PySparkのDataframeを3手順で結合(join)する【ソースコードあり】

★悩み★
・PySparkのDataframeで結合(join)はどうすればいいのだろうか。
・結合(join)したPySparkのDataframeから特定列の抽出手順を知りたい。


こういった「悩み」に答えます。

★本記事の内容★
① PySparkのDataframeを3手順で結合(join)する手順をご紹介
② 結合(join)したPySparkのDataframeから特定列を抽出する手順をご紹介


これからご紹介する「PySparkのDataframeを3手順で結合(join)する手順」を実践したことで、筆者は30分以内でPySparkのDataframeを結合できました。

記事の前半では「PySparkのDataframeを結合(join)する手順」をソースコードを交えながら紹介します。
記事の後半では「結合(join)したPySparkのDataframeから特定列を抽出する手順」をソースコードを交えながら紹介します。

この記事を読み終えることで、「PySparkのDataframeで結合(join)する手順」を把握できるだけでなく、「PySparkの基本的な使い方」も把握した状態になります。

Apache Sparkをインストールされていない方は、以下の記事をご覧ください。

PySparkのDataframeを3手順で結合(join)する手順

「PySparkのDataframeを3手順で結合(join)する手順」についてご紹介します。

★PySparkのDataframeを結合(join)する手順の流れ★
手順1:PySparkでデータをDataframeとして読み込み
手順2:PySparkのDataframeを結合(join)
手順3:PySparkでDataframeを結合(join)するプログラムを実行


上記の流れで、PySparkのDataframeを結合(join)することができます。

以降で、「PySparkのDataframeを結合(join)する手順の流れ」の各手順に関して説明します。

今回は、以下で紹介したデータを元に、「PySparkのDataframeを作成し結合(join)する手順」を説明します。


手順1:PySparkでデータをDataframeとして読み込み

「手順1:PySparkでデータをDataframeとして読み込み」に関してご紹介します。

PySparkでは、以下の「spark.read.json」でJsonファイルをDataframeとして読み込めます。

from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
from pyspark.sql.functions import max as max_, col, when
import sys
import json

# 1. 引数を変数に格納
args = sys.argv

## 2. sparkの設定を作成
spark = SparkSession \
    .builder \
    .appName("concat json") \
    .getOrCreate()
spark_context = spark.sparkContext

## 3. jsonファイルからDataframeを作成
users_df   = spark.read.json("rocketchat_data/users.json")
message_df = spark.read.json("rocketchat_data/message.json")


spark.read.jsonの引数として、HDFS上のJsonファイルPathを指定します。


以上で、「手順1:PySparkでデータをDataframeとして読み込み」は完了です。

手順2:PySparkのDataframeを結合(join)

「手順2:PySparkのDataframeを結合(join)」に関してご紹介します。

PySparkでは、以下の「join」メソッドを利用することで2つのDataframeを結合(join)できます。

## 4. 外部結合を実施
joined_df = message_df.join(users_df, message_df.u._id == users_df._id, "inner")
joined_df.show()


joinメソッドの引数は以下となります。
・第1引数:結合(join)したいDataframeを指定
・第2引数:結合(join)条件を指定
・第3引数:結合(join)の種類を指定(18種類)
参照先:Sparkのdataframeに関する公式サイト


以上で、「手順2:PySparkのDataframeを結合(join)」は完了です。

手順3:PySparkでDataframeを結合(join)するプログラムを実行

「手順3:PySparkでDataframeを結合(join)するプログラムを実行」に関してご紹介します。

先ほどのプログラムをtest.pyとし、「spark-submit」コマンドを利用し実行します。

Sparkが導入されている環境で、以下のコマンドを実行しましょう。

$ SPARK_CMD="/opt/oss/spark-2.4.5-bin-without-hadoop/bin/spark-submit"
$ ${SPARK_CMD} --master yarn --conf "spark.executor.memoryOverhead=600" \
             --num-executors 2 \
             --executor-cores 1 \
             --executor-memory 3G \
             test.py


