python ETL tool pyetl

pyetl is an ETL framework developed in pure python. Compared with ETL tools such as sqoop and datax, pyetl can add udf functions to each field, making the data conversion process more flexible. Compared with professional ETL tools pyetl, it is lighter and pure python Code operation, more in line with developer habits

installation

pip3 install pyetl

Usage example

Data synchronization between database tables

from pyetl import Task, DatabaseReader, DatabaseWriter
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")Task(reader, writer).start()

Database table to hive table synchronization

from pyetl import Task, DatabaseReader, HiveWriter2
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =HiveWriter2("hive://localhost:10000/default", table_name="target")Task(reader, writer).start()

Database table synchronization es

from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader =DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer =ElasticSearchWriter(hosts=["localhost"], index_name="tartget")Task(reader, writer).start()

The original table target table field names are different, you need to add field mapping

Add to

# The original table source contains uuid, full_name field
reader =DatabaseReader("sqlite:///db.sqlite3", table_name="source")
# The target table target contains id and name fields
writer =DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# columns configure the field mapping relationship between the target table and the original table
columns ={"id":"uuid","name":"full_name"}Task(reader, writer, columns=columns).start()

UDF mapping of fields, rule verification, data standardization, data cleaning, etc.

# The udf mapping of the functions configuration field is as follows.
functions={"id": str,"name": lambda x: x.strip()}Task(reader, writer, columns=columns, functions=functions).start()

Inherit the Task class to flexibly extend ETL tasks

import json
from pyetl import Task, DatabaseReader, DatabaseWriter

classNewTask(Task):
 reader =DatabaseReader("sqlite:///db.sqlite3", table_name="source")
 writer =DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
  
 def get_columns(self):"""Generate field mapping configuration by function, which is more flexible to use"""
 # The following example takes the field mapping configuration in the database and returns it to the dictionary type
 sql ="select columns from task where name='new_task'"
 columns = self.writer.db.read_one(sql)["columns"]return json.loads(columns)
   
 def get_functions(self):"""Generate field udf mapping through function"""
 # The following example converts each field type to a string
 return{col: str for col in self.columns}
   
 def apply_function(self, record):"""UDF for a whole piece of data in the data stream"""
 record["flag"]=int(record["id"])%2return record

 def before(self):"""What to do before the task starts,Such as initializing the task table, creating the target table, etc."""
 sql ="create table destination_table(id int, name varchar(100))"
 self.writer.db.execute(sql)
  
 def after(self):"""The operation to be performed after the task is completed, such as updating the task status, etc."""
 sql ="update task set status='done' where name='new_task'"
 self.writer.db.execute(sql)NewTask().start()

Currently implemented Reader and Writer lists

Reader Introduction
DatabaseReader Support reading of all relational databases
FileReader Read structured text data, such as csv file
ExcelReader Excel table file reading
Writer Introduction
DatabaseWriter Supports writing to all relational databases
ElasticSearchWriter Batch write data to es index
HiveWriter Bulk insert hive table
HiveWriter2 Load data method to import hive table (recommended)
FileWriter Write data to text file

Project address pyetl

to sum up

So far this article about python ETL tool pyetl is introduced. For more relevant python ETL tool pyetl content, please search for previous articles on ZaLou.Cn or continue to browse related articles below. Hope you will support ZaLou.Cn more in the future!

Recommended Posts

python ETL tool pyetl
Python multi-threaded port scanning tool