日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

BeetleX.FastHttpApi之控制器调度设计

發布時間:2023/12/4 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 BeetleX.FastHttpApi之控制器调度设计 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ? 為了可以更靈活地在Webapi應用服務中分配線程資源,BeetleX.FastHttpApi在線程調度上直接細化到Action級別;組件不僅可以精準控制每個Action的最大RPS限制,還能精細到控制使用多少線程資源來處理這些API的請求。接下來詳細講解組件針對這一塊的實現結構和代碼。

需求

? ? ? ? 為什么要做到這么精細的控制呢?如果有足夠資源那是不用考慮這方面的問題;但實際應用中資源不足是經常需要面對的問題。在整個服務中往往有些API非常占用資源,這個時候就希望通過簡單配置來控制API使用的線程數達到一個理想的資源分配結果。在控制上最直接的辦法是控制對應的RPS數量,但有時候希望以線程資源的方式來分配。

使用

????????組件可以在控制器的Action上根據需求標記對應的限制屬性

????????//每秒最大處理數100,超過就拒絕[RequestMaxRPS(100)]public object MasRps(IHttpContext context){return DateTime.Now;}//不限制,由框架通過線程池調度public object None(string name, IHttpContext context){return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";}//所有請求用一個線程有序處理[ThreadQueue(ThreadQueueType.Single)]public object SingleQueue(string name, IHttpContext context){return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";}//所有請求分配兩個線程有序處理[ThreadQueue(ThreadQueueType.Multiple, 2)]public object MultipleQueue(string name, IHttpContext context){return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";}//根據Name的值一致線程處理,同一值會分配到一個線程中有序處理[ThreadQueue("name")]public object UniqueQueue(string name, IHttpContext context){return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";}

設計實現

????????接下來看一下BeetleX.FastHttpApi組件代碼是如何進行工作的。由于需要線程控制,那自然就需要一個隊列;組件提供一個NextQueue的隊列來完成這方面的工作,每個NextQueue會分配一個線程來處理。

