pythonETLツールpyetl

pyetlは、純粋なpythonで開発されたETLフレームワークです。sqoopやdataxなどのETLツールと比較して、pyetlは各フィールドにudf関数を追加できるため、データ変換プロセスがより柔軟になります。プロのETLツールpyetlと比較すると、軽量で純粋なpythonです。開発者の習慣に沿ったコード操作

インストール

pip3 install pyetl

使用例

データベーステーブル間のデータ同期

from pyetl import Task, DatabaseReader, DatabaseWriter
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")Task(reader, writer).start()

データベーステーブルからハイブテーブルへの同期

from pyetl import Task, DatabaseReader, HiveWriter2
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =HiveWriter2("hive://localhost:10000/default", table_name="target")Task(reader, writer).start()

データベーステーブルの同期

from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =ElasticSearchWriter(hosts=["localhost"], index_name="tartget")Task(reader, writer).start()

元のテーブルターゲットテーブルのフィールド名が異なるため、フィールドマッピングを追加する必要があります

追加

# 元のテーブルソースにはuuid、fullが含まれています_名前フィールド
reader =DatabaseReader("sqlite:///db.sqlite3", table_name="source")
# ターゲットテーブルターゲットには、IDフィールドと名前フィールドが含まれます
writer =DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# 列は、ターゲットテーブルと元のテーブルの間のフィールドマッピング関係を構成します
columns ={"id":"uuid","name":"full_name"}Task(reader, writer, columns=columns).start()

フィールドのUDFマッピング、ルール検証、データ標準化、データクリーニングなど。

# 関数構成フィールドのudfマッピングは次のとおりです。
functions={"id": str,"name": lambda x: x.strip()}Task(reader, writer, columns=columns, functions=functions).start()

Taskクラスを継承して、ETLタスクを柔軟に拡張します

import json
from pyetl import Task, DatabaseReader, DatabaseWriter

classNewTask(Task):
 reader =DatabaseReader("sqlite:///db.sqlite3", table_name="source")
 writer =DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
  
 def get_columns(self):"""機能ごとにフィールドマッピング構成を生成します。これにより、より柔軟に使用できます。"""
 # 次の例では、データベースのフィールドマッピング構成を取得し、それを辞書タイプに返します。
 sql ="select columns from task where name='new_task'"
 columns = self.writer.db.read_one(sql)["columns"]return json.loads(columns)
   
 def get_functions(self):"""関数を介してフィールドudfマッピングを生成します"""
 # 次の例では、各フィールドタイプを文字列に変換します
 return{col: str for col in self.columns}
   
 def apply_function(self, record):"""データストリーム内のデータ全体のUDF"""
 record["flag"]=int(record["id"])%2return record

 def before(self):"""タスクを開始する前に行うこと,タスクテーブルの初期化、ターゲットテーブルの作成など。"""
 sql ="create table destination_table(id int, name varchar(100))"
 self.writer.db.execute(sql)
  
 def after(self):"""タスクステータスの更新など、タスクの完了後に実行される操作。"""
 sql ="update task set status='done' where name='new_task'"
 self.writer.db.execute(sql)NewTask().start()

現在実装されているリーダーリストとライターリスト

リーダー はじめに
DatabaseReader すべてのリレーショナルデータベースの読み取りをサポート
FileReader csvファイルなどの構造化テキストデータを読み取る
ExcelReader Excelテーブルファイルの読み取り
ライター はじめに
DatabaseWriter すべてのリレーショナルデータベースへの書き込みをサポート
ElasticSearchWriter esインデックスへのバッチ書き込みデータ
HiveWriter 一括挿入ハイブテーブル
HiveWriter2 ハイブテーブルをインポートするためのデータメソッドのロード(推奨)
FileWriter テキストファイルにデータを書き込む

プロジェクトアドレスpyetl

総括する

これまでのところ、python ETLツールpyetlに関するこの記事が紹介されています。より関連性の高いpythonETLツールpyetlコンテンツについては、ZaLou.Cnで以前の記事を検索するか、以下の関連記事を引き続き参照してください。今後、ZaLou.Cnをさらにサポートしていただければ幸いです。

Recommended Posts

pythonETLツールpyetl
Pythonマルチスレッドポートスキャンツール