pyetl is an ETL framework developed in pure python. Compared with ETL tools such as sqoop and datax, pyetl can add udf functions to each field, making the data conversion process more flexible. Compared with professional ETL tools pyetl, it is lighter and pure python Code operation, more in line with developer habits
installation
pip3 install pyetl
Usage example
Data synchronization between database tables
from pyetl import Task, DatabaseReader, DatabaseWriter
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")Task(reader, writer).start()
Database table to hive table synchronization
from pyetl import Task, DatabaseReader, HiveWriter2
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =HiveWriter2("hive://localhost:10000/default", table_name="target")Task(reader, writer).start()
Database table synchronization es
from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =ElasticSearchWriter(hosts=["localhost"], index_name="tartget")Task(reader, writer).start()
The original table target table field names are different, you need to add field mapping
Add to
# The original table source contains uuid, full_name field
reader =DatabaseReader("sqlite:///db.sqlite3", table_name="source")
# The target table target contains id and name fields
writer =DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# columns configure the field mapping relationship between the target table and the original table
columns ={"id":"uuid","name":"full_name"}Task(reader, writer, columns=columns).start()
UDF mapping of fields, rule verification, data standardization, data cleaning, etc.
# The udf mapping of the functions configuration field is as follows.
functions={"id": str,"name": lambda x: x.strip()}Task(reader, writer, columns=columns, functions=functions).start()
Inherit the Task class to flexibly extend ETL tasks
import json
from pyetl import Task, DatabaseReader, DatabaseWriter
classNewTask(Task):
reader =DatabaseReader("sqlite:///db.sqlite3", table_name="source")
writer =DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
def get_columns(self):"""Generate field mapping configuration by function, which is more flexible to use"""
# The following example takes the field mapping configuration in the database and returns it to the dictionary type
sql ="select columns from task where name='new_task'"
columns = self.writer.db.read_one(sql)["columns"]return json.loads(columns)
def get_functions(self):"""Generate field udf mapping through function"""
# The following example converts each field type to a string
return{col: str for col in self.columns}
def apply_function(self, record):"""UDF for a whole piece of data in the data stream"""
record["flag"]=int(record["id"])%2return record
def before(self):"""What to do before the task starts,Such as initializing the task table, creating the target table, etc."""
sql ="create table destination_table(id int, name varchar(100))"
self.writer.db.execute(sql)
def after(self):"""The operation to be performed after the task is completed, such as updating the task status, etc."""
sql ="update task set status='done' where name='new_task'"
self.writer.db.execute(sql)NewTask().start()
Currently implemented Reader and Writer lists
Reader | Introduction |
---|---|
DatabaseReader | Support reading of all relational databases |
FileReader | Read structured text data, such as csv file |
ExcelReader | Excel table file reading |
Writer | Introduction |
---|---|
DatabaseWriter | Supports writing to all relational databases |
ElasticSearchWriter | Batch write data to es index |
HiveWriter | Bulk insert hive table |
HiveWriter2 | Load data method to import hive table (recommended) |
FileWriter | Write data to text file |
Project address pyetl
to sum up
So far this article about python ETL tool pyetl is introduced. For more relevant python ETL tool pyetl content, please search for previous articles on ZaLou.Cn or continue to browse related articles below. Hope you will support ZaLou.Cn more in the future!