【Java】SparkのDataSetを結合(join)する手順【ソースコードあり】

★悩み★
・JavaでSparkを利用する手順が分からない。
・SparkのDataSetで結合(join)はどうすればいいのだろうか。
・結合(join)したSparkのDataSetから特定列の抽出手順を知りたい。


こういった「悩み」に答えます。

★本記事の内容★
①【Java】SparkのDataSetを4手順で結合(join)する手順をご紹介
②【Java】結合(join)したSparkのDataSetから特定列を抽出する手順をご紹介


これからご紹介する「SparkのDataSetで結様(join)する手順」を実践したことで、筆者は30分以内でJavaでSparkのDataSetを用いた結合(join)できました。

記事の前半では「JavaでSparkのDataSetを4手順で結合(join)する手順」をソースコードを交えながら紹介します。
記事の後半では「結合(join)したSparkのDataSetから特定列を抽出する手順」をソースコードを交えながら紹介します。

この記事を読み終えることで、「JavaでSparkのDataSetを結合(join)する手順」を把握できるだけでなく、「JavaでSparkを利用する手順」も把握した状態になります。

Apache Sparkをインストールされていない方は、以下の記事をご覧ください。

【Java】SparkのDataSetを4手順で結合(join)する手順を

JavaでSparkのDataSetを用いて、結合(join)する方法についてご紹介します。

★JavaでSparkのDataSetを結合(join)する手順の流れ★
手順1:データをSparkのDataSetとして読み込む【Javaのコードあり】
手順2:SparkのDataSetを結合(join)【Javaのコードあり】
手順3:Javaで実装したSparkのDataSetを結合(join)するプログラムをビルド
手順4:JavaでSparkのDataSetを結合(join)するプログラムを実行


上記の流れで、JavaでSparkのDataSetを結合(join)できます。

以降で、「JavaでSparkのDataSetを結合(join)する手順の流れ」の各手順に関して簡単に説明します。

今回は、以下で紹介したデータを元に説明します。

手順1:データをSparkのDataSetとして読み込む【Javaのコードあり】

「手順1:データをSparkのDataSetとして読み込む【Javaのコードあり】」に関してご説明します。
 

データをSparkのDataSetとして読み込む方法に関してご紹介します。


JavaのSparkでは、以下の「spark.read().json」でJsonファイルをDataSetとして読み込めます。

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import static org.apache.spark.sql.functions.col;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.Column;


public class ConcatJson {
  public static void main(String[] args) {
    // sparkの設定を作成
    SparkSession spark = SparkSession.builder().appName("Concat Application").getOrCreate();
    
    // jsonファイルのPath
    String userFile = "rocketchat_data/users.json";
    String messageFile = "rocketchat_data/message.json";

    // DataSetとしてファイル読み込み
    Dataset<Row> userData = spark.read().json(userFile);
    Dataset<Row> messageData = spark.read().json(messageFile);

    // sparkのオブジェクトを解放
    spark.stop();
  }
}

spark.read().jsonの引数として、HDFS上のJsonファイルPathを指定します。


以上で、「手順1:データをSparkのDataSetとして読み込む【Javaのコードあり】」は完了です。

手順2:SparkのDataSetを結合(join)【Javaのコードあり】

「手順2:SparkのDataSetを結合(join)【Javaのコードあり】」に関してご説明します。

JavaのSparkでは、以下の「join」メソッドを利用することで2つのDataSetを結合(join)できます。

// DataSetとしてファイル読み込み
~~~

// 外部結合を実施
Dataset<Row> joinedDf = messageData.join(userData, messageData.col("u._id").equalTo(userData.col("_id")), "inner");
joinedDf.printSchema();

// sparkのオブジェクトを解放


手順1と手順2で紹介したソースコードをマージし、ファイル名をConcatJson.javaとして保存しましょう。

joinに指定する引数は以下となります。
第1引数:結合したいDataSetを指定
第2引数:結合条件を指定
第3引数:結合の種類を指定(11種類)

参照先:Sparkのdatasetに関する公式サイト


以上で、「手順2:SparkのDataSetを結合(join)【Javaのコードあり】」は完了です。

手順3:Javaで実装したSparkのDataSetを結合(join)するプログラムをビルド

「手順3:Javaで実装したSparkのDataSetを結合(join)するプログラムをビルド」に関してご説明します。
 

Javaで実装したSparkのプログラムは、ビルド/コンパイルが必要です。


まずは、以下のディレクトリ構造になっていることを確認しましょう。

# su - hadoop # Hadoopを操作できるユーザに変更
$ ls -1
ConcatJson.java
$


その後、以下のコマンドを実行しJavaプログラムをビルド/コンパイルしましょう。