public class NextQueue : IDisposable{public NextQueue(){mQueue = new System.Collections.Concurrent.ConcurrentQueue<IEventWork>();ID = System.Threading.Interlocked.Increment(ref mID);}public long ID { get; set; }private static long mID;private readonly object _workSync = new object();private bool _doingWork;private int mCount;private System.Collections.Concurrent.ConcurrentQueue<IEventWork> mQueue;public int Count => mCount;//添加任務到隊列中public void Enqueue(IEventWork item){mQueue.Enqueue(item);System.Threading.Interlocked.Increment(ref mCount);lock (_workSync){//當前隊列是否工作中if (!_doingWork){//獲取一個線程進行工作System.Threading.ThreadPool.QueueUserWorkItem(OnStart);_doingWork = true;}}}private void OnError(Exception e, IEventWork work){try{Error?.Invoke(e, work);}catch{}}public static Action<Exception, IEventWork> Error { get; set; }private async void OnStart(object state){while (true){//獲取隊列任務并執行while (mQueue.TryDequeue(out IEventWork item)){System.Threading.Interlocked.Decrement(ref mCount);using (item){try{//等待任務執行await item.Execute();}catch (Exception e_){OnError(e_, item);}}}lock (_workSync){//隊列為空跑出線程if (mQueue.IsEmpty){try{Unused?.Invoke();}catch { }_doingWork = false;return;}}}}public Action Unused { get; set; }public void Dispose(){while (mQueue.TryDequeue(out IEventWork work)){try{work.Dispose();}catch{}}}}

NextQueue是一個支持異步任務的處理隊列,它確保添加進來的任務都是有序執行,即使任務內部處理的任務是異步。

ActionContext

????????該對象是用于執行控制器方法,包括webapi控制器和Websocket控制器。在這里只講述控制怎樣調度執行的,更詳細了解可以查看

https://github.com/beetlex-io/FastHttpApi/blob/master/src/ActionContext.cs

主要講解一下Execute方法是怎樣調用控制器方法的

internal async Task Execute(IActionResultHandler resultHandler){//驗證RPSif (Handler.ValidateRPS()){Handler.IncrementRequest();//是否存在隊列控制配置if (Handler.ThreadQueue == null || Handler.ThreadQueue.Type == ThreadQueueType.None){if?(Handler.Async)//異步方法{await OnAsyncExecute(resultHandler);}else{//同步方法OnExecute(resultHandler);}}else{//配置了隊列控制r妊ActionTask actionTask = new ActionTask(this, resultHandler,new TaskCompletionSource<object>());//獲取異步隊列var queue = Handler.ThreadQueue.GetQueue(this.HttpContext);//階列是否有效,為了安全隊列都有最大等待數限制,超過就拒絕處理if (Handler.ThreadQueue.Enabled(queue)){this.HttpContext.Queue = queue;//把當前任務插入隊列queue.Enqueue(actionTask);//等待隊執行結果通知await actionTask.CompletionSource.Task;}else{Handler.IncrementError();resultHandler.Error(new Exception($"{Handler.SourceUrl} process error,out of queue limit!"), EventArgs.LogType.Warring, 500);}}}else{Handler.IncrementError();resultHandler.Error(new Exception($"{Handler.SourceUrl} process error,out of max rps!"), EventArgs.LogType.Warring, 509);}}

GetQueue

????????應該方法根據當前請示信息和配置來獲取對應的異步隊列

public NextQueue GetQueue(IHttpContext context){//單隊執行,永遠返回針對當前控制器方法的第一個隊列if (Type == ThreadQueueType.Single)return QueueGroup.Queues[0];//輪循當前分配最大隊列數else if (Type == ThreadQueueType.Multiple)return QueueGroup.Next();//針對請求數據做一致性隊列分配else if (Type == ThreadQueueType.DataUnique){string value = null;if (UniqueName != null){if (string.Compare(UniqueName, "$path", true) == 0){value = context.Request.GetSourcePath();}else if(UniqueName.IndexOf("__")==0){return mUniqueQueueGroup.Has(UniqueName.GetHashCode());}else{value = context.Request.Header[UniqueName];if (value == null)context.Data.TryGetString(UniqueName, out value);}}if (value == null)value = context.Request.GetSourceUrl();return mUniqueQueueGroup.Has(value.GetHashCode());}//如果都沒匹配到就獲取輪循的下一個return QueueGroup.Next();}

ActionTask

????????方法異步任務對象,隊列會有序地執行相關對象,這對象的實現非常簡單。

struct ActionTask : IEventWork{public ActionTask(ActionContext context, IActionResultHandler resultHandler, TaskCompletionSource<object> completionSource){Context = context;ResultHandler = resultHandler;CompletionSource = completionSource;}public TaskCompletionSource<object> CompletionSource { get; set; }public ActionContext Context { get; set; }public IActionResultHandler ResultHandler { get; set; }public void Dispose(){}public async Task Execute(){try{if (Context.Handler.Async){//異步方法await Context.OnAsyncExecute(ResultHandler);}else{//同步方法Context.OnExecute(ResultHandler);}}finally{//回調執行完成,讓隊列繼續下一個任務。CompletionSource?.TrySetResult(new object());}}}

總結

????????到這里整個線程調度的核心就介紹完成了,如果不了解一些基礎知識會感覺完成這些功能很復雜,其實都是一些基礎功能的應用;?完成這些功能主要涉及幾個基礎知識分別是:隊列,線程池和用于處理異步回調的TaskCompletionSource對象。

BeetleX

開源跨平臺通訊框架(支持TLS)
提供高性能服務和大數據處理解決方案

https://beetlex.io

總結

以上是生活随笔為你收集整理的BeetleX.FastHttpApi之控制器调度设计的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。