SparkのRDDとDataFrameでそれぞれwordcount

Sparkでデータ処理プログラムを書くためのAPIには、RDDとDataFrameの二種類がある。2つのAPIを用いてwordcountを書いてみる。wordcountは、テキスト中の単語の出現回数を数えるプログラムであり、分散データ処理の必修課題である。

RDDは低レベルなAPIで、データのレコードにはスキーマがない。データ処理は、map関数やflatMap関数などリスト処理的な高階関数によって記述する。reduceByKeyなどいくつかの操作は、レコードが(key, value)のタプルであることを要求するが、その検査はジョブ投入時ではなく、タスク実行時に行われる。総じて、古式ゆかしいMapReduceの感覚で扱える。

DataFrameは高レベルのAPIで、データのレコードにはスキーマが適用される。データ処理は、SQLによって記述するか、あるいはホスト言語上のDSL(以下クエリDSL)を用いてクエリを組み立てることにより行う。レコード内のデータに対する演算は、Rの式オブジェクトのようなものを組み立てて演算子に渡す。式オブジェクトの表現力が不足するときには、ホスト言語のクロージャを用いてUDFを定義し、クエリで用いる。総じて、RのDataFrameに、JPAのJPQLとCriteria APIを組み合わせたような感覚で扱える。

DataFrameの方が新しいAPIである。DataFrameの利点としては、ジョブ投入時点でスキーマの整合性が検査されること、スキーマを用いて実行計画の最適化が行われること、Hiveとの統合が容易なことが挙げられる。主流はこちらに移りつつあり、Spark上の機械学習ライブラリも、RDDベースのspark.mllibからDataFrameベースのspark.mlへの移行が予定されている。

環境構築

Google Cloud PlatformのCloud Dataprocを使う。Cloud DataprocはHadoop/Sparkクラスタを立ち上げるサービスである。

Dataproc APIを有効にした上で次のコマンドを実行し、「wc」という名前のHadoop/Sparkクラスタを立ち上げる。

gcloud dataproc clusters create wc --zone asia-northeast1-b

プログラムの共通仕様

  • ホスト言語としてPythonを用いる。
  • 次の3つのコマンドライン引数を取る。
  • 入力テキストの全wordcountをCSV形式で出力パスに書く。
  • 頻出語のwordcountを標準出力に印字する。

RDDによるwordcount

#!/usr/bin/env python
# coding: utf-8
# vim: et sw=4 sts=4

import argparse

from pyspark import SparkContext

def wordcount_rdd(input_path, output_path, min_popular):
    # (1) RDDの操作はSparkContext起点に行う
    spark_context = SparkContext()

    # (2) テキスト行をレコードとするRDD
    lines = spark_context.textFile(input_path)

    # (3) 単語をレコードとするRDD
    words = lines.flatMap(lambda line: line.split())

    # (4) (単語, 1)をレコードとするRDD
    word_ones = words.map(lambda word: (word, 1))

    # (5) (単語, 出現回数)をレコードとするRDD
    word_counts = word_ones.reduceByKey(lambda count1, count2: count1 + count2)

    # (6) ここまでの計算結果を(8)以降で流用するためにキャッシュする
    word_counts.persist()

    # (7) CSV形式で出力
    csv = word_counts.map(lambda (word, count): u'%s, %s' % (word, count))
    csv.saveAsTextFile(output_path)

    # (8) 頻出語を絞り込む
    popular = word_counts.filter(lambda (word, count): count >= min_popular)

    # (9) ローカルにデータを取ってきて印字
    for (word, count) in popular.collect():
        print '%s => %s' % (word, count)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('input_path')
    parser.add_argument('output_path')
    parser.add_argument('min_popular', type = int)
    args = parser.parse_args()

    wordcount_rdd(args.input_path, args.output_path, args.min_popular)