$ javac -cp '/opt/oss/spark-2.4.5-bin-without-hadoop/jars/*' ConcatJson.java
$ jar cvf ConcatJson.jar ConcatJson.class 
マニフェストが追加されました
ConcatJson.classを追加中です(入=2127)(出=966)(54%収縮されました)
$

 

ビルド/コンパイルする際にsparkのjarsを指定しましょう。


以上で、「手順3:Javaで実装したSparkのDataSetを結合(join)するプログラムをビルド」は完了です。

手順4:JavaでSparkのDataSetを結合(join)するプログラムを実行

「手順4:JavaでSparkのDataSetを結合(join)するプログラムを実行」に関してご説明します。
 

Javaで実装したSparkのプログラムを実行する方法をご紹介します。


ビルド/コンパイルしたプログラムを「spark-submit」コマンドを利用し実行します。

Sparkが導入されている環境で以下のコマンドを実行しましょう。

$ hadoop fs -ls rocketchat_data
Found 3 items
-rw-r--r--   1 hadoop supergroup       3183 2020-09-27 19:25 rocketchat_data/message.json
-rw-r--r--   1 hadoop supergroup       1979 2020-09-27 19:25 rocketchat_data/room.json
-rw-r--r--   1 hadoop supergroup       1376 2020-09-27 19:25 rocketchat_data/users.json

$ /opt/oss/spark-2.4.5-bin-without-hadoop/bin/spark-submit --master yarn --conf "spark.executor.memoryOverhead=600" \
             --num-executors 2 \
             --executor-cores 1 \
             --executor-memory 3G \
             --class ConcatJson ConcatJson.jar


上記コマンドの出力結果として、以下のように結合したDataSetのスキーマ情報が表示されます。

