2009年7月8日水曜日

C++: マルチコアCPUを利用した並列化による高速な階層的クラスタリング

このブログ記事をはてなブックマークに追加

比較的データ数の多い階層的クラスタリングを行う必要があり、手元にあるRで処理を始めたのだが思ったよりも遅かった。そこで、マルチコアCPUを利用した並列化で高速化することにした。RでもGPGPUを使って高速化したプログラムがあるようなのだが、すぐに使える高性能GPUを用意できなかったし、それに、TBBライブラリを使った並列化は手間も時間も掛からないので作ってしまった方が良いと判断した。

尚、このプログラムで作成したクラスタデータのデンドログラム描画や閾値による区分けについては一つの記事で書くには大きすぎるので、記事を分けて、「Python: 階層的クラスタリングのデンドログラム描画と閾値による区分け」に回すことにする。

まずは、「集合知プログラミング」にPythonによる階層的クラスタリングのソースコードが載っていたのでそれをC++で書き直した。アルゴリズム自体はそれほど複雑なものではないのでコーディングは簡単だった。ただ、クラスタ同士の距離を最遠隣法(complete link)にしたかったので、それについては変更を加えた。また、TBBライブラリを利用するために、分割して計算を行う部分については関数オブジェクトで実装した。

余談だが、この「集合知プログラミング」はかなり良い書籍だと思う。Pythonを使っているのでコードも読みやすい。興味のある方は是非読んでみて欲しい。

完成したソースコードは最後に示しておいた。赤で示した部分がユークリッド距離を計算する関数であり、この部分を変更することで、べき乗距離にすることも、マンハッタン距離にすることも、別のどんな指標にすることも、簡単にできる。因みに自分はRMSDの2乗で計算を行った。また、青で示した部分が最遠隣法の計算部分であり、ここを書き換えることで群平均法や重心法などの他の結合方法に変更することができる。

入出力データについてだが、任意の次元数の1データをスペース区切りの一行とし、複数データを格納したファイルを入力ファイルとしている。そして、結合する距離、データの番号のペアを一行としたデータが出力ファイルとして書き出される。番号は入力データを正の整数(0~)として、データの集合体であるクラスタを負の整数(-1~)として扱っている。

それでは実際に計算してみよう。まず、以下のような入力ファイルを用意する。この例では次元数3でデータ数は20となる。

test.dat

3.790 7.255 10.259 3.848 7.352 9.208 3.869 7.320 9.771 3.937 7.418 10.621 3.848 7.050 10.359 3.840 7.232 10.370 3.944 7.509 10.918 3.833 7.168 10.598 3.908 7.260 10.822 3.904 7.112 10.834 3.786 7.336 10.060 3.856 7.155 10.722 3.723 6.424 9.654 3.796 6.936 10.652 3.843 7.069 10.440 3.829 7.417 10.582 3.863 7.289 10.669 3.905 6.078 8.719 3.830 6.668 10.246 3.811 6.202 8.308

これを、以下のように実行する。ここでの実行ファイルは clusters_tbb としている。

clusters_tbb test.dat test.out

出力ファイル test.out の内容は以下の通り。

test.out

0.0833487 4 14 0.11483 3 15 0.123895 0 5 0.126783 7 11 0.144271 16 -4 0.14854 8 9 0.253505 -5 -6 0.264889 -1 -3 0.301108 2 10 0.366858 6 -2 0.382649 13 -7 0.439469 17 19 0.588505 18 -8 0.648837 -10 -11 0.80762 -9 -13 1.03717 1 12 1.16488 -14 -15 1.46078 -12 -16 2.92199 -17 -18

一番最初の行は、距離0.11483で3番(先頭は0番)と15番のデータが結合してクラスタを作っていることを示し、最後の行では、距離2.92199で17番(先頭は1番、-1と表示される)と18番のクラスタが結合していることを示している。

最後にRとの速度比較を行った。入力として次元数45の2000データを使用した。また、Rについては、Rscriptを利用して以下のように実行している。

Rscript test.R

test.R

data = read.table("test.dat") d = dist(data) hc <- hclust(d, method="complete") write.table(hc["merge"], "merge.out", row.names=F, col.names=F) write.table(hc["height"], "height.out", row.names=F, col.names=F)

結果は以下の通り。

Fedora 8 / Intel Core2 Duo E8400@3.00GHz x 1 CPU / 2 GB R 2.6.1 Rscript / gcc 4.1.2, tbb 2.1 : g++ clusters_tbb.cpp -O3 -ltbb R 17.46秒 1 core 13.02秒 2 cores 6.91秒

Red Hat Enterprise Linux 5 / Quad-Core AMD Opteron 8356 x 8 CPU / 196 GB R 2.9.0 Rscript / icpc 11.0, tbb 2.1 : icpc clusters_tbb.cpp -O3 -ltbb R 36.11秒 1 core 27.25秒 2 cores 14.58秒 4 cores 7.45秒 8 cores 4.50秒

