日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python etl 大猩猩_Airflow教程-使用Airflow实现ETL调度

發布時間:2025/3/20 python 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python etl 大猩猩_Airflow教程-使用Airflow实现ETL调度 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、Airflow是什么

airflow 是一個編排、調度和監控workflow的平臺,由Airbnb開源,現在在Apache Software Foundation 孵化。airflow 將workflow編排為由tasks組成的DAGs(有向無環圖),調度器在一組workers上按照指定的依賴關系執行tasks。同時,airflow 提供了豐富的命令行工具和簡單易用的用戶界面以便用戶查看和操作,并且airflow提供了監控和報警系統。

二、Airflow的核心概念

DAGs:即有向無環圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴關系組織起來,描述的是所有tasks執行的順序。

Operators:airflow內置了很多operators,如BashOperator 執行一個bash 命令,PythonOperator 調用任意的Python 函數,EmailOperator 用于發送郵件,HTTPOperator 用于發送HTTP請求, SqlOperator 用于執行SQL命令...同時,用戶可以自定義Operator,這給用戶提供了極大的便利性。可以理解為用戶需要的一個操作,是Airflow提供的類

Tasks:Task 是 Operator的一個實例

Task Instance:由于Task會被重復調度,每次task的運行就是不同的task instance了。Task instance 有自己的狀態,包括"running", "success", "failed", "skipped", "up for retry"等。

Task Relationships:DAGs中的不同Tasks之間可以有依賴關系

三、使用AirFlow完成天級的任務調度

說了這么多抽象的概念,估計看官還是云里霧里,下面就直接舉個例子來說明吧。

1. 安裝airflow

Airflow可以約等于只支持linux和mac,Windows上極其難裝,筆者放棄了.

安裝也很簡單,以下代碼來自官方文檔,使用了Python的pip管理:

# airflow needs a home, ~/airflow is the default,

# but you can lay foundation somewhere else if you prefer

# (optional)

export AIRFLOW_HOME=~/airflow

# install from pypi using pip

pip install apache-airflow

# initialize the database

airflow initdb

# start the web server, default port is 8080

airflow webserver -p 8080

# start the scheduler

airflow scheduler

# visit localhost:8080 in the browser and enable the example dag in the home page

安裝好了以后訪問localhost:8080即可訪問ui界面

2. 基本配置

需要創建~/airflow/dags目錄,這個目錄是默認的存放DAG的地方,想修改的話可以修改~/airflow/airflow.cfg文件

修改airflow的數據庫

airflow會使用sqlite作為默認的數據庫,此情況下airflow進行調度的任務都只能單個的執行.在調度任務量不大的情況下,可以使用sqlite作為backend.如果想scale out的話,需要修改配置文件,官方推薦使用mysql或者postgresql作為backend數據庫.

3. 使用PostgresOperator執行SQL完成ETL任務

通過搜集信息,了解到PostgresOperator能執行SQL,并且還支持傳參數.能解決大多數ETL任務中的傳參問題.傳參使用的是Python的Jinjia模塊.

創建DAG

首先創建一個test_param_sql.py文件.內容如下:

from datetime import datetime, timedelta

import airflow

from airflow.operators.postgres_operator import PostgresOperator

from airflow.operators.dummy_operator import DummyOperator

from airflow.models import Variable

args = {

'owner': 'airflow',

'depends_on_past': False,

'start_date': datetime(2019, 7, 26), #start_date會決定這個DAG從哪天開始生效

'email': ['airflow@example.com'],

'email_on_failure': False,

'email_on_retry': False,

'retries': 1,

'retry_delay': timedelta(minutes=5),

# 'queue': 'bash_queue',

# 'pool': 'backfill',

# 'priority_weight': 10,

# 'end_date': datetime(2016, 1, 1),

}

# Variable是Airflow提供的用戶自定義變量的功能,在UI界面的Admin -> Variable下可以進行增刪改查,此處筆者定義了sql_path作為存放sql文件的地方

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(

'test_param_sql',

schedule_interval=timedelta(days=1), # schedule_interval是調度的頻率

template_searchpath=tmpl_search_path,

default_args=args,

max_active_runs=1)

test_param_sql = PostgresOperator(

task_id='test_param_sql',

postgres_conn_id='postgres_default',

sql='param_sql.sql',

dag=dag,

params={'period': '201905'},

pool='pricing_pool')

match_finish = DummyOperator(

task_id='match_finish',

dag=dag

)

test_param_sql >> match_finish

準備要執行的Sql文件

創建test_sql.sql文件.

SQL文件會被Jinjia解析,可以使用一些宏來實現時間的替換 例

{{ ds }} 會被轉換為當天的 YYYY-MM-DD 格式的日期

{{ ds_nodash }} 會被轉換為當天的 YYYYMMDD的格式的日期

在本例里則是通過{{params.period}} 取到了 DAG上傳入的參數,

insert into test.param_sql_test

select * from test.dm_input_loan_info_d

where period = {{params.period}};

整體的目錄結構如下

dags/

test_param_sql.py

sql/

test_sql.sql

測試dag是否正確

可以使用 airflow test dag_id task_id date 進行測試,測試會執行Operator,Operator指定的行為會進行調度. 但是不會將執行的行為記錄到Airflow的數據庫里

發布

把文件放到~/airflow/dags目錄下,sql文件不要放在dags目錄下,可以找其他地方(比如同級目錄),配置好上文說到的Variable,能找到即可.筆者的理解是,airflow會掃描dags目錄下的內容,并嘗試解析成dag,如果有不能成功解析的內容,ui界面上會有錯誤提示,導致dag顯示不出來等問題.

其他有用的信息

如何在dag.py里引入其他的本地python模塊

需要把本地的python模塊放到一個zip文件里,例如:

my_dag1.py

my_dag2.py

package1/init.py

package1/functions.py

然后把這個zip文件放到dags目錄下,才能被正確解析

pooling可以控制任務的并行度,如果給DAG指定了一個不存在的pooling,任務會一直處于scheduled的狀態,不繼續進行

總結

以上是生活随笔為你收集整理的python etl 大猩猩_Airflow教程-使用Airflow实现ETL调度的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。