centos7
Airflow 1.10.6
Python 3.6.8
Mysql 5.6
redis 3.3
少し(バイドゥによる)
リモート接続を開くことに注意してください(ファイアウォールをオフにしてください)
文字セットは、文字化けを防ぐためにUTF8に一律に変更されます(utf8mb4も使用できます)。
mysqlまたはMariaDBの上位バージョンは、VARCHAR(5000)のエラーを報告します。下位バージョンを提案します。
その理由は、データベースの高バージョンが効率のためにVARCHERの最大長を制限するためです。
postgresqlは後で追加しようとしませんでした
Pythonのわずかなインストール(Baiduによる)
環境変数にpythonを追加してください(便利)
vim ~/.bashrc
# 環境変数exportAIRFLOWの行を追加します_HOME=/opt/airflow
source ~/.bashrc
export SLUGIFY_USES_TEXT_UNIDECODE=yes
# 構成ファイルを生成します。一部のエラーが報告される場合があります。無視してください。AIRFLOWを確認してください。_HOMEディレクトリの下に生成.cfgおよび関連ドキュメントは、この実行の成功を証明します#pythoの環境変数が構成されている場合は、直接実行します#で構成されていません${PYTHON_HOME}/lib/python3.6/sit-packages/airflow/binディレクトリで実行します`./airflow`
pip install apache-airflow
pip install'apache-airflow[mysql]'
pip install'apache-airflow[celery]'
pip install'apache-airflow[redis]'
# sqlalchemyリンク
sql_alchemy_conn = mysql://username:password@localhost:3306/airflow
# アクチュエータを構成する
executor=CeleryExecutor
# セロリブローカーを構成する_url
broker_url = redis://lochost:5379/0
# メタデータ情報管理を構成する
result_backend = db+mysql://username:password@localhost:3306/airflow
# ユーザーグループとユーザーグループを作成し、エアフローを追加します
useradd airflow -g airflow
# 意志{AIRFLOW_HOME}ディレクトリ修復ユーザーグループCD/opt/
chgrp -R airflow airflow
# フォアグラウンドでWebサービスを開始します
airflow webserver
# バックグラウンドでWebサービスを開始する
airflow webserver -D
# フォアグラウンドでスケジューラを開始します
airflow schedule
# バックグラウンドでスケジューラを開始する
airflow scheduler -D
# ワーカーホストは、通常のユーザーでエアフローワーカーを開くだけで済みます#ユーザーairflowuseraddairflowを作成します
# ユーザーテスト用のパスワードpasswdエアフローを設定します
# rootユーザーの下で、airflowフォルダーのアクセス許可を変更し、chmodを完全に開くように設定します-R 777 /opt/airflow
# 通常のユーザーに切り替えて、airflowworkerコマンドを実行します#通常のユーザーが起動時に読んだことがわかりました~/.bashrcファイルに一貫性がなく、AIRFLOWに再参加します_HOMEがやる#新しい通常のユーザーを作成する前に環境変数を構成する場合、この問題は発生しない可能性があります。ユーザーの作成後に環境変数を変更しました。
airflow worker
worker.png
# ワーカーを実行する前に一時変数を実行します(一時は永続的に使用できません)export C_FORCE_ROOT="true"#ユーザーCDを切り替える必要はありません/usr/local/python3/bin/
# フォアグラウンドでワーカーサービスを開始します
airflow worker
# バックグラウンドで作業サービスを開始します
airflow worker -D
default_timezone = Asia/Shanghai
参照は次のとおりです。
cd /usr/local/lib/python3.6/site-packages/airflow
# utcで= pendulum.timezone(‘UTC’)この行(27行目)コードの下の気流から追加.configuration import conf
try:
tz = conf.get("core","default_timezone")if tz =="system":
utc = pendulum.local_timezone()else:
utc = pendulum.timezone(tz)except Exception:pass#utcnowを変更する()関数(69行目)
元のコードd= dt.datetime.utcnow()
dに修正= dt.datetime.now()
# utcで= pendulum.timezone(‘UTC’)この行(37行目)コードの下の気流から追加.configuration import conf
try:
tz = conf.get("core","default_timezone")if tz =="system":
utc = pendulum.local_timezone()else:
utc = pendulum.timezone(tz)except Exception:pass
コードを入力varUTCseconds=(x.getTime()+ x.getTimezoneOffset()*60*1000);
varUTCsecondsに変更します= x.getTime();
コードを入れて"timeFormat":"H:i:s %UTC%",
に"timeFormat":"H:i:s",
[aiflow公式文書](https://airflow.apache.org/docs/stable/howto/email-config.html)を参照してください
email_backend = airflow.utils.email.send_email_smtp
設定するメールボックスサーバーアドレスのsmtpで、メールボックス設定を確認します(ここでは163
smtp_host = smtp.163.com
メールボックス通信プロトコル
smtp_starttls = False
smtp_ssl = True
メールアドレス
smtp_user = [email protected]
メール設定またはBaiduでメール認証コードを確認してください
smtp_password = 16ビットの認証コード
メールボックスサービスポート
smtp_port =ポート
あなたのメールアドレスsmtp_mail_from = [email protected]
dagのdefault_argsにパラメーターを追加します
default_args ={
# メールボックスを受け入れる
' email':['[email protected]''],
# タスクが失敗したときにメールを送信するかどうか
' email_on_failure': True,
# タスクがメールの送信を再試行するかどうか
' email_on_retry': False,}
——————————————————————————————————————————————
気流のグローバル変数に設定
DAGにパラメータを追加して、dag全体を制御します
dag =DAG(f"dag_name",
default_args=default_args,
schedule_interval="0 12 * * *",
max_active_runs =1)
各タスクのオペレーターでパラメーターを設定します
t3 =PythonOperator(
task_id='demo_task',
provide_context=True,
python_callable=demo_task,
task_concurrency=1,
dag=dag)
Recommended Posts