単語ごとの出現回数を数えるには、同一単語のレコードを一ヶ所にかき集める必要がある。これを行うのが(5)のreduceByKeyである。reduceByKeyは入力に(key, value)のタプルを要求して、同一keyの全valueをreduceする。reduceByKeyに渡せるようにするため、(4)で(単語, 1)のレコードを作っている。1を出現回数分足し合わせれば、当然結果は出現回数になる。

プログラム中、flatMap, map, reduceByKey, filterはRDDから別のRDDを作るtransformation(変換処理)であり、saveAsTextFile, collectは一連のデータフローの結果を得るaction(アクション)である。Sparkのデータフローはアクションのメソッドが呼ばれるたびに起動される。

デフォルトでは、各アクションはデータフローの最初から計算をし直す。処理時間短縮のため、先行するアクションの途中結果を後続のアクションで流用するには、(6)のようにpersistするか、あるいはcacheする。cacheはオプション引数を付けてpersistを呼ぶだけのメソッド。

Google Cloud Storage (GCS) を入力元・出力先としてジョブを実行するには、SparkクラスタのいずれかのVMで、次のようにコマンドを実行する。

spark-submit \
  wordcount_rdd.py \
  gs://<バケット>/<入力テキスト> \
  gs://<バケット>/<出力> \
  100

GCSではなくHDFSを使うのであれば、パスは「hdfs://...」とする。

Cloud Dataprocでは、gcloudコマンドでジョブを投入することもできる。この場合、GCPのWebコンソールにてジョブの履歴が確認できる。

gcloud dataproc jobs submit pyspark \
  wordcount_rdd.py \
  --cluster=wc \
  -- \
  gs://<バケット>/<入力テキスト> \
  gs://<バケット>/<出力> \
  100

DataFrameのSQLによるwordcount

#!/usr/bin/env python
# coding: utf-8
# vim: et sw=4 sts=4

import argparse

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructField, StructType, StringType, ArrayType

def wordcount_sql(input_path, output_path, min_popular):
    # (1) DataFrameの操作はSparkSession起点で行う
    spark_context = SparkContext()
    spark_session = SparkSession(spark_context)

    # (2) UDFの登録等はSQLContext起点で行う
    sql_context = SQLContext(spark_context, spark_session)

    # (3) (value:StringType)をスキーマとするDataFrame
    text = spark_session.read.text(input_path)

    # (4) SQLで使えるようにビューとして登録
    text.createTempView('text')

    # (5) tokenize UDF
    sql_context.registerFunction('tokenize',
        lambda s: s.split(), returnType = ArrayType(StringType()))

    # (6) (word:StringType, count:LongType)をスキーマとするDataFrame
    word_counts = spark_session.sql(
            '''SELECT word, COUNT(*) AS count
            FROM (SELECT EXPLODE(tokenize(value)) AS word FROM text) words
            GROUP BY word''')

    # (7) 後続のアクションのためにキャッシュ
    word_counts.persist()

    # (8) CSV形式で出力
    word_counts.write.save(output_path)

    # (9) SQLで使えるようにビューとして登録
    word_counts.createTempView('word_counts')

    # (10) 頻出語を絞り込む
    popular = spark_session.sql(
        'SELECT word, count FROM word_counts WHERE count >= %d' % min_popular)

    # (11) ローカルにデータを取ってきて印字
    for record in popular.collect():
        print '%s => %s' % (record['word'], record['count'])

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('input_path')
    parser.add_argument('output_path')
    parser.add_argument('min_popular', type = int)
    args = parser.parse_args()

    wordcount_sql(args.input_path, args.output_path, args.min_popular)

DataFrameをSQLから参照するためには、(4), (9)のようにcreateTempViewでビューとして登録する。ビューの生存期間はSparkSessionと同じ。

SQLがデフォルトでサポートしない演算については、(5)のようにUDFを作る。これについては、組み込みのsplit関数で行けそうだけど、例示のためにUDFにしてみた。

