日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > windows >内容正文

windows

用 Celery 实现邮件推送系统

發(fā)布時(shí)間:2025/7/14 windows 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 用 Celery 实现邮件推送系统 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

系統(tǒng)需求

本文以Celery 實(shí)現(xiàn)分布式任務(wù)隊(duì)列為基礎(chǔ),簡(jiǎn)述了一個(gè)郵件推送系統(tǒng)的模型。

Celery 是 Distributed Task Queue,分布式任務(wù)隊(duì)列,分布式?jīng)Q定了可以有多個(gè) worker 的存在,隊(duì)列表示其是異步操作,即存在一個(gè)產(chǎn)生任務(wù)提出需求的工頭,和一群等著被分配工作的碼農(nóng)。

需求:

1.在郵件推送系統(tǒng)中,我們需要對(duì)成千上萬的用戶發(fā)送郵件,發(fā)送郵件具有時(shí)效性,即不能說今天開始發(fā)郵件,要等到明天才能發(fā)送完畢。

2.發(fā)送郵件過程中,可能會(huì)遇到過于頻繁,郵件服務(wù)器上信件堆積無法及時(shí)接受新信件而產(chǎn)生的拒信,或者郵件服務(wù)器將我們的郵件判決為垃圾郵件。

3.郵件發(fā)送的 I/O 時(shí)間較長(zhǎng),不能讓程序在等待郵件服務(wù)器返回消息上浪費(fèi)時(shí)間。

所以我們的推送系統(tǒng)要有以下特性:1.分布式處理作業(yè);2.閉環(huán)監(jiān)控;3.異步式分發(fā)作業(yè)

系統(tǒng)框圖

前端通過 ajax 調(diào)用 views 中的 callpush 接口,該接口將被推送用戶的篩選條件傳入 service,然后 service 請(qǐng)求數(shù)據(jù)庫(kù),將返回?cái)?shù)據(jù)作為參數(shù)調(diào)用 celery 接口中 addtask 函數(shù)。celery 接口中 addtask 根據(jù) action 參數(shù)來判斷所要添加的任務(wù)類型,根據(jù)不同的類型分別進(jìn)行處理,放入隊(duì)列。

系統(tǒng)的另外一頭,worker 從隊(duì)列中取出任務(wù),用 mail 函數(shù)推送郵件,如果發(fā)送失敗就調(diào)用 error_handler 進(jìn)行異常處理,此處我們將所有 task 的執(zhí)行情況放入 redis 中,給每個(gè)任務(wù)進(jìn)行標(biāo)記,如果成功則標(biāo)記為 1,失敗則 0.

前端可以通過 ajax 調(diào)用 pushstatus 來向 redis 中讀取任務(wù)執(zhí)行情況,此處我們返回了成功和失敗任務(wù)的個(gè)數(shù)。

偽代碼實(shí)現(xiàn)

# Controller from redis import StrictRedis red = StrictRedis(host='localhost', port=6379, db=0)def callpush(request):area = request.POST.get('area')return HttpResponse(str(mailpush(area)))def pushstatus(request):failure = red.scard('status:0:task')success = red.scard('status:1:task')return HttpResponse('Failures: ' + str(failure) + '\nSuccess: ' + str(success))# Service def mailpush(**kargs):targets = MtUser.objects.filter(kargs).values('username', 'address')addtask(action='mailpush', data=targets, content='Hello %s!', subject='Greetings')return len(targets)# Celery Interface (Dispatcher) from celery import Celeryapp = Celery() app.config_from_object('celeryconfig')def addtask(action, data, **kargs):if action == 'mailpush':for (address, username) in data:app.send_task('worker.mail', args=[kargs['subject'], kargs['content'] % username, address], link_error=app.signature('worker.error_handler'))elif action == 'messagepush':passelse:pass# Celery Backend (Worker) from celery import Celery from celery import Task from redis import StrictRedisapp = Celery() app.config_from_object('celeryconfig') red = StrictRedis(host='localhost', port=6379, db=0)@app.task(bind=True) def mail(self, subject, content, address):from django.core.mail import EmailMessagemsg = EmailMessage(subject, content, 'admin@admin.com', address)msg.content_subtype = 'html'msg.send()red.sadd('status:1:task', self.request.id)# Overwrite the on_failure function in trace.py @app.task def error_handler(uuid, args):print uuidprint argsred.set(uuid, args)red.sadd('status:0:task', uuid)red.srem('status:1:task', uuid)

轉(zhuǎn)載于:https://my.oschina.net/shinedev/blog/500554

總結(jié)

以上是生活随笔為你收集整理的用 Celery 实现邮件推送系统的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。