Java 7 の Fork/Join で 並列マージソート & クイックソート

Java 6 までの Concurrency Framework の主役は、 Executor です。Executor を使うと、 非同期処理や複数セッション処理の並列化を、効率的に実装することができます。

Java 7 では、 Fork/Join という新しい仕組みが登場します。これは、 Executor とは異なった種類の計算、 CPU が律速となるような重い計算をターゲットにしています。重い計算を小さなタスクに分割して、小分けになったタスクを複数のスレッド (プロセッサ) が並列実行することで、高速に処理する、という実行モデルです。OpenMP に似ているはずです。使ったことないけど。

Fork/Join の仕組みについて、詳しくは、下記桜庭さんの記事をお読みください。スレッドごとの実行待ち行列の取り回しの仕方など、面白いです。

Java SE 7徹底理解 第2回 細粒度の並行処理 - Fork/Join Framework

仕組みは分かったつもりになりましたが、やっぱり動かしてみて、どれだけ効率的に処理できるのか見てみたいです。ということで、 Fork/Join を使ってマージソートクイックソートを書いてみました。両方とも再帰的な分割統治法のアルゴリズムで、個々のタスクが子タスク以外に依存しないので、 Fork/Join で実装するのにもってこいです。

以下載せるコードは、 BitBucket に公開しています。

https://bitbucket.org/miyakawataku/parallelsort/

マージソート

まずはマージソートマージソートは通常、ソート対象の部分リストを引数にもらって、ソートされた新しいリストを戻す、という具合に実装します。こういった関数的なタスクを Fork/Join フレームワーク上で実装するには、 RecursiveTask クラスを継承します。メインの処理は compute メソッドに書きます。

public class MergeSortTask< T extends Comparable< ? super T > >
        extends RecursiveTask< List< T > > {

    /** ソートするリスト. */
    private final List< T > list;

    public MergeSortTask( List< T > list ) {
        this.list = list;
    }

    /** ソートされた新しいリストを戻す。 */
    @Override
    protected List< T > compute() {
        if ( this.list.size() <= 1 ) {
            return this.list;
        } else {
            int mid = this.list.size() / 2;
            // (1) 左側の部分リストをソートするタスク
            MergeSortTask< T > leftTask = new MergeSortTask<>(
                    list.subList( 0 , mid ) );
            // (2) 右側の部分リストをソートするタスク
            MergeSortTask< T > rightTask = new MergeSortTask<>(
                    list.subList( mid , this.list.size() ) );
            // (3) 右側のタスクを実行待ち行列に入れる
            rightTask.fork();
            // (4) 左側のタスクを実行する
            List< T > leftSorted = leftTask.compute();
            // (5) 右側のタスクの結果を取る
            List< T > rightSorted = rightTask.join();
            return merge( leftSorted , rightSorted );
        }
    }

    /** 引数のリストをマージした新しいリストを戻す。 */
    private List< T > merge( List< T > leftList , List< T > rightList ) {
        // 省略
    }
}

右側の部分リストをソートするタスクは、いったん実行待ち行列 *1 に入れて (fork) 、結果をもらって (join) います。左側の部分リストをソートするタスクについては、単に compute メソッドを再帰的に呼び出しています。ここで、(3)と(4)、もしくは(4)と(5)の順序を入れ替えると、残念なことになります。

この MergeSortTask を使ってソート処理を行うには、 ForkJoinPool という、 Fork/Join 専用のスレッドプールに対してタスクを投げます。

// 何も引数を与えないと、プロセッサ数と同じだけの数のスレッドが作られる
ForkJoinPool pool = new ForkJoinPool();
List< Integer > result = pool.invoke( new MergeSortTask< Integer >( numbers ) );

クイックソート

続いてクイックソートクイックソートは通常、ソート対象の部分リストについて、左側に小さい値を、右側に大きい値をより分けた上で、左右それぞれの部分リストに対してさらにクイックソートを施します。こうした、副作用によって計算結果を保存するタスクを Fork/Join フレームワーク上で実装するには、 RecursiveAction クラスを継承します。メインの処理は compute メソッドに書きます。

