pyetlは、純粋なpythonで開発されたETLフレームワークです。sqoopやdataxなどのETLツールと比較して、pyetlは各フィールドにudf関数を追加できるため、データ変換プロセスがより柔軟になります。プロのETLツールpyetlと比較すると、軽量で純粋なpythonです。開発者の習慣に沿ったコード操作
インストール
pip3 install pyetl
使用例
データベーステーブル間のデータ同期
from pyetl import Task, DatabaseReader, DatabaseWriter
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")Task(reader, writer).start()
データベーステーブルからハイブテーブルへの同期
from pyetl import Task, DatabaseReader, HiveWriter2
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =HiveWriter2("hive://localhost:10000/default", table_name="target")Task(reader, writer).start()
データベーステーブルの同期
from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =ElasticSearchWriter(hosts=["localhost"], index_name="tartget")Task(reader, writer).start()
元のテーブルターゲットテーブルのフィールド名が異なるため、フィールドマッピングを追加する必要があります
追加
# 元のテーブルソースにはuuid、fullが含まれています_名前フィールド
reader =DatabaseReader("sqlite:///db.sqlite3", table_name="source")
# ターゲットテーブルターゲットには、IDフィールドと名前フィールドが含まれます
writer =DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# 列は、ターゲットテーブルと元のテーブルの間のフィールドマッピング関係を構成します
columns ={"id":"uuid","name":"full_name"}Task(reader, writer, columns=columns).start()
フィールドのUDFマッピング、ルール検証、データ標準化、データクリーニングなど。
# 関数構成フィールドのudfマッピングは次のとおりです。
functions={"id": str,"name": lambda x: x.strip()}Task(reader, writer, columns=columns, functions=functions).start()
Taskクラスを継承して、ETLタスクを柔軟に拡張します
import json
from pyetl import Task, DatabaseReader, DatabaseWriter
classNewTask(Task):
reader =DatabaseReader("sqlite:///db.sqlite3", table_name="source")
writer =DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
def get_columns(self):"""機能ごとにフィールドマッピング構成を生成します。これにより、より柔軟に使用できます。"""
# 次の例では、データベースのフィールドマッピング構成を取得し、それを辞書タイプに返します。
sql ="select columns from task where name='new_task'"
columns = self.writer.db.read_one(sql)["columns"]return json.loads(columns)
def get_functions(self):"""関数を介してフィールドudfマッピングを生成します"""
# 次の例では、各フィールドタイプを文字列に変換します
return{col: str for col in self.columns}
def apply_function(self, record):"""データストリーム内のデータ全体のUDF"""
record["flag"]=int(record["id"])%2return record
def before(self):"""タスクを開始する前に行うこと,タスクテーブルの初期化、ターゲットテーブルの作成など。"""
sql ="create table destination_table(id int, name varchar(100))"
self.writer.db.execute(sql)
def after(self):"""タスクステータスの更新など、タスクの完了後に実行される操作。"""
sql ="update task set status='done' where name='new_task'"
self.writer.db.execute(sql)NewTask().start()
現在実装されているリーダーリストとライターリスト
リーダー | はじめに |
---|---|
DatabaseReader | すべてのリレーショナルデータベースの読み取りをサポート |
FileReader | csvファイルなどの構造化テキストデータを読み取る |
ExcelReader | Excelテーブルファイルの読み取り |
ライター | はじめに |
---|---|
DatabaseWriter | すべてのリレーショナルデータベースへの書き込みをサポート |
ElasticSearchWriter | esインデックスへのバッチ書き込みデータ |
HiveWriter | 一括挿入ハイブテーブル |
HiveWriter2 | ハイブテーブルをインポートするためのデータメソッドのロード(推奨) |
FileWriter | テキストファイルにデータを書き込む |
プロジェクトアドレスpyetl
総括する
これまでのところ、python ETLツールpyetlに関するこの記事が紹介されています。より関連性の高いpythonETLツールpyetlコンテンツについては、ZaLou.Cnで以前の記事を検索するか、以下の関連記事を引き続き参照してください。今後、ZaLou.Cnをさらにサポートしていただければ幸いです。