SparkでPostgreSQLに接続する手順【Python/PySpark】

SparkでPostgreSQLに接続する手順【Python/PySpark】

 

★悩み★
・Apache SparkでPostgreSQLに接続ってできるのかな。
・Python on Spark(PySpark)でPostgreSQLのデータを操作できるのかな?
・PySparkのアプリ(Python on Spark)でPostgreSQLに接続する手順は?



こういった「悩み」に答えます。
 

★本記事の内容★
① Spark(PySpark)でPostgreSQLに接続するための準備をご紹介
② Spark(PySpark)でPostgreSQLに接続する手順をご紹介



これからご紹介する「SparkでPostgreSQLに接続する手順【Python/PySpark】」を実践したことで、2時間以内で「PySparkアプリ(Pythonで実装したSparkのアプリケーション)でPostgreSQLのデータを集計」できました。

記事の前半では「Apache SparkでPostgreSQLに接続するための準備」を解説しつつ、記事の後半では「Apache SparkでPostgreSQLに接続する手順」を紹介します。

この記事を読み終えることで、「Apache SparkからPostgreSQLに接続できる」状態になります。


 

【PR】この記事には広告を含む場合があります

Spark(PySpark)でPostgreSQLに接続するための準備

Spark(PySpark)でPostgreSQLに接続するための準備


「Spark(PySpark)でPostgreSQLに接続するための準備」に関してご紹介します。

★Spark(PySpark)でPostgreSQLに接続するための準備★
手順1:PostgreSQLのインストール
手順2:Apache Sparkの実行環境を構築
手順3:PostgreSQLに接続するためのJDBC Driverファイルをダウンロード



上記の流れで、「Spark(PySpark)でPostgreSQLに接続するための準備」ができます。
 

上記の各手順は、以下の日時と環境で動作確認済みです。
動作確認済み日時:2021年7月20日
動作確認済み環境:CentOS Linux release 7.7.1908 (Core)



以降で、上記「Spark(PySpark)でPostgreSQLに接続するための準備」の各手順に関してご説明します。
 

手順1:PostgreSQLのインストール

「手順1:PostgreSQLのインストール」に関してご説明します。
 

PySparkアプリ(Pythonで実装したSparkのアプリケーション)でPostgreSQLに接続するためにも、PostgreSQLをインストールしましょう。



「PostgreSQLのインストール手順」に関しては、以下の記事「5ステップでPostgreSQL13をインストールする方法」をご覧ください。
 



また、Kubernetes上にPostgreSQLをインストールしたい場合、以下の記事をご覧ください。


以上で、「手順1:PostgreSQLのインストール」は完了です。
 

手順2:Apache Sparkの実行環境を構築

「手順2:Apache Sparkの実行環境を構築」に関してご説明します。
 

PySparkアプリ(Pythonで実装したSparkのアプリケーション)でPostgreSQLに接続するためにも、Apache Sparkをインストールします。



「Apache Sparkの実行環境を作成する手順」に関しては、以下の記事をご覧ください。
 



また、Kubernetes上にApache Sparkをインストールしたい場合、以下の記事をご覧ください。
 


以上で、「手順2:Apache Sparkの実行環境を構築」は完了です。
 

手順3:PostgreSQLに接続するためのJDBC Driverファイルをダウンロード

「手順3:PostgreSQLに接続するためのJDBC Driverファイルをダウンロード」に関してご説明します。
 

PySparkアプリ(Pythonで実装したSparkアプリ)からPostgreSQLに接続するためには、JDBC Driverというファイルが必要です。



PySparkアプリからPostgreSQLに接続するために、JDBC Driverを下記サイト(下図の赤枠)からダウンロードして下さい。
PostgreSQL用のJDBC Driverダウンロードサイト(公式サイト)
 

手順3:PostgreSQLに接続するためのJDBC Driverファイルをダウンロード



PySparkアプリをHadoop上で実行したい場合、ダウンロードしたJDBC Driverファイルをscpコマンドなどで各サーバに転送して下さい。


以上で、「手順3:PostgreSQLに接続するためのJDBCファイルをダウンロード」は完了です。
 

上記の流れで、「Apache Spark(PySpark)でPostgreSQLに接続するための準備」が完了しました。


 

Spark(PySpark)でPostgreSQLに接続する手順

Spark(PySpark)でPostgreSQLに接続する手順