public class QuickSortTask< T extends Comparable< ? super T > >
        extends RecursiveAction {

    /** ソート対象のリスト。computeメソッドによって内容が変更される。 */
    private final List< T > list;

    public QuickSortTask( List< T > list ) {
        this.list = list;
    }

    /** リストを破壊的にソートする。 */
    @Override
    public void compute() {
        if ( this.list.size() <= 1 ) {
            return;
        }
        int partition = partition();
        QuickSortTask< T > leftTask = new QuickSortTask<>(
                this.list.subList( 0 , partition ) );
        QuickSortTask< T > rightTask = new QuickSortTask<>(
                this.list.subList( partition + 1 , this.list.size() ) );
        rightTask.fork();
        leftTask.compute();
        rightTask.join();
    }

    /** 小さい値を左に、大きい値を右に振り分け、閾値のインデクスを戻す。 */
    private int partition() {
        // 省略
    }
}

compute メソッドが値を戻さないことの他は、マージソートとほとんど同じです。

QuickSortTask を使ってソート処理を行うコードは、次のとおりです。

ForkJoinPool pool = new ForkJoinPool();
// numbers は破壊的にソートされる
pool.invoke( new QuickSortTask< Integer >( numbers ) );

実行してみた

Amazon EC2 の High-CPU Extra Large Instance (2.5コア相当×8仮想CPU) で実行してみました。

2000万件のランダムな整数列に対し、 ForkJoinPool のスレッド数を1から12まで変化させて *2 、3回ずつ実行しました。対照のために、処理系付属の Collections.sort メソッドでソートした時間も測りました。

クイックソート
スレッド数 平均実行時間 (ミリ秒) 並列度=1の時の実行時間との逆比
1 30778 1
2 18290 1.68278
3 14609 2.10683
4 12634 2.43618
5 12659 2.43124
6 12297 2.50294
7 11576 2.65883
8 11249 2.73604
9 11294 2.72506
10 11236 2.73929
11 11402 2.69941
12 11071 2.78003

8並列あたりで上限に達する感じです。

マージソート
スレッド数 平均実行時間 (ミリ秒) 並列度=1の時の実行時間との逆比
1 43624 1
2 31950 1.36539
3 29216 1.49313
4 29864 1.46073
5 28704 1.5198
6 34297 1.27196
7 29217 1.4931
8 28446 1.53357
9 28883 1.51035
10 29324 1.48765
11 28479 1.53177
12 30070 1.45076

スレッドを増やしても、2倍まで達しませんでした。クイックソートと比べてスケールしないのは、タスクごとに結果リストをnewするからかも知れません。ヒープはスレッド間の共有リソースなので、GCが頻発する状況では、並行性が落ちます。

Collections.sort
スレッド数 平均実行時間 (ミリ秒)
1 (固定) 222704

Fork/Join 版のクイックソートを2スレッド以上で動かした時の方が、 Collections.sort *3 よりも若干速くなりました。

まとめ

  • Fork/Join は CPU 律速な重い処理を、細粒度で並列実行して実行速度を稼ぐためのフレームワークです
  • 関数的なタスクは RecursiveTask を継承して実装します
  • 破壊的なタスクは RecursiveAction を継承して実装します
  • Fork/Join のタスクは ForkJoinPool で実行します
  • タスク内でヒープをたくさん使うと、いまひとつ並行性が上がらないかも知れません (未確認)

日頃 OpenMP 等で CPU をしばき上げている人が、 Java でコードを書く時に使える道具だと思います。僕はSI屋で仕事をしているのですが、その仕事の限りにおいて必要になることは、多分ないんじゃないかと思います。たった1つの重い処理を、CPU上でぶん回す、ということがまれだからです。

*1: 実体はスレッドごとの双方向キューです

*2: 12スレッドまで調べている理由は、8コアのインスタンスだと思っていたからです。 /proc/cpuinfo にも processor が8つ見えていました。ここらへん難しいので、本当は物理マシンで測る方がよいと思います

*3: 内部実装はマージソートです