Apache Airflow 2.0.0 已正式發布,Airflow 是一個靈活、可擴展的工作流自動化和調度系統,可編集和管理數百 PB 的數據流。項目可輕鬆編排複雜的計算工作流,通過智能調度、資料庫和依賴關係管理、錯誤處理和日誌記錄,Airflow 可以對從單個伺服器到大規模集群的資源進行自動化管理。Airflow 採用 Python 編寫,具有高擴展性,能夠運行其他語言編寫的任務,並允許與常用的體系結構和項目集成,如 AWS S3、Docker、Kubernetes、MySQL、PostgresSQL 等。
下面簡單介紹 2.0 版本的主要新特性。
更新 UI
Airflow 2.0 對 UI 進行了重大更新,並升級了部分樣式。
引入編寫 dag(有向無環圖)的新方法:TaskFlow API
新的方法對依賴關係的處理更清晰,XCom 也更易於使用。
示例
from airflow.decorators import dag, taskfrom airflow.utils.dates import days_ago@dag(default_args={'owner': 'airflow'}, schedule_interval=None, start_date=days_ago(2))def tutorial_taskflow_api_etl(): @task def extract(): return {"1001": 301.27, "1002": 433.21, "1003": 502.22} @task def transform(order_data_dict: dict) -> dict: total_order_value = 0 for value in order_data_dict.values(): total_order_value += value return {"total_order_value": total_order_value} @task() def load(total_order_value: float): print("Total order value is: %.2f" % total_order_value) order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value"])tutorial_etl_dag = tutorial_taskflow_api_etl()
詳情查看
TaskFlow API Tutorial
TaskFlow API Documentation
提升 Scheduler 性能
2.0 版本提升了 Airflow Scheduler 性能,啟動任務的速度明顯變快。在 Astronomer.io 上對此 Scheduler 進行基準測試的結果顯示,它的速度快到連開發者都懷疑結果出錯了。
Scheduler 現已兼容 HA
支持運行多個 scheduler 實例,這對於彈性使用(如果 scheduler 發生故障)和調度性能都非常有意義。若希望完全使用此功能,需要 Postgres 9.6+ 或 MySQL 8+(MySQL 5 和 MariaDB 可能無法使用多個 scheduler)。
運行多個 scheduler 不需要任何配置或其他設置,只需在其他地方啟動 scheduler (確保它可以訪問 DAG 文件),它將通過資料庫與現有的 scheduler 配合使用。
詳情查看 Scheduler HA 文檔。
簡化 KubernetesExecutor
Airflow 2.0 重新建立了 KubernetesExecutor 架構,為 Airflow 用戶提供更快、更容易理解和更靈活的使用方式。用戶現在可以訪問完整的 Kubernetes API 來創建一個 .yaml pod_template_file,而不是在 airflow.cfg 中指定參數。
此外還用pod_override參數替換了executor_config詞典,此項變化從 KubernetesExecutor 刪除了三千多行代碼,使其運行速度更快,並減少潛在錯誤。
詳情查看
Docs on pod_template_file
Docs on pod_override
完整發布公告 https://airflow.apache.org/blog/airflow-two-point-oh-is-here/。