★悩み★
・Apache SparkでPostgreSQLに接続ってできるのかな。
・Python on Spark(PySpark)でPostgreSQLのデータを操作できるのかな?
・PySparkのアプリ(Python on Spark)でPostgreSQLに接続する手順は?
こういった「悩み」に答えます。
★本記事の内容★
① Spark(PySpark)でPostgreSQLに接続するための準備をご紹介
② Spark(PySpark)でPostgreSQLに接続する手順をご紹介
これからご紹介する「SparkでPostgreSQLに接続する手順【Python/PySpark】」を実践したことで、2時間以内で「PySparkアプリ(Pythonで実装したSparkのアプリケーション)でPostgreSQLのデータを集計」できました。
記事の前半では「Apache SparkでPostgreSQLに接続するための準備」を解説しつつ、記事の後半では「Apache SparkでPostgreSQLに接続する手順」を紹介します。
この記事を読み終えることで、「Apache SparkからPostgreSQLに接続できる」状態になります。
Spark(PySpark)でPostgreSQLに接続するための準備
「Spark(PySpark)でPostgreSQLに接続するための準備」に関してご紹介します。
★Spark(PySpark)でPostgreSQLに接続するための準備★
手順1:PostgreSQLのインストール
手順2:Apache Sparkの実行環境を構築
手順3:PostgreSQLに接続するためのJDBC Driverファイルをダウンロード
上記の流れで、「Spark(PySpark)でPostgreSQLに接続するための準備」ができます。
上記の各手順は、以下の日時と環境で動作確認済みです。
動作確認済み日時:2021年7月20日
動作確認済み環境:CentOS Linux release 7.7.1908 (Core)
以降で、上記「Spark(PySpark)でPostgreSQLに接続するための準備」の各手順に関してご説明します。
手順1:PostgreSQLのインストール
「手順1:PostgreSQLのインストール」に関してご説明します。
PySparkアプリ(Pythonで実装したSparkのアプリケーション)でPostgreSQLに接続するためにも、PostgreSQLをインストールしましょう。
「PostgreSQLのインストール手順」に関しては、以下の記事「5ステップでPostgreSQL13をインストールする方法」をご覧ください。
また、Kubernetes上にPostgreSQLをインストールしたい場合、以下の記事をご覧ください。
以上で、「手順1:PostgreSQLのインストール」は完了です。
手順2:Apache Sparkの実行環境を構築
「手順2:Apache Sparkの実行環境を構築」に関してご説明します。
PySparkアプリ(Pythonで実装したSparkのアプリケーション)でPostgreSQLに接続するためにも、Apache Sparkをインストールします。
「Apache Sparkの実行環境を作成する手順」に関しては、以下の記事をご覧ください。
また、Kubernetes上にApache Sparkをインストールしたい場合、以下の記事をご覧ください。
以上で、「手順2:Apache Sparkの実行環境を構築」は完了です。
手順3:PostgreSQLに接続するためのJDBC Driverファイルをダウンロード
「手順3:PostgreSQLに接続するためのJDBC Driverファイルをダウンロード」に関してご説明します。
PySparkアプリ(Pythonで実装したSparkアプリ)からPostgreSQLに接続するためには、JDBC Driverというファイルが必要です。
PySparkアプリからPostgreSQLに接続するために、JDBC Driverを下記サイト(下図の赤枠)からダウンロードして下さい。
PostgreSQL用のJDBC Driverダウンロードサイト(公式サイト)
PySparkアプリをHadoop上で実行したい場合、ダウンロードしたJDBC Driverファイルをscpコマンドなどで各サーバに転送して下さい。
以上で、「手順3:PostgreSQLに接続するためのJDBCファイルをダウンロード」は完了です。
上記の流れで、「Apache Spark(PySpark)でPostgreSQLに接続するための準備」が完了しました。
Spark(PySpark)でPostgreSQLに接続する手順
「Spark(PySpark)でPostgreSQLに接続する手順」に関してご紹介します。
★Spark(PySpark)でPostgreSQLに接続するまでの流れ★
手順1:PostgreSQLにSpark(PySpark)で集計したいデータを挿入
手順2:PostgreSQLに接続するPySpark(Pythonで作った)アプリを実装
手順3:PostgreSQLに接続するPySparkアプリを実行
手順4:PostgreSQLに接続するPySparkアプリの実行結果を確認
上記の流れで、「Pythonで作ったSparkアプリ(PySparkアプリ)でPostgreSQLに接続」できます。
上記の各手順は、以下の日時と環境で動作確認済みです。
動作確認済み日時:2021年7月20日
動作確認済み環境:CentOS Linux release 7.7.1908 (Core)
以降で、上記「Spark(PySpark)でPostgreSQLに接続するまでの流れ」の各手順に関してご説明します。
手順1:PostgreSQLにSpark(PySpark)で集計したいデータを挿入
「手順1:PostgreSQLにSpark(PySpark)で集計したいデータを挿入」に関してご説明します。
Spark(PySpark)アプリでPostgreSQLのデータを集計するためにも、PostgreSQLにデータを挿入しましょう。
「PostgreSQLへのデータ挿入手順」に関しては、以下の記事「PostgreSQL 13でデータ登録と参照をする方法」をご覧ください。
また、PostgreSQLのバックアップファイルを持っている場合、PostgreSQLのリストアでもPostgreSQLにデータを格納できます。リストア手順に関しては、、以下の記事をご覧ください。
以上で、「手順1:PostgreSQLにSpark(PySpark)で集計したいデータを挿入」は完了です。
手順2:PostgreSQLに接続するPySpark(Pythonで作った)アプリを実装
「手順2:PostgreSQLに接続するPySpark(Pythonで作った)アプリを実装」に関してご説明します。
Pythonで「PostgreSQLに接続しデータを集計するSparkアプリ」を実装しましょう。
今回は、「下記テーブルに格納されたデータの総数を集計するPySparkアプリ(Pythonで実装したSparkアプリ)」を実装しましょう。PostgreSQLのテーブル構造とプログラムは、以下となります。
【集計対象となるPostgreSQLのテーブル構造】
testdb=# \d meibo
テーブル"public.meibo"
列 | 型 | 照合順序 | Null 値を許容 | デフォルト
-------------------+------+----------+---------------+------------
person_id | text | | not null |
name | text | | not null |
testdb=#
【PostgreSQLのテーブルに格納されたデータ総数を集計するPySparkアプリのプログラム】
#!/usr/bin/env python
# coding:utf-8
## 指定したPostgreSQLのテーブルに格納されたデータ総数を集計するsparkアプリケーション
from datetime import datetime
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
import sys
# 1. 引数を変数に格納
args = sys.argv
db_ip = args[1] # 引数からPostgreSQLが稼働しているIPアドレスを設定
db_user = args[2] # 引数からPostgreSQLのユーザ名を設定
db_pass = args[3] # 引数からPostgreSQLのパスワードを設定
db_name = args[4] # 引数からPostgreSQLのデータベース名を設定
db_port = args[5] # 引数からPostgreSQLが稼働しているポート番号を設定
## 2. sparkの設定を作成
spark = SparkSession \
.builder \
.appName("Python Spark CALC DATA NUM") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
## 3. dataframeを作成
query = "SELECT * FROM meibo"
meibo_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://"+db_ip+":"+db_port+"/"+db_name) \
.option("user", db_user) \
.option("password", db_pass) \
.option("query", query) \
.load()
## 4. meiboテーブルに格納された総データ数を算出し、出力
total_data_count = meibo_df.count()
print("Total Data Count:" + str(total_data_count))
上記のPySparkのプログラムを「pyspark-data-count.py」というファイル名で保存しましょう。
ちなみに、Kubernetes上で実装したPySparkアプリを動かしたい場合、コンテナ化が必要です。コンテナ化の手順に関しては、以下の記事「手順4:実装したPySparkアプリを含むSparkのコンテナイメージを作成」をご覧ください。
コンテナ化する場合、JDBC Driverのファイルもコンテナ内に含める必要があります。Dockerコンテナを作成する場合、「COPY」を用いてコンテナ内に含めましょう。
以上で、「手順2:PostgreSQLに接続するPySpark(Pythonで作った)アプリを実装」は完了です。
手順3:PostgreSQLに接続するPySparkアプリを実行
「手順3:PostgreSQLに接続するPySparkアプリを実行」に関してご説明します。
手順2で作成した「PostgreSQLのテーブルに格納されたデータ総数を集計するるPySparkアプリ(Pythonで実装したSparkアプリ)」を実行しましょう。
作成した「PostgreSQLのテーブルに格納されたデータ総数を集計するPySparkアプリ(Pythonで実装したSparkアプリ)」を以下のコマンドで実行しましょう。
【Hadoop上でSparkを動作させる場合】
[root@node1 spark-3.1.2-bin-hadoop3.2]# bin/spark-submit \
--master yarn --conf "spark.executor.memoryOverhead=600" \
--conf "spark.driver.extraClassPath=/var/tmp/postgresql-42.2.6.jar" \
--conf "spark.executor.extraClassPath=/var/tmp/postgresql-42.2.6.jar" \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 3G \
pyspark-data-count.py "10.233.13.18" "postgres" "postgres" "testdb" "5432"
【Kubernetes上でSparkを動作させる場合】
[root@node1 spark-3.1.2-bin-hadoop3.2]# bin/spark-submit \
--master k8s://https://127.0.0.1:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark1 \
--conf spark.kubernetes.container.image=herokakedashi/spark-py:spark-p-hadoop \
--name spark-python-app \
--conf "spark.driver.extraClassPath=/app/postgresql-42.2.6.jar" \
--conf "spark.executor.extraClassPath=/app/postgresql-42.2.6.jar" \
local:///app/pyspark-data-count.py "10.233.13.18" "postgres" "postgres" "testdb" "5432"
以上で、「手順3:PostgreSQLに接続するPySparkアプリを実行」は完了です。
手順4:PostgreSQLに接続するPySparkアプリの実行結果を確認
「手順4:PostgreSQLに接続するPySparkアプリの実行結果を確認」に関してご説明します。
実行したPySparkアプリ(Pythonで実装したSparkアプリ)の実行結果を確認してみましょう。
「PySparkアプリ(Pythonで実装したSparkアプリ)」の実行結果を確認するために、以下のコマンドを実行してください。
【Hadoop上でSparkを動作させる場合】
bin/spark-submit実行後、標準出力に「Total Data Count:」が出力されていることを確認しましょう。
【Kubernetes上でSparkを動作させる場合】
kubectl logsを実行し、KubernetesのPodログに「Total Data Count:」が出力されていることを確認しましょう。
上記の確認ができた場合、正常に「PostgreSQLに接続しデータを集計するPySparkアプリ(Pythonで実装したSparkアプリ)を実行できた」と判断できます。
以上で、「手順4:PostgreSQLに接続するPySparkアプリの実行結果を確認」は完了です。
上記の流れで、「PostgreSQLのテーブルに格納されたデータ総数を集計するPySparkアプリ(Pythonで実装したSparkアプリ)」を実行できました。
ちなみに、「Apache Sparkのアーキテクチャを体系的に学びたい」や「Apache Sparkの特徴を知りたい」方は、以下の参考書がオススメです。
動画で学びたいという方には、以下がオススメです。
【まとめ】SparkでPostgreSQLに接続する手順【Python/PySpark】
今回の記事を通して、「SparkでPostgreSQLに接続する手順【Python/PySpark】」をご紹介することで、以下の悩みを解消しました。
★悩み★
・Apache SparkでPostgreSQLに接続ってできるのかな。
・Python on Spark(PySpark)でPostgreSQLのデータを操作できるのかな?
・PySparkのアプリ(Python on Spark)でPostgreSQLに接続する手順は?
「Apache SparkでPostgreSQLに接続する手順は?」や「PySparkでPostgreSQLのデータを集計する手順は?」で悩んでいるあなたにこの記事が少しでも役に立てれば幸いです。
コメント