Red Hat Enterprise Linux 5 / Intel Xeon E5450@3.00GHz x 2 CPU / 8 GB icpc 11.0, tbb 2.1 : icpc clusters_tbb.cpp -O3 -ltbb 1 core 11.16秒 2 cores 5.45秒 4 cores 3.25秒 8 cores 2.42秒

Microsoft Windows XP SP3 / Intel Core2 6600@2.40GHz x 1 CPU / 2 GB R 2.9.1 Rscript / cl 15.00 (Visual Studio 2008), tbb 2.1 : cl clusters_tbb.cpp /EHsc /Ox /MD R 23.65秒 1 core 43.53秒 2 cores 23.14秒

Linuxでは1コアでの計算でも今回作成したプログラムの方がRの階層的クラスタリングよりも若干速い結果を出している。また、並列化に関しては、2コアでは1コアのほぼ倍の性能を出しており、SMP環境での実行速度を見ると、4コアで3.6倍以上、8コアでも6倍以上で、十分な性能が出ていると思う。ただ、OpteronのSMP環境ではRでも自作プログラムでも遅いが何故だろう。あと、Windows環境では1コアでRの方が倍ぐらい速く、2コアでトントンだった。メモリ関連の実装で効率が違うのだろうか。

Python: 階層的クラスタリングのデンドログラム描画と閾値による区分け」に続く。

clusters_tbb.cpp

// hierarchical clustering using the TBB library // by nox, 2009.07.02 // // Linux - Intel C++ // icpc clusters_tbb.cpp -O3 -ltbb -o clusters_tbb // // Linux - GCC // g++ clusters_tbb.cpp -O3 -ltbb -o clusters_tbb // // Windows - Microsoft Visual Studio 2008 // cl clusters_tbb.cpp /EHsc /Ox /MD #include <iostream> #include <fstream> #include <sstream> #include <vector> #include <algorithm> #include <utility> #include <cmath> #include "tbb/task_scheduler_init.h" #include "tbb/blocked_range.h" #include "tbb/parallel_for.h" #include "tbb/parallel_reduce.h" #include "tbb/parallel_sort.h" using namespace std; using namespace tbb; double calc_distance(const vector<double>& vec1, const vector<double>& vec2) { vector<double>::const_iterator p, q; double s = 0.0; for (p = vec1.begin(), q = vec2.begin(); p != vec1.end(); ++p, ++q) s += (*p - *q) * (*p - *q); return sqrt(s); } static vector<int> table_i; struct bicluster { bicluster* left; bicluster* right; int id; vector<int> vec_ids; double distance; bicluster(bicluster* left_, bicluster* right_, int id_, vector<int> vec_ids_, double distance_) : left(left_), right(right_), id(id_), vec_ids(vec_ids_), distance(distance_) { } }; typedef vector<bicluster*>::iterator vec_t; typedef pair<vec_t, vec_t> pair_t; typedef pair<int, int> distance_pair_t; typedef vector<double> distances_t; class CompleteLink { public: double dist; const vector<int>& clust; const distances_t& distances; CompleteLink(double d_, const vector<int>& clust_, const distances_t& distances_) : dist(d_), clust(clust_), distances(distances_) { } CompleteLink(const CompleteLink& cl, split) : dist(cl.dist), clust(cl.clust), distances(cl.distances) { } void operator()(const blocked_range<vector<int>::const_iterator>& r) { for (vector<int>::const_iterator p = r.begin(); p != r.end(); ++p) { for (vector<int>::const_iterator q = clust.begin(); q != clust.end(); ++q) { int i = min(*p, *q); int j = max(*p, *q); dist = max(distances[table_i[i] + j], dist); } } } void join(const CompleteLink& cl) { dist = max(dist, cl.dist); } }; double calc_cluster_distance(const vector<int>& clust_i, const vector<int>& clust_j, const distances_t& distances) { CompleteLink cl(0.0, clust_j, distances); parallel_reduce(blocked_range<vector<int>::const_iterator>(clust_i.begin(), clust_i.end(), 100), cl); return cl.dist; } class ReduceHCluster { public: const vector<vector<double> >& data; const vector<bicluster*>& clusters; distances_t& distances; pair_t lowest_pair; double closest; ReduceHCluster(const vector<vector<double> >& data_, const vector<bicluster*>& clusters_, distances_t& distances_, pair_t lowest_pair_, double closest_) : data(data_), clusters(clusters_), distances(distances_), lowest_pair(lowest_pair_), closest(closest_) { } ReduceHCluster(const ReduceHCluster& hc, split) : data(hc.data), clusters(hc.clusters), distances(hc.distances), lowest_pair(hc.lowest_pair), closest(hc.closest) { } void operator()(const blocked_range<vec_t>& r) { for (vec_t p = r.begin(); p != r.end(); ++p) { for (vec_t q = p + 1; q != clusters.end(); ++q) { int i = min((*p)->id, (*q)->id); int j = max((*p)->id, (*q)->id); if (distances[table_i[i] + j] < 0) distances[table_i[i] + j] = calc_cluster_distance((*p)->vec_ids, (*q)->vec_ids, distances); double d = distances[table_i[i] + j]; if (d < closest) { closest = d; lowest_pair = make_pair(p, q); } } } } void join(const ReduceHCluster& hc) { if (closest > hc.closest) { closest = hc.closest; lowest_pair = hc.lowest_pair; } } }; class HClusterDistances { private: const vector<vector<double> >& data; int n; distances_t& distances; public: HClusterDistances(const vector<vector<double> >& data_, int n_, distances_t& distances_) : data(data_), n(n_), distances(distances_) { } void operator()(const blocked_range<int>& r) const { for (int i = r.begin(); i != r.end(); i++) for (int j = i + 1; j < n; j++) distances[table_i[i] + j] = calc_distance(data[i], data[j]); } }; inline int index_i(int n, int max_n) { int s = 0; for (int i = 0; i < n; i++) s += max_n - i; return s - n - 1; } void hcluster(const vector<vector<double> >& data, vector<bicluster*>& clusters) { int n = clusters.size() * 2; for (int i = 0; i < n; i++) table_i.push_back(index_i(i, n - 1)); // distance between i and j: distances[table_i(i) + j] distances_t distances(n * (n - 1) / 2, -1.0); int current_clust_id = clusters.size(); parallel_for(blocked_range<int>(0, clusters.size() - 1, 100), HClusterDistances(data, clusters.size(), distances)); while (clusters.size() > 1) { cerr << clusters.size() << endl; pair_t lowest_pair = make_pair(clusters.begin(), clusters.begin() + 1); double closest = calc_cluster_distance((*lowest_pair.first)->vec_ids, (*lowest_pair.second)->vec_ids, distances); ReduceHCluster hc(data, clusters, distances, lowest_pair, closest); parallel_reduce(blocked_range<vec_t>(clusters.begin(), clusters.end(), 50), hc); closest = hc.closest; lowest_pair = hc.lowest_pair; vector<int> vec_ids((*lowest_pair.first)->vec_ids); vec_ids.insert(vec_ids.end(), (*lowest_pair.second)->vec_ids.begin(), (*lowest_pair.second)->vec_ids.end()); vector<int>().swap((*lowest_pair.second)->vec_ids); vector<int>().swap((*lowest_pair.first)->vec_ids); bicluster* new_cluster = new bicluster(*lowest_pair.first, *lowest_pair.second, current_clust_id, vec_ids, closest); current_clust_id++; clusters.erase(lowest_pair.second); clusters.erase(lowest_pair.first); clusters.push_back(new_cluster); } } void read_data(const string fname, vector<vector<double> >& data) { fstream fs(fname.c_str(), ios_base::in); string line; stringstream ss; vector<double> v; double value; while (getline(fs, line)) { for (ss.str(line), v.clear(); ss >> value; v.push_back(value)); ss.clear(); data.push_back(v); } } void store_clusters(const bicluster* cluster, vector<pair<double, distance_pair_t> >& cluster_data, int max_n) { static int n = 0; if (cluster->left && cluster->right) cluster_data.push_back(make_pair(cluster->distance, make_pair( cluster->left->id < max_n ? cluster->left->id : -(cluster->left->id - max_n + 1), cluster->right->id < max_n ? cluster->right->id : -(cluster->right->id - max_n + 1)))); if (cluster->left) store_clusters(cluster->left, cluster_data, max_n); if (cluster->right) store_clusters(cluster->right, cluster_data, max_n); } void write_clusters(const string fname, const vector<pair<double, distance_pair_t> >& cluster_data) { fstream fs(fname.c_str(), ios_base::out | ios_base::trunc); for (vector<pair<double, distance_pair_t> >::const_iterator p = cluster_data.begin(); p != cluster_data.end(); ++p) { fs << p->first << " " << p->second.first << " " << p->second.second << endl; } } int main(int argc, char** argv) { if (argc < 3) { cerr << "Hierarchical clustering using the TBB library." << endl; cerr << endl; cerr << " Usage: " << argv[0] << " input_data_file output_cluster_file" << endl; exit(1); } task_scheduler_init init; vector<vector<double> > data; vector<bicluster*> clusters; read_data(argv[1], data); for (int i = 0; i < data.size(); i++) clusters.push_back(new bicluster(NULL, NULL, i, vector<int>(1, i), 0.0)); hcluster(data, clusters); vector<pair<double, distance_pair_t> > cluster_data; store_clusters(clusters[0], cluster_data, data.size()); parallel_sort(cluster_data.begin(), cluster_data.end()); write_clusters(argv[2], cluster_data); return 0; }

0 コメント: