进程间通信:同步双工管道
? ? ? ? 因為工作需要,需要設計出一個雙工的IPC。(轉載請指明出處)在一番比較后,我發現管道是比較符合我們的需求的。但是我們需求要求管道的對方是可信任的,而在vista以下系統是沒有GetNamedPipeClientProcessId、GetNamedPipeClientSessionId、GetNamedPipeServerProcessId、
? ? ? ??GetNamedPipeServerSessionId這樣的函數,于是是否可信這個操作就要求由客戶端和服務端兩方互檢來完成,至于互檢的思路,我會在之后管道的加強版中給出思路和例子。而本文只是簡單介紹一個同步雙工管道。
? ? ? ? 在工作中寫的管道模型中,服務端每次被連接上,都會啟動一個連接實例(線程)。于是如果存在多個客戶端接入的情況下,將啟動多個線程。這樣的模型比較簡單,但是效率存在問題。這些天我參考了微軟的例子,重寫了管道模型。服務端只啟動一個線程,利用該線程的APC完成所有連接的讀寫操作。因為是同步雙工,所以我設計的模型是不停的一問一答。當有消息要發向對方時,只需要向“問”列表中插入消息,底層會將這條消息發往對方;如果“問”表中不存數據,則發一條垃圾消息,對方在接受到這條消息后不做任何處理。這樣的設計也就是為了維持管道暢通,不因一個環節卡住導致其他操作不可完成。
? ? ? ? 對于管道模型,我設計成:傳輸層,數據層,邏輯層,應用層四層結構。其中傳輸層只負責管道連接和數據傳輸,不關心數據內容;數據層會將傳輸層所有取到的數據以管道句柄為依據進行分組,同時負責將各個連接要傳給對方的數據匯總供傳輸層使用;邏輯層考慮加入驗證邏輯,即驗證對方是否為可信任,同時為應用層提供方便的調用支持,比如在邏輯層啟動一個線程調用一個應用層設置的回調函數來處理接受到的消息,同時暴露一個發送數據的函數供應用層使用。這樣應用層只要實現處理消息的回調、調用發送數據的接口即可。(工作中設計的管道模型就是這樣子的。因為我準備重寫一個更穩定和高效的管道,目前只大致寫好了傳輸層代碼。)
? ? ? ? 服務端
#include "stdafx.h"
#include "PipeServerInstance.h"
#include <string>
#include <strsafe.h>CPipeServerInstance::CPipeServerInstance()
{m_hPipe = NULL;
}CPipeServerInstance::~CPipeServerInstance()
{}VOID CPipeServerInstance::StartService()
{HANDLE hConnectEvent = NULL; // 創建一個連接等待事件hConnectEvent = CreateEvent( NULL, TRUE, TRUE, NULL );if ( NULL == hConnectEvent ) {// 創建連接事件失敗MYTRACE(L"CreateEvent failed with %d.\n", GetLastError()); return;}OVERLAPPED oConnect;::ZeroMemory( &oConnect, sizeof(OVERLAPPED) );oConnect.hEvent = hConnectEvent; BOOL bPendingIO = FALSE;// 創建一個管道實例并等待客戶端接入bPendingIO = CreateAndConnectInstance( &oConnect ); // 等待事件的返回值DWORD dwWait = WAIT_TIMEOUT;while ( TRUE ) { // 等待一個客戶端的接入,或者為了讀寫例程執行dwWait = WaitForSingleObjectEx( hConnectEvent, INFINITE, TRUE );switch ( dwWait ) { case WAIT_OBJECT_0: // 一個客戶端接入if ( FALSE == RunServerInstance( &oConnect, bPendingIO) ){return;}if ( FALSE == bPendingIO ){return;}break; case WAIT_IO_COMPLETION: // 等待事件是由讀寫完成例程觸發的break; default: {// 錯誤MYTRACE(L"WaitForSingleObjectEx (%d)\n", GetLastError()); return ;}} } return;
}BOOL CPipeServerInstance::RunServerInstance( LPOVERLAPPED lpoConnect, BOOL& bPendingIO )
{DWORD cbRet = 0;// 如果一個操作被掛起,則獲取這個連接的結果if ( FALSE != bPendingIO ) { if ( FALSE == GetOverlappedResult( m_hPipe, lpoConnect, &cbRet, FALSE ) ) {MYTRACE(L"ConnectNamedPipe (%d)\n", GetLastError()); return FALSE;}} LPPIPEINST lpPipeInst; // 分配全局固定內存空間用于保存讀寫數據lpPipeInst = (LPPIPEINST) GlobalAlloc( GPTR, sizeof(PIPEINST) ); if ( NULL == lpPipeInst) {MYTRACE(L"GlobalAlloc failed (%d)\n", GetLastError()); return FALSE;}lpPipeInst->hPipeInst = m_hPipe; lpPipeInst->lpPointer = this;DealMsg( lpPipeInst );WriteFileEx( lpPipeInst->hPipeInst, lpPipeInst->cbWrite, lpPipeInst->dwWrite, (LPOVERLAPPED) lpPipeInst, (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedWriteRoutine); // 創建一個新的管道實例去等待下一個客戶端接入bPendingIO = CreateAndConnectInstance( lpoConnect ); return TRUE;
}BOOL CPipeServerInstance::StopService()
{m_CriticalPipeHandle.Lock();for ( VECHANDLEITER it = m_VecPipeHandle.begin(); it != m_VecPipeHandle.end(); it++ ){DisconnectNamedPipe( *it );}m_CriticalPipeHandle.Unlock();return TRUE;
}VOID CPipeServerInstance::DisconnectAndClose( LPPIPEINST lpPipeInst )
{if( NULL == lpPipeInst || NULL == lpPipeInst->hPipeInst ){return;}// 斷開管道連接if ( FALSE == DisconnectNamedPipe( lpPipeInst->hPipeInst ) ) {MYTRACE(L"DisconnectNamedPipe failed with %d.\n", GetLastError());}// 關閉管道CloseHandle( lpPipeInst->hPipeInst ); // 釋放掉全局分配的內存if ( NULL != lpPipeInst ) {GlobalFree(lpPipeInst); }
}BOOL CPipeServerInstance::CreateAndConnectInstance( LPOVERLAPPED lpoOverlap )
{m_hPipe = CreateNamedPipe( PIPENAME, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,PIPE_UNLIMITED_INSTANCES,sizeof(PipeMsgStruct),sizeof(PipeMsgStruct),PIPE_TIMEOUT,NULL );if ( INVALID_HANDLE_VALUE == m_hPipe ) {MYTRACE(L"CreateNamedPipe failed with %d.\n", GetLastError()); return FALSE;}m_CriticalPipeHandle.Lock();m_VecPipeHandle.push_back( m_hPipe );m_CriticalPipeHandle.Unlock();// 啟動一個新的連接等待客戶端接入return ConnectToNewClient( m_hPipe, lpoOverlap );
}BOOL CPipeServerInstance::ConnectToNewClient( HANDLE hPipe, LPOVERLAPPED lpo )
{BOOL fPendingIO = FALSE; DWORD dwLastError = ERROR_SUCCESS;// 異步命名管道連接應該失敗if ( FALSE != ConnectNamedPipe( hPipe, lpo ) ) { MYTRACE( L"ConnectNamedPipe failed with %d.\n", GetLastError() ); return FALSE;}switch ( GetLastError() ) { // If the function fails, the return value is zero // and GetLastError returns a value other than ERROR_IO_PENDING or ERROR_PIPE_CONNECTED.case ERROR_IO_PENDING: {// 正在連接fPendingIO = TRUE; }break; case ERROR_PIPE_CONNECTED: {// If a client connects before the function is called, // the function returns zero and GetLastError returns ERROR_PIPE_CONNECTED. // This can happen if a client connects in the interval // between the call to CreateNamedPipe and the call to ConnectNamedPipe.// In this situation, there is a good connection between client and server, // even though the function returns zero.// 如果客戶端已經連接上,則設置事件if ( SetEvent(lpo->hEvent) ) {break; }}// 這個地方故意不break的,因為SetEvent失敗了default: {MYTRACE(L"ConnectNamedPipe failed with %d.\n", GetLastError());}} return fPendingIO;
}VOID WINAPI CPipeServerInstance::CompletedWriteRoutine( DWORD dwErr, DWORD cbWritten,LPOVERLAPPED lpOverLap )
{LPPIPEINST lpPipeInst = NULL; BOOL fRead = FALSE; // 因為lpOverLap的內存是固定的,而其又是LPPIPEINST類型的第一個元素// 于是,這樣就可以獲得之前分配的LPPIPEINST類型的對象lpPipeInst = (LPPIPEINST) lpOverLap; // 已經異步寫完,于是再異步讀if ( ( 0 == dwErr ) && ( cbWritten == lpPipeInst->dwWrite ) ) fRead = ReadFileEx( lpPipeInst->hPipeInst, lpPipeInst->cbRead, PIPEMSGLENGTH, (LPOVERLAPPED) lpPipeInst, (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedReadRoutine); // 如果寫失敗了,就斷開連接CPipeServerInstance* pThis = static_cast<CPipeServerInstance*> (lpPipeInst->lpPointer);if ( FALSE == fRead && NULL != pThis ) {pThis->DisconnectAndClose( lpPipeInst ); }
}VOID WINAPI CPipeServerInstance::CompletedReadRoutine( DWORD dwErr, DWORD cbBytesRead, LPOVERLAPPED lpOverLap )
{LPPIPEINST lpPipeInst = NULL; BOOL fWrite = FALSE; // 因為lpOverLap的內存是固定的,而其又是LPPIPEINST類型的第一個元素// 于是,這樣就可以獲得之前分配的LPPIPEINST類型的對象lpPipeInst = (LPPIPEINST) lpOverLap; CPipeServerInstance* pThis = static_cast<CPipeServerInstance*> (lpPipeInst->lpPointer);// 已經異步讀完,于是再異步寫if ( 0 == dwErr && 0 != cbBytesRead && NULL != pThis) { pThis->DealMsg( lpPipeInst );fWrite = WriteFileEx( lpPipeInst->hPipeInst, lpPipeInst->cbWrite, lpPipeInst->dwWrite, (LPOVERLAPPED) lpPipeInst, (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedWriteRoutine); }// 如果讀失敗了,就斷開連接if ( FALSE == fWrite && NULL != pThis ) {pThis->DisconnectAndClose(lpPipeInst); }
}VOID CPipeServerInstance::DealMsg( LPPIPEINST pipe )
{MYTRACE( L"[%p] %s\n", pipe->hPipeInst, pipe->cbRead);std::wstring strCmd = L"Default answer from server";StringCchCopy( (WCHAR*)pipe->cbWrite, PIPEMSGLENGTH, strCmd.c_str() );pipe->dwWrite = ( lstrlen( strCmd.c_str() ) + 1 ) * sizeof(TCHAR);
}
? ? ? ? 客戶端
#include "stdafx.h"
#include "PipeClientInstance.h"
#include <string>
#include <strsafe.h>CPipeClientInstance::CPipeClientInstance()
{m_hPipe = NULL;m_hStopEvent = NULL;
}CPipeClientInstance::~CPipeClientInstance()
{}BOOL CPipeClientInstance::StartClient()
{LPPIPEINST lpPipeInst = NULL; // 創建一個退出事件m_hStopEvent = CreateEvent( NULL, FALSE, FALSE, NULL );if ( NULL == m_hStopEvent ) {// 創建退出事件失敗MYTRACE(L"CreateEvent failed with %d.\n", GetLastError()); return FALSE;}// 連接服務器if ( FALSE == ConnectToServer() ){return FALSE;}// 消息循環if ( FALSE == MsgLoop( lpPipeInst ) ){return FALSE;}// 關閉管道清除內存DisconnectAndClose( lpPipeInst );return TRUE;
}BOOL CPipeClientInstance::ConnectToServer()
{while ( WAIT_TIMEOUT == WaitForSingleObjectEx( m_hStopEvent, 10, TRUE ) ) { // 異步的方式打開一個已經存在的管道m_hPipe = CreateFile( PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);if ( INVALID_HANDLE_VALUE != m_hPipe ){// 打開管道成功break; }if ( ERROR_PIPE_BUSY != GetLastError() ) {// 除了發生ERROR_PIPE_BUSY錯誤,否則其他錯誤都認為打開失敗MYTRACE( L"Could not open pipe. GLE=%d\n", GetLastError() ); return FALSE;}// 所有的管道處于“忙”狀態,所以等待20秒if ( FALSE == WaitNamedPipe( PIPENAME, 20000 ) ) { MYTRACE(L"Could not open pipe: 20 second wait timed out."); return FALSE;}} return TRUE;
}BOOL CPipeClientInstance::MsgLoop( LPPIPEINST& lpPipeInst )
{DWORD dwMode = PIPE_READMODE_BYTE; BOOL bSuccess = SetNamedPipeHandleState( m_hPipe, &dwMode, NULL, NULL );if ( FALSE == bSuccess ){return FALSE;}// 分配全局固定內存空間用于保存讀寫數據lpPipeInst = (LPPIPEINST) GlobalAlloc( GPTR, sizeof(PIPEINST) ); if ( NULL == lpPipeInst) {MYTRACE(L"GlobalAlloc failed (%d)\n", GetLastError()); return FALSE;}lpPipeInst->hPipeInst = m_hPipe; lpPipeInst->lpPointer = this;lpPipeInst->dwWrite = 0;m_CriticalSectionQuit.Lock();lpPipeInst->bSuccess = TRUE;m_CriticalSectionQuit.Unlock();CompletedWriteRoutine( 0, 0, (LPOVERLAPPED) lpPipeInst ); DWORD dwWait = WAIT_TIMEOUT;bSuccess = FALSE;BOOL bExit = FALSE;do {bSuccess = FALSE;// 等待退出事件,同時也讓讀寫完成例程執行dwWait = WaitForSingleObjectEx( m_hStopEvent, INFINITE, TRUE );switch( dwWait ){case WAIT_OBJECT_0:{// 等到了終止事件,退出循環bExit = TRUE;}break;case WAIT_IO_COMPLETION:{m_CriticalSectionQuit.Lock();if ( FALSE != lpPipeInst->bSuccess ){// 讀寫完成,繼續循環bSuccess = TRUE;}else{bExit = TRUE;}m_CriticalSectionQuit.Unlock();}break;default:{// 其他類型導致退出,認為是出錯,退出循環bExit = TRUE;}}if ( FALSE != bExit ){break;}} while( FALSE != bSuccess );return TRUE;
}BOOL CPipeClientInstance::StopClient()
{if ( NULL != m_hStopEvent ){::SetEvent( m_hStopEvent );}return TRUE;
}VOID CPipeClientInstance::DisconnectAndClose( LPPIPEINST lpPipeInst )
{if ( NULL != lpPipeInst->hPipeInst ){// 關閉管道CloseHandle( lpPipeInst->hPipeInst ); }// 釋放掉全局分配的內存if ( NULL != lpPipeInst ) {GlobalFree(lpPipeInst); }
}VOID WINAPI CPipeClientInstance::CompletedWriteRoutine( DWORD dwErr, DWORD cbWritten,LPOVERLAPPED lpOverLap )
{LPPIPEINST lpPipeInst = NULL; BOOL fRead = FALSE; // 因為lpOverLap的內存是固定的,而其又是LPPIPEINST類型的第一個元素// 于是,這樣就可以獲得之前分配的LPPIPEINST類型的對象lpPipeInst = (LPPIPEINST) lpOverLap; // 已經異步寫完,于是再異步讀if ( ( 0 == dwErr ) && ( cbWritten == lpPipeInst->dwWrite ) ) fRead = ReadFileEx( lpPipeInst->hPipeInst, lpPipeInst->cbRead, PIPEMSGLENGTH, (LPOVERLAPPED) lpPipeInst, (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedReadRoutine); // 如果寫失敗了,就斷開連接CPipeClientInstance* pThis = static_cast<CPipeClientInstance*> ( lpPipeInst->lpPointer );if ( FALSE == fRead && NULL != pThis ) {pThis->NotifyExit( lpPipeInst ); }
}VOID WINAPI CPipeClientInstance::CompletedReadRoutine( DWORD dwErr, DWORD cbBytesRead, LPOVERLAPPED lpOverLap )
{LPPIPEINST lpPipeInst = NULL; BOOL fWrite = FALSE; // 因為lpOverLap的內存是固定的,而其又是LPPIPEINST類型的第一個元素// 于是,這樣就可以獲得之前分配的LPPIPEINST類型的對象lpPipeInst = (LPPIPEINST) lpOverLap; CPipeClientInstance* pThis = static_cast<CPipeClientInstance*> (lpPipeInst->lpPointer);// 已經異步讀完,于是再異步寫if ( 0 == dwErr && 0 != cbBytesRead && NULL != pThis) { pThis->DealMsg( lpPipeInst );fWrite = WriteFileEx( lpPipeInst->hPipeInst, lpPipeInst->cbWrite, lpPipeInst->dwWrite, (LPOVERLAPPED) lpPipeInst, (LPOVERLAPPED_COMPLETION_ROUTINE) CompletedWriteRoutine); }// 如果讀失敗了,就斷開連接if ( FALSE == fWrite && NULL != pThis ) {pThis->NotifyExit(lpPipeInst); }
}VOID CPipeClientInstance::DealMsg( LPPIPEINST pipe )
{MYTRACE( L"[%p] %s\n", pipe->hPipeInst, pipe->cbRead );std::wstring strCmd = L"Default answer from client";StringCchCopy( (WCHAR*)pipe->cbWrite, PIPEMSGLENGTH, strCmd.c_str() );pipe->dwWrite = ( lstrlen( strCmd.c_str() ) + 1 ) * sizeof(TCHAR);
}VOID CPipeClientInstance::NotifyExit( LPPIPEINST lpPipeInst )
{m_CriticalSectionQuit.Lock();lpPipeInst->bSuccess = FALSE;m_CriticalSectionQuit.Unlock();
}
?
? ? ? ? 這個代碼中的一些值得注意的設計:
- 在寫完成例程中調用異步讀,在讀完成例程中調用異步寫,從而實現同步雙工。(特別注意不要在完成例程中的異步操作后WaitforXXEX,否則會出現嚴重的遞歸問題,最后內存耗盡,程序掛掉)
- 對每一個接入,都分配一個不可移動的內存,其第一個元素設置成OVERLAPPED結構對象,同時讓這個結構對象就是異步操作和完成例程中都會使用的那個參數。如異步操作
BOOL WINAPI ReadFileEx(__in HANDLE hFile,__out_opt LPVOID lpBuffer,__in DWORD nNumberOfBytesToRead,__inout LPOVERLAPPED lpOverlapped,__in_opt LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
? ? ? ? 完成例程
VOID CALLBACK FileIOCompletionRoutine(__in DWORD dwErrorCode,__in DWORD dwNumberOfBytesTransfered,__inout LPOVERLAPPED lpOverlapped
);
? ? ? ? 這樣設計,就可以達到一個很重要的目的:在完成例程中獲取“讀/寫”的數據。
? ? ? ? 對應的工程地址是:CommunicatePipe工程
(轉載請指明出處)
總結
以上是生活随笔為你收集整理的进程间通信:同步双工管道的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python3编写简易统计服务器
- 下一篇: 一种精确从文本中提取URL的思路及实现