.Net 如何模拟会话级别的信号量,对http接口调用频率进行限制(有demo)
現在,因為種種因素,你必須對一個請求或者方法進行頻率上的訪問限制。?
比如, 你對外提供了一個API接口,注冊用戶每秒鐘最多可以調用100次,非注冊用戶每秒鐘最多可以調用10次。
比如, 有一個非常吃服務器資源的方法,在同一時刻不能超過10個人調用這個方法,否則服務器滿載。
比如, 有一些特殊的頁面,訪客并不能頻繁的訪問或發言。
比如, 秒殺活動等進行。
比如 ,防范DDOS,當達到一定頻率后調用腳本iis服務器ip黑名單,防火墻黑名單。
如上種種的舉例,也就是說,如何從一個切面的角度對調用的方法進行頻率上的限制。而對頻率限制,服務器層面都有最直接的解決方法,現在我說的則是代碼層面上的頻率管控。
本文給出兩個示例,一個是基于單機環境的實現,第二個則是基于分布式的Redis實現。
--------------------
以第一個API接口需求為例,先說下單機環境下的實現。
按照慣性思維,我們自然會想到緩存的過期策略這種方法,但是嚴格來講就HttpRuntime.Cache而言,通過緩存的過期策略來對請求進行頻率的并發控制是不合適的。
HttpRuntime.Cache 是應用程序級別的Asp.Net的緩存技術,通過這個技術可以申明多個緩存對象,可以為每個對象設置過期時間,當過期時間到達后該緩存對象就會消失(也就是當你訪問該對象的時候為Null)
為什么這樣說呢?比如對某個方法(方法名:GetUserList)我們要進行1秒鐘最多10次的限制,現在我們就新建一個int型的Cache對象,然后設置1秒鐘后過期消失。那么每當訪問GetUserList方法前,我們就先判斷這個Cache對象的值是否大于10,如果大于10就不執行GetUserList方法,如果小于10則允許執行。每當訪問該對象的時候如果不存在或者過期就新建,這樣周而復始,則該對象永遠不可能超過10。
if ((int)HttpRuntime.Cache["GetUserListNum"] > 10) //大于10請求失敗
? {
? ? ?Console.WriteLine("禁止請求");
? }
? else
? {
? ? ?HttpRuntime.Cache["GetUserListNum"] = (int)HttpRuntime.Cache["GetUserListNum"] + 1; //否則該緩存對象的值+1
? ? ?Console.WriteLine("允許請求");
? }
這樣的思想及實現相對來說非常簡單,但是基于這樣的一個模型設定,那么就會出現這種情況:
?
?
如上圖,每個點代表一次訪問請求,我在0秒的時候 新建了一個名字為GetUserListNum的緩存對象。?
在0~0.5秒期間 我訪問了3次,在0.5~1秒期間,我們訪問了7次。此時,該對象消失,然后我們接著訪問,該對象重置為0.?
? ?在第1~1.5秒期間,還是訪問了7次,在第1.5秒~2秒期間訪問了3次。
基于這種簡單緩存過期策略的模型,在這2秒鐘內,我們雖然平均每秒鐘都訪問了10次,滿足這個規定,但是如果我們從中取一個期間段,0.5秒~1.5秒期間,也是1秒鐘,但是卻實實在在的訪問了14次!遠遠超過了我們設置的 1秒鐘最多訪問10次的 限制。
?
那么如何科學的來解決上面的問題呢?我們可以通過模擬會話級別的信號量這一手段,這也就是我們今天的主題了。
什么是信號量?僅就以代碼而言,??static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(5);? 它的意思就代表在多線程情況下,在任何一時刻,只能同時5個線程去訪問。
?
4容器4線程模型
現在,在實現代碼的之前我們先設計一個模型。
假設我們有一個用戶A的管道,這個管道里裝著用戶A的請求,比如用戶A在一秒鐘發出了10次請求,那么每一個請求過來,管道里的元素都會多一個。但是我們設定這個管道最多只能容納10個元素,而且每個元素的存活期為1秒,1秒后則該元素消失。那么這樣設計的話,無論是速率還是數量的突進,都會有管道長度的限制。這樣一來,無論從哪一個時間節點或者時間間隔出發,這個管道都能滿足我們的頻率限制需求。
而這里的管道,就必須和會話Id來對應了。每當有新會話進來的時候就生成一個新管道。這個會話id根據自己場景所定,可以是sessionId,可以是ip,也可以是token。
那么既然這個管道是會話級別的,我們肯定得需要一個容器,來裝這些管道。現在,我們以IP來命名會話管道,并把所有的管道都裝載在一個容器中,如圖
而基于剛才的設定,我們還需要對容器內的每條管道的元素進行處理,把過期的給剔除掉,為此,還需要單獨為該容器開辟出一個線程來為每條管道進行元素的清理。而當管道的元素為0時,我們就清掉該管道,以便節省容器空間。
?
當然,由于用戶量多,一個容器內可能存在上萬個管道,這個時候僅僅用一個容器來裝載來清理,在效率上顯然是不夠的。這個時候,我們就得對容器進行橫向擴展了。
比如,我們可以根據Cpu核心數自動生成對應的數量的容器,然后根據一個算法,對IP來進行導流。我當前cpu是4個邏輯核心,就生成了4個容器,每當用戶訪問的時候,都會最先經過一個算法,這個算法會對IP進行處理,如192.168.1.11~192.168.1.13這個Ip段進第一個容器,xxx~xxx進第二個容器,依次類推,相應的,也就有了4個線程去分別處理4個容器中的管道。
?
那么,最終就形成了我們的4容器4線程模型了。
現在,著眼于編碼實現:
首先我們需要一個能承載這些容器的載體,這個載體類似于連接池的概念,可以根據一些需要自動生成適應數量的容器,如果有特殊要求的話,還可以在容器上切出一個容器管理的面,在線程上切出一個線程管理的面以便于實時監控和調度。如果真要做這樣一個系統,那么 容器的調度 和 線程的調度功能 是必不可少的,而本Demo則是完成了主要功能,像容器和線程在代碼中我也沒剝離開來,算法也是直接寫死的,實際設計中,對算法的設計還是很重要的,還有多線程模型中,怎樣上鎖才能讓效率最大化也是重中之重的。
而這里為了案例的直觀就直接寫死成4個容器。
public static List<Container> ContainerList = new List<Container>(); //容器載體
static Factory()
{
? ? ?for (int i = 0; i < 4; i++)
? ? ?{
? ? ? ? ContainerList.Add(new Container(i)); ?//遍歷4次 ?生成4個容器
? ? ?}
? ? ?foreach (var item in ContainerList)
? ? ?{
? ? ? ? item.Run(); ? ?//開啟線程
? ? ?}
}
現在,我們假定 有編號為 0 到 40 這樣的 41個用戶。那么這個導流算法 我也就直接寫死,編號0至9的用戶 將他們的請求給拋轉到第一個容器,編號10~19的用戶 放到第二個容器,編號20~29放到第三個容器,編號30~40的用戶放到第四個容器。
那么這個代碼就是這樣的:
static Container GetContainer(int userId, out int i) //獲取容器的算法
?{
? ? ?if (0 <= userId && userId < 10) ? ?//編號0至9的用戶 ?返回第一個容器 ?依次類推
? ? ?{
? ? ? ? ? i = 0;
? ? ? ? ? return ContainerList[0];
? ? ?}
? ? ?if (10 <= userId && userId < 20)
? ? ?{
? ? ? ? ? i = 1;
? ? ? ? ? return ContainerList[1];
? ? ?}
? ? ?if (20 <= userId && userId < 30)
? ? ?{
? ? ? ? ? i = 2;
? ? ? ? ? return ContainerList[2];
? ? ? }
? ? ? i = 3;
? ? ? return ContainerList[3];
? }
當我們的會話請求經過算法的導流之后,都必須調用一個方法,用于辨別管道數量。如果管道數量已經大于10,則請求失敗,否則成功
public static void Add(int userId)
? {
? ? ? ?if (GetContainer(userId, out int i).Add(userId))
? ? ? ? ? ? Console.WriteLine("容器" + i + " 用戶" + userId + " ?發起請求");
? ? ? ?else
? ? ? ? ? ? Console.WriteLine("容器" + i + " 用戶" + userId + " ?被攔截");
? }
接下來就是容器Container的代碼了。
這里,對容器的選型用線程安全的ConcurrentDictionary類。
線程安全:當多個線程同時讀寫同一個共享元素的時候,就會出現數據錯亂,迭代報錯等安全問提
ConcurrentDictionary:除了GetOrAdd方法要慎用外,是.Net4.0專為解決Dictionary線程安全而出的新類型
ReaderWriterLockSlim:較ReaderWriterLock優化的讀寫鎖,多個線程同時訪問讀鎖 或? 一個線程訪問寫鎖
然后當你向容器添加一條管道中的數據是通過這個方法:
public bool Add(int userId)
?{
? ? ?obj.EnterReadLock();//掛讀鎖,允許多個線程同時寫入該方法
? ? ?try
? ? ?{
? ? ? ? ?ConcurrentList<DateTime> dtList = dic.GetOrAdd(userId.ToString(), new ConcurrentList<DateTime>()); //如果不存在就新建 ConcurrentList
? ? ? ? ?return dtList.CounterAdd(10, DateTime.Now); //管道容量10,當臨界管道容量后 返回false
? ? ?}
? ? ?finally
? ? ?{
? ? ? ? ?obj.ExitReadLock();
? ? ?}
?}
?這里,為了在后面的線程遍歷刪除ConcurrentList的管道的時候保證ConcurrentList的安全性,所以此處要加讀鎖。
?而ConcurrentList,因為.Net沒有推出List集合類的線程安全(count和add加鎖),所以自己新建了一個繼承于List<T>的安全類型,在這里 封裝了3個需要使用的方法。
public class ConcurrentList<T> : List<T>
{
? ? private object obj = new object();
? ??
? ? public bool CounterAdd(int num, T value)
? ? {
? ? ? ? lock (obj)
? ? ? ? {
? ? ? ? ? ? if (base.Count >= num)
? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? else
? ? ? ? ? ? ? ? base.Add(value);
? ? ? ? ? ? return true;
? ? ? ? }
? ? }
? ? public new bool Remove(T value)
? ? {
? ? ? ? lock (obj)
? ? ? ? {
? ? ? ? ? ? base.Remove(value);
? ? ? ? ? ? return true;
? ? ? ? }
? ? }
? ? public new T[] ToArray()?
? ? {
? ? ? ? lock (obj)
? ? ? ? {
? ? ? ? ? ? return base.ToArray();
? ? ? ? }
? ? }
}
最后就是線程的運行方法:
public void Run()
?{
? ? ?ThreadPool.QueueUserWorkItem(c =>
? ? ?{
? ? ? ? ?while (true)
? ? ? ? ?{
? ? ? ? ? ? ?if (dic.Count > 0)
? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ?foreach (var item in dic.ToArray())
? ? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ? ?ConcurrentList<DateTime> list = item.Value;
? ? ? ? ? ? ? ? ? ? ?foreach (DateTime dt in list.ToArray()) ??
? ? ? ? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ? ? ? ?if (DateTime.Now.AddSeconds(-3) > dt)
? ? ? ? ? ? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?list.Remove(dt);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Console.WriteLine("容器" + seat + " 已刪除用戶" + item.Key + "管道中的一條數據");
? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ?if (list.Count == 0)
? ? ? ? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ? ? ? ?obj.EnterWriteLock();
? ? ? ? ? ? ? ? ? ? ? ? ?try
? ? ? ? ? ? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if (list.Count == 0)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if (dic.TryRemove(item.Key, out ConcurrentList<DateTime> i))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?{ Console.WriteLine("容器" + seat + " 已清除用戶" + item.Key + "的List管道"); }
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ? ?finally
? ? ? ? ? ? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?obj.ExitWriteLock();
? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ?}
? ? ? ? ? ? ?else
? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ?Thread.Sleep(100);
? ? ? ? ? ? ?}
? ? ? ? ?}
? ? ?}
? ?);
?}
最后,是效果圖,一個是基于控制臺的,還一個是基于Signalr的。
?
分布式下Redis
上面介紹了一種頻率限制的模型,分布式與單機相比,無非就是載體不同,我們只要把這個容器的載體從程序上移植出來,來弄成一個單獨的服務或者直接借用Redis也是可行的。
這里就介紹分布式情況下,Redis的實現。
不同于Asp.Net的多線程模型,大概因為Redis的各種類型的元素非常粒度的操作導致各種加鎖的復雜性,所以在網絡請求處理這塊Redis是單線程的,基于Redis的實現則因為單線程的緣故在編碼角度不用太多考慮到與邏輯無關的問題。
簡單介紹下,Redis是一個內存數據庫,這個數據庫屬于非關系型數據庫,它的概念不同于一般的我們認知的Mysql Oracle SqlServer關系型數據庫,它沒有Sql沒有字段名沒有表名這些概念,它和HttpRunTime.Cache的概念差不多一樣,首先從操作上屬于鍵值對模式,就如 Cache["鍵名"] 這樣就能獲取到值類似,而且可以對每個Key設置過期策略,而Redis中的Key所對應的值并不是想存啥就存啥的,它支持五種數據類型:string(字符串),hash(哈希),list(列表),set(集合)及sorted set(有序集合)。
今天要說的是Sorted set有序集合,有序集合相比其它的集合類型的特殊點在于,使用有序集合的時候還能給插入的元素指定一個 積分score,我們把這個積分score理解為排序列,它內部會對積分進行排序,積分允許重復,而有序集合中的元素則是唯一。
還是同樣的思路,每當有用戶訪問的時候,都對該用戶的 管道(有序集合)中添加一個元素,然后設置該元素的積分為當前時間。接著在程序中開個線程,來對管道中積分小于約定時間的元素進行清理。因為規定有序集合中的元素只能是唯一值,所以在賦值方面只要是滿足uuid即可。
?
那么用Redis來實現的代碼那就是類似這種:
通過using語法糖實現IDisposable而包裝的Redis分布式鎖,然后里面正常的邏輯判斷。
這樣的代碼雖然也能完成功能,但不夠友好。Redis是個基于內存的數據庫,于性能而言,瓶頸在于網絡 IO 上,與Get一次發出一次請求相比,能不能通過一段腳本來實現大部分邏輯呢?
有的,Redis支持?Lua腳本:
Lua 是一種輕量小巧的腳本語言,用標準C語言編寫并以源代碼形式開放, 其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。
大致意思就是,直接向Redis發送一段腳本或者讓它直接本地讀取一段腳本從而直接實現所有的邏輯。
/// <summary>
/// 如果 大于10(AccountNum) 就返回1 ? 否則就增加一條集合中的元素 并返回 空
/// </summary>
/// <param name="zcardKey"></param>
/// <param name="score"></param>
/// <param name="zcardValue"></param>
/// <param name="AccountNum"></param>
/// <returns></returns>
public string LuaAddAccoundSorted(string zcardKey, double score, string zcardValue, int AccountNum)
{
? ? string str = "local uu = redis.call('zcard',@zcardKey) if (uu >=tonumber(@AccountNum)) then return 1 else redis.call('zadd',@zcardKey,@score,@zcardValue) ?end";
? ? var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str), new { zcardKey = zcardKey, score = score, zcardValue = zcardValue, AccountNum=AccountNum });
? ? return re.ToString();
}
ocal uu就是申明一個為名uu的變量的意思,redis.call就是redis命令,這段腳本意思就是如果 大于10(AccountNum) 就返回1? ?否則就增加一條集合中的元素 并返回 空。
管道內元素處理的方法就是:
/// <summary>
?/// 遍歷當前所有前綴的有序集合,如果數量為0,那么就返回1 否則 就刪除 滿足最大分值條件區間的元素,如果該集合個數為0則消失
?/// </summary>
?/// <param name="zcardPrefix"></param>
?/// <param name="score"></param>
?/// <returns></returns>
public string LuaForeachRemove(string zcardPrefix, double score)
?{
? ? ?StringBuilder str = new StringBuilder();
? ? ?str.Append("local uu = redis.call('keys',@zcardPrefix) "); //聲明一個變量 去獲取 模糊查詢的結果集合
? ? ?str.Append("if(#uu==0) then"); ? ?//如果集合長度=0
? ? ?str.Append(" ? return 1 ");
? ? ?str.Append("else ");
? ? ?str.Append(" ? for i=1,#uu do "); ? //遍歷
? ? ?str.Append(" ? ? ? redis.call('ZREMRANGEBYSCORE',uu[i],0,@score) "); ?//刪除從0 到 該score 積分區間的元素
? ? ?str.Append(" ? ? ? if(redis.call('zcard',uu[i])==0) then "); ?//如果管道長度=0
? ? ?str.Append(" ? ? ? ? ? redis.call('del',uu[i]) "); ? //刪除
? ? ?str.Append(" ? ? ? end ");
? ? ?str.Append(" ? end ");
? ? ?str.Append("end ");
? ? ?var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str.ToString()), new { zcardPrefix = zcardPrefix + "*", score = score });
? ? ?return re.ToString();
這2段代碼通過發送Lua腳本的形式來完成了整個過程,因為Redis的網絡模型原因,所以把LuaForeachRemove方法給提出來做個服務來單獨處理即可。至于那種多容器多線程的實現,則完全可以開多個Redis的實例來實現。最后放上效果圖。
最后,我把這些都給做成了個Demo。我喜歡和我一樣的人交朋友,不被環境影響,自己是自己的老師,歡迎加群 .Net web交流群?166843154
原文地址:http://www.cnblogs.com/1996V/p/8127576.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結
以上是生活随笔為你收集整理的.Net 如何模拟会话级别的信号量,对http接口调用频率进行限制(有demo)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: g4e基础篇#1 为什么要使用版本控制系
- 下一篇: Quartz.NET 3.0 正式发布