日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

airflow sql_alchemy_conn mysql_airflow的安装和使用 - 完全版

發布時間:2023/11/27 生活经验 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 airflow sql_alchemy_conn mysql_airflow的安装和使用 - 完全版 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

之前試用了azkaban一小段時間,雖然上手快速方便,但是功能還是太簡單,不夠靈活。

Airflow使用代碼來管理任務,這樣應該是最靈活的,決定試一下。

我是python零基礎,在使用airflow的過程中可謂吃盡了苦頭。。好歹最后實現所有要求,兩三周的時間沒有白費

看完這篇文章,可以達到如下目標:

安裝airflow

如何修改界面右上角顯示的時間到當前時區

如何添加任務

調試任務python代碼

如何啟動spark任務

如何限定任務同時執行的個數

如何手動觸發任務時傳入參數

如何在airflow界面上重新運行任務

如何查看任務log及所有任務的運行記錄

如何在任務失敗時發郵件(騰訊企業郵箱)

如何在任務失敗時發消息到企業微信

以下過程已經過去了有一段時間,當時記錄的也不一定很全面,如果有的不能執行,請留言告知。

安裝airflow

系統:Ubuntu 16

python: 3.7

airflow版本:1.10.10

保持pip3到最新版本

pip3 install --upgrade pip

安裝使用pip3

切換到root用戶執行: pip3 install apache-airflow

你以為敲完這條命令就可以去把個妹或者撩個漢再回來就裝好了,請坐下。

我碰到的錯誤:

Python.h not found

運行

sudo apt-get install python3.7-dev

某些依賴版本不對:

ERROR: pendulum 1.4.4 has requirement python-dateutil<3.0.0.0,>=2.6.0.0, but you'll have python-dateutil 2.4.2 which is incompatible.

ERROR: pandas 0.25.3 has requirement python-dateutil>=2.6.1, but you'll have python-dateutil 2.4.2 which is incompatible.

運行

pip install python-dateutil --upgrade

哪個包版本不對,更新哪個

數據庫使用mysql

相信你看這個文章的時候應該不會還沒有嘗試裝過airflow,所以airflow.cfg這個文件已經有了,在哪也很清楚

修改airflow.cfg:

sql_alchemy_conn = mysql://airflow:password@jjh1810:3306/airflow

使用root用戶連接到mysql:

create user 'airflow'@'%' identified by '123';

grant all privileges on airflow.* to 'airflow'@'%';

flush privileges;

set explicit_defaults_for_timestamp = 1; --這一行至關重要

再使用airflow用戶登錄mysql:

create database airflow CHARACTER SET = utf8mb4;

初始化數據庫

airflow initdb

這時候會報mysql依賴問題,如:

No module named '_mysql'

這個時候終于可以啟動airflow了:

** 啟的時候不要使用root用戶,回到普通用戶 **

airflow webserver -p 8080

airflow scheduler

如何修改界面右上角顯示的時間到當前時區

相信應該所有人都會干這個事情:

喲?airflow里有個時區的配置,改了應該就好了

default_timezone = Asia/Shanghai

然后去刷一下頁面

還是UTC嘛,這配置騙人的嗎?

那么看這一篇文章吧:

Airflow 修改時區

** 改的時候注意:** python的代碼是根據縮進來區別代碼塊的,所以拷代碼的時候一定要注意縮進沒有問題

如何添加任務

在~/airflow下創建dags文件夾,把.py文件放入即可

airflow啟動了一個叫 DagFileProcessorManager 的進程來掃描dags目錄,一但有文件個數變更,或者內容變更都會很快被檢測到

這個進程有相應的log文件,可以提供一些文件處理錯誤信息

調試任務python代碼

關閉schedule

這個時候已經開始寫任務的python代碼了,對于python小白與剛開始接觸airflow的小哥或老哥來說,簡直就是痛不欲生

有一個配置在調試的時候比較實用,就是關掉任務的schudle,只有手動觸發才會執行。

把dag的schedule_interval設置為None

schedule_interval=None

python小白實用技巧

還有python代碼里單引號和雙引號是等價的,如果有引號嵌套可以分開使用,避免麻煩的轉義,如:

hour = '{{ dag_run.conf["hour"] }}'

Jinja template

反正我第一眼看到這個東西,特別是官方教程里那一大塊的模板文本的時候,心里只有一個字: WTF?!

templated_command = """

{% for i in range(5) %}

echo "{{ ds }}"

echo "{{ macros.ds_add(ds, 7)}}"

echo "{{ params.my_param }}"

{% endfor %}

"""

其實也不是很復雜,這個玩意理解了以后還是比較方便的。除了在代碼中使用普通的python變量或者airflow內置變量之外,很多時候在字符串中也需要使用airflow的內置變量會比較靈活和方便,Jinja template提供的就是這個功能。

如何啟動spark任務

airflow是很強大很靈活的,官方提供了功能豐富的各種插件,各種JobOperator。來,簡單好用童叟無欺的SparkSubmitOperator了解一下?

我的需求很簡單,可以提交任務到不同的spark集群。這樣就要求不能使用機器默認的hadoop和spark環境變量,必須為每個任務指定獨立的配置文件。不知道是不是有大牛一次性成功的,反正我是試了無數次,一句話在心里不停的重復:“這什么吊東西!”

小可愚鈍,google能搜出來的都看過了,怎么都不行,死活都不行,主要是環境變量不對。

調用linux腳本執行spark-submit是最靈活方便的辦法

轉念一想,還是傳統的spark提交方式最好用啊,就是執行sh腳本利用spark-submit進行提交,這樣spark就與airflow無關了,而且不管是環境變量還是參數配置都最靈活,傳入參數也很方便。

這樣只要使用普通的BashOperator就可以了,而且airflow應該專注如何調度任務,而不是還要兼顧任務的配置,就算SparkSubmitOperator可以工作,也是使用sh腳本的方式更好。

如何限定任務同時執行的個數

像spark任務通常使用的資源都會比較多,如果dag執行開始時間到當前時間間隔很長,或是暫停很長時間再開啟,那么一開啟的時候schedule會瞬間創建大量任務,提交到默認的pool,這個pool默認的大小是128。這樣肯定是大家不希望看到的。

一個解決辦法,為每個spark任務創建單獨的pool,大小設置為1,這樣一個spark任務一次就只能有一個在運行狀態,后面都排隊。

界面上操作:[Admin] -> [Pools],slots設為1。

然后在spark task的operator里添加參數:pool='PoolName'

如何手動觸發任務時傳入參數

假設任務是每小時觸發一次,處理24小時前的數據,也就是今天8點處理昨天8點這一個小時的數據。除了schedule自動執行的邏輯,我們還希望可以手動觸發任務,并且指定某個小時重新處理。

** 注: ** 這個功能只有1.10.10才支持,就是在界面上點擊 [Trigger DAG] 的時候可以填入參數(固定為Json格式)。

先來看一下最終的結果

hour='{{ dag_run.conf["hour"] if dag_run.conf["hour"] else (execution_date - macros.timedelta(hours=24)).strftime("%Y%m%d%H") }}'

這里使用了Jinja template,通過dag_run對象可以獲取界面上傳入的參數,如果沒有這個參數(也就是通過schedule觸發),那么執行默認的邏輯(這里是24之前),并且格式化時間與界面輸入保持一致。

如何在airflow界面上重新運行任務

這個功能默認的Executor是不支持的,需要安裝CeleryExecutor,而CeleryExecutor一個存放消息的框架,這里我們選擇rabbitmq。

假定rabbitmq已經裝好。

安裝請看官方文檔:Celery Executor

配置

executor = CeleryExecutor

borker_url = amqp://user:password@host:port

** 注:** 如果rabbitmq是集群模式,這里也是挑一臺出來使用。指定所有節點我還沒有配置成功,如果有會配置的,請留言告知。

如何在界面上重跑任務呢?

界面上點擊dag進入dag管理界面,點擊[Tree View]。

Task每次運行都會用一個小方塊來表示,點擊小方塊,再點擊 [Run] 按鈕就可以了。

** 注:** Tree View 這里最多只顯示固定數量的歷史記錄,如果再早的時間只能通過點擊 [Trigger DAG] 再指定參數運行。

任務運行時間的問題

這里有一個關鍵的問題,在界面上點擊8個小時以前任務執行,那么任務觸發的時候,運行的是8個小時之前的時間,還是當前時間呢?

如果我們是通過之前的hour變量的來指定時間的,那任務運行的時間就是8個小時之前,任務當時觸發的時間。為什么呢?

我們在Jinja template里使用的變量 dag_run, execute_date這個并不是運行時變量,每次task觸發,相關的上下文信息都會存到數據庫里。所以8個小時之后我們再重新運行task的時候,是從數據庫中讀取當時的上下文信息,而不是現在的信息。

如何查看任務log及所有任務的運行記錄

查看所有任務的運行記錄

DAG界面里的 [Graph View] -> 點擊任務 -> [Task Instances]

主菜單里的 [Browser] -> [Task Instances]

查看log

這就比較簡單了

點擊 [Tree View] 里的小方塊,可以查看log

Task Intances 列表最后一列,也可以查看log

如何在任務失敗時發郵件(騰訊企業郵箱)

首先DAG的default_args需要配置

'email':['name@mail.com'],

'email_on_failure': True

修改airflow.cfg

smtp_host = smtp.exmail.qq.com

smtp_starttls = False

smtp_ssl = True

smtp_port = 465

smtp_mail_from = name@mail.com

smtp_user = name@mail.com

smtp_password = password

實話說,這些配置都搞了好久才試出來,這種體驗簡直讓人欲哭無淚。當然,身為一個碼畜哭個什么,到這里已經被python和airflow的種問題折磨很多天了,素質三連走起來。

首先 smtp_ssl = True, smtp_port = 465 是一個重點。再次smtp_mail_from和smtp_user都使用同一個有效的郵箱地址。

如何在任務失敗時發消息到企業微信

有時候覺得發郵件可能還不夠,想把失敗消息發到企業微信,這樣更能及時的發現問題。

添加企業微信依賴

airflow官方支持釘釘的通知,企業微信通知就是根據這個改動而來,代碼有兩個py文件:airflow企業微信支持

把這兩個py文件放到 dags 目錄,也就是和dag的py文件放在一起。

使用方法:

3. 在企業微信群中創建機器人

右鍵點擊群

選擇 [Add Group Robot],并創建

獲取機器人的key:右鍵 [View Information],可以得到一個URL

https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxx-xx-xx

這個key的值就是機器人的ID

在airflow中創建企業微信的連接:[主菜單] -> [Admin] -> [Connections],配置填寫:

Conn Id: wechat_robot

Conn Type: HTTP

Host: https://qyapi.weixin.qq.com

Password: 前面得到的key值,也就是機器人的ID

在代碼中使用

代碼中import WechatOperator

from wechat_operator import WechatOperator

創建 failure call 方法:

def failure_callback(context):

dagConf = context['dag_run'].conf

taskInst = context['task_instance']

hour = None

if 'hour' in dagConf:

hour = dagConf['hour']

else:

hour = (taskInst.execution_date - timedelta(hours=24)).strftime('%Y%m%d%H')

message = 'Airflow任務失敗:\n' \

'DAG: {}\n' \

'Task: {}\n' \

'時間: {}\n' \

.format(taskInst.dag_id,

taskInst.task_id,

hour)

return WechatOperator(

task_id='wechat_task',

wechat_conn_id='wechat_robot',

message_type='text',

message=message,

at_all=True,

).execute(context)

這個代碼應該還是很好懂的,主要是為了創建 WechatOperator 對象。

有個邏輯來重新獲取執行時間(這里必須使用代碼,而不能直接使用Jinja template),為的是在通知里面可以直接看到是哪個時間出錯了。

default_args添加 failure callback配置

'on_failure_callbak': failure_callback

結束語

到這里,總算是搭建好一個可以正式投入生產使用的環境了。

Airflow雖然很靈活,但是想真正滿足生產需求,還是經歷了不少痛苦。特別是要求會使用python,加上airflow官方文檔也不是很詳細,這兩點導致入門曲線太陡峭了。

總結

以上是生活随笔為你收集整理的airflow sql_alchemy_conn mysql_airflow的安装和使用 - 完全版的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。