日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > asp.net >内容正文

asp.net

使用高性能Pipelines构建.NET通讯程序

發布時間:2023/12/4 asp.net 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用高性能Pipelines构建.NET通讯程序 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

.NET Standard支持一組新的API,System.Span, System.Memory,還有System.IO.Pipelines。這幾個新的API極大了提升了.NET程序的效能,將來.NET很多基礎API都會使用它們進行重寫。

Pipelines旨在解決.NET編寫Socket通信程序時的很多困難,相信讀者也對此不勝其煩,使用stream模型進行編程,就算能夠解決,也是實在麻煩。

System.IO.Pipelines使用簡單的內存片段來管理數據,可以極大的簡化編寫程序的過程。關于Pipelines的詳細介紹,可以看看這里。現在ASP.NET Core中使用的Kestrel已經在使用這個API。(話說這個東西貌似就是Kestrel團隊搞出來的。)

可能是直接需要用Socket場景有限(物聯網用的還挺多的),Pipelines相關的資料感覺不是很多。官方給出的示例是基于ASCII協議的,有固定結尾的協議,這里我以物聯網設備常用的BINARY二進制自定義協議為例,講解基于Pipelines的程序套路。

與基于Stream的方式不同,pipelines提供一個pipe,用于存儲數據,pipe中間存儲的數據有點鏈表的感覺,可以基于SequencePosition進行slice操作,這樣就能得到一個ReadOnlySequence<T>對象。reader可以進行自定義操作,并在操作完成之后告訴pipe已經處理了多少數據,整個過程是不需要進行內存復制操作的,因此性能得到了提升,還少了很多麻煩。可以簡單理解作為服務器端,流程:

接受數據循環:接到數據->放pipe里面->告訴pipe放了多少數據
處理數據循環:在pipe里面找一條完整數據->交給處理流程->告訴pipe處理了多少數據

有一款設備,binary協議,數據包開頭0x75, 0xbd, 0x7e, 0x97一共4個字節,隨后跟數據包長度2個字節(固定2400字節,不固定長度也可以參照),隨后是數據區。在設備連接成功之后,數據主動從設備發送到PC。

雖然是.NET Core平臺的,但是.NET FRAMEWORK 4.6.1上面也可以nuget安裝,直接

install-package system.io.pipelines

進行安裝就可以了。Socket相關處理的代碼不再寫了,只列關鍵的。

代碼第一步是聲明pipe。

private async void InitPipe(Socket socket)
{
Pipe pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(socket, pipe.Reader);

await Task.WhenAll(reading, writing);
}

pipe有reader還有一個writer,reader負責讀取pipe數據,主要用在數據處理循環,writer負責將數據寫入pipe,主要用在數據接受循環。


private async Task FillPipeAsync(Socket socket, PipeWriter writer)
{

const int minimumBufferSize = 1024 * 1024;

while (running)
{
try
{

Memory<byte> memory = writer.GetMemory(minimumBufferSize);


if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory, out ArraySegment<byte> arraySegment))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}

int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket, arraySegment, SocketFlags.None);
if (bytesRead == 0)
{
break;
}


writer.Advance(bytesRead);
}
catch
{
break;
}


FlushResult result = await writer.FlushAsync();

if (result.IsCompleted)
{
break;
}
}


writer.Complete();
}


private async Task ReadPipeAsync(Socket socket, PipeReader reader)
{
while (running)
{

ReadResult result = await reader.ReadAsync();

ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;

do
{

position = buffer.PositionOf((byte)0x75);
if (position != null)
{


var headtoCheck = buffer.Slice(position.Value, 4).ToArray();

if (headtoCheck.SequenceEqual(new byte[] { 0x75, 0xbd, 0x7e, 0x97 }))
{

if (buffer.Slice(position.Value).Length >= 2400)
{

var mes = buffer.Slice(position.Value, 2400);

await ProcessMessage(mes.ToArray());

var next = buffer.GetPosition(2400, position.Value);

buffer = buffer.Slice(next);
}
else
{

break;
}
}
else
{

var next = buffer.GetPosition(1, position.Value);
buffer = buffer.Slice(next);
}
}
}
while (position != null);


reader.AdvanceTo(buffer.Start, buffer.End);

if (result.IsCompleted)
{
break;
}
}

reader.Complete();
}

以上代碼基本解決了以下問題:

  • 數據接收不完整,找不到開頭結尾,導致數據大量丟棄,或者自己維護一個queue的代碼復雜性

  • 數據接收與處理的同步問題

  • 一次性收到多條數據的情況

本文只是解釋了pipeline處理的模式,對于茫茫多的ToArray方法,可以使用基于Span的操作進行優化(有時間就來填坑)。另外,如果在await ProcessMessage(mes.ToArray());這里,直接使用Task.Run(()=>ProcessMessage(mes);代替的話,實測會出現莫名其妙的問題,很有可能是pipe運行快,在系統調度Task之前,已經將內存釋放導致的,如果需要優化這一塊的話,需要格外注意。

原文地址:https://www.cnblogs.com/podolski/p/10807204.html

.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?

總結

以上是生活随笔為你收集整理的使用高性能Pipelines构建.NET通讯程序的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。