root
 |-- _id: string (nullable = true)
 |-- _updatedAt: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- attachments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- image_dimensions: struct (nullable = true)
 |    |    |    |-- height: long (nullable = true)
 |    |    |    |-- width: long (nullable = true)
 |    |    |-- image_preview: string (nullable = true)
 |    |    |-- image_size: long (nullable = true)
 |    |    |-- image_type: string (nullable = true)
 |    |    |-- image_url: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- title_link: string (nullable = true)
 |    |    |-- title_link_download: boolean (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- file: struct (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- groupable: boolean (nullable = true)
 |-- mentions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- msg: string (nullable = true)
 |-- reactions: struct (nullable = true)
 |    |-- :grinning:: struct (nullable = true)
 |    |    |-- usernames: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- replies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rid: string (nullable = true)
 |-- t: string (nullable = true)
 |-- tcount: long (nullable = true)
 |-- tlm: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- tmid: string (nullable = true)
 |-- ts: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- tshow: boolean (nullable = true)
 |-- u: struct (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- username: string (nullable = true)
 |-- __rooms: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- _id: string (nullable = true)
 |-- _updatedAt: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- active: boolean (nullable = true)
 |-- avatarETag: string (nullable = true)
 |-- avatarOrigin: string (nullable = true)
 |-- createdAt: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- emails: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- verified: boolean (nullable = true)
 |-- lastLogin: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- name: string (nullable = true)
 |-- roles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- services: struct (nullable = true)
 |    |-- email: struct (nullable = true)
 |    |    |-- verificationTokens: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- address: string (nullable = true)
 |    |    |    |    |-- token: string (nullable = true)
 |    |    |    |    |-- when: struct (nullable = true)
 |    |    |    |    |    |-- $date: string (nullable = true)
 |    |-- email2fa: struct (nullable = true)
 |    |    |-- changedAt: struct (nullable = true)
 |    |    |    |-- $date: string (nullable = true)
 |    |    |-- enabled: boolean (nullable = true)
 |    |-- password: struct (nullable = true)
 |    |    |-- bcrypt: string (nullable = true)
 |    |    |-- reset: struct (nullable = true)
 |    |    |    |-- email: string (nullable = true)
 |    |    |    |-- reason: string (nullable = true)
 |    |    |    |-- token: string (nullable = true)
 |    |    |    |-- when: struct (nullable = true)
 |    |    |    |    |-- $date: string (nullable = true)
 |    |-- resume: struct (nullable = true)
 |    |    |-- loginTokens: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- hashedToken: string (nullable = true)
 |    |    |    |    |-- when: struct (nullable = true)
 |    |    |    |    |    |-- $date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- statusConnection: string (nullable = true)
 |-- statusDefault: string (nullable = true)
 |-- type: string (nullable = true)
 |-- username: string (nullable = true)
 |-- utcOffset: long (nullable = true)


以上で、「手順4:JavaでSparkのDataSetを結合(join)するプログラムを実行」は完了です。

上記の4手順で、JavaでSparkのDataSetを結合(join)できました。

【Java】結合(join)したSparkのDataSetから特定列を抽出する手順

「【Java】結合(join)したSparkのDataSetから特定列を抽出する手順」に関してご説明します。

★結合(join)したSparkのDataSetから特定列を抽出する流れ★
手順1:ネストされたJsonをSparkのDataSetとして読み込む【Javaのコードあり】
手順2:結合(join)したSparkのDataSetから特定列を抽出【Javaのコードあり】
手順3:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムをビルド
手順4:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムを実行


上記の4手順で、結合(join)したPySparkのDataframeから特定列を抽出できます。

以降で、「結合(join)したSparkのDataSetから特定列を抽出する流れ」の各手順に関して説明します。

手順1:ネストされたJsonをSparkのDataSetとして読み込む【Javaのコードあり】

「手順1:ネストされたJsonをSparkのDataSetとして読み込む【Javaのコードあり】」に関してご説明します。

spark.read().json」を使うことでネストされたJsonもDataSetとして読み込めます。

今回は、複雑なネスト構造をしている「【Java】SparkのDataSetを4手順で結合(join)する手順を」で結合(join)したDataSetを利用します。

以上で、「手順1:ネストされたJsonをSparkのDataSetとして読み込む【Javaのコードあり】」は完了です。

手順2:結合(join)したSparkのDataSetから特定列を抽出【Javaのコードあり】

「手順2:結合(join)したSparkのDataSetから特定列を抽出【Javaのコードあり】」に関してご説明します。

JavaのSparkでDataSetから情報を抽出したい場合は、「select/col/getItem」を利用します。

・selectメソッド
 → DataSetから指定したカラムを抽出します。
・colメソッド
 → 指定したカラムをTypedColumn型のオブジェクトとして扱います。
・getItemメソッド
 → カラムに配列データが格納されている場合、
   getItemメソッドを使うことで指定した要素を抽出できます。


前章で表示したスキーマ内の下記情報を抽出してみましょう。
・username
・msg
・emailsの1番目のaddress

上記情報を抽出するコードは、以下となります。

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import static org.apache.spark.sql.functions.col;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.Column;


public class ConcatJson {
  public static void main(String[] args) {
    String userFile = "rocketchat_data/users.json"; // Should be some file on your system
    String messageFile = "rocketchat_data/message.json"; // Should be some file on your system
    String roomFile = "rocketchat_data/room.json"; // Should be some file on your system

    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<Row> userData = spark.read().json(userFile);
    Dataset<Row> messageData = spark.read().json(messageFile);

    Dataset<Row> joinedDf = messageData.join(userData, messageData.col("u._id").equalTo(userData.col("_id")), "inner");
    joinedDf.select(col("username"),col("msg"), col("emails").getItem(0).getItem("address")).show();
    spark.stop();
  }
}


以上で、「手順2:結合(join)したSparkのDataSetから特定列を抽出【Javaのコードあり】」は完了です。

手順3:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムをビルド

「手順3:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムをビルド」に関してご説明します。

「【Java】SparkのDataSetを4手順で結合(join)する手順」の手順3と同じ手順でJavaで実装したSparkのプログラムをビルド/コンパイルしましょう。

以上で、「手順3:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムをビルド」は完了です。

手順4:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムを実行

「手順4:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムを実行」に関してご説明します。

「【Java】SparkのDataSetを4手順で結合(join)する手順」の手順4と同じで手順でJavaのプログラムをSparkで実行しましょう。

「spark-submit」コマンドの実行結果として、指定した情報のみ出力できます。

+--------+----------------+--------------------+
|username|             msg|   emails[0].address|
+--------+----------------+--------------------+
| suehiro|         xxx|xxx@...|
| suehiro|                |xxx@...|
| suehiro|わかりましたか?|xxx@...|
| suehiro|               2|xxx@...|
| suehiro|               1|xxx@...|
| suehiro|               3|xxx@...|
| suehiro|            YES!|xxx@...|
| suehiro|            test|xxx@...|
| suehiro|          da!!!!|xxx@...|
+--------+----------------+--------------------+


以上で、「手順4:結合(join)したSparkのDataSetから特定列を抽出するJavaプログラムを実行」は完了です。

上記の4手順で、「結合(join)したSparkのDataSetから特定列を抽出する手順」ができました。

【まとめ】SparkのDataSetを結合(join)する手順

今回の記事を通して、JavaでSparkのDataSetを結合(join)する手順をご紹介することで、以下の悩みを解消しました。

★悩み★
・JavaでSparkを利用する手順が分からない。
・SparkのDataSetで結合(join)はどうすればいいのだろうか。
・結合(join)したSparkのDataSetから特定列の抽出手順を知りたい。

「JavaのSparkでDataSetを結合(join)する方法は?」と悩んでいるあなたにこの記事が少しでも役に立てれば幸いです。

コメント