★悩み★
・PySparkのDataframeで結合(join)はどうすればいいのだろうか。
・結合(join)したPySparkのDataframeから特定列の抽出手順を知りたい。
こういった「悩み」に答えます。
★本記事の内容★
① PySparkのDataframeを3手順で結合(join)する手順をご紹介
② 結合(join)したPySparkのDataframeから特定列を抽出する手順をご紹介
これからご紹介する「PySparkのDataframeを3手順で結合(join)する手順」を実践したことで、筆者は30分以内でPySparkのDataframeを結合できました。
記事の前半では「PySparkのDataframeを結合(join)する手順」をソースコードを交えながら紹介します。
記事の後半では「結合(join)したPySparkのDataframeから特定列を抽出する手順」をソースコードを交えながら紹介します。
この記事を読み終えることで、「PySparkのDataframeで結合(join)する手順」を把握できるだけでなく、「PySparkの基本的な使い方」も把握した状態になります。
Apache Sparkをインストールされていない方は、以下の記事をご覧ください。
PySparkのDataframeを3手順で結合(join)する手順
「PySparkのDataframeを3手順で結合(join)する手順」についてご紹介します。
★PySparkのDataframeを結合(join)する手順の流れ★
手順1:PySparkでデータをDataframeとして読み込み
手順2:PySparkのDataframeを結合(join)
手順3:PySparkでDataframeを結合(join)するプログラムを実行
上記の流れで、PySparkのDataframeを結合(join)することができます。
以降で、「PySparkのDataframeを結合(join)する手順の流れ」の各手順に関して説明します。
今回は、以下で紹介したデータを元に、「PySparkのDataframeを作成し結合(join)する手順」を説明します。
手順1:PySparkでデータをDataframeとして読み込み
「手順1:PySparkでデータをDataframeとして読み込み」に関してご紹介します。
PySparkでは、以下の「spark.read.json」でJsonファイルをDataframeとして読み込めます。
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
from pyspark.sql.functions import max as max_, col, when
import sys
import json
# 1. 引数を変数に格納
args = sys.argv
## 2. sparkの設定を作成
spark = SparkSession \
.builder \
.appName("concat json") \
.getOrCreate()
spark_context = spark.sparkContext
## 3. jsonファイルからDataframeを作成
users_df = spark.read.json("rocketchat_data/users.json")
message_df = spark.read.json("rocketchat_data/message.json")
spark.read.jsonの引数として、HDFS上のJsonファイルPathを指定します。
以上で、「手順1:PySparkでデータをDataframeとして読み込み」は完了です。
手順2:PySparkのDataframeを結合(join)
「手順2:PySparkのDataframeを結合(join)」に関してご紹介します。
PySparkでは、以下の「join」メソッドを利用することで2つのDataframeを結合(join)できます。
## 4. 外部結合を実施
joined_df = message_df.join(users_df, message_df.u._id == users_df._id, "inner")
joined_df.show()
joinメソッドの引数は以下となります。
・第1引数:結合(join)したいDataframeを指定
・第2引数:結合(join)条件を指定
・第3引数:結合(join)の種類を指定(18種類)
参照先:Sparkのdataframeに関する公式サイト
以上で、「手順2:PySparkのDataframeを結合(join)」は完了です。
手順3:PySparkでDataframeを結合(join)するプログラムを実行
「手順3:PySparkでDataframeを結合(join)するプログラムを実行」に関してご紹介します。
先ほどのプログラムをtest.pyとし、「spark-submit」コマンドを利用し実行します。
Sparkが導入されている環境で、以下のコマンドを実行しましょう。
$ SPARK_CMD="/opt/oss/spark-2.4.5-bin-without-hadoop/bin/spark-submit"
$ ${SPARK_CMD} --master yarn --conf "spark.executor.memoryOverhead=600" \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 3G \
test.py
「spark-submit」コマンドの出力結果として、以下のように結合(join)したDataframeのスキーマ情報が表示されます。
★結合したDataframeのスキーマ情報(以降、スキーマ1と表記)★
root
|-- _id: string (nullable = true)
|-- _updatedAt: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- attachments: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- image_dimensions: struct (nullable = true)
| | | |-- height: long (nullable = true)
| | | |-- width: long (nullable = true)
| | |-- image_preview: string (nullable = true)
| | |-- image_size: long (nullable = true)
| | |-- image_type: string (nullable = true)
| | |-- image_url: string (nullable = true)
| | |-- title: string (nullable = true)
| | |-- title_link: string (nullable = true)
| | |-- title_link_download: boolean (nullable = true)
| | |-- type: string (nullable = true)
|-- channels: array (nullable = true)
| |-- element: string (containsNull = true)
|-- file: struct (nullable = true)
| |-- _id: string (nullable = true)
| |-- name: string (nullable = true)
| |-- type: string (nullable = true)
|-- groupable: boolean (nullable = true)
|-- mentions: array (nullable = true)
| |-- element: string (containsNull = true)
|-- msg: string (nullable = true)
|-- reactions: struct (nullable = true)
| |-- :grinning:: struct (nullable = true)
| | |-- usernames: array (nullable = true)
| | | |-- element: string (containsNull = true)
|-- replies: array (nullable = true)
| |-- element: string (containsNull = true)
|-- rid: string (nullable = true)
|-- t: string (nullable = true)
|-- tcount: long (nullable = true)
|-- tlm: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- tmid: string (nullable = true)
|-- ts: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- tshow: boolean (nullable = true)
|-- u: struct (nullable = true)
| |-- _id: string (nullable = true)
| |-- username: string (nullable = true)
|-- __rooms: array (nullable = true)
| |-- element: string (containsNull = true)
|-- _id: string (nullable = true)
|-- _updatedAt: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- active: boolean (nullable = true)
|-- avatarETag: string (nullable = true)
|-- avatarOrigin: string (nullable = true)
|-- createdAt: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- emails: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address: string (nullable = true)
| | |-- verified: boolean (nullable = true)
|-- lastLogin: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- name: string (nullable = true)
|-- roles: array (nullable = true)
| |-- element: string (containsNull = true)
|-- services: struct (nullable = true)
| |-- email: struct (nullable = true)
| | |-- verificationTokens: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- address: string (nullable = true)
| | | | |-- token: string (nullable = true)
| | | | |-- when: struct (nullable = true)
| | | | | |-- $date: string (nullable = true)
| |-- email2fa: struct (nullable = true)
| | |-- changedAt: struct (nullable = true)
| | | |-- $date: string (nullable = true)
| | |-- enabled: boolean (nullable = true)
| |-- password: struct (nullable = true)
| | |-- bcrypt: string (nullable = true)
| | |-- reset: struct (nullable = true)
| | | |-- email: string (nullable = true)
| | | |-- reason: string (nullable = true)
| | | |-- token: string (nullable = true)
| | | |-- when: struct (nullable = true)
| | | | |-- $date: string (nullable = true)
| |-- resume: struct (nullable = true)
| | |-- loginTokens: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- hashedToken: string (nullable = true)
| | | | |-- when: struct (nullable = true)
| | | | | |-- $date: string (nullable = true)
|-- status: string (nullable = true)
|-- statusConnection: string (nullable = true)
|-- statusDefault: string (nullable = true)
|-- type: string (nullable = true)
|-- username: string (nullable = true)
|-- utcOffset: long (nullable = true)
以上で、「手順3:PySparkでDataframeを結合(join)するプログラムを実行」は完了です。
上記の3手順で、PySparkのDataframeで結合(join)できました。
結合(join)したPySparkのDataframeから特定列を抽出する手順
「結合(join)したPySparkのDataframeから特定列を抽出する手順」に関してご紹介します。
★結合(join)したPySparkのDataframeから特定列を抽出する流れ★
手順1:PySparkでネストされたJsonをDataframeとして読み込み
手順2:結合(join)したPySparkのDataframeから特定列を抽出
手順3:結合(join)したPySparkのDataframeから特定列を抽出するプログラムを実行
上記の3手順で、結合(join)したPySparkのDataframeから特定列を抽出できます。
以降で、「結合(join)したPySparkのDataframeから特定列を抽出する流れ」の各手順に関して説明します。
手順1:PySparkでネストされたJsonをDataframeとして読み込み
「手順1:PySparkでネストされたJsonをDataframeとして読み込み」に関してご紹介します。
「spark.read.json」を使うことで、PySparkではネストされたJsonもDataframeとして読み込めます。
今回は、複雑なネスト構造をしている「PySparkのDataframeを3手順で結合(join)する手順」で結合(join)したDataframeを利用します。
以上で、「手順1:PySparkでネストされたJsonをDataframeとして読み込み」は完了です。
手順2:結合(join)したPySparkのDataframeから特定列を抽出
「手順2:結合(join)したPySparkのDataframeから特定列を抽出」に関してご紹介します。
PySparkでDataframeから特定列を抽出したい場合、「select/col/getItem」を利用します。
・selectメソッド
→ Dataframeから指定したカラムを抽出します。
・colメソッド
→ 指定したカラムをTypedColumn型のオブジェクトとして扱います。
・getItemメソッド
→ カラムに配列データが格納されている場合、
getItemメソッドを使うことで指定した要素を抽出できます。
スキーマ1の下記情報を抽出してみましょう。
・username
・msg
・emailsの1番目のaddress
上記情報を抽出するコードは、以下となります。
## 5. ネストされた情報を抽出し表示
joined_df.select("username","msg", col("emails").getItem(0)["address"]).show()
以上で、「手順2:結合(join)したPySparkのDataframeから特定列を抽出」は完了です。
手順3:結合(join)したPySparkのDataframeから特定列を抽出するプログラムを実行
「手順3:結合(join)したPySparkのDataframeから特定列を抽出するプログラムを実行」に関してご紹介します。
test.pyに手順2で紹介した「ネストされた情報を抽出し表示」のコードを追記し、「spark-submit」コマンドを実行します。
Sparkが導入されている環境で以下のコマンドを実行しましょう。
$ SPARK_CMD="/opt/oss/spark-2.4.5-bin-without-hadoop/bin/spark-submit"
$ ${SPARK_CMD} --master yarn --conf "spark.executor.memoryOverhead=600" \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 3G \
test.py
「spark-submit」コマンドの実行結果として、指定した情報のみ出力できます。
+--------+----------------+--------------------+
|username| msg| emails[0].address|
+--------+----------------+--------------------+
| xxxxxxx| yyyyyyy|xxxxxxxxxxxxxxxx@...|
| xxxxxxx| |xxxxxxxxxxxxxxxx@...|
| xxxxxxx|わかりましたか?|xxxxxxxxxxxxxxxx@...|
| xxxxxxx| 2|xxxxxxxxxxxxxxxx@...|
| xxxxxxx| 1|xxxxxxxxxxxxxxxx@...|
| xxxxxxx| 3|xxxxxxxxxxxxxxxx@...|
| xxxxxxx| YES!|xxxxxxxxxxxxxxxx@...|
| xxxxxxx| test|xxxxxxxxxxxxxxxx@...|
| xxxxxxx| da!!!!|xxxxxxxxxxxxxxxx@...|
+--------+----------------+--------------------+
以上で、「手順3:結合(join)したPySparkのDataframeから特定列を抽出するプログラムを実行」は完了です。
上記の3手順で、「結合(join)したPySparkのDataframeから特定列を抽出する手順」ができました。
【まとめ】PySparkのDataframeを3手順で結合(join)する
今回の記事を通して、PySparkのDataframeを3手順で結合(join)するをご紹介することで、以下の悩みを解消しました。
★悩み★
・PySparkのDataframeで結合(join)はどうすればいいのだろうか。
・結合(join)したPySparkのDataframeから特定列の抽出手順を知りたい。
「PySparkでDataframeを結合(join)する方法は?」と悩んでいるあなたにこの記事が少しでも役に立てれば幸いです。
コメント