スポンサーリンク

SparkでJsonをCSVへ変換する方法【実機検証あり】

★悩み★
・Sparkで複雑なJson形式を扱えるか分からない。
・SparkでJson形式のファイルを読み込む方法が分からない。
・SparkでJsonをCSVへ変換する方法が分からない。

こういった「悩み」に答えます。

★本記事の内容★
(1) SparkでネストされたJsonを扱えます!
(2) SparkでJsonを読み込むread.jsonをご紹介【実機検証あり】
(3) SparkでJsonをCSVへ変換する方法をご紹介【実機検証あり】

この記事を書いている筆者は、4年間、データエンジニアとして従事しています。

データエンジニアとして、Spark関連の運用保守であったり、Spark関連の技術を用いたデータ分析基盤の提案や導入をしたりしてきました。

また、趣味としてSparkを活用したSNS分析も行なっています。
分析内容に興味がある方は、下記の記事を参照してみてください。

Sparkを普段から使い続けてはや4年な筆者が、「SparkでJsonをCSVへ変換する方法」に関して分かりやすく解説していきます。

SparkでネストされたJsonを扱えます!

データ分析などをする上で、以下のようなネストされたJsonを利用することが多々あります。

{"col1":{"col2":"val2","col3":["arr1","arr2"]}, "col4": 1.2, "col5": {"col6": 1, "col7":2}}

結論から言いますと、Sparkでも上記のようなネストされたJsonを扱うことができます

もちろん、下記のようなネストされていないJsonもSparkで簡単に扱うことができます。

{"col1":1,"col4": 1.2}

以降で、具体的にどのようにしてJsonをSparkで読み込むのかに関してご紹介します。

SparkでJsonを読み込むread.json【実機検証あり】

それでは、SparkでJsonファイルを読み込む方法に関してご紹介します。

前提条件

以降で紹介するプログラムでは、「Python」を利用します。

Pythonをインストールされていない方は、以下の記事をご覧ください。Pythonを簡単にインストールできます。

PythonからSparkを利用することができる「PySpark」というものを使っていきます。

実機検証

PySparkでJsonを読み込むプログラムは、以下のようになります。
以下のプログラムをjson_test.pyとして保存します。

from datetime import datetime
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
import sys
import json

## 1. sparkの設定を作成
spark = SparkSession \
    .builder \
    .appName("test application") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark_context = spark.sparkContext

## 2. HDFSに格納されたJsonの読み込み
test_df = spark.read.json("spark-test/test.json")

## 3. 構造を表示
test_df.printSchema()

## 4. データを表示
test_df.show()
test_df.select("col1.col2").show()
test_df.select("col5").show()

次に、HDFS上にJsonファイルを格納します。

$ hadoop fs -mkdir spark-test
$ cat test.json
{"col1":{"col2":"val2","col3":["arr1","arr2"]}, "col4": 1.2, "col5": {"col6": 1, "col7":2}}
$ hadoop fs -put test.json spark-test/test.json
$ hadoop fs -cat spark-test/test.json
{"col1":{"col2":"val2","col3":["arr1","arr2"]}, "col4": 1.2, "col5": {"col6": 1, "col7":2}}

その後、json_test.pyを実行します。

$ export SPARK_CMD="/opt/oss/spark-2.4.5-bin-without-hadoop/bin/spark-submit"
$ ${SPARK_CMD} --master yarn --conf "spark.executor.memoryOverhead=600" \
             --num-executors 2 \
             --executor-cores 1 \
             --executor-memory 3G \
             json_test.py

json_test.pyを実行すると以下のように「Jsonの構造情報」と「Jsonファイルに格納されたデータ」が表示されます。

root
 |-- col1: struct (nullable = true)
 |    |-- col2: string (nullable = true)
 |    |-- col3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- col4: double (nullable = true)
 |-- col5: struct (nullable = true)
 |    |-- col6: long (nullable = true)
 |    |-- col7: long (nullable = true)

+----+
|col2|
+----+
|val2|
+----+

+------+
|  col5|
+------+
|[1, 2]|
+------+

上記のように複雑なネストで構成されたJsonファイルもSparkでは簡単に扱うことができます。

注目するべき点は、Jsonファイルを読み込むだけで、データの型まで自動で判断してくれるという点です。

Sparkというソフトウェアは、なかなか賢くできています。

SparkでJsonをCSVへ変換する方法【実機検証あり】

Jsonファイルから必要な情報を抽出し、CSV形式にしたいという要望もデータ分析では多々あります。

結論から言いいますと、SparkではJsonをCSVへ簡単に変換することができます。

それでは、実機検証で確認したことをご紹介します。

実機検証

PySparkでJsonをCSVへ変換するプログラムは、以下のようになります。
以下のプログラムをjson2csv.pyとして保存します。

from datetime import datetime
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
import sys
import json

## 1. sparkの設定を作成
spark = SparkSession \
    .builder \
    .appName("Python Spark CALC FAMOUS WORDS OF SPOTS") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark_context = spark.sparkContext

## 2. HDFSに格納されたJsonの読み込み
test_df = spark.read.json("spart-test/test.json")

## 3. データを表示
test_df.select("col4","col5.col6").write.option("header","true").csv("test.csv")

json2csv.pyを以下のようにして実行します。

$ export SPARK_CMD="/opt/oss/spark-2.4.5-bin-without-hadoop/bin/spark-submit"
$ ${SPARK_CMD} --master yarn --conf "spark.executor.memoryOverhead=600" \
             --num-executors 2 \
             --executor-cores 1 \
             --executor-memory 3G \
             json_test.py

その後、HDFS上に生成された以下のファイルを確認します。

$ hadoop fs -ls test.csv
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2020-09-12 23:48 test.csv/_SUCCESS
-rw-r--r--   1 hadoop supergroup         16 2020-09-12 23:48 test.csv/part-00000-fda68adb-b7ab-4fd9-b83b-1bec90fad67b-c000.csv
$ hadoop fs -cat test.csv/part-00000-fda68adb-b7ab-4fd9-b83b-1bec90fad67b-c000.csv
col4,col6
1.2,1
$

以上のように簡単にSparkでJsonからCSV形式へ変更することができました。

【まとめ】SparkでJsonをCSVへ変換する方法

今回の記事を通して、pySparkを利用したJsonをCSVへ変換する方法をご紹介することで、以下の悩みを解消しました。

★悩み★
・Sparkで複雑なJson形式を扱えるか分からない。
・SparkでJson形式のファイルを読み込む方法が分からない。
・SparkでJsonをCSVへ変換する方法が分からない。

SparkでJsonをCSV変換することができないか?と悩んでいるあなた」にこの記事が少しでも役に立てれば幸いです。

コメント

タイトルとURLをコピーしました