(6)のSQL中で使っているEXPLODE関数は、レコード中の配列を縦持ちのレコード群に変換している。

ジョブを投入するコマンド。spark.sql.shuffle.partitionsは、GROUP BYが作るパーティションの数で、デフォルトは100。パーティションごとにタスクが実行される。自分が使ったデータでは、タスク起動のオーバーヘッドが大きかったので、2に変更している。DataFrameのrepartitionメソッドで、個別に指定もできる。

spark-submit \
  --conf spark.sql.shuffle.partitions=2 \
  wordcount_sql.py \
  gs://<バケット>/<入力テキスト> \
  gs://<バケット>/<出力> \
  100

gcloudコマンドを使う場合。

gcloud dataproc jobs submit pyspark \
  wordcount_sql.py \
  --cluster=wc \
  --properties=spark.sql.shuffle.partitions=2 \
  -- \
  gs://<バケット>/<入力テキスト> \
  gs://<バケット>/<出力> \
  100

DataFrameのクエリDSLによるwordcount

#!/usr/bin/env python
# coding: utf-8
# vim: et sw=4 sts=4

import argparse

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, ArrayType
from pyspark.sql.functions import udf, explode

def wordcount_df(input_path, output_path, min_popular):
    # (1) DataFrameの操作はSparkSession起点で行う
    spark_context = SparkContext()
    spark_session = SparkSession(spark_context)

    # (2) (value:StringType)をスキーマとするDataFrame
    text = spark_session.read.text(input_path)

    # (3) tokenize UDF
    tokenize_udf = udf(lambda s: s.split(), returnType = ArrayType(StringType()))

    # (4) (word:StringType)をスキーマとするDataFrame
    words = text.select(explode(tokenize_udf(text['value'])).alias('word'))

    # (5) (word:StringType, count:LongType)をスキーマとするDataFrame
    word_counts = words.groupBy('word').count()

    # (6) 後続のアクションのためにキャッシュ
    word_counts.persist()

    # (7) CSV形式で出力
    word_counts.write.csv(output_path)

    # (8) 頻出語を絞り込む
    popular = word_counts.filter(word_counts['count'] >= min_popular)

    # (9) ローカルにデータを取ってきて印字
    for rec in popular.collect():
        print '%s => %s' % (rec['word'], rec['count'])

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('input_path')
    parser.add_argument('output_path')
    parser.add_argument('min_popular', type = int)
    args = parser.parse_args()

    wordcount_df(args.input_path, args.output_path, args.min_popular)

(4)の「tokenize_udf(text['value'])」, (8)の「word_counts['count'] >= min_popular」は、この場で計算しているのではなく、ワーカノード上で実行するべき計算を、式オブジェクトとして作っている。(9)について言えば、「word_counts['count']」でcount列を表すColumnオブジェクトを取ってきて、Columnオブジェクトに定義された「>=」演算子で、計算結果の式オブジェクトに相当するColumnオブジェクトを生成している。

ジョブを投入するコマンド。

spark-submit \
  --conf spark.sql.shuffle.partitions=2 \
  wordcount_df.py \
  gs://<バケット>/<入力テキスト> \
  gs://<バケット>/<出力> \
  100

gcloudコマンドを使う場合。

gcloud dataproc jobs submit pyspark \
  wordcount_df.py \
  --cluster=wc \
  --properties=spark.sql.shuffle.partitions=2 \
  -- \
  gs://<バケット>/<入力テキスト> \
  gs://<バケット>/<出力> \
  100

所感

個人的には、ホスト言語の感覚でデータ処理が書けるRDDの方が好みである。

DataFrameを使うのであれば、できる限りSQLを使うのが良いと思う。クエリDSLを使うには、SQLの知識に加えて、クエリDSLと、式オブジェクトのDSLの知識も必要となる一方で、ホスト言語の型チェックによる型安全性が得られるわけでもない。利点が見つからない。