程序员过关斩将--自定义线程池来实现文档转码
背景
我司在很久之前,一位很久之前的同事寫(xiě)過(guò)一個(gè)文檔轉(zhuǎn)圖片的服務(wù),具體業(yè)務(wù)如下:
1. 用戶(hù)在客戶(hù)端上傳文檔,可以是ppt,word,pdf 等格式,用戶(hù)上傳完成可以在客戶(hù)端預(yù)覽上傳的文檔,預(yù)覽的時(shí)候采用的是圖片形式(不要和我說(shuō)用別的方式預(yù)覽,現(xiàn)在已經(jīng)來(lái)不及了)
2. 當(dāng)用戶(hù)把文檔上傳到云端之后(阿里云),把文檔相關(guān)的信息記錄在數(shù)據(jù)庫(kù),然后等待轉(zhuǎn)碼完成
3. 服務(wù)器有一個(gè)轉(zhuǎn)碼服務(wù)(其實(shí)就是一個(gè)windows service)不停的在輪訓(xùn)待轉(zhuǎn)碼的數(shù)據(jù),如果有待轉(zhuǎn)碼的數(shù)據(jù),則從數(shù)據(jù)庫(kù)取出來(lái),然后根據(jù)文檔的網(wǎng)絡(luò)地址下載到本地進(jìn)行轉(zhuǎn)碼(轉(zhuǎn)成多張圖片)
4. 當(dāng)文檔轉(zhuǎn)碼完畢,把轉(zhuǎn)碼出來(lái)的圖片上傳到云端,并把云端圖片的信息記錄到數(shù)據(jù)庫(kù)
5. 客戶(hù)端有預(yù)覽需求的時(shí)候,根據(jù)數(shù)據(jù)庫(kù)來(lái)判斷有沒(méi)有轉(zhuǎn)碼成功,如果成功,則獲取數(shù)據(jù)來(lái)顯示。
文檔預(yù)覽的整體過(guò)程如以上所說(shuō),老的轉(zhuǎn)碼服務(wù)現(xiàn)在什么問(wèn)題呢?
1. 由于一個(gè)文檔同時(shí)只能被一個(gè)線(xiàn)程進(jìn)行轉(zhuǎn)碼操作,所以老的服務(wù)采用了把待轉(zhuǎn)碼數(shù)據(jù)劃分管道的思想,一共有六個(gè)管道,映射到數(shù)據(jù)庫(kù)大體就是 Id=》管道ID 這個(gè)樣子。
2. 一個(gè)控制臺(tái)程序,根據(jù)配置文件信息,讀取某一個(gè)管道待轉(zhuǎn)碼的文檔,然后單線(xiàn)程進(jìn)行轉(zhuǎn)碼操作
3. 一共有六個(gè)管道,所以服務(wù)器上起了六個(gè)cmd的黑窗口......
4. 有的時(shí)候個(gè)別文檔由于格式問(wèn)題或者其他問(wèn)題 轉(zhuǎn)碼過(guò)程中會(huì)卡住,具體的表現(xiàn)為:停止了轉(zhuǎn)碼操作。
5. 如果程序卡住了,需要運(yùn)維人員重新啟動(dòng)轉(zhuǎn)碼cmd窗口(這種維護(hù)比較蛋疼)
后來(lái)機(jī)緣巧合,這個(gè)程序的維護(hù)落到的菜菜頭上,維護(hù)了一周左右,大約重啟了10多次,終于忍受不了了,重新搞一個(gè)吧。仔細(xì)分析過(guò)后,刨除實(shí)際文檔轉(zhuǎn)碼的核心操作之外,整個(gè)轉(zhuǎn)碼流程其實(shí)還有很多注意點(diǎn)
1. 需要保證轉(zhuǎn)碼服務(wù)不被卡住,如果和以前一樣就沒(méi)有必要重新設(shè)計(jì)了
2. 盡量避免開(kāi)多個(gè)進(jìn)程的方式,其實(shí)在這個(gè)業(yè)務(wù)場(chǎng)景下,多個(gè)進(jìn)程和多個(gè)線(xiàn)程作用是一致的。
3. 每個(gè)文檔只能被轉(zhuǎn)碼一次,如果一個(gè)文檔被轉(zhuǎn)碼多次,不僅浪費(fèi)了服務(wù)器資源,而且還有可能會(huì)有數(shù)據(jù)不一致的情況發(fā)生
4. 轉(zhuǎn)碼失敗的文檔需要有一定次數(shù)的重試,因?yàn)橐淮问〔淮淼诙问?#xff0c;所以一定要給失敗的文檔再次被操作的機(jī)會(huì)
5. 因?yàn)槌绦虿煌5陌盐臋n轉(zhuǎn)碼成本地圖片,所以需要保證這些文件在轉(zhuǎn)碼完成在服務(wù)器上刪除,不然的話(huà),時(shí)間長(zhǎng)了會(huì)生成很多無(wú)用的文件
說(shuō)了這么多,其實(shí)需要注意的點(diǎn)還是很多的。以整個(gè)的轉(zhuǎn)碼流程來(lái)說(shuō),本質(zhì)上是一個(gè)任務(wù)池的生產(chǎn)和消費(fèi)問(wèn)題,任務(wù)池中的任務(wù)就是待轉(zhuǎn)碼的文檔,生產(chǎn)者不停的把待轉(zhuǎn)碼文檔丟進(jìn)任務(wù)池,消費(fèi)者不停的把任務(wù)池中文檔轉(zhuǎn)碼完成。
線(xiàn)程池
這很顯然和線(xiàn)程池很類(lèi)似,菜菜之前就寫(xiě)過(guò)一個(gè)線(xiàn)程池的文章,有興趣的同學(xué)可以去翻翻歷史。今天我們就以這個(gè)線(xiàn)程池來(lái)解決這個(gè)轉(zhuǎn)碼問(wèn)題。線(xiàn)程池的本質(zhì)是初始化一定數(shù)目的線(xiàn)程,不停的執(zhí)行任務(wù)。
?//線(xiàn)程池定義?public?class?LXThreadPool:IDisposable{bool?PoolEnable?=?true;?//線(xiàn)程池是否可用?List<Thread>?ThreadContainer?=?null;?//線(xiàn)程的容器ConcurrentQueue<ActionData>?JobContainer?=?null;?//任務(wù)的容器int?_maxJobNumber;?//線(xiàn)程池最大job容量ConcurrentDictionary<string,?DateTime>?JobIdList?=?new?ConcurrentDictionary<string,?DateTime>();?//job的副本,用于排除某個(gè)job?是否在運(yùn)行中public?LXThreadPool(int?threadNumber,int?maxJobNumber=1000){if(threadNumber<=0?||?maxJobNumber?<=?0){throw?new?Exception("線(xiàn)程池初始化失敗");}_maxJobNumber?=?maxJobNumber;ThreadContainer?=?new?List<Thread>(threadNumber);JobContainer?=?new?ConcurrentQueue<ActionData>();for?(int?i?=?0;?i?<?threadNumber;?i++){var?t?=?new?Thread(RunJob);t.Name?=?$"轉(zhuǎn)碼線(xiàn)程{i}";ThreadContainer.Add(t);t.Start();}//清除超時(shí)任務(wù)的線(xiàn)程var?tTimeOutJob?=?new?Thread(CheckTimeOutJob);tTimeOutJob.Name?=?$"清理超時(shí)任務(wù)線(xiàn)程";tTimeOutJob.Start();}//往線(xiàn)程池添加一個(gè)線(xiàn)程,返回線(xiàn)程池的新線(xiàn)程數(shù)public?int?AddThread(int?number=1){if(!PoolEnable?||?ThreadContainer==null?||?!ThreadContainer.Any()?||?JobContainer==null||?!JobContainer.Any()){return?0;}while?(number?<=?0){var?t?=?new?Thread(RunJob);ThreadContainer.Add(t);t.Start();number?-=?number;}return?ThreadContainer?.Count????0;}//向線(xiàn)程池添加一個(gè)任務(wù),返回0:添加任務(wù)失敗?? 1:成功public?int?AddTask(Action<object>?job,?object?obj,string?actionId,?Action<Exception>?errorCallBack?=?null){if?(JobContainer?!=?null){if(JobContainer.Count>=?_maxJobNumber){return?0;}//首先排除10分鐘還沒(méi)轉(zhuǎn)完的var?timeoOutJobList?=?JobIdList.Where(s?=>?s.Value.AddMinutes(10)?<?DateTime.Now);if(timeoOutJobList!=null&&?timeoOutJobList.Any()){foreach?(var?timeoutJob?in?timeoOutJobList){JobIdList.TryRemove(timeoutJob.Key,out?DateTime?v);}}if?(!JobIdList.Any(s?=>?s.Key?==?actionId)){if(JobIdList.TryAdd(actionId,?DateTime.Now)){JobContainer.Enqueue(new?ActionData?{?Job?=?job,?Data?=?obj,?ActionId?=?actionId,?ErrorCallBack?=?errorCallBack?});return?1;}else{return?101;}}else{return?100;}????????????}return?0;}??private?void?RunJob(){while?(JobContainer?!=?null??&&?PoolEnable){//任務(wù)列表取任務(wù)ActionData?job?=?null;JobContainer?.TryDequeue(out?job);if?(job?==?null){//如果沒(méi)有任務(wù)則休眠Thread.Sleep(20);continue;}try{//執(zhí)行任務(wù)job.Job.Invoke(job.Data);}catch?(Exception?error){//異常回調(diào)if?(job?!=?null&&?job.ErrorCallBack!=null){job?.ErrorCallBack(error);}}finally{if?(!JobIdList.TryRemove(job.ActionId,out?DateTime?v)){}}}}//終止線(xiàn)程池public?void?Dispose(){PoolEnable?=?false;JobContainer?=?null;if?(ThreadContainer?!=?null){foreach?(var?t?in?ThreadContainer){//強(qiáng)制線(xiàn)程退出并不好,會(huì)有異常t.Join();}ThreadContainer?=?null;}}//清理超時(shí)的任務(wù)private?void?CheckTimeOutJob(){//首先排除10分鐘還沒(méi)轉(zhuǎn)完的var?timeoOutJobList?=?JobIdList.Where(s?=>?s.Value.AddMinutes(10)?<?DateTime.Now);if?(timeoOutJobList?!=?null?&&?timeoOutJobList.Any()){foreach?(var?timeoutJob?in?timeoOutJobList){JobIdList.TryRemove(timeoutJob.Key,?out?DateTime?v);}}System.Threading.Thread.Sleep(60000);}}public?class?ActionData{//任務(wù)的id,用于排重public?string?ActionId?{?get;?set;?}//執(zhí)行任務(wù)的參數(shù)public?object?Data?{?get;?set;?}//執(zhí)行的任務(wù)public?Action<object>?Job?{?get;?set;?}//發(fā)生異常時(shí)候的回調(diào)方法public?Action<Exception>?ErrorCallBack?{?get;?set;?}}以上就是一個(gè)線(xiàn)程池的具體實(shí)現(xiàn),和具體的業(yè)務(wù)無(wú)關(guān),完全可以用于任何適用于線(xiàn)程池的場(chǎng)景,其中有一個(gè)注意點(diǎn),我新加了任務(wù)的標(biāo)示,主要用于排除重復(fù)的任務(wù)被投放多次(只排除正在運(yùn)行中的任務(wù))。當(dāng)然代碼不是最優(yōu)的,有需要的同學(xué)可以自己去優(yōu)化
使用線(xiàn)程池
接下來(lái),我們利用以上的線(xiàn)程池來(lái)完成我們的文檔轉(zhuǎn)碼任務(wù),首先我們啟動(dòng)的時(shí)候初始化一個(gè)線(xiàn)程池,并啟動(dòng)一個(gè)獨(dú)立線(xiàn)程來(lái)不停的往線(xiàn)程池來(lái)輸送任務(wù),順便起了一個(gè)監(jiān)控線(xiàn)程去監(jiān)視發(fā)送任務(wù)的線(xiàn)程
string?lastResId?=?null;string?lastErrorResId?=?null;Dictionary<string,?int>?ResErrNumber?=?new?Dictionary<string,?int>();?//轉(zhuǎn)碼失敗的資源重試次數(shù)int?MaxErrNumber?=?5;//最多轉(zhuǎn)碼錯(cuò)誤的資源10次Thread?tPutJoj?=?null;LXThreadPool?pool?=?new?LXThreadPool(4,100);public?void?OnStart(){//初始化一個(gè)線(xiàn)程發(fā)送轉(zhuǎn)碼任務(wù)tPutJoj?=?new?Thread(PutJob);tPutJoj.IsBackground?=?true;tPutJoj.Start();//初始化?監(jiān)控線(xiàn)程var?tMonitor?=?new?Thread(MonitorPutJob);tMonitor.IsBackground?=?true;tMonitor.Start();}//監(jiān)視發(fā)放job的線(xiàn)程private?void?MonitorPutJob(){while?(true){if(tPutJoj?==?null||?!tPutJoj.IsAlive){Log.Error($"發(fā)送轉(zhuǎn)碼任務(wù)線(xiàn)程停止==========");tPutJoj?=?new?Thread(PutJob);tPutJoj.Start();Log.Error($"發(fā)送轉(zhuǎn)碼任務(wù)線(xiàn)程重新初始化并啟動(dòng)==========");}System.Threading.Thread.Sleep(5000);}}private?void?PutJob(){???????????while?(true){try{//先搜索等待轉(zhuǎn)碼的var?fileList?=?DocResourceRegisterProxy.GetFileList(new?int[]?{?(int)FileToImgStateEnum.Wait?},?30,?lastResId);Log.Error($"拉取待轉(zhuǎn)碼記錄===總數(shù):lastResId:{lastResId},結(jié)果:{fileList?.Count()????0}");if?(fileList?==?null?||?!fileList.Any()){lastResId?=?null;Log.Error($"待轉(zhuǎn)碼數(shù)量為0,開(kāi)始拉取轉(zhuǎn)碼失敗記錄,重新轉(zhuǎn)碼==========");//如果無(wú)待轉(zhuǎn),則把出錯(cuò)的?嘗試fileList?=?DocResourceRegisterProxy.GetFileList(new?int[]?{?(int)FileToImgStateEnum.Error,?(int)FileToImgStateEnum.TimeOut,?(int)FileToImgStateEnum.Fail?},?1,?lastErrorResId);if?(fileList?==?null?||?!fileList.Any()){lastErrorResId?=?null;}else{// Log.Error($"開(kāi)始轉(zhuǎn)碼失敗記錄:{JsonConvert.SerializeObject(fileList)}");List<DocResourceRegister>?errFilter?=?new?List<DocResourceRegister>();foreach?(var?errRes?in?fileList){if?(ResErrNumber.TryGetValue(errRes.res_id,?out?int?number)){if?(number?>?MaxErrNumber){Log.Error($"資源:{errRes.res_id}?轉(zhuǎn)了{(lán)MaxErrNumber}次不成功,放棄===========");continue;}else{errFilter.Add(errRes);ResErrNumber[errRes.res_id]?=?number?+?1;}}else{ResErrNumber.Add(errRes.res_id,?1);errFilter.Add(errRes);}}fileList?=?errFilter;if?(fileList.Any()){lastErrorResId?=?fileList.Select(s?=>?s.res_id).Max();}}}else{lastResId?=?fileList.Select(s?=>?s.res_id).Max();}if?(fileList?!=?null?&&?fileList.Any()){foreach?(var?file?in?fileList){//如果?任務(wù)投放線(xiàn)程池失敗,則等待一面繼續(xù)投放int?poolRet?=?0;while?(poolRet?<=?0){poolRet?=?pool.AddTask(s?=>?{AliFileService.ConvertToImg(file.res_id?+?$".{file.res_ext}",?FileToImgFac.Instance(file.res_ext));},?file,?file.res_id);if?(poolRet?<=?0?||?poolRet?>?1){Log.Error($"發(fā)放轉(zhuǎn)碼任務(wù)失敗==========線(xiàn)程池返回結(jié)果:{poolRet}");System.Threading.Thread.Sleep(1000);}}}}//每一秒去數(shù)據(jù)庫(kù)取一次數(shù)據(jù)System.Threading.Thread.Sleep(3000);}catch{continue;}}}以上就是發(fā)放任務(wù),線(xiàn)程池執(zhí)行任務(wù)的所有代碼,由于具體的轉(zhuǎn)碼代碼涉及到隱私,這里不在提供,如果有需要可以私下找菜菜索要,雖然我深知還有更優(yōu)的方式,但是我覺(jué)得線(xiàn)程池這樣的思想可能會(huì)對(duì)部分人有幫助,其中任務(wù)超時(shí)的核心代碼如下(采用了polly插件):
var?policy=?Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut),?onTimeout:?(context,?timespan,?task)?=>{ret.State=Enum.FileToImgStateEnum.TimeOut;???????????????????});policy.Execute(s=>{.....});把你的更優(yōu)方案寫(xiě)在留言區(qū)吧,2020年大家越來(lái)越好
●程序員修神之路--打通Docker鏡像發(fā)布容器運(yùn)行流程
●程序員修神之路--容器技術(shù)為什么會(huì)這么流行(記得去抽獎(jiǎng))
●程序員修神之路--kubernetes是微服務(wù)發(fā)展的必然產(chǎn)物
●程序員過(guò)關(guān)斬將--要想獲取我的用戶(hù)信息,就得按照規(guī)矩來(lái)
●程序員過(guò)關(guān)斬將--更加優(yōu)雅的Token認(rèn)證方式JWT
●程序員過(guò)關(guān)斬將--cookie和session的關(guān)系其實(shí)很簡(jiǎn)單
●程序員修神之路--用NOSql給高并發(fā)系統(tǒng)加速
●程序員修神之路--高并發(fā)系統(tǒng)設(shè)計(jì)負(fù)載均衡架構(gòu)
●程序員修神之路--做好分庫(kù)分表其實(shí)很難之一(繼續(xù)送書(shū))
●程序員修神之路--做好分庫(kù)分表其實(shí)很難之二(送書(shū)繼續(xù))
●程序員過(guò)關(guān)斬將--你為什么還在用存儲(chǔ)過(guò)程?
●程序員過(guò)關(guān)斬將--小小的分頁(yè)引發(fā)的加班血案
●程序員修神之路--問(wèn)世間異步為何物?
●程序員修神之路--提高網(wǎng)站的吞吐量????
總結(jié)
以上是生活随笔為你收集整理的程序员过关斩将--自定义线程池来实现文档转码的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: dotNetCore操作Redis(含C
- 下一篇: Xamarin.Forms弹出对话框插件