「Spark(PySpark)でPostgreSQLに接続する手順」に関してご紹介します。

★Spark(PySpark)でPostgreSQLに接続するまでの流れ★
手順1:PostgreSQLにSpark(PySpark)で集計したいデータを挿入
手順2:PostgreSQLに接続するPySpark(Pythonで作った)アプリを実装
手順3:PostgreSQLに接続するPySparkアプリを実行
手順4:PostgreSQLに接続するPySparkアプリの実行結果を確認



上記の流れで、「Pythonで作ったSparkアプリ(PySparkアプリ)でPostgreSQLに接続」できます。
 

上記の各手順は、以下の日時と環境で動作確認済みです。
動作確認済み日時:2021年7月20日
動作確認済み環境:CentOS Linux release 7.7.1908 (Core)



以降で、上記「Spark(PySpark)でPostgreSQLに接続するまでの流れ」の各手順に関してご説明します。
 

手順1:PostgreSQLにSpark(PySpark)で集計したいデータを挿入

「手順1:PostgreSQLにSpark(PySpark)で集計したいデータを挿入」に関してご説明します。
 

Spark(PySpark)アプリでPostgreSQLのデータを集計するためにも、PostgreSQLにデータを挿入しましょう。



「PostgreSQLへのデータ挿入手順」に関しては、以下の記事「PostgreSQL 13でデータ登録と参照をする方法」をご覧ください。
 



また、PostgreSQLのバックアップファイルを持っている場合、PostgreSQLのリストアでもPostgreSQLにデータを格納できます。リストア手順に関しては、、以下の記事をご覧ください。
 


以上で、「手順1:PostgreSQLにSpark(PySpark)で集計したいデータを挿入」は完了です。
 

手順2:PostgreSQLに接続するPySpark(Pythonで作った)アプリを実装

「手順2:PostgreSQLに接続するPySpark(Pythonで作った)アプリを実装」に関してご説明します。
 

Pythonで「PostgreSQLに接続しデータを集計するSparkアプリ」を実装しましょう。



今回は、「下記テーブルに格納されたデータの総数を集計するPySparkアプリ(Pythonで実装したSparkアプリ)」を実装しましょう。PostgreSQLのテーブル構造とプログラムは、以下となります。

【集計対象となるPostgreSQLのテーブル構造】
testdb=# \d meibo
                      テーブル"public.meibo"
        列         |  型  | 照合順序 | Null 値を許容 | デフォルト 
-------------------+------+----------+---------------+------------
 person_id          | text |          | not null      | 
 name             | text |          | not null      | 

testdb=# 


【PostgreSQLのテーブルに格納されたデータ総数を集計するPySparkアプリのプログラム】
#!/usr/bin/env python
# coding:utf-8
## 指定したPostgreSQLのテーブルに格納されたデータ総数を集計するsparkアプリケーション
from datetime import datetime
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
import sys

# 1. 引数を変数に格納
args = sys.argv
db_ip = args[1]   # 引数からPostgreSQLが稼働しているIPアドレスを設定
db_user = args[2] # 引数からPostgreSQLのユーザ名を設定
db_pass = args[3] # 引数からPostgreSQLのパスワードを設定
db_name = args[4] # 引数からPostgreSQLのデータベース名を設定
db_port = args[5] # 引数からPostgreSQLが稼働しているポート番号を設定

## 2. sparkの設定を作成
spark = SparkSession \
    .builder \
    .appName("Python Spark CALC DATA NUM") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## 3. dataframeを作成
query = "SELECT * FROM meibo"
meibo_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://"+db_ip+":"+db_port+"/"+db_name) \
    .option("user", db_user) \
    .option("password", db_pass) \
    .option("query", query) \
    .load()

## 4. meiboテーブルに格納された総データ数を算出し、出力
total_data_count = meibo_df.count()
print("Total Data Count:" + str(total_data_count))



上記のPySparkのプログラムを「pyspark-data-count.py」というファイル名で保存しましょう。

ちなみに、Kubernetes上で実装したPySparkアプリを動かしたい場合、コンテナ化が必要です。コンテナ化の手順に関しては、以下の記事「手順4:実装したPySparkアプリを含むSparkのコンテナイメージを作成」をご覧ください。
 

 

コンテナ化する場合、JDBC Driverのファイルもコンテナ内に含める必要があります。Dockerコンテナを作成する場合、「COPY」を用いてコンテナ内に含めましょう。



