日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

AirFlow官方入门DAG示例

發(fā)布時(shí)間:2025/3/11 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 AirFlow官方入门DAG示例 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

經(jīng)過前兩篇文章的簡(jiǎn)單介紹之后,我們安裝了自己的AirFlow以及簡(jiǎn)單了解了DAG的定義文件.現(xiàn)在我們要實(shí)現(xiàn)自己的一個(gè)DAG.

1. 啟動(dòng)Web服務(wù)器

使用如下命令啟用:

airflow webserver

現(xiàn)在可以通過將瀏覽器導(dǎo)航到啟動(dòng)Airflow的主機(jī)上的8080端口來(lái)訪問Airflow UI,例如:http://localhost:8080/admin/

備注

Airflow附帶了許多示例DAG。 請(qǐng)注意,在你自己的`dags_folder`中至少有一個(gè)DAG定義文件之前,這些示例可能無(wú)法正常工作。你可以通過更改`airflow.cfg`中的`load_examples`設(shè)置來(lái)隱藏示例DAG。

2. 第一個(gè)AirFlow DAG

現(xiàn)在一切都準(zhǔn)備好了,我們開始寫一些代碼,來(lái)實(shí)現(xiàn)我們的第一個(gè)DAG。 我們將首先創(chuàng)建一個(gè)Hello World工作流程,其中除了向日志發(fā)送"Hello world!"之外什么都不做。

創(chuàng)建你的dags_folder,那就是你的DAG定義文件存儲(chǔ)目錄---$AIRFLOW_HOME/dags。在該目錄中創(chuàng)建一個(gè)名為hello_world.py的文件。

AIRFLOW_HOME ├── airflow.cfg ├── airflow.db ├── airflow-webserver.pid ├── dags │ ├── hello_world.py │ └── hello_world.pyc └── unittests.cfg

將以下代碼添加到dags/hello_world.py中:

# -*- coding: utf-8 -*-import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from datetime import timedelta#------------------------------------------------------------------------------- # these args will get passed on to each operator # you can override them on a per-task basis during operator initializationdefault_args = {'owner': 'jifeng.si','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(2),'email': ['1203745031@qq.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5) }#------------------------------------------------------------------------------- # dagdag = DAG('example_hello_world_dag',default_args=default_args,description='my first DAG',schedule_interval=timedelta(days=1))#------------------------------------------------------------------------------- # first operatordate_operator = BashOperator(task_id='date_task',bash_command='date',dag=dag)#------------------------------------------------------------------------------- # second operatorsleep_operator = BashOperator(task_id='sleep_task',depends_on_past=False,bash_command='sleep 5',dag=dag)#------------------------------------------------------------------------------- # third operatordef print_hello():return 'Hello world!'hello_operator = PythonOperator(task_id='hello_task',python_callable=print_hello,dag=dag)#------------------------------------------------------------------------------- # dependenciessleep_operator.set_upstream(date_operator) hello_operator.set_upstream(date_operator)

該文件創(chuàng)建一個(gè)簡(jiǎn)單的DAG,只有三個(gè)運(yùn)算符,兩個(gè)BaseOperator(一個(gè)打印日期一個(gè)休眠5秒),另一個(gè)為PythonOperator在執(zhí)行任務(wù)時(shí)調(diào)用print_hello函數(shù)。

3. 測(cè)試代碼

使用如下命令測(cè)試一下我們寫的代碼的正確性

python ~/opt/airflow/dags/hello_world.py

如果你的腳本沒有拋出異常,這意味著你代碼中沒有錯(cuò)誤,并且你的Airflow環(huán)境是健全的。

下面測(cè)試一下我們的DAG中的Task.使用如下命令查看我們example_hello_world_dagDAG下有什么Task:

xiaosi@yoona:~$ airflow list_tasks example_hello_world_dag

可以看到我們有三個(gè)Task:

date_task hello_task sleep_task

下面分別測(cè)試一下這幾個(gè)Task:

(1) 測(cè)試date_task

xiaosi@yoona:~$ airflow test example_hello_world_dag date_task 20170803

(2) 測(cè)試hello_task

xiaosi@yoona:~$ airflow test example_hello_world_dag hello_task 20170803

如果沒有問題,我們就可以運(yùn)行我們的DAG了.

4. 運(yùn)行DAG

為了運(yùn)行你的DAG,打開另一個(gè)終端,并通過如下命令來(lái)啟動(dòng)Airflow調(diào)度程序:

airflow scheduler

備注

調(diào)度程序?qū)l(fā)送任務(wù)進(jìn)行執(zhí)行。默認(rèn)Airflow設(shè)置依賴于一個(gè)名為`SequentialExecutor`的執(zhí)行器,它由調(diào)度程序自動(dòng)啟動(dòng)。在生產(chǎn)中,你可以使用更強(qiáng)大的執(zhí)行器,如`CeleryExecutor`。

當(dāng)你在瀏覽器中重新加載Airflow UI時(shí),應(yīng)該會(huì)在Airflow UI中看到你的hello_world?DAG。

為了啟動(dòng)DAG Run,首先打開工作流(off鍵),然后單擊Trigger Dag按鈕(Links 第一個(gè)按鈕),最后單擊Graph View按鈕(Links 第三個(gè)按鈕)以查看運(yùn)行進(jìn)度:

你可以重新加載圖形視圖,直到兩個(gè)任務(wù)達(dá)到狀態(tài)成功。完成后,你可以單擊hello_task,然后單擊View Log查看日志。如果一切都按預(yù)期工作,日志應(yīng)該顯示一些行,其中之一是這樣的:

[2017-08-03 09:46:43,236] {base_task_runner.py:95} INFO - Subtask: [2017-08-03 09:46:43,235] {python_operator.py:81} INFO - Done. Returned value was: Hello world![2017-08-03 09:46:47,378] {jobs.py:2083} INFO - Task exited with return code 0

更多多資訊或疑問內(nèi)容請(qǐng)關(guān)注?微信公眾號(hào) “讓夢(mèng)飛起來(lái)”?或添加小編微信,?后臺(tái)回復(fù) “Python” ,領(lǐng)取更多資料哦

? ?? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ??

總結(jié)

以上是生活随笔為你收集整理的AirFlow官方入门DAG示例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。