日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

【部署类】专题:消息队列MQ、进程守护Supervisor

發布時間:2023/11/27 生活经验 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【部署类】专题:消息队列MQ、进程守护Supervisor 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

1 背景需求

2 技術方案

2.1 消息隊列

2.2 進程守護

3 源碼介紹

3.1 supervisor部分

3.1.1 supervisord.conf 內容

3.1.2?MM3D.conf 和 MM3D_2.conf 內容

3.2?算法程序(也就是我的主函數)


1 背景需求

某C端產品,前端嵌入式(安卓)將采集的數據發送給后端,后端服務器(Java)要負責將數據交到算法服務器(python,C++),算法服務器收到數據并處理完后將結果再返回給后端,后端拿著結果二次加工后再發給前端顯示。

基本要求:

  1. 算法服務器有多臺,要充分利用,要滿足并發。
  2. 異常崩潰、關機等,算法要自啟動。

2 技術方案

  1. 應對并發問題:后端和算法端采用RabbitMQ消息訂閱方案。
  2. 應對異常自啟動問題:采用Supervisor進程守護。

架構圖如下:

2.1 消息隊列

消息隊列(Message queue)原理比較簡單(當然細節很多),主要作用就是把所有生產者的數據放到一個隊列中,所有消費者從從這個隊列里取,確保每個數據僅被消費一次,互相不沖突。

詳細原理可參考:

消息隊列(mq)是什么? - 知乎

什么是消息隊列啊? - 知乎

RabbitMQ 入門系列(9)— Python 的 pika 庫常用函數及參數說明_wohu1104的專欄-CSDN博客

2.2 進程守護

進程守護的目的是讓異常崩潰的程序能自動重啟。

Supervisor是用Python開發的一套通用的進程管理程序,能將一個普通的命令行進程變為后臺daemon,并監控進程狀態,異常退出時能自動重啟。

幾個要點的解釋:

  1. Supervisor為什么能啟動程序?
    1. 答:Supervisor自己本身是某種程序,它能在Linux系統,通過自定義的配置去指定任意個子程序(每個子程序要定義一個唯一名稱),而每個子進程被啟動后會去執行一個shell文件(.sh文件),而你可以在這個shell文件中自定義任何命令行代碼,所以你能以任何方式去啟動任意位置的任意多個程序。
  2. Supervisor為什么能自動啟動崩潰的程序?
    1. 答:由于supervisor的子進程會通過指定的shell腳本去啟動其他“孫”進程(也就是你想啟動的程序),并且子進程能和孫進程通信,所以,當你的程序崩潰時,其所屬的supervisor子進程會重新執行一次shell腳本,把這個崩潰的程序再啟動。這里重啟的規則和配置有很多方式,很靈活。

更多信息,我看了比較好的參考如下(推薦級分先后順序):

??????Supervisor使用詳解 - 浪淘沙& - 博客園

詳解Supervisor進程守護監控 - 請叫我頭頭哥 - 博客園

supervisor 使用詳解_11111-CSDN博客_supervisor

3 源碼介紹

算法服務器部分運行的邏輯是:

  1. 算法服務器開機。
  2. supervisor程序自動啟動,通過配置文件,自動開啟相應的子進程。每個子進程啟動后再去調用一個shell文件,把算法程序逐一啟動起來。
  3. 眾多算法程序開始實時訂閱唯一的文件服務器消息。
  4. 某個算法程序從MQ隊列中拿到一個文件包路徑和名字后,就會通過FTP去文件服務器下載數據到算法服務器本地,然后算法模塊開始處理數據、返回數據給后端,然后重新監聽。

3.1 supervisor部分

supervisor安裝好后,配置文件一般放在/etc/supervisor文件夾內,里面有如下兩個文件:

  • supervisord.conf:supervisor的基本配置文件
  • conf.d:一個文件夾,里面存放supervisor每個子進程的配置文件。(我有個疑問是為什么一個文件夾要用.d起名字,看起來還以為是個文件)
    • MM3D.conf:我定義的一個子進程配置。這個conf文件的名字可以隨便取。
    • MM3D_2.conf:我定義的第二個子進程配置。

3.1.1 supervisord.conf 內容

