任務調動的工具。有過基礎linux經驗的應該都知道crontab,但他有無法建立複雜任務依賴,簡單調閱日誌等缺點,這時就需要一個完善的ETL工具。本篇筆記簡單記錄我的學習過程,並將成果上傳到github repo
另外目前airflow已經發展到v2,而網路上還能找到不少關於v1的教學,所以在學習的時候要注意版本。官方的文章在此
特點
- 開源
- 友善的UI
- 有豐富的plugin
- 純python
Getting Started
推薦快速啟動可以使用官方docker compose
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
1
|
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.3/docker-compose.yaml'
|
創建必要volume與設定airflow執行者
1
2
|
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
|
init與執行
1
2
|
docker compose up airflow-init
docker compose up
|
DAG
DAG(Directed Acyclic Graph)有向無環圖。在Airflow中,DAG是一個工作流程的定義,描述了一系列的任務(Tasks)和它們之間的依賴關係。每個任務代表一個工作單元,可以是任何可以在Airflow中執行的操作,例如執行Python腳本、執行SQL查詢、調用外部API等。
example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
# 定義預設參數
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def print_hello():
print("Hello from the PythonOperator task")
# 定義DAG
with DAG(
'simple_dag_example',
default_args=default_args,
description='A simple example DAG',
schedule_interval=timedelta(days=1), # 每天執行一次
) as dag:
# 定義兩個任務,v2開始也能使用decorator
start_task = DummyOperator(
task_id='start_task',
dag=dag,
)
python_task = PythonOperator(
task_id='python_task',
python_callable=print_hello,
dag=dag,
)
# 定義任務之間的依賴關係,這樣的範例為先執行start_task後再執行python_task
start_task >> python_task
|
Scheduler
Scheduler會定時去檢查DAGs的資料夾
- 檢查是否有任何DAGs需要DAG Run
- 對DAG Run 下的task建立schedule task instance
所以要建立一個任務最直覺的方式,就是將編寫好的DAG py檔放入DAGs資料夾。可以從airflow的example中複製並修改開始,或直接參考上個section的example
UI操作
如果scheduler已經完成update,則我們就能在ui介面上看到我們新增的DAG。關於UI的操作說明礙於平台不適合放入過多圖片,因此改提供官方的UI說明連結。
大家可以朝著最基本的操作熟悉為目標:
- 檢視DAG運作情況
- 手動觸發DAG運作
- 調閱DAG執行log