以上で、「手順2:PostgreSQLに接続するPySpark(Pythonで作った)アプリを実装」は完了です。
 

手順3:PostgreSQLに接続するPySparkアプリを実行

「手順3:PostgreSQLに接続するPySparkアプリを実行」に関してご説明します。
 

手順2で作成した「PostgreSQLのテーブルに格納されたデータ総数を集計するるPySparkアプリ(Pythonで実装したSparkアプリ)」を実行しましょう。



作成した「PostgreSQLのテーブルに格納されたデータ総数を集計するPySparkアプリ(Pythonで実装したSparkアプリ)」を以下のコマンドで実行しましょう。

【Hadoop上でSparkを動作させる場合】
[root@node1 spark-3.1.2-bin-hadoop3.2]#  bin/spark-submit \
  --master yarn --conf "spark.executor.memoryOverhead=600" \
  --conf "spark.driver.extraClassPath=/var/tmp/postgresql-42.2.6.jar" \
  --conf "spark.executor.extraClassPath=/var/tmp/postgresql-42.2.6.jar"  \
  --num-executors 2 \
  --executor-cores 1 \
  --executor-memory 3G \
  pyspark-data-count.py "10.233.13.18" "postgres" "postgres" "testdb" "5432" 


【Kubernetes上でSparkを動作させる場合】
[root@node1 spark-3.1.2-bin-hadoop3.2]#  bin/spark-submit \
  --master k8s://https://127.0.0.1:6443 \
  --deploy-mode cluster \
  --conf spark.executor.instances=2 \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark1 \
  --conf spark.kubernetes.container.image=herokakedashi/spark-py:spark-p-hadoop \
  --name spark-python-app \
  --conf "spark.driver.extraClassPath=/app/postgresql-42.2.6.jar" \
  --conf "spark.executor.extraClassPath=/app/postgresql-42.2.6.jar" \
  local:///app/pyspark-data-count.py "10.233.13.18" "postgres" "postgres" "testdb" "5432"



以上で、「手順3:PostgreSQLに接続するPySparkアプリを実行」は完了です。
 

手順4:PostgreSQLに接続するPySparkアプリの実行結果を確認

「手順4:PostgreSQLに接続するPySparkアプリの実行結果を確認」に関してご説明します。
 

実行したPySparkアプリ(Pythonで実装したSparkアプリ)の実行結果を確認してみましょう。



「PySparkアプリ(Pythonで実装したSparkアプリ)」の実行結果を確認するために、以下のコマンドを実行してください。

【Hadoop上でSparkを動作させる場合】
bin/spark-submit実行後、標準出力に「Total Data Count:」が出力されていることを確認しましょう。

【Kubernetes上でSparkを動作させる場合】
kubectl logsを実行し、KubernetesのPodログに「Total Data Count:」が出力されていることを確認しましょう。



上記の確認ができた場合、正常に「PostgreSQLに接続しデータを集計するPySparkアプリ(Pythonで実装したSparkアプリ)を実行できた」と判断できます。


以上で、「手順4:PostgreSQLに接続するPySparkアプリの実行結果を確認」は完了です。
 

上記の流れで、「PostgreSQLのテーブルに格納されたデータ総数を集計するPySparkアプリ(Pythonで実装したSparkアプリ)」を実行できました。



ちなみに、「Apache Sparkのアーキテクチャを体系的に学びたい」や「Apache Sparkの特徴を知りたい」方は、以下の参考書がオススメです。


動画で学びたいという方には、以下がオススメです。

Apache Spark入門@udemy

 
 

【まとめ】SparkでPostgreSQLに接続する手順【Python/PySpark】

SparkでPostgreSQLに接続する手順【Python/PySpark】


今回の記事を通して、「SparkでPostgreSQLに接続する手順【Python/PySpark】」をご紹介することで、以下の悩みを解消しました。

★悩み★
・Apache SparkでPostgreSQLに接続ってできるのかな。
・Python on Spark(PySpark)でPostgreSQLのデータを操作できるのかな?
・PySparkのアプリ(Python on Spark)でPostgreSQLに接続する手順は?



「Apache SparkでPostgreSQLに接続する手順は?」や「PySparkでPostgreSQLのデータを集計する手順は?」で悩んでいるあなたにこの記事が少しでも役に立てれば幸いです。

コメント

タイトルとURLをコピーしました