「spark-submit」コマンドの出力結果として、以下のように結合(join)したDataframeのスキーマ情報が表示されます。

★結合したDataframeのスキーマ情報(以降、スキーマ1と表記)★

root
 |-- _id: string (nullable = true)
 |-- _updatedAt: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- attachments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- image_dimensions: struct (nullable = true)
 |    |    |    |-- height: long (nullable = true)
 |    |    |    |-- width: long (nullable = true)
 |    |    |-- image_preview: string (nullable = true)
 |    |    |-- image_size: long (nullable = true)
 |    |    |-- image_type: string (nullable = true)
 |    |    |-- image_url: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- title_link: string (nullable = true)
 |    |    |-- title_link_download: boolean (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- file: struct (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- groupable: boolean (nullable = true)
 |-- mentions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- msg: string (nullable = true)
 |-- reactions: struct (nullable = true)
 |    |-- :grinning:: struct (nullable = true)
 |    |    |-- usernames: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- replies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rid: string (nullable = true)
 |-- t: string (nullable = true)
 |-- tcount: long (nullable = true)
 |-- tlm: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- tmid: string (nullable = true)
 |-- ts: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- tshow: boolean (nullable = true)
 |-- u: struct (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- username: string (nullable = true)
 |-- __rooms: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- _id: string (nullable = true)
 |-- _updatedAt: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- active: boolean (nullable = true)
 |-- avatarETag: string (nullable = true)
 |-- avatarOrigin: string (nullable = true)
 |-- createdAt: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- emails: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- verified: boolean (nullable = true)
 |-- lastLogin: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- name: string (nullable = true)
 |-- roles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- services: struct (nullable = true)
 |    |-- email: struct (nullable = true)
 |    |    |-- verificationTokens: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- address: string (nullable = true)
 |    |    |    |    |-- token: string (nullable = true)
 |    |    |    |    |-- when: struct (nullable = true)
 |    |    |    |    |    |-- $date: string (nullable = true)
 |    |-- email2fa: struct (nullable = true)
 |    |    |-- changedAt: struct (nullable = true)
 |    |    |    |-- $date: string (nullable = true)
 |    |    |-- enabled: boolean (nullable = true)
 |    |-- password: struct (nullable = true)
 |    |    |-- bcrypt: string (nullable = true)
 |    |    |-- reset: struct (nullable = true)
 |    |    |    |-- email: string (nullable = true)
 |    |    |    |-- reason: string (nullable = true)
 |    |    |    |-- token: string (nullable = true)
 |    |    |    |-- when: struct (nullable = true)
 |    |    |    |    |-- $date: string (nullable = true)
 |    |-- resume: struct (nullable = true)
 |    |    |-- loginTokens: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- hashedToken: string (nullable = true)
 |    |    |    |    |-- when: struct (nullable = true)
 |    |    |    |    |    |-- $date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- statusConnection: string (nullable = true)
 |-- statusDefault: string (nullable = true)
 |-- type: string (nullable = true)
 |-- username: string (nullable = true)
 |-- utcOffset: long (nullable = true)


以上で、「手順3:PySparkでDataframeを結合(join)するプログラムを実行」は完了です。

上記の3手順で、PySparkのDataframeで結合(join)できました。


結合(join)したPySparkのDataframeから特定列を抽出する手順

「結合(join)したPySparkのDataframeから特定列を抽出する手順」に関してご紹介します。

★結合(join)したPySparkのDataframeから特定列を抽出する流れ★
手順1:PySparkでネストされたJsonをDataframeとして読み込み
手順2:結合(join)したPySparkのDataframeから特定列を抽出
手順3:結合(join)したPySparkのDataframeから特定列を抽出するプログラムを実行


上記の3手順で、結合(join)したPySparkのDataframeから特定列を抽出できます。

以降で、「結合(join)したPySparkのDataframeから特定列を抽出する流れ」の各手順に関して説明します。

手順1:PySparkでネストされたJsonをDataframeとして読み込み

「手順1:PySparkでネストされたJsonをDataframeとして読み込み」に関してご紹介します。

「spark.read.json」を使うことで、PySparkではネストされたJsonもDataframeとして読み込めます。

今回は、複雑なネスト構造をしている「PySparkのDataframeを3手順で結合(join)する手順」で結合(join)したDataframeを利用します。

以上で、「手順1:PySparkでネストされたJsonをDataframeとして読み込み」は完了です。

手順2:結合(join)したPySparkのDataframeから特定列を抽出

「手順2:結合(join)したPySparkのDataframeから特定列を抽出」に関してご紹介します。

PySparkでDataframeから特定列を抽出したい場合、「select/col/getItem」を利用します。

・selectメソッド
 → Dataframeから指定したカラムを抽出します。
・colメソッド
 → 指定したカラムをTypedColumn型のオブジェクトとして扱います。
・getItemメソッド
 → カラムに配列データが格納されている場合、
   getItemメソッドを使うことで指定した要素を抽出できます。


スキーマ1の下記情報を抽出してみましょう。
・username
・msg
・emailsの1番目のaddress

上記情報を抽出するコードは、以下となります。

## 5. ネストされた情報を抽出し表示
joined_df.select("username","msg", col("emails").getItem(0)["address"]).show()


以上で、「手順2:結合(join)したPySparkのDataframeから特定列を抽出」は完了です。

手順3:結合(join)したPySparkのDataframeから特定列を抽出するプログラムを実行

「手順3:結合(join)したPySparkのDataframeから特定列を抽出するプログラムを実行」に関してご紹介します。

test.pyに手順2で紹介した「ネストされた情報を抽出し表示」のコードを追記し、「spark-submit」コマンドを実行します。

Sparkが導入されている環境で以下のコマンドを実行しましょう。

$ SPARK_CMD="/opt/oss/spark-2.4.5-bin-without-hadoop/bin/spark-submit"
$ ${SPARK_CMD} --master yarn --conf "spark.executor.memoryOverhead=600" \
             --num-executors 2 \
             --executor-cores 1 \
             --executor-memory 3G \
             test.py


「spark-submit」コマンドの実行結果として、指定した情報のみ出力できます。

+--------+----------------+--------------------+
|username|             msg|   emails[0].address|
+--------+----------------+--------------------+
| xxxxxxx|         yyyyyyy|xxxxxxxxxxxxxxxx@...|
| xxxxxxx|                |xxxxxxxxxxxxxxxx@...|
| xxxxxxx|わかりましたか?|xxxxxxxxxxxxxxxx@...|
| xxxxxxx|               2|xxxxxxxxxxxxxxxx@...|
| xxxxxxx|               1|xxxxxxxxxxxxxxxx@...|
| xxxxxxx|               3|xxxxxxxxxxxxxxxx@...|
| xxxxxxx|            YES!|xxxxxxxxxxxxxxxx@...|
| xxxxxxx|            test|xxxxxxxxxxxxxxxx@...|
| xxxxxxx|          da!!!!|xxxxxxxxxxxxxxxx@...|
+--------+----------------+--------------------+


以上で、「手順3:結合(join)したPySparkのDataframeから特定列を抽出するプログラムを実行」は完了です。

上記の3手順で、「結合(join)したPySparkのDataframeから特定列を抽出する手順」ができました。


【まとめ】PySparkのDataframeを3手順で結合(join)する

今回の記事を通して、PySparkのDataframeを3手順で結合(join)するをご紹介することで、以下の悩みを解消しました。

★悩み★
・PySparkのDataframeで結合(join)はどうすればいいのだろうか。
・結合(join)したPySparkのDataframeから特定列の抽出手順を知りたい。


「PySparkでDataframeを結合(join)する方法は?」と悩んでいるあなたにこの記事が少しでも役に立てれば幸いです。

コメント

タイトルとURLをコピーしました