Java 6 までの Concurrency Framework の主役は、 Executor です。Executor を使うと、 非同期処理や複数セッション処理の並列化を、効率的に実装することができます。
Java 7 では、 Fork/Join という新しい仕組みが登場します。これは、 Executor とは異なった種類の計算、 CPU が律速となるような重い計算をターゲットにしています。重い計算を小さなタスクに分割して、小分けになったタスクを複数のスレッド (プロセッサ) が並列実行することで、高速に処理する、という実行モデルです。OpenMP に似ているはずです。使ったことないけど。
Fork/Join の仕組みについて、詳しくは、下記桜庭さんの記事をお読みください。スレッドごとの実行待ち行列の取り回しの仕方など、面白いです。
Java SE 7徹底理解 第2回 細粒度の並行処理 - Fork/Join Framework
仕組みは分かったつもりになりましたが、やっぱり動かしてみて、どれだけ効率的に処理できるのか見てみたいです。ということで、 Fork/Join を使ってマージソートとクイックソートを書いてみました。両方とも再帰的な分割統治法のアルゴリズムで、個々のタスクが子タスク以外に依存しないので、 Fork/Join で実装するのにもってこいです。
以下載せるコードは、 BitBucket に公開しています。
https://bitbucket.org/miyakawataku/parallelsort/
マージソート
まずはマージソート。マージソートは通常、ソート対象の部分リストを引数にもらって、ソートされた新しいリストを戻す、という具合に実装します。こういった関数的なタスクを Fork/Join フレームワーク上で実装するには、 RecursiveTask クラスを継承します。メインの処理は compute メソッドに書きます。
public class MergeSortTask< T extends Comparable< ? super T > > extends RecursiveTask< List< T > > { /** ソートするリスト. */ private final List< T > list; public MergeSortTask( List< T > list ) { this.list = list; } /** ソートされた新しいリストを戻す。 */ @Override protected List< T > compute() { if ( this.list.size() <= 1 ) { return this.list; } else { int mid = this.list.size() / 2; // (1) 左側の部分リストをソートするタスク MergeSortTask< T > leftTask = new MergeSortTask<>( list.subList( 0 , mid ) ); // (2) 右側の部分リストをソートするタスク MergeSortTask< T > rightTask = new MergeSortTask<>( list.subList( mid , this.list.size() ) ); // (3) 右側のタスクを実行待ち行列に入れる rightTask.fork(); // (4) 左側のタスクを実行する List< T > leftSorted = leftTask.compute(); // (5) 右側のタスクの結果を取る List< T > rightSorted = rightTask.join(); return merge( leftSorted , rightSorted ); } } /** 引数のリストをマージした新しいリストを戻す。 */ private List< T > merge( List< T > leftList , List< T > rightList ) { // 省略 } }
右側の部分リストをソートするタスクは、いったん実行待ち行列 *1 に入れて (fork) 、結果をもらって (join) います。左側の部分リストをソートするタスクについては、単に compute メソッドを再帰的に呼び出しています。ここで、(3)と(4)、もしくは(4)と(5)の順序を入れ替えると、残念なことになります。
この MergeSortTask を使ってソート処理を行うには、 ForkJoinPool という、 Fork/Join 専用のスレッドプールに対してタスクを投げます。
// 何も引数を与えないと、プロセッサ数と同じだけの数のスレッドが作られる ForkJoinPool pool = new ForkJoinPool(); List< Integer > result = pool.invoke( new MergeSortTask< Integer >( numbers ) );
クイックソート
続いてクイックソート。クイックソートは通常、ソート対象の部分リストについて、左側に小さい値を、右側に大きい値をより分けた上で、左右それぞれの部分リストに対してさらにクイックソートを施します。こうした、副作用によって計算結果を保存するタスクを Fork/Join フレームワーク上で実装するには、 RecursiveAction クラスを継承します。メインの処理は compute メソッドに書きます。
public class QuickSortTask< T extends Comparable< ? super T > > extends RecursiveAction { /** ソート対象のリスト。computeメソッドによって内容が変更される。 */ private final List< T > list; public QuickSortTask( List< T > list ) { this.list = list; } /** リストを破壊的にソートする。 */ @Override public void compute() { if ( this.list.size() <= 1 ) { return; } int partition = partition(); QuickSortTask< T > leftTask = new QuickSortTask<>( this.list.subList( 0 , partition ) ); QuickSortTask< T > rightTask = new QuickSortTask<>( this.list.subList( partition + 1 , this.list.size() ) ); rightTask.fork(); leftTask.compute(); rightTask.join(); } /** 小さい値を左に、大きい値を右に振り分け、閾値のインデクスを戻す。 */ private int partition() { // 省略 } }
compute メソッドが値を戻さないことの他は、マージソートとほとんど同じです。
QuickSortTask を使ってソート処理を行うコードは、次のとおりです。
ForkJoinPool pool = new ForkJoinPool(); // numbers は破壊的にソートされる pool.invoke( new QuickSortTask< Integer >( numbers ) );
実行してみた
Amazon EC2 の High-CPU Extra Large Instance (2.5コア相当×8仮想CPU) で実行してみました。
2000万件のランダムな整数列に対し、 ForkJoinPool のスレッド数を1から12まで変化させて *2 、3回ずつ実行しました。対照のために、処理系付属の Collections.sort メソッドでソートした時間も測りました。
クイックソート
スレッド数 | 平均実行時間 (ミリ秒) | 並列度=1の時の実行時間との逆比 |
---|---|---|
1 | 30778 | 1 |
2 | 18290 | 1.68278 |
3 | 14609 | 2.10683 |
4 | 12634 | 2.43618 |
5 | 12659 | 2.43124 |
6 | 12297 | 2.50294 |
7 | 11576 | 2.65883 |
8 | 11249 | 2.73604 |
9 | 11294 | 2.72506 |
10 | 11236 | 2.73929 |
11 | 11402 | 2.69941 |
12 | 11071 | 2.78003 |
8並列あたりで上限に達する感じです。
マージソート
スレッド数 | 平均実行時間 (ミリ秒) | 並列度=1の時の実行時間との逆比 |
---|---|---|
1 | 43624 | 1 |
2 | 31950 | 1.36539 |
3 | 29216 | 1.49313 |
4 | 29864 | 1.46073 |
5 | 28704 | 1.5198 |
6 | 34297 | 1.27196 |
7 | 29217 | 1.4931 |
8 | 28446 | 1.53357 |
9 | 28883 | 1.51035 |
10 | 29324 | 1.48765 |
11 | 28479 | 1.53177 |
12 | 30070 | 1.45076 |
スレッドを増やしても、2倍まで達しませんでした。クイックソートと比べてスケールしないのは、タスクごとに結果リストをnewするからかも知れません。ヒープはスレッド間の共有リソースなので、GCが頻発する状況では、並行性が落ちます。
まとめ
- Fork/Join は CPU 律速な重い処理を、細粒度で並列実行して実行速度を稼ぐためのフレームワークです
- 関数的なタスクは RecursiveTask を継承して実装します
- 破壊的なタスクは RecursiveAction を継承して実装します
- Fork/Join のタスクは ForkJoinPool で実行します
- タスク内でヒープをたくさん使うと、いまひとつ並行性が上がらないかも知れません (未確認)
日頃 OpenMP 等で CPU をしばき上げている人が、 Java でコードを書く時に使える道具だと思います。僕はSI屋で仕事をしているのですが、その仕事の限りにおいて必要になることは、多分ないんじゃないかと思います。たった1つの重い処理を、CPU上でぶん回す、ということがまれだからです。