; supervisor config file[unix_http_server]
file=/var/run/supervisor.sock   ; (the path to the socket file)
chmod=0700                       ; sockef file mode (default 0700)[supervisord]
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor            ; ('AUTO' child log dir, default $TEMP); the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL  for a unix socket; The [include] section can just contain the "files" setting.  This
; setting can list multiple files (separated by whitespace or
; newlines).  It can also contain wildcards.  The filenames are
; interpreted as relative to this file.  Included files *cannot*
; include files themselves.[include]
files = /etc/supervisor/conf.d/*.conf

3.1.2?MM3D.conf 和 MM3D_2.conf 內容

MM3D.conf內容如下:

[program:MM3D]
directory=/mm3d/RV2_magic_mirror_algo/MM3D
command=sh /mm3d/RV2_magic_mirror_algo/MM3D/excute.sh
autostart=true
autorestart=true
startretries=100
redirect_stderr=true
stdout_logfile=/mm3d/RV2_magic_mirror_algo/MM3D/supervisor_out.log

MM3D_2.conf內容如下:

[program:MM3D_2]
directory=/mm3d/RV2_magic_mirror_algo/MM3D
command=sh /mm3d/RV2_magic_mirror_algo/MM3D/excute.sh
autostart=true
autorestart=true
startretries=100
redirect_stderr=true
stdout_logfile=/mm3d/RV2_magic_mirror_algo/MM3D/supervisor_2out.log

注意:conf文件中,比較重要的參數感覺有兩個:

  1. 唯一的進程名。也就是:[program:XXX]里面的XXX。后面使用supervisorctl 各種命令操控子進程需要用到這些名字。
  2. 生成日志的位置和名字。多個子進程不要把日志搞一起了。

3.2?算法程序(也就是我的主函數)

算法程序主要包括兩塊:

  1. MQ通信模塊(包括FTP拉取數據流)。
  2. 算法處理模塊以及數據上傳模塊。

代碼如下:

(config_MQ.py就省略了,里面是一些SDK、模型等地址,以及MQ的IP地址和密碼等)

import os
import numpy as npfrom pathlib import Path
from config_MQ import Config
import time
from loguru import logger
import sysimport pika
from ftplib import FTP
import jsondef ftp_connect():try:"""連接ftp:return:"""ftp = FTP()logger.debug('config.ftp_host: {}', config.ftp_host)logger.debug('config.ftp_port: {}', config.ftp_port)ftp.connect(config.ftp_host, config.ftp_port)  # 連接遠程服務器IP地址ftp.encoding = 'utf-8'ftp.set_debuglevel(1)  # 不開啟調試模式ftp.login(config.ftp_user, config.ftp_pwd)  # 登錄ftp# print(ftp.getwelcome())	# ftp服務器歡迎語except Exception as e:#print(e)logger.exception('ftp_connect error: {}', e)return Noneelse:return ftpdef read_file(file_path, target_dir, filename):ftp = ftp_connect()  # 連接ftp# ftp服務器上文件的路徑# 本地文件下載保存的路徑# 本地文件下載寫入的路徑文件# writefile = '%s/%s' % (write_path, filename)write_path = target_dir + '/%s' % (filename + '.zip')with open(write_path, "wb") as f:ftp.retrbinary('RETR %s' % file_path, f.write)ftp.close();def callbackTry(ch, method, properties, body):print(body.decode())ch.basic_ack(delivery_tag=method.delivery_tag)## 拿到消息轉jsonbodyJson = json.loads(body.decode())filepath = bodyJson['filepath']user_id = bodyJson['keypair']callback_url = bodyJson['callbackUrl'] # 回調云端地址sample_raw_dir = os.path.join(raw_data_root, user_id) #../../MM3D_RAW/B16XXXXXXXXsample_result_dir = os.path.join(result_data_root, user_id) # ../../MM3D_Result/B16XXXXXXX# 拿到ftp url下載文件并保存sample_raw_dirif not os.path.isdir(sample_raw_dir):try:os.mkdir(sample_raw_dir)except Exception as e:logger.exception('Fail to mkdir to raw data: {}', e)#print('Fail to mkdir to raw data', e)if not os.path.isdir(sample_result_dir):try:os.mkdir(sample_result_dir)except Exception as e:logger.exception('Fail to mkdir to result data: {}', e)#print('Fail to mkdir to result data', e)try:# zip_file = user_id + '.zip'# file.save(os.path.join(sample_raw_dir, zip_file))read_file(filepath, sample_raw_dir, user_id) #通過FTP拉取數據包并保存在本地except Exception as e:logger.exception('Fail to save raw data: {}', e)#print("Fail to save raw data", e)start_time = int(round(time.time() * 1000))sample_key_pair = sample_raw_dir.split('/')[-1]# 識別文件的路勁logger.debug("sample_raw_dir :{}",  sample_raw_dir)logger.debug("callback_url :{}",  callback_url)############################ 算法部分 ############################## TODO 調用算法程序識別def callback(ch, method, properties, body):try:callbackTry(ch, method, properties, body)except Exception as e:logger.exception('algo error: {}', e)#print("algo error:", e)def init_rabbitmq():# 創建連接時的登錄憑證。 username: MQ 賬號, password: MQ 密碼credentials = pika.PlainCredentials(config.rabbitmq_user, config.rabbitmq_pwd)# 阻塞式連接 MQ# parameters: 連接參數(包含主機/端口/虛擬主機/賬號/密碼等憑證信息)connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq_host, port=config.rabbitmq_port, virtual_host='/',credentials=credentials))# 獲取與 rabbitmq 通信的通道channel = connection.channel()# 聲明交換器exchange = "algoExchange"channel.exchange_declare(exchange=exchange, durable=True, exchange_type='topic')# 聲明隊列ex_queue = "algoExchange_queue"channel.queue_declare(queue=ex_queue, durable=True, auto_delete=True)# 通過路由鍵將隊列和交換器綁定channel.queue_bind(exchange=exchange, queue=ex_queue, routing_key='algoConfigRoutingKey')# 從隊列中拿到消息開始消費#(當要消費時,調用該回調函數 callback)channel.basic_consume(ex_queue, callback,auto_ack=True)  # auto_ack設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉# 處理 I/O 事件和 basic_consume 的回調, 直到所有的消費者被取消# (開始循環,直到發送退出消息)channel.start_consuming()if __name__ == "__main__":'''configue logger rotation="00:00:00",'''logger.add('../log/log-{time:YYYY-MM-DD}-PID='+ str(os.getpid()) +'.log', level="DEBUG",encoding="utf-8",  colorize=True, format="<green>{time}</green> <level>{message}</level>" )config = Config()raw_data_root = config.raw_data_rootresult_data_root = config.result_data_rootraw_data_backup_root = config.raw_data_backupraw_data_backup_root_path = Path(raw_data_backup_root)if not raw_data_backup_root_path.is_dir():os.mkdir(config.raw_data_backup)############################ 算法初始化部分 ############################## TODO 調用初始化############################ rabbitmq部分 ############################init_rabbitmq()

總結

以上是生活随笔為你收集整理的【部署类】专题:消息队列MQ、进程守护Supervisor的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。