日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SequoiaDB 系列之六 :源码分析之coord节点

發布時間:2023/12/20 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SequoiaDB 系列之六 :源码分析之coord节点 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

好久不見。

?

在上一篇SequoiaDB 系列之五?? :源碼分析之main函數,有講述進程開始運行時,會根據自身的角色,來初始化不同的CB(控制塊,control block)。

在之前的一篇SequoiaDB 系列之四?? :架構簡析中,我們簡單過了一遍SequoiaDB的架構和各個節點的角色。

今天來看看SequoiaDB的coord角色。

首先,需要有個大致的輪廓:

coord節點主要承擔代理的角色。作為SequoiaDB集群對外的接頭人,它轉發消息給其它節點,組合(combine)不同節點返回的數據,把結果返回給client。

catalog節點主要存儲meta數據,比如集群中有哪些組,每個組的狀態;每個組上有哪些節點,有哪些集合(Collection),哪些集合是主子表等等。

data節點主要是管理存儲的數據,它接受coord轉發過來的CRUD等操作,并記錄同步日志(最終一致性)。

?

在注冊CB的函數中:

void _pmdController::registerCB( SDB_ROLE dbrole ) {if ( SDB_ROLE_DATA == dbrole ){...}else if ( SDB_ROLE_COORD == dbrole ){PMD_REGISTER_CB( sdbGetTransCB() ) ; // TRANSPMD_REGISTER_CB( sdbGetCoordCB() ) ; // COORDPMD_REGISTER_CB( sdbGetFMPCB () ) ; // FMP}...// 每個節點都會注冊的控制塊PMD_REGISTER_CB( sdbGetDMSCB() ) ; // DMSPMD_REGISTER_CB( sdbGetRTNCB() ) ; // RTNPMD_REGISTER_CB( sdbGetSQLCB() ) ; // SQLPMD_REGISTER_CB( sdbGetAggrCB() ) ; // AGGRPMD_REGISTER_CB( sdbGetPMDController() ) ; // CONTROLLER }

?coord注冊這幾個CB之后,就開始注冊和啟動服務:

具體函數在_KRCB::init()中,不再表述。_KRCB::init()會根據節點的角色,啟動不同的服務。

客戶端連接到coord,coord便會啟動一個線程,為該連接服務。

1 INT32 pmdTcpListenerEntryPoint ( pmdEDUCB *cb, void *pData ) 2 { 3 ... 4 5 while ( !cb->isDisconnected() && !pListerner->isClosed() ) 6 { 7 SOCKET s ; 8 rc = pListerner->accept ( &s, NULL, NULL ) ; 9 if ( SDB_TIMEOUT == rc || SDB_TOO_MANY_OPEN_FD == rc ) 10 { 11 rc = SDB_OK ; 12 continue ; 13 } 14 if ( rc && PMD_IS_DB_DOWN ) 15 { 16 rc = SDB_OK ; 17 goto done ; 18 } 19 else if ( rc ) 20 { 21 PD_LOG ( PDERROR, "Failed to accept socket in TcpListener(rc=%d)", 22 rc ) ; 23 if ( pListerner->isClosed() ) 24 { 25 break ; 26 } 27 else 28 { 29 continue ; 30 } 31 } 32 33 cb->incEventCount() ; 34 ++mondbcb->numConnects ; 35 void *pData = NULL ; 36 *((SOCKET *) &pData) = s ; 37 if ( !krcb->isActive() ) 38 { 39 ossSocket newsock ( &s ) ; 40 newsock.close () ; 41 continue ; 42 } 43 44 rc = eduMgr->startEDU ( EDU_TYPE_AGENT, pData, &agentEDU ) ; 45 if ( rc ) 46 { 47 PD_LOG( ( rc == SDB_QUIESCED ? PDWARNING : PDERROR ), 48 "Failed to start edu, rc: %d", rc ) ; 49 ossSocket newsock ( &s ) ; 50 newsock.close () ; 51 continue ; 52 } 53 } //while ( ! cb->isDisconnected() ) 54 55 ... 56 }

服務線程監聽到client的連接,啟動一個EDU_TYPE_AGENT類型的線程,單獨為client服務。

?

下面講述coord節點的最主要的功能——消息轉發

coord的啟動初,會初始化一些必要的全局變量。在SequoiaDB中,會初始化很多command,拿創建集合空間來說,在文件SequoiaDB/engine/rtn/rtnCoord.cpp 中

1 RTN_COORD_CMD_BEGIN 2 ... 3 4 RTN_COORD_CMD_ADD( COORD_CMD_LISTCOLLECTIONSPACES, rtnCoordCMDListCollectionSpace ) 5 6 ... 7 RTN_COORD_OP_END

?嗯,上面的代碼有點MFC中消息映射的感覺。

來看看 RTN_COORD_CMD_ADD 宏:

1 #define RTN_COORD_CMD_ADD( cmdName, cmdClass ) {\ 2 rtnCoordCommand *pObj = SDB_OSS_NEW cmdClass();\ 3 _cmdMap.insert ( COORD_CMD_MAP::value_type (cmdName, pObj ));}

宏主要是new一個對象,再把對象插入到_cmdMap中,這樣在程序初始化時候,便會有一系列的command對象存儲在_cmdMap中。

另外,對SequoiaDB而言,所有的command操作,都是在查詢操作的基礎上做的,服務端用一些方法區別開是真正的查詢,還是command。SequoiaDB的命令,是以$開頭的字符串。

前提簡述完畢,現在假設client連接上了coord,coord也創建了一個線程,為這個client服務。

1 INT32 _pmdLocalSession::run() 2 { 3 INT32 rc = SDB_OK ; 4 UINT32 msgSize = 0 ; 5 CHAR *pBuff = NULL ; 6 INT32 buffSize = 0 ; 7 pmdEDUMgr *pmdEDUMgr = NULL ; 8 9 if ( !_pEDUCB ) 10 { 11 rc = SDB_SYS ; 12 goto error ; 13 } 14 15 pmdEDUMgr = _pEDUCB->getEDUMgr() ; 16 17 while ( !_pEDUCB->isDisconnected() && !_socket.isClosed() ) 18 { 19 _pEDUCB->resetInterrupt() ; 20 _pEDUCB->resetInfo( EDU_INFO_ERROR ) ; 21 _pEDUCB->resetLsn() ; 22 23 rc = recvData( (CHAR*)&msgSize, sizeof(UINT32) ) ; // 收取數據包的前四個字節,代表該數據包有多大 24 if ( rc ) 25 { 26 if ( SDB_APP_FORCED != rc ) 27 { 28 PD_LOG( PDERROR, "Session[%s] failed to recv msg size, " 29 "rc: %d", sessionName(), rc ) ; 30 } 31 break ; 32 } 33 34 if ( msgSize == (UINT32)MSG_SYSTEM_INFO_LEN ) // 如果包長度是 MSG_SYSTEM_INFO_LEN (-1),則這是一個請求系統信息包,coord會返回本機的字節序列給client 35 { // 每個連接的第一個包,一定是長度標記為 MSG_SYSTEM_INFO_LEN 的包,否則字節序不正確,所有的數據都不能保證能正確解析(萬一數據庫運行在大端機上呢) 36 rc = _recvSysInfoMsg( msgSize, &pBuff, buffSize ) ; 37 if ( rc ) 38 { 39 break ; 40 } 41 rc = _processSysInfoRequest( pBuff ) ; 42 if ( rc ) 43 { 44 break ; 45 } 46 47 _setHandshakeReceived() ; 48 } 49 else if ( msgSize < sizeof(MsgHeader) || msgSize > SDB_MAX_MSG_LENGTH ) // 對包的大小做出了限制,包長超過某值或者小于某值的包,都會導致連接中斷 50 { 51 PD_LOG( PDERROR, "Session[%s] recv msg size[%d] is less than " 52 "MsgHeader size[%d] or more than max msg size[%d]", 53 sessionName(), msgSize, sizeof(MsgHeader), 54 SDB_MAX_MSG_LENGTH ) ; 55 rc = SDB_INVALIDARG ; 56 break ; 57 } 58 else 59 { 60 pBuff = getBuff( msgSize + 1 ) ; 61 if ( !pBuff ) 62 { 63 rc = SDB_OOM ; 64 break ; 65 } 66 buffSize = getBuffLen() ; 67 *(UINT32*)pBuff = msgSize ; 68 rc = recvData( pBuff + sizeof(UINT32), 69 msgSize - sizeof(UINT32), 70 PMD_RECV_DATA_AFTER_LENGTH_TIMEOUT ) ; // 到此處,說明程序可以愉快的接受client的發送的數據包了 71 if ( rc ) 72 { 73 if ( SDB_APP_FORCED != rc ) 74 { 75 PD_LOG( PDERROR, "Session[%s] failed to recv msg[len: %u], " 76 "rc: %d", sessionName(), msgSize - sizeof(UINT32), 77 rc ) ; 78 } 79 break ; 80 } 81 82 _pEDUCB->incEventCount() ; 83 pBuff[ msgSize ] = 0 ; 84 if ( SDB_OK != ( rc = pmdEDUMgr->activateEDU( _pEDUCB ) ) ) 85 { 86 PD_LOG( PDERROR, "Session[%s] activate edu failed, rc: %d", 87 sessionName(), rc ) ; 88 break ; 89 } 90 rc = _processMsg( (MsgHeader*)pBuff ) ; // 收到數據包,開始處理,該函數在結合代碼講解 91 if ( rc ) 92 { 93 break ; 94 } 95 if ( SDB_OK != ( rc = pmdEDUMgr->waitEDU( _pEDUCB ) ) ) 96 { 97 PD_LOG( PDERROR, "Session[%s] wait edu failed, rc: %d", 98 sessionName(), rc ) ; 99 break ; 100 } 101 } 102 } // end while 103 104 done: 105 disconnect() ; 106 return rc ; 107 error: 108 goto done ; 109 }

?_processMsg方法:

1 INT32 _pmdLocalSession::_processMsg( MsgHeader * msg ) 2 { 3 INT32 rc = SDB_OK ; 4 const CHAR *pBody = NULL ; 5 INT32 bodyLen = 0 ; 6 rtnContextBuf contextBuff ; 7 INT32 opCode = msg->opCode ; 8 9 rc = _onMsgBegin( msg ) ; // 對數據包做前期處理,例如改數據包是不是需要返回,(若出錯)需不需要回滾,并初始化好回復的數據包頭部 10 if ( SDB_OK == rc ) 11 { 12 rc = _processor->processMsg( msg, contextBuff, // 我是項目經理,這個包就交給processor處理去吧,我要的是結果。 13 _replyHeader.contextID, // processor在不同的節點中,指向不同的對象(咦,這不是多態么?),因此也有不同的處理方式 14 _needReply ) ; 15 pBody = contextBuff.data() ; // pBody指向要返回的數據,避免拷貝(提高執行效率) 16 bodyLen = contextBuff.size() ; // 數據長度,不表 17 _replyHeader.numReturned = contextBuff.recordNum() ; // 返回的數據共有多少條記錄 18 _replyHeader.startFrom = (INT32)contextBuff.getStartFrom() ; // 應該從哪一條開始讀 19 if ( SDB_OK != rc ) 20 { 21 if ( _needRollback ) // 當執行過程中例如(insert, delete等),出錯了,需要把數據復原 22 { 23 INT32 rcTmp = rtnTransRollback( eduCB(), getDPSCB() ) ; 24 if ( rcTmp ) 25 { 26 PD_LOG( PDERROR, "Session[%s] failed to rollback trans " 27 "info, rc: %d", sessionName(), rcTmp ) ; 28 } 29 _needRollback = FALSE ; 30 } 31 } 32 } 33 34 if ( _needReply ) // 需要回復,那就再處理一下把 35 { 36 if ( rc && bodyLen == 0 ) // 執行過程出錯,那就返回出錯信息 37 { 38 _errorInfo = utilGetErrorBson( rc, _pEDUCB->getInfo( 39 EDU_INFO_ERROR ) ) ; 40 pBody = _errorInfo.objdata() ; 41 bodyLen = (INT32)_errorInfo.objsize() ; 42 _replyHeader.numReturned = 1 ; 43 } 44 _replyHeader.header.opCode = MAKE_REPLY_TYPE(opCode) ; // 填充回復數據包中的字段 45 _replyHeader.flags = rc ; 46 _replyHeader.header.messageLength = sizeof( _replyHeader ) + 47 bodyLen ; 48 49 INT32 rcTmp = _reply( &_replyHeader, pBody, bodyLen ) ; // 把包發送給client 50 if ( rcTmp ) 51 { 52 PD_LOG( PDERROR, "Session[%s] failed to send response, rc: %d", 53 sessionName(), rcTmp ) ; 54 disconnect() ; 55 } 56 } 57 58 _onMsgEnd( rc, msg ) ; 59 rc = SDB_OK ; 60 61 return rc ; 62 }

?coord節點上的processor,是pmdCoordProcessor的一個實例,是用來做數據轉發的,不同于真正做數據處理的pmdDataProcessor。

1 INT32 _pmdCoordProcessor::processMsg( MsgHeader *msg, 2 rtnContextBuf &contextBuff, 3 INT64 &contextID, 4 BOOLEAN &needReply ) 5 { 6 ... 7 8 rc = _processCoordMsg( msg, _replyHeader, contextBuff ) ; // 轉給另一個函數(_processCoordMsg)處理,下面講述 9 if ( SDB_COORD_UNKNOWN_OP_REQ == rc ) 10 { 11 contextBuff.release() ; 12 rc = _pmdDataProcessor::processMsg( msg, contextBuff, // 如果上一個函數處理后,返回的錯誤是一個 SDB_COORD_UNKNOWN_OP_REQ類型,則交給pmdDataProcessor處理 13 contextID, needReply ) ; 14 } 15 ... 16 }

?pmdCoordProcessor的處理過程

1 INT32 _pmdCoordProcessor::_processCoordMsg( MsgHeader *msg, 2 MsgOpReply &replyHeader, 3 rtnContextBuf &contextBuff ) 4 { 5 INT32 rc = SDB_OK ; 6 if ( NULL != _pErrorObj ) 7 { 8 SDB_OSS_DEL _pErrorObj ; 9 _pErrorObj = NULL ; 10 } 11 if ( NULL != _pResultBuff ) 12 { 13 _pResultBuff = NULL ; 14 } 15 CoordCB *pCoordcb = _pKrcb->getCoordCB(); 16 rtnCoordProcesserFactory *pProcesserFactory 17 = pCoordcb->getProcesserFactory(); 18 19 if ( MSG_AUTH_VERIFY_REQ == msg->opCode ) 20 { 21 rc = SDB_COORD_UNKNOWN_OP_REQ ; 22 goto done ; 23 } 24 else if ( MSG_BS_INTERRUPTE == msg->opCode || 25 MSG_BS_INTERRUPTE_SELF == msg->opCode || 26 MSG_BS_DISCONNECT == msg->opCode ) 27 { 28 } 29 else if ( !getClient()->isAuthed() ) // 沒有用用戶和密碼登錄,就收到了數據包的,就先嘗試用默認的用戶名和密碼,先取得數據庫的授權,否則無法做操作 30 { 31 rc = getClient()->authenticate( "", "" ) ; 32 if ( rc ) 33 { 34 goto done ; 35 } 36 } 37 38 switch ( msg->opCode ) // 開始檢查client要做什么樣的操作了 39 { 40 case MSG_BS_GETMORE_REQ : // get more操作,coord不做處理,先標記成 SDB_COORD_UNKNOWN_OP_REQ,交給其它地方處理 41 rc = SDB_COORD_UNKNOWN_OP_REQ ; 42 break ; 43 case MSG_BS_QUERY_REQ: // 查詢操作,這個是重點。所有的command 44 { 45 MsgOpQuery *pQueryMsg = ( MsgOpQuery * )msg ; 46 CHAR *pQueryName = pQueryMsg->name ; 47 SINT32 queryNameLen = pQueryMsg->nameLength ; 48 if ( queryNameLen > 0 && '$' == pQueryName[0] ) // 如果查詢的name字段,是用$開頭的字符串,則認為這個是command,要走command處理 49 { 50 rtnCoordCommand *pCmdProcesser = 51 pProcesserFactory->getCommandProcesser( pQueryMsg ) ; // 找到command的對象,上文中有描述所有的command都在初始化的時候,存入_cmdMap中 52 if ( NULL != pCmdProcesser ) 53 { 54 rc = pCmdProcesser->execute( ( CHAR *)msg, // 找到了,就開始command處理了 55 msg->messageLength, 56 eduCB(), 57 replyHeader, 58 &contextBuff ) ; 59 break ; 60 } 61 } 62 // 如果沒有找到,則走入 default代碼塊 63 } 64 default: 65 { 66 rtnContextBase *pContext = NULL ; 67 rtnCoordOperator *pOperator = 68 pProcesserFactory->getOperator( msg->opCode ) ; // 交給operator處理,operator是類似于command的幾個特殊的處理對象,數量比較少,此處不表 69 rc = pOperator->execute( ( CHAR* )msg, // 轉發給對應的operator類實例 70 msg->messageLength, 71 eduCB(), 72 replyHeader, 73 &contextBuff ) ; 74 ... 75 } 76 }

?以創建集合空間的command為例,看看 rtnCoordCMDListCollectionSpace 的 execute做了什么:

INT32 rtnCoordCMDCreateCollectionSpace::execute( CHAR *pReceiveBuffer,SINT32 packSize,pmdEDUCB *cb,MsgOpReply &replyHeader,rtnContextBuf *buf ){...MsgOpQuery *pCreateReq = (MsgOpQuery *)pReceiveBuffer; // 構造一個 MSG_CAT_CREATE_COLLECTION_SPACE_REQ 的數據包pCreateReq->header.routeID.value = 0;pCreateReq->header.TID = cb->getTID();pCreateReq->header.opCode = MSG_CAT_CREATE_COLLECTION_SPACE_REQ; // 數據包的類型 rc = executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent,cb, NULL, NULL ) ;if ( rc ){PD_LOG ( PDERROR, "create collectionspace failed, rc = %d", rc ) ;goto error ;}done :replyHeader.flags = rc ;PD_TRACE_EXITRC ( SDB_RTNCOCMDCRCS_EXE, rc ) ;return rc;error :goto done ;}

?該函數的主體,構造了另外一個數據包,然后執行 executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent, cb, NULL, NULL ) ;這一句上。跟進這一函數:

1 INT32 rtnCoordCommand::executeOnCataGroup ( CHAR *pBuffer, 2 netMultiRouteAgent *pRouteAgent, 3 pmdEDUCB *cb, 4 rtnContextCoord *pContext, 5 CoordGroupList *pGroupList, 6 std::vector<BSONObj> *pReplyObjs ) 7 { 8 INT32 rc = SDB_OK; 9 ... 10 retry : 11 rc = rtnCoordGetCatGroupInfo( cb, isNeedRefresh, catGroupInfo ); // 查詢catalog的信息,主要是獲取到catalog組的主節點的服務地址 12 if ( rc ) 13 { 14 probe = 100 ; 15 goto error ; 16 PD_LOG ( PDERROR, "Execute on catalogue node failed, failed to get " 17 "catalogue group info(rc=%d)", rc ); 18 } 19 rc = rtnCoordSendRequestToPrimary( pBuffer, catGroupInfo, sendNodes, // 跟了這么久,做了那么多的準備,這一句才是真開始了,有興趣可以自己看一下 :) 20 pRouteAgent, MSG_ROUTE_CAT_SERVICE, 21 cb ); 22 if ( rc ) 23 { 24 probe = 200 ; 25 goto error ; 26 } 27 rc = rtnCoordGetReply( cb, sendNodes, replyQue, // 等待并收取遠程節點處理的返回信息 28 MAKE_REPLY_TYPE(((MsgHeader*)pBuffer)->opCode) ) ; 29 ... 30 }

?

?rtnCoordSendRequestToPrimary就不再詳細跟進描述了,根據函數名,大致就可以了解一個大概,是把數據發送到指定組(此處是catalog組)的主節點。

coord上的其它command或者operator也是采用類似的方法來轉發消息給其它節點,就不再一一贅述了。

?

綜合全文的講述,coord處理client請求的流程

發送請求給coord節點

?? coord先揪出這個請求是做什么

????? 交給對應的command處理

??????? ?查詢(本地緩存或者遠程獲取的)catalog信息

???????? 把消息轉成節點間的內部消息

???????? 轉發給目標節點

???????? 然后等待返回數據

???? 再把返回數據交給處理線程

線程把返回結果發送給client

?

=====>THE END<=====?

?

轉載于:https://www.cnblogs.com/tynia/p/coord.html

總結

以上是生活随笔為你收集整理的SequoiaDB 系列之六 :源码分析之coord节点的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。