★悩み★
・JavaでSparkを利用する手順が分からない。
・SparkのDataSetで結合(join)はどうすればいいのだろうか。
・結合(join)したSparkのDataSetから特定列の抽出手順を知りたい。
こういった「悩み」に答えます。
★本記事の内容★
①【Java】SparkのDataSetを4手順で結合(join)する手順をご紹介
②【Java】結合(join)したSparkのDataSetから特定列を抽出する手順をご紹介
これからご紹介する「SparkのDataSetで結様(join)する手順」を実践したことで、筆者は30分以内でJavaでSparkのDataSetを用いた結合(join)できました。
記事の前半では「JavaでSparkのDataSetを4手順で結合(join)する手順」をソースコードを交えながら紹介します。
記事の後半では「結合(join)したSparkのDataSetから特定列を抽出する手順」をソースコードを交えながら紹介します。
この記事を読み終えることで、「JavaでSparkのDataSetを結合(join)する手順」を把握できるだけでなく、「JavaでSparkを利用する手順」も把握した状態になります。
Apache Sparkをインストールされていない方は、以下の記事をご覧ください。
【Java】SparkのDataSetを4手順で結合(join)する手順を
JavaでSparkのDataSetを用いて、結合(join)する方法についてご紹介します。
★JavaでSparkのDataSetを結合(join)する手順の流れ★
手順1:データをSparkのDataSetとして読み込む【Javaのコードあり】
手順2:SparkのDataSetを結合(join)【Javaのコードあり】
手順3:Javaで実装したSparkのDataSetを結合(join)するプログラムをビルド
手順4:JavaでSparkのDataSetを結合(join)するプログラムを実行
上記の流れで、JavaでSparkのDataSetを結合(join)できます。
以降で、「JavaでSparkのDataSetを結合(join)する手順の流れ」の各手順に関して簡単に説明します。
今回は、以下で紹介したデータを元に説明します。
手順1:データをSparkのDataSetとして読み込む【Javaのコードあり】
「手順1:データをSparkのDataSetとして読み込む【Javaのコードあり】」に関してご説明します。
データをSparkのDataSetとして読み込む方法に関してご紹介します。
JavaのSparkでは、以下の「spark.read().json」でJsonファイルをDataSetとして読み込めます。
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import static org.apache.spark.sql.functions.col;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.Column;
public class ConcatJson {
public static void main(String[] args) {
// sparkの設定を作成
SparkSession spark = SparkSession.builder().appName("Concat Application").getOrCreate();
// jsonファイルのPath
String userFile = "rocketchat_data/users.json";
String messageFile = "rocketchat_data/message.json";
// DataSetとしてファイル読み込み
Dataset<Row> userData = spark.read().json(userFile);
Dataset<Row> messageData = spark.read().json(messageFile);
// sparkのオブジェクトを解放
spark.stop();
}
}
spark.read().jsonの引数として、HDFS上のJsonファイルPathを指定します。
以上で、「手順1:データをSparkのDataSetとして読み込む【Javaのコードあり】」は完了です。
手順2:SparkのDataSetを結合(join)【Javaのコードあり】
「手順2:SparkのDataSetを結合(join)【Javaのコードあり】」に関してご説明します。
JavaのSparkでは、以下の「join」メソッドを利用することで2つのDataSetを結合(join)できます。
// DataSetとしてファイル読み込み
~~~
// 外部結合を実施
Dataset<Row> joinedDf = messageData.join(userData, messageData.col("u._id").equalTo(userData.col("_id")), "inner");
joinedDf.printSchema();
// sparkのオブジェクトを解放
手順1と手順2で紹介したソースコードをマージし、ファイル名をConcatJson.javaとして保存しましょう。
joinに指定する引数は以下となります。
第1引数:結合したいDataSetを指定
第2引数:結合条件を指定
第3引数:結合の種類を指定(11種類)
参照先:Sparkのdatasetに関する公式サイト
以上で、「手順2:SparkのDataSetを結合(join)【Javaのコードあり】」は完了です。
手順3:Javaで実装したSparkのDataSetを結合(join)するプログラムをビルド
「手順3:Javaで実装したSparkのDataSetを結合(join)するプログラムをビルド」に関してご説明します。
Javaで実装したSparkのプログラムは、ビルド/コンパイルが必要です。
まずは、以下のディレクトリ構造になっていることを確認しましょう。
# su - hadoop # Hadoopを操作できるユーザに変更
$ ls -1
ConcatJson.java
$
その後、以下のコマンドを実行しJavaプログラムをビルド/コンパイルしましょう。
$ javac -cp '/opt/oss/spark-2.4.5-bin-without-hadoop/jars/*' ConcatJson.java
$ jar cvf ConcatJson.jar ConcatJson.class
マニフェストが追加されました
ConcatJson.classを追加中です(入=2127)(出=966)(54%収縮されました)
$
ビルド/コンパイルする際にsparkのjarsを指定しましょう。
以上で、「手順3:Javaで実装したSparkのDataSetを結合(join)するプログラムをビルド」は完了です。
手順4:JavaでSparkのDataSetを結合(join)するプログラムを実行
「手順4:JavaでSparkのDataSetを結合(join)するプログラムを実行」に関してご説明します。
Javaで実装したSparkのプログラムを実行する方法をご紹介します。
ビルド/コンパイルしたプログラムを「spark-submit」コマンドを利用し実行します。
Sparkが導入されている環境で以下のコマンドを実行しましょう。
$ hadoop fs -ls rocketchat_data
Found 3 items
-rw-r--r-- 1 hadoop supergroup 3183 2020-09-27 19:25 rocketchat_data/message.json
-rw-r--r-- 1 hadoop supergroup 1979 2020-09-27 19:25 rocketchat_data/room.json
-rw-r--r-- 1 hadoop supergroup 1376 2020-09-27 19:25 rocketchat_data/users.json
$ /opt/oss/spark-2.4.5-bin-without-hadoop/bin/spark-submit --master yarn --conf "spark.executor.memoryOverhead=600" \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 3G \
--class ConcatJson ConcatJson.jar
上記コマンドの出力結果として、以下のように結合したDataSetのスキーマ情報が表示されます。
root
|-- _id: string (nullable = true)
|-- _updatedAt: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- attachments: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- image_dimensions: struct (nullable = true)
| | | |-- height: long (nullable = true)
| | | |-- width: long (nullable = true)
| | |-- image_preview: string (nullable = true)
| | |-- image_size: long (nullable = true)
| | |-- image_type: string (nullable = true)
| | |-- image_url: string (nullable = true)
| | |-- title: string (nullable = true)
| | |-- title_link: string (nullable = true)
| | |-- title_link_download: boolean (nullable = true)
| | |-- type: string (nullable = true)
|-- channels: array (nullable = true)
| |-- element: string (containsNull = true)
|-- file: struct (nullable = true)
| |-- _id: string (nullable = true)
| |-- name: string (nullable = true)
| |-- type: string (nullable = true)
|-- groupable: boolean (nullable = true)
|-- mentions: array (nullable = true)
| |-- element: string (containsNull = true)
|-- msg: string (nullable = true)
|-- reactions: struct (nullable = true)
| |-- :grinning:: struct (nullable = true)
| | |-- usernames: array (nullable = true)
| | | |-- element: string (containsNull = true)
|-- replies: array (nullable = true)
| |-- element: string (containsNull = true)
|-- rid: string (nullable = true)
|-- t: string (nullable = true)
|-- tcount: long (nullable = true)
|-- tlm: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- tmid: string (nullable = true)
|-- ts: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- tshow: boolean (nullable = true)
|-- u: struct (nullable = true)
| |-- _id: string (nullable = true)
| |-- username: string (nullable = true)
|-- __rooms: array (nullable = true)
| |-- element: string (containsNull = true)
|-- _id: string (nullable = true)
|-- _updatedAt: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- active: boolean (nullable = true)
|-- avatarETag: string (nullable = true)
|-- avatarOrigin: string (nullable = true)
|-- createdAt: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- emails: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address: string (nullable = true)
| | |-- verified: boolean (nullable = true)
|-- lastLogin: struct (nullable = true)
| |-- $date: string (nullable = true)
|-- name: string (nullable = true)
|-- roles: array (nullable = true)
| |-- element: string (containsNull = true)
|-- services: struct (nullable = true)
| |-- email: struct (nullable = true)
| | |-- verificationTokens: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- address: string (nullable = true)
| | | | |-- token: string (nullable = true)
| | | | |-- when: struct (nullable = true)
| | | | | |-- $date: string (nullable = true)
| |-- email2fa: struct (nullable = true)
| | |-- changedAt: struct (nullable = true)
| | | |-- $date: string (nullable = true)
| | |-- enabled: boolean (nullable = true)
| |-- password: struct (nullable = true)
| | |-- bcrypt: string (nullable = true)
| | |-- reset: struct (nullable = true)
| | | |-- email: string (nullable = true)
| | | |-- reason: string (nullable = true)
| | | |-- token: string (nullable = true)
| | | |-- when: struct (nullable = true)
| | | | |-- $date: string (nullable = true)
| |-- resume: struct (nullable = true)
| | |-- loginTokens: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- hashedToken: string (nullable = true)
| | | | |-- when: struct (nullable = true)
| | | | | |-- $date: string (nullable = true)
|-- status: string (nullable = true)
|-- statusConnection: string (nullable = true)
|-- statusDefault: string (nullable = true)
|-- type: string (nullable = true)
|-- username: string (nullable = true)
|-- utcOffset: long (nullable = true)
以上で、「手順4:JavaでSparkのDataSetを結合(join)するプログラムを実行」は完了です。
上記の4手順で、JavaでSparkのDataSetを結合(join)できました。
【Java】結合(join)したSparkのDataSetから特定列を抽出する手順
「【Java】結合(join)したSparkのDataSetから特定列を抽出する手順」に関してご説明します。
★結合(join)したSparkのDataSetから特定列を抽出する流れ★
手順1:ネストされたJsonをSparkのDataSetとして読み込む【Javaのコードあり】
手順2:結合(join)したSparkのDataSetから特定列を抽出【Javaのコードあり】
手順3:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムをビルド
手順4:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムを実行
上記の4手順で、結合(join)したPySparkのDataframeから特定列を抽出できます。
以降で、「結合(join)したSparkのDataSetから特定列を抽出する流れ」の各手順に関して説明します。
手順1:ネストされたJsonをSparkのDataSetとして読み込む【Javaのコードあり】
「手順1:ネストされたJsonをSparkのDataSetとして読み込む【Javaのコードあり】」に関してご説明します。
「spark.read().json」を使うことでネストされたJsonもDataSetとして読み込めます。
今回は、複雑なネスト構造をしている「【Java】SparkのDataSetを4手順で結合(join)する手順を」で結合(join)したDataSetを利用します。
以上で、「手順1:ネストされたJsonをSparkのDataSetとして読み込む【Javaのコードあり】」は完了です。
手順2:結合(join)したSparkのDataSetから特定列を抽出【Javaのコードあり】
「手順2:結合(join)したSparkのDataSetから特定列を抽出【Javaのコードあり】」に関してご説明します。
JavaのSparkでDataSetから情報を抽出したい場合は、「select/col/getItem」を利用します。
・selectメソッド
→ DataSetから指定したカラムを抽出します。
・colメソッド
→ 指定したカラムをTypedColumn型のオブジェクトとして扱います。
・getItemメソッド
→ カラムに配列データが格納されている場合、
getItemメソッドを使うことで指定した要素を抽出できます。
前章で表示したスキーマ内の下記情報を抽出してみましょう。
・username
・msg
・emailsの1番目のaddress
上記情報を抽出するコードは、以下となります。
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import static org.apache.spark.sql.functions.col;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.Column;
public class ConcatJson {
public static void main(String[] args) {
String userFile = "rocketchat_data/users.json"; // Should be some file on your system
String messageFile = "rocketchat_data/message.json"; // Should be some file on your system
String roomFile = "rocketchat_data/room.json"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<Row> userData = spark.read().json(userFile);
Dataset<Row> messageData = spark.read().json(messageFile);
Dataset<Row> joinedDf = messageData.join(userData, messageData.col("u._id").equalTo(userData.col("_id")), "inner");
joinedDf.select(col("username"),col("msg"), col("emails").getItem(0).getItem("address")).show();
spark.stop();
}
}
以上で、「手順2:結合(join)したSparkのDataSetから特定列を抽出【Javaのコードあり】」は完了です。
手順3:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムをビルド
「手順3:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムをビルド」に関してご説明します。
「【Java】SparkのDataSetを4手順で結合(join)する手順」の手順3と同じ手順でJavaで実装したSparkのプログラムをビルド/コンパイルしましょう。
以上で、「手順3:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムをビルド」は完了です。
手順4:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムを実行
「手順4:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムを実行」に関してご説明します。
「【Java】SparkのDataSetを4手順で結合(join)する手順」の手順4と同じで手順でJavaのプログラムをSparkで実行しましょう。
「spark-submit」コマンドの実行結果として、指定した情報のみ出力できます。
+--------+----------------+--------------------+
|username| msg| emails[0].address|
+--------+----------------+--------------------+
| suehiro| xxx|xxx@...|
| suehiro| |xxx@...|
| suehiro|わかりましたか?|xxx@...|
| suehiro| 2|xxx@...|
| suehiro| 1|xxx@...|
| suehiro| 3|xxx@...|
| suehiro| YES!|xxx@...|
| suehiro| test|xxx@...|
| suehiro| da!!!!|xxx@...|
+--------+----------------+--------------------+
以上で、「手順4:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムを実行」は完了です。
上記の4手順で、「結合(join)したSparkのDataSetから特定列を抽出する手順」ができました。
【まとめ】SparkのDataSetを結合(join)する手順
今回の記事を通して、JavaでSparkのDataSetを結合(join)する手順をご紹介することで、以下の悩みを解消しました。
★悩み★
・JavaでSparkを利用する手順が分からない。
・SparkのDataSetで結合(join)はどうすればいいのだろうか。
・結合(join)したSparkのDataSetから特定列の抽出手順を知りたい。
「JavaのSparkでDataSetを結合(join)する方法は?」と悩んでいるあなたにこの記事が少しでも役に立てれば幸いです。
コメント