OpenDDS
OpenDDS簡(jiǎn)介
Don Busch,首席軟件工程師兼合作伙伴
Object Computing,Inc.(OCI)
介紹
分布式實(shí)時(shí)應(yīng)用程序有時(shí)以數(shù)據(jù)為中心而不是以服務(wù)為中心,這意味著分布式系統(tǒng)中參與者的主要目標(biāo)是分發(fā)應(yīng)用程序數(shù)據(jù),而不是訪問共享服務(wù)。應(yīng)用程序數(shù)據(jù)的提供者和/或使用者的集合在設(shè)計(jì)時(shí)可能是未知的,并且可能會(huì)在應(yīng)用程序的整個(gè)生命周期內(nèi)發(fā)生變化。通常,以發(fā)布/訂閱通信模型而不是請(qǐng)求/響應(yīng)模型最有效地實(shí)現(xiàn)以數(shù)據(jù)為中心的范例。
用于實(shí)時(shí)系統(tǒng)的OMG 數(shù)據(jù)分發(fā)服務(wù)(DDS)解決了以數(shù)據(jù)為中心的分布式應(yīng)用程序的性能要求和實(shí)時(shí)性要求。DDS增加了分布式實(shí)時(shí)系統(tǒng)開發(fā)人員可以使用的發(fā)布/訂閱選項(xiàng)的范圍。為了方便起見,使用OMG接口定義語言(IDL)定義DDS接口。但是,大多數(shù)細(xì)節(jié)留給實(shí)現(xiàn),最重要的是如何在發(fā)布者和訂閱者之間進(jìn)行數(shù)據(jù)傳輸。DDS實(shí)現(xiàn)者決定將底層數(shù)據(jù)通過TCP,UDP,UDP多播,共享內(nèi)存等從發(fā)布者移動(dòng)到訂閱者的基礎(chǔ)通信機(jī)制。使用CORBA或IIOP協(xié)議不需要DDS規(guī)范的實(shí)現(xiàn)。將數(shù)據(jù)從發(fā)布者傳輸?shù)接嗛喺摺?/p>
OpenDDS是OMG數(shù)據(jù)分發(fā)服務(wù)規(guī)范的開源C ++實(shí)現(xiàn)。OpenDDS包含基于文件的配置機(jī)制。通過配置文件,OpenDDS用戶可以配置發(fā)布者或訂閱者的傳輸,調(diào)試輸出,內(nèi)存分配,DCPSInfoRepo代理進(jìn)程的位置以及許多其他設(shè)置。《OpenDDS開發(fā)人員指南》的“配置”一章中介紹了完整的配置設(shè)置集。
在本文中,我們涵蓋以下主題:
OMG DDS的OpenDDS實(shí)現(xiàn)
DDS架構(gòu)
股票報(bào)價(jià)員示例
IDL類型
發(fā)行人
訂戶
訂戶的聽眾
建立發(fā)布者和訂閱者
配置股票報(bào)價(jià)器
通過TCP傳輸運(yùn)行股票報(bào)價(jià)器
通過UDP傳輸運(yùn)行股票報(bào)價(jià)器
摘要
參考資料
OMG DDS的OpenDDS實(shí)現(xiàn)
OpenDDS利用可插拔的傳輸體系結(jié)構(gòu),通過應(yīng)用程序開發(fā)人員選擇的傳輸和封送實(shí)施實(shí)現(xiàn)數(shù)據(jù)傳輸。從概念上講,該體系結(jié)構(gòu)借鑒了TAO的可插入?yún)f(xié)議框架。OpenDDS當(dāng)前支持TCP和UDP點(diǎn)對(duì)點(diǎn)傳輸以及不可靠和可靠的多播,并使用高性能的封送處理實(shí)現(xiàn)。
這種可插拔的傳輸體系結(jié)構(gòu)允許DDS用戶基于所需的傳輸以及應(yīng)用程序部署的同質(zhì)或異質(zhì)性質(zhì)來優(yōu)化DDS安裝。可以做出這些選擇而不會(huì)影響應(yīng)用程序代碼本身。
封送處理代碼由專用的OpenDDS IDL編譯器生成。單個(gè)單獨(dú)的DCPS信息存儲(chǔ)庫(kù)(DCPSInfoRepo)流程充當(dāng)中央票據(jù)交換所,將發(fā)布者和訂閱者聯(lián)系在一起。在幕后,OpenDDS使用CORBA與DCPSInfoRepo流程進(jìn)行通信, 以關(guān)聯(lián)發(fā)布者和訂閱者。發(fā)布者和訂閱者之間的數(shù)據(jù)傳輸直接在發(fā)布和訂閱過程之間進(jìn)行。OpenDDS為RB和在發(fā)送或接收DDS數(shù)據(jù)時(shí)發(fā)生的非CORBA I / O創(chuàng)建自己的線程。
DDS架構(gòu)
OMG數(shù)據(jù)分發(fā)服務(wù)規(guī)范將DDS分為兩個(gè)單獨(dú)的體系結(jié)構(gòu)層。較低的層是以數(shù)據(jù)為中心的發(fā)布和訂閱(DCPS)層,其中包含到發(fā)布/訂閱通信機(jī)制的類型安全接口。上層是數(shù)據(jù)本地重構(gòu)層(DLRL),它使應(yīng)用程序開發(fā)人員可以在DCPS層之上構(gòu)建本地對(duì)象模型,從而使應(yīng)用程序免受DCPS知識(shí)的影響。每一層都有自己的概念和使用模式集,因此可以分別討論這兩層的概念和術(shù)語。
以數(shù)據(jù)為中心的發(fā)布和訂閱-DCPS
DCPS層負(fù)責(zé)有效地將數(shù)據(jù)從發(fā)布者分發(fā)到感興趣的訂閱者。它 在發(fā)送方使用發(fā)布者和數(shù)據(jù)寫入器 ,在接收方使用訂戶和數(shù)據(jù)讀取器的概念來實(shí)現(xiàn)。DCPS層由一個(gè)或多個(gè)數(shù)據(jù)域組成,每個(gè)數(shù)據(jù)域都包含一組通過DDS進(jìn)行通信的參與者(發(fā)布者和訂閱者)。每個(gè)實(shí)體(即發(fā)布者或訂閱者)都屬于一個(gè)域。每個(gè)進(jìn)程在其所屬的每個(gè)數(shù)據(jù)域中都有一個(gè)域參與者。
在任何數(shù)據(jù)域中,數(shù)據(jù)都是由主題標(biāo)識(shí)的,該主題是特定于類型的域段,允許發(fā)布者和訂閱者明確地引用數(shù)據(jù)。在域中,主題將唯一的主題名稱,數(shù)據(jù)類型和一組服務(wù)質(zhì)量(QoS)策略與數(shù)據(jù)本身相關(guān)聯(lián)。每個(gè)主題僅與一種數(shù)據(jù)類型相關(guān)聯(lián),盡管許多不同的主題可以發(fā)布相同的數(shù)據(jù)類型。發(fā)布者的行為由與特定數(shù)據(jù)源的發(fā)布者,數(shù)據(jù)寫入者和主題元素相關(guān)聯(lián)的QoS策略確定。同樣,訂戶的行為由與訂戶,數(shù)據(jù)讀取器和特定數(shù)據(jù)接收器的主題元素相關(guān)聯(lián)的QoS策略確定。
有關(guān)DCPS術(shù)語的更多信息,請(qǐng)參見《OpenDDS開發(fā)人員指南》。
DDS規(guī)范定義了許多服務(wù)質(zhì)量(QoS)策略,應(yīng)用程序可使用這些策略來指定其可靠性,資源使用,容錯(cuò)以及對(duì)服務(wù)的其他要求。參與者指定他們從服務(wù)中需要的行為;服務(wù)決定如何實(shí)現(xiàn)這些行為。這些策略可以應(yīng)用于各種DCPS實(shí)體(主題,數(shù)據(jù)寫入器,數(shù)據(jù)讀取器,發(fā)布者,訂閱者和域參與者),盡管并非所有策略對(duì)所有類型的實(shí)體都有效。
訂閱者和發(fā)布者通過報(bào)價(jià)請(qǐng)求范例協(xié)作指定QoS策略。發(fā)布者向所有訂閱者提供一套QoS策略。訂戶請(qǐng)求其所需的QoS策略集。然后,DDS實(shí)現(xiàn)會(huì)嘗試將請(qǐng)求的策略與提供的策略進(jìn)行匹配。如果策略一致,則發(fā)布和訂閱將匹配。
OpenDDS支持全套DCPS服務(wù)質(zhì)量(QoS)策略,包括:
| 活潑 | 控制活動(dòng)性檢查,以確保系統(tǒng)中預(yù)期的實(shí)體仍處于活動(dòng)狀態(tài) |
| 可靠性 | 確定是否允許該服務(wù)刪除樣本 |
| 歷史 | 控制實(shí)例的值發(fā)生變化,然后將其傳達(dá)給所有訂閱服務(wù)器,該實(shí)例發(fā)生了什么情況 |
| 資源限制 | 控制服務(wù)可用于滿足其他QoS要求的資源 |
有關(guān)服務(wù)質(zhì)量策略的更完整列表和更詳細(xì)的服務(wù)質(zhì)量定義,請(qǐng)參閱對(duì)象管理組的DDS白皮書簡(jiǎn)介附錄A。
數(shù)據(jù)本地重建層-DLRL
數(shù)據(jù)本地重建層(DLRL)是DCPS之上的面向?qū)ο髮印LRL對(duì)象是具有一個(gè)或多個(gè)共享屬性的本機(jī)語言(即C ++)對(duì)象。每個(gè)DLRL類都映射到一個(gè)或多個(gè)DCPS主題。每個(gè)共享屬性值都映射到主題數(shù)據(jù)類型的字段,并且其值通過DCPS分布在整個(gè)應(yīng)用程序中。DLRL參與者通過修改DLRL對(duì)象將數(shù)據(jù)與應(yīng)用程序的其余部分進(jìn)行通信,從而發(fā)布有關(guān)主題的數(shù)據(jù)樣本。DLRL共享屬性可以是簡(jiǎn)單的值或結(jié)構(gòu),對(duì)另一個(gè)DLRL對(duì)象的引用或這些對(duì)象的集合(列表,映射)。DLRL支持復(fù)雜的對(duì)象圖和DLRL對(duì)象之間的復(fù)雜關(guān)系。
開發(fā)人員負(fù)責(zé)確定DCPS實(shí)體如何映射到DLRL對(duì)象。使用IDL值類型在OMG接口定義語言(IDL)中指定模型。映射在概念上類似于對(duì)象關(guān)系數(shù)據(jù)庫(kù)映射,后者將對(duì)象模型映射到關(guān)系數(shù)據(jù)庫(kù)表。我們認(rèn)為每個(gè)DCPS主題都類似于關(guān)系數(shù)據(jù)庫(kù)表,每個(gè)樣本都作為該表中的一行。DDS規(guī)范具有從DCPS到DLRL的默認(rèn)映射。或者,開發(fā)人員可以選擇通過XML映射文件指定自己的自定義映射。
OpenDDS當(dāng)前未實(shí)現(xiàn)DLRL。
目錄
OpenDDS股票報(bào)價(jià)器示例
我們的示例說明了通過DDS DCPS層發(fā)布和訂閱數(shù)據(jù)樣本。該示例包含兩個(gè)DCPS主題,都與股市有關(guān)。
股票報(bào)價(jià)發(fā)布者將股票報(bào)價(jià)樣本發(fā)布給感興趣的訂閱者;每個(gè)報(bào)價(jià)均包含證券的股票代碼,其價(jià)值和時(shí)間戳。報(bào)價(jià)在整個(gè)交易日中定期發(fā)布,因?yàn)橘I賣交易會(huì)影響證券的基礎(chǔ)價(jià)值。另外,證券交易所事件發(fā)布者發(fā)布與證券交易所有關(guān)的重要事件,即,何時(shí)交易所開放,關(guān)閉,何時(shí)暫停交易或恢復(fù)交易。
我們的訂戶同時(shí)訂閱股票報(bào)價(jià)和股票交易所事件。訂戶打印其所看到的每個(gè)報(bào)價(jià)的代碼符號(hào)和值。當(dāng)訂閱者收到表明當(dāng)天股票交易所已經(jīng)關(guān)閉的事件時(shí),它將正常關(guān)閉。因此,“封閉”證券交易所事件的接收是訂戶停止預(yù)期股票報(bào)價(jià)樣本的信號(hào)。
我們將演示如何使用相同的發(fā)布者和訂閱者代碼通過TCP和UDP傳輸進(jìn)行通信。傳輸配置隔離在一組配置文件中,使我們無需更改任何代碼即可切換傳輸。
目錄
IDL類型
首先,我們?cè)贗DL中定義已發(fā)布的DDS數(shù)據(jù)類型:
#include "orbsvcs/TimeBase.idl" module StockQuoter { #pragma DCPS_DATA_TYPE "StockQuoter::Quote" #pragma DCPS_DATA_KEY "StockQuoter::Quote ticker"struct Quote {string ticker;string exchange;string full_name;double value;TimeBase::TimeT timestamp;};#pragma DCPS_DATA_TYPE "StockQuoter::ExchangeEvent" #pragma DCPS_DATA_KEY "StockQuoter::ExchangeEvent exchange"enum ExchangeEventType { TRADING_OPENED,TRADING_CLOSED,TRADING_SUSPENDED,TRADING_RESUMED };struct ExchangeEvent {string exchange;ExchangeEventType event;TimeBase::TimeT timestamp;}; };我們發(fā)布兩種數(shù)據(jù)類型:每個(gè)股票報(bào)價(jià)的報(bào)價(jià)類型,以及用于指示何時(shí)打開,關(guān)閉證券交易所以及何時(shí)暫停或恢復(fù)交易的ExchangeEvent類型。該DCPS_DATA_TYPE編譯標(biāo)記的類型與DDS使用。的 DCPS_DATA_KEY每種類型的定義是針對(duì)每個(gè)唯一標(biāo)識(shí)符 的實(shí)例中的數(shù)據(jù)類型的。我們報(bào)價(jià)類型的關(guān)鍵是股票的股票代碼。一整天,我們希望為每個(gè)股票代號(hào)發(fā)布許多值或樣本。每個(gè)股票代號(hào)的已發(fā)布樣本集屬于同一實(shí)例。在我們的示例中,我們將發(fā)布兩個(gè)股票代號(hào),并因此發(fā)布兩個(gè)實(shí)例:SPY(標(biāo)準(zhǔn)普爾存托憑證,即S&P 500)和MDY(S&P中盤存托憑證,即S&P中盤400)。
接下來,我們使用OpenDDS的opendds_idl編譯器編譯IDL 以生成 類型支持代碼。類型支持代碼包括生成的DCPS 數(shù)據(jù)寫入器和數(shù)據(jù)讀取器 C ++類以及其他IDL代碼。DDS使用類型安全的接口進(jìn)行發(fā)布和訂閱。類型安全的接口具有幾個(gè)優(yōu)點(diǎn):首先,在編譯時(shí)更容易捕獲編程錯(cuò)誤;第二,當(dāng)在編譯時(shí)已知封送數(shù)據(jù)類型時(shí),可以使生成的封送代碼非常高效。第三,我們可以避免any在數(shù)據(jù)傳輸中使用低效類型,例如CORBA 。
生成股票報(bào)價(jià)器的IDL類型的類型支持代碼的命令如下:
$ DDS_ROOT / bin / opendds_idl StockQuoter.idl此命令生成以下文件:
StockQuoterTypeSupport.idl StockQuoterTypeSupportImpl.h StockQuoterTypeSupportImpl.cpp但是,我們不需要opendds_idl手動(dòng)運(yùn)行編譯器。稍后,我們將使用Make Project Creator(MPC)項(xiàng)目為我們自動(dòng)化構(gòu)建步驟。
接下來,我們使用TAO的IDL編譯器來編譯所有三個(gè)IDL文件- StockQuoter.idl我們手動(dòng)編寫的 文件,以及由生成的類型支持文件opendds_idl。
tao_idl -I $ DDS_ROOT -I $ TAO_ROOT / orbsvcs StockQuoter.idl tao_idl -I $ DDS_ROOT -I $ TAO_ROOT / orbsvcs StockQuoterTypeSupport.idl目錄
發(fā)行人
接下來,我們編寫一個(gè)發(fā)布者,以通過DDS發(fā)布股票報(bào)價(jià)和股票交易所事件。首先,我們包括由opendds_idl編譯器生成的兩個(gè)類型支持頭文件。
#include "StockQuoterTypeSupportImpl.h"我們還包括DCPS發(fā)布者,服務(wù)參與者和QoS標(biāo)頭文件。
#include "dds/DCPS/Service_Participant.h" #include "dds/DCPS/Marked_Default_Qos.h" #include "dds/DCPS/PublisherImpl.h" #include "ace/streams.h" #include "orbsvcs/Time_Utilities.h"以下常量用于我們的域,類型名稱和主題名稱。每種類型均在單獨(dú)的主題上發(fā)布。訂戶必須為其域,類型名稱和主題名稱使用相同的值。
// constants for Stock Quoter domain Id, types, and topic DDS::DomainId_t QUOTER_DOMAIN_ID = 1066; const char* QUOTER_QUOTE_TYPE = "Quote Type"; const char* QUOTER_QUOTE_TOPIC = "Stock Quotes"; const char* QUOTER_EXCHANGE_EVENT_TYPE = "Exchange Event Type"; const char* QUOTER_EXCHANGE_EVENT_TOPIC = "Stock Exchange Events";在發(fā)布證券交易所事件(即打開,關(guān)閉,暫停或恢復(fù))時(shí),我們還將發(fā)布該事件適用的證券交易所的名稱。
const char* STOCK_EXCHANGE_NAME = "Test Stock Exchange";這是獲取當(dāng)前日期和時(shí)間的簡(jiǎn)單輔助方法。
TimeBase::TimeT get_timestamp() {TimeBase::TimeT retval;ACE_hrtime_t t = ACE_OS::gethrtime ();ORBSVCS_Time::hrtime_to_TimeT (retval, t);return retval; }發(fā)布者的源代碼文件的其余部分包含main()。我們輸入發(fā)布者的main()
int main (int argc, char *argv[]) {DDS::DomainParticipantFactory_var dpf =DDS::DomainParticipantFactory::_nil();DDS::DomainParticipant_var participant =DDS::DomainParticipant::_nil();try{首先,我們創(chuàng)建一個(gè)域參與者。DDS發(fā)布者可以在多個(gè)獨(dú)立域上發(fā)布,但是我們的示例僅在一個(gè)域上發(fā)布。我們使用TheDomainParticipantFactoryWithArgs宏將命令行參數(shù)傳遞到DCPS中,并獲得單例域參與者工廠。我們使用域參與者的默認(rèn)服務(wù)質(zhì)量策略為“ Quote”域創(chuàng)建一個(gè)域參與者。QUOTER_DOMAIN_ID傳遞給工廠的值在發(fā)布者和訂閱者中必須相同。
// Initialize, and create a DomainParticipant
dpf = TheParticipantFactoryWithArgs(argc, argv);
participant = dpf->create_participant(
QUOTER_DOMAIN_ID,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil());
if (CORBA::is_nil (participant.in ()))
{
cerr << “create_participant failed.” << endl;
ACE_OS::exit(1);
}
然后,我們通過域參與者使用默認(rèn)的服務(wù)質(zhì)量值創(chuàng)建發(fā)布者。PublisherListener 當(dāng)某些與發(fā)布相關(guān)的事件發(fā)生時(shí),我們可以附加一個(gè)DCPS調(diào)用的。但是,我們不在乎那些事件,因此我們附加了一個(gè)nil偵聽器。
// Create a publisher for the two topics// (PUBLISHER_QOS_DEFAULT is defined in// Marked_Default_Qos.h)DDS::Publisher_var pub =participant->create_publisher(PUBLISHER_QOS_DEFAULT,DDS::PublisherListener::_nil());if (CORBA::is_nil (pub.in ())){cerr << "create_publisher failed." << endl;ACE_OS::exit(1);}通過DCPS進(jìn)行發(fā)布涉及三個(gè)步驟。首先,我們?yōu)榘l(fā)布的數(shù)據(jù)樣本注冊(cè)每種類型。我們的示例發(fā)布了兩種IDL類型的示例Quote和ExchangeEvent。其次,我們創(chuàng)建一個(gè)或多個(gè)發(fā)布主題。每個(gè)主題只能綁定一種類型。因此,我們?yōu)閮煞N類型的每種類型創(chuàng)建一個(gè)主題。第三,我們?yōu)槊總€(gè)主題創(chuàng)建一個(gè)數(shù)據(jù)編寫器,并通過該數(shù)據(jù)編寫器發(fā)布示例。
我們首先向域參與者注冊(cè)IDL Quote類型,并為Quote類型傳遞生成的QuoteTypeSupportImpl類的實(shí)例。我們用于報(bào)價(jià)類型的名稱(存儲(chǔ)在常量值中QUOTER_QUOTE_TYPE)必須與訂閱服務(wù)器上使用的名稱匹配。創(chuàng)建主題時(shí),我們指定此類型名稱,從而使DCPS能夠在以后為該主題創(chuàng)建適當(dāng)類型的數(shù)據(jù)寫入器。
// Register the Quote typeStockQuoter::QuoteTypeSupport_var quote_servant= new StockQuoter::QuoteTypeSupportImpl();if (DDS::RETCODE_OK !=quote_servant->register_type(participant.in (),QUOTER_QUOTE_TYPE)){cerr << "register_type for " << QUOTER_QUOTE_TYPE<< " failed." << endl;ACE_OS::exit(1);}然后,我們使用生成的ExchangeEventTypeSupportImpl類以相同的方式向域參與者注冊(cè)IDL ExchangeEvent類型。我們的DCPS域參與者可以發(fā)布有關(guān)Quote或ExchangeEvent類型的主題。
// Register the ExchangeEvent typeStockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant= new StockQuoter::ExchangeEventTypeSupportImpl();if (DDS::RETCODE_OK !=exchange_evt_servant->register_type(participant.in (),QUOTER_EXCHANGE_EVENT_TYPE)){cerr << "register_type for "<< QUOTER_EXCHANGE_EVENT_TYPE<< " failed." << endl;ACE_OS::exit(1);}我們?yōu)閳?bào)價(jià)樣本創(chuàng)建一個(gè)主題,指示報(bào)價(jià)數(shù)據(jù)類型的主題名稱和注冊(cè)名稱,并使用默認(rèn)的服務(wù)質(zhì)量設(shè)置。同樣,引用主題和類型名稱必須在發(fā)布者和訂閱者上匹配。
// Get QoS to use for our two topics
// Could also use TOPIC_QOS_DEFAULT instead
DDS::TopicQos default_topic_qos;
participant->get_default_topic_qos(default_topic_qos);
// Create a topic for the Quote type…
DDS::Topic_var quote_topic =
participant->create_topic (QUOTER_QUOTE_TOPIC,
QUOTER_QUOTE_TYPE,
default_topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (quote_topic.in ()))
{
cerr << “create_topic for "
<< QUOTER_QUOTE_TOPIC
<< " failed.” << endl;
ACE_OS::exit(1);
}
同樣,我們?yōu)镋xchangeEvent示例創(chuàng)建一個(gè)主題,指示主題名稱和ExchangeEvent類型的注冊(cè)名稱,并使用默認(rèn)的服務(wù)質(zhì)量設(shè)置。同樣,證券交易所事件主題和類型名稱必須在發(fā)布者和訂閱者上匹配。
// .. and another topic for the Exchange Event typeDDS::Topic_var exchange_evt_topic =participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,QUOTER_EXCHANGE_EVENT_TYPE,default_topic_qos,DDS::TopicListener::_nil());if (CORBA::is_nil (exchange_evt_topic.in ())){cerr << "create_topic for "<< QUOTER_EXCHANGE_EVENT_TOPIC<< " failed."<< endl;ACE_OS::exit(1);}我們創(chuàng)建了兩個(gè)數(shù)據(jù)編寫器,每個(gè)主題一個(gè)。我們傳入上面創(chuàng)建的主題;該主題知道其類型。每個(gè)數(shù)據(jù)編寫者都與一個(gè)發(fā)布者正好相關(guān)聯(lián),并就一個(gè)主題進(jìn)行發(fā)布。后來,我們的發(fā)布者通過將數(shù)據(jù)樣本寫入每個(gè)數(shù)據(jù)寫入器來發(fā)布每個(gè)主題。以下代碼為“股票報(bào)價(jià)”主題創(chuàng)建數(shù)據(jù)編寫器。
// Get QoS to use for our two DataWriters// Could also use DATAWRITER_QOS_DEFAULTDDS::DataWriterQos dw_default_qos;pub->get_default_datawriter_qos (dw_default_qos);// Create a DataWriter for the Quote topicDDS::DataWriter_var quote_base_dw =pub->create_datawriter(quote_topic.in (),dw_default_qos,DDS::DataWriterListener::_nil());if (CORBA::is_nil (quote_base_dw.in ())){cerr << "create_datawriter for "<< QUOTER_QUOTE_TOPIC<< " failed." << endl;ACE_OS::exit(1);}StockQuoter::QuoteDataWriter_var quote_dw= StockQuoter::QuoteDataWriter::_narrow(quote_base_dw.in());if (CORBA::is_nil (quote_dw.in ())){cerr << "QuoteDataWriter could not be narrowed"<< endl;ACE_OS::exit(1);}然后,我們?yōu)椤白C券交易所事件”主題創(chuàng)建一個(gè)數(shù)據(jù)編寫器。同樣,我們傳入上面創(chuàng)建的主題,該主題知道其類型。
// Create a DataWriter for the Exchange Event topicDDS::DataWriter_var exchange_evt_base_dw =pub->create_datawriter(exchange_evt_topic.in (),dw_default_qos,DDS::DataWriterListener::_nil());if (CORBA::is_nil (exchange_evt_base_dw.in ())){cerr << "create_datawriter for "<< QUOTER_EXCHANGE_EVENT_TOPIC<< " failed." << endl;ACE_OS::exit(1);}StockQuoter::ExchangeEventDataWriter_var exchange_evt_dw =StockQuoter::ExchangeEventDataWriter::_narrow(exchange_evt_base_dw.in());if (CORBA::is_nil (exchange_evt_dw.in ())){cerr << "ExchangeEventDataWriter could not "<< "be narrowed"<< endl;ACE_OS::exit(1);}我們可以選擇注冊(cè)每個(gè)數(shù)據(jù)實(shí)例。注冊(cè)每個(gè)數(shù)據(jù)實(shí)例將在編寫該實(shí)例的樣本時(shí)稍微改善延遲。
發(fā)布者可以在每個(gè)數(shù)據(jù)實(shí)例上發(fā)布許多數(shù)據(jù)樣本。數(shù)據(jù)實(shí)例由唯一鍵標(biāo)識(shí)。對(duì)于Quote類型,我們將ticker 其標(biāo)識(shí)為IDL類型定義中的關(guān)鍵字段。具有相同鍵值的每個(gè)Quote數(shù)據(jù)樣本均被視為同一數(shù)據(jù)實(shí)例的一部分。換句話說,在股票代碼“ SPY”上發(fā)布的每個(gè)Quote示例都是同一實(shí)例的一部分。
我們有兩個(gè)Quote實(shí)例,分別是股票代碼“ SPY”(標(biāo)準(zhǔn)普爾存托憑證,即S&P 500)和“ MDY”(標(biāo)準(zhǔn)普爾中型存托憑證,即S&P Midcap 400),以及一個(gè)ExchangeEvent實(shí)例,用于“測(cè)試證券交易所”。 ”。我們向適當(dāng)?shù)臄?shù)據(jù)寫入器注冊(cè)每個(gè)實(shí)例。實(shí)際調(diào)用了注冊(cè)方法,_cxx_register因?yàn)樗黵egister是C ++中的保留字。
// Register the Exchange Event and the two// Quoted securities (SPY and MDY) with the// appropriate data writerStockQuoter::Quote spy;spy.ticker = CORBA::string_dup("SPY");DDS::InstanceHandle_t spy_handle =quote_dw->_cxx_register(spy);StockQuoter::Quote mdy;mdy.ticker = CORBA::string_dup("MDY");DDS::InstanceHandle_t mdy_handle =quote_dw->_cxx_register(mdy);StockQuoter::ExchangeEvent ex_evt;ex_evt.exchange = STOCK_EXCHANGE_NAME;DDS::InstanceHandle_t exchange_handle =exchange_evt_dw->_cxx_register(ex_evt);最后,我們發(fā)布。首先,我們發(fā)布TRADING_OPENED有關(guān)“證券交易所事件”主題的事件。
// Publish...StockQuoter::ExchangeEvent opened;opened.exchange = STOCK_EXCHANGE_NAME;opened.event = StockQuoter::TRADING_OPENED;opened.timestamp = get_timestamp();cout << "Publishing TRADING_OPENED" << endl;DDS::ReturnCode_t ret =exchange_evt_dw->write(opened, exchange_handle);if (ret != DDS::RETCODE_OK){ACE_ERROR ((LM_ERROR,ACE_TEXT("(%P|%t)ERROR: OPEN write returned %d.\n"),ret));}然后,我們?cè)凇肮善眻?bào)價(jià)”主題上發(fā)布“ SPY”和“ MDY”實(shí)例的幾個(gè)股票報(bào)價(jià)數(shù)據(jù)樣本。我們簡(jiǎn)單地循環(huán),每次增加一點(diǎn)報(bào)價(jià)符號(hào)的報(bào)價(jià),以模擬在真正美好的一天中的活躍交易。
ACE_Time_Value quarterSecond( 0, 250000 );
for ( int i = 0; i < 20; ++i )
{
//
// SPY
//
StockQuoter::Quote spy_quote;
spy_quote.exchange = STOCK_EXCHANGE_NAME;
spy_quote.ticker = CORBA::string_dup(“SPY”);
spy_quote.full_name =
CORBA::string_dup(“S&P Depository Receipts”);
spy_quote.value = 1200.0 + 10.0*i;
spy_quote.timestamp = get_timestamp();
cout << "Publishing SPY Quote: "
<< spy_quote.value << endl;
ret = quote_dw->write(spy_quote, spy_handle);
if (ret != DDS::RETCODE_OK)
{
ACE_ERROR ((
LM_ERROR,
ACE_TEXT("(%P|%t)ERROR: SPY write returned %d.\n"),
ret));
}
ACE_OS::sleep( quarterSecond );
//
// MDY
//
StockQuoter::Quote mdy_quote;
mdy_quote.exchange = STOCK_EXCHANGE_NAME;
mdy_quote.ticker = CORBA::string_dup(“MDY”);
mdy_quote.full_name =
CORBA::string_dup(“S&P Midcap Depository Receipts”);
mdy_quote.value = 1400.0 + 10.0*i;
mdy_quote.timestamp = get_timestamp();
cout << "Publishing MDY Quote: "
<< mdy_quote.value << endl;
ret = quote_dw->write(mdy_quote, mdy_handle);
if (ret != DDS::RETCODE_OK)
{
ACE_ERROR ((
LM_ERROR,
ACE_TEXT("(%P|%t)ERROR: MDY write returned %d.\n"),
ret));
}
ACE_OS::sleep( quarterSecond );
}
最后,我們TRADING_CLOSED在“證券交易所事件”主題上發(fā)布事件,以指示該證券交易所當(dāng)天關(guān)閉。
```cppStockQuoter::ExchangeEvent closed;closed.exchange = STOCK_EXCHANGE_NAME;closed.event = StockQuoter::TRADING_CLOSED;closed.timestamp = get_timestamp();cout << "Publishing TRADING_CLOSED" << endl;ret = exchange_evt_dw->write(closed, exchange_handle);if (ret != DDS::RETCODE_OK){ACE_ERROR ((LM_ERROR,ACE_TEXT("(%P|%t)ERROR: CLOSED write returned %d.\n"),ret));}cout << "Exiting..." << endl;} catch (CORBA::Exception& e) {cerr << "Exception caught in main.cpp:" << endl<< e << endl;ACE_OS::exit(1);} 最后,我們?cè)陔x開之前先清理自己。// Cleanuptry {if (!CORBA::is_nil (participant.in ())) {participant->delete_contained_entities();}if (!CORBA::is_nil (dpf.in ())) {dpf->delete_participant(participant.in ());}} catch (CORBA::Exception& e) {cerr << "Exception caught in cleanup."<< endl<< e << endl;ACE_OS::exit(1);}TheServiceParticipant->shutdown ();return 0; } 這就完成了發(fā)布者的C ++代碼。目錄## 訂戶我們的訂戶訂閱股票報(bào)價(jià)和證券交易所事件,從發(fā)布者那里接收數(shù)據(jù)樣本。我們使用發(fā)布者的`TRADING_CLOSED`事件來表示當(dāng)天的交易已經(jīng)結(jié)束,從而觸發(fā)了訂閱者的正常關(guān)閉。訂戶中的許多代碼與發(fā)布者中的代碼相似。我們以與發(fā)布者中相同的方式獲得域參與者,注冊(cè)類型等。主要區(qū)別在于訂閱者是被動(dòng)的,等待接收樣本,而發(fā)布者是主動(dòng)的。訂閱服務(wù)器使用偵聽器對(duì)象從發(fā)布服務(wù)器接收樣本。首先,我們包括由dcps_ts.pl 腳本生成的兩個(gè)類型支持頭文件。我們還將這些文件包含在發(fā)布者中。但是,我們還包括兩個(gè)偵聽器頭文件,每種發(fā)布類型一個(gè)。當(dāng)在相關(guān)主題上發(fā)布數(shù)據(jù)樣本時(shí),DDS會(huì)調(diào)用偵聽器。```cpp #include "StockQuoterTypeSupportImpl.h" #include "ExchangeEventDataReaderListenerImpl.h"我們還包括DCPS訂戶,服務(wù)參與者和QoS標(biāo)頭文件。
#include "dds/DCPS/Service_Participant.h" #include "dds/DCPS/Marked_Default_Qos.h" #include "dds/DCPS/SubscriberImpl.h" #include "dds/DCPS/BuiltinTopicUtils.h" #include "ace/streams.h" #include "orbsvcs/Time_Utilities.h"以下常量用于我們的域,類型名稱和主題名稱。這些名稱必須與發(fā)布者使用的域,類型名稱和主題名稱匹配。
// constants for Stock Quoter domain Id, types, and topic // (same as publisher) DDS::DomainId_t QUOTER_DOMAIN_ID = 1066; const char* QUOTER_QUOTE_TYPE = "Quote Type"; const char* QUOTER_QUOTE_TOPIC = "Stock Quotes"; const char* QUOTER_EXCHANGE_EVENT_TYPE = "Exchange Event Type"; const char* QUOTER_EXCHANGE_EVENT_TOPIC = "Stock Exchange Events";訂戶的源代碼文件的其余部分包含main()。我們輸入訂戶的main()。
int main (int argc, char *argv[]) {DDS::DomainParticipantFactory_var dpf =DDS::DomainParticipantFactory::_nil();DDS::DomainParticipant_var participant =DDS::DomainParticipant::_nil();try {就像在發(fā)布者中一樣,我們創(chuàng)建域參與者。指定的域與發(fā)布者的域匹配。
// Initialize, and create a DomainParticipant// (same code as publisher)dpf = TheParticipantFactoryWithArgs(argc, argv);participant = dpf->create_participant(QUOTER_DOMAIN_ID,PARTICIPANT_QOS_DEFAULT,DDS::DomainParticipantListener::_nil());if (CORBA::is_nil (participant.in ())){cerr << "create_participant failed." << endl;ACE_OS::exit(1);}然后,我們通過域參與者使用默認(rèn)的服務(wù)質(zhì)量策略值創(chuàng)建一個(gè)訂戶。SubscriberListener 當(dāng)某些與訂閱相關(guān)的事件發(fā)生時(shí),我們可以附加一個(gè)DCPS調(diào)用的。但是,我們不在乎那些事件,因此我們將其設(shè)置為零。這幾乎與我們致電時(shí)在發(fā)布商中所做的相同create_publisher。
// Create a subscriber for the two topics
// (SUBSCRIBER_QOS_DEFAULT is defined
// in Marked_Default_Qos.h)
DDS::Subscriber_var sub =
participant->create_subscriber(
SUBSCRIBER_QOS_DEFAULT,
DDS::SubscriberListener::_nil());
if (CORBA::is_nil (sub.in ()))
{
cerr << “create_subscriber failed.” << endl;
ACE_OS::exit(1);
}
與發(fā)布者一樣,我們必須向域參與者注冊(cè)IDL Quote和ExchangeEvent類型,才能訂閱有關(guān)這些類型的主題。
// Register the Quote type
// (same code as publisher)
StockQuoter::QuoteTypeSupport_var quote_servant
= new StockQuoter::QuoteTypeSupportImpl();
if (DDS::RETCODE_OK !=
quote_servant->register_type(participant.in (),
QUOTER_QUOTE_TYPE))
{
cerr << “register_type for " << QUOTER_QUOTE_TYPE
<< " failed.” << endl;
ACE_OS::exit(1);
}
// Register the ExchangeEvent type
// (same code as publisher)
StockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant
= new StockQuoter::ExchangeEventTypeSupportImpl();
if (DDS::RETCODE_OK !=
exchange_evt_servant->register_type(
participant.in (),
QUOTER_EXCHANGE_EVENT_TYPE))
{
cerr << “register_type for "
<< QUOTER_EXCHANGE_EVENT_TYPE
<< " failed.” << endl;
ACE_OS::exit(1);
}
與發(fā)布者一樣,我們?yōu)楣善眻?bào)價(jià)創(chuàng)建一個(gè)主題,指示主題名稱和報(bào)價(jià)類型的注冊(cè)名稱,并使用默認(rèn)的服務(wù)質(zhì)量設(shè)置。同樣,股票報(bào)價(jià)主題名稱必須在發(fā)布者和訂閱者上匹配。
// Get QoS to use for our two topics
// Could also use TOPIC_QOS_DEFAULT instead
// (same code as publisher)
DDS::TopicQos default_topic_qos;
participant->get_default_topic_qos(default_topic_qos);
// Create a topic for the Quote type…
// (same code as publisher)
DDS::Topic_var quote_topic =
participant->create_topic (QUOTER_QUOTE_TOPIC,
QUOTER_QUOTE_TYPE,
default_topic_qos,
DDS::TopicListener::_nil());
if (CORBA::is_nil (quote_topic.in ()))
{
cerr << “create_topic for "
<< QUOTER_QUOTE_TOPIC
<< " failed.” << endl;
ACE_OS::exit(1);
}
同樣,我們?yōu)镋xchangeEvent示例創(chuàng)建一個(gè)主題,指示主題名稱和ExchangeEvent類型的注冊(cè)名稱,并使用默認(rèn)的服務(wù)質(zhì)量設(shè)置。同樣,證券交易所事件主題名稱必須在發(fā)布者和訂閱者上匹配。
// .. and another topic for the Exchange Event type// (same code as publisher)DDS::Topic_var exchange_evt_topic =participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,QUOTER_EXCHANGE_EVENT_TYPE,default_topic_qos,DDS::TopicListener::_nil());if (CORBA::is_nil (exchange_evt_topic.in ())){cerr << "create_topic for "<< QUOTER_EXCHANGE_EVENT_TOPIC<< " failed."<< endl;ACE_OS::exit(1);}在發(fā)布者上,我們創(chuàng)建了兩個(gè)數(shù)據(jù)編寫器,每個(gè)主題一個(gè)。在訂戶上,我們將創(chuàng)建兩個(gè)數(shù)據(jù)讀取器,每個(gè)主題一個(gè)。每個(gè)數(shù)據(jù)讀取器只有一個(gè)訂戶,并訂閱一個(gè)主題。我們還在每個(gè)數(shù)據(jù)讀取器上附加了一個(gè)偵聽器,以接收發(fā)布的數(shù)據(jù)樣本的通知。這是發(fā)布者和訂閱者代碼不同的地方。
以下代碼為“股票報(bào)價(jià)”主題創(chuàng)建一個(gè)偵聽器。偵聽器是一個(gè)本地CORBA對(duì)象,實(shí)現(xiàn)了DDS::DataReaderListenerIDL接口。我們使用OpenDDS的便捷servant_to_reference功能模板來獲取接口類型的引用。
// Create DataReaders and DataReaderListeners for the// Quote and ExchangeEvent// Create a Quote listenerQuoteDataReaderListenerImpl quote_listener_servant;DDS::DataReaderListener_var quote_listener =::OpenDDS::DCPS::servant_to_reference("e_listener_servant);if (CORBA::is_nil (quote_listener.in ())){cerr << "Quote listener is nil." << endl;ACE_OS::exit(1);}我們?yōu)椤白C券交易所事件”主題創(chuàng)建第二個(gè)偵聽器。
// Create an ExchangeEvent listener ExchangeEventDataReaderListenerImpl exchange_evt_listener_servant;DDS::DataReaderListener_var exchange_evt_listener =::OpenDDS::DCPS::servant_to_reference(&exchange_evt_listener_servant);if (CORBA::is_nil (exchange_evt_listener.in ())) {cerr << "ExchangeEvent listener is nil." << endl;ACE_OS::exit(1); }最后,我們?yōu)閮蓚€(gè)主題中的每個(gè)主題創(chuàng)建一個(gè)數(shù)據(jù)讀取器。首先,我們?yōu)椤肮善眻?bào)價(jià)”主題創(chuàng)建一個(gè)數(shù)據(jù)讀取器,并附加上面創(chuàng)建的相關(guān)偵聽器。
// Create the Quote DataReader// Get the default QoS// Could also use DATAREADER_QOS_DEFAULTDDS::DataReaderQos dr_default_qos;sub->get_default_datareader_qos (dr_default_qos);DDS::DataReader_var quote_dr =sub->create_datareader(quote_topic.in (),dr_default_qos,quote_listener.in ());然后,我們?yōu)椤白C券交易所事件”主題創(chuàng)建一個(gè)數(shù)據(jù)讀取器,并附加上面創(chuàng)建的另一個(gè)偵聽器。
// Create the ExchangeEvent DataReader
DDS::DataReader_var exchange_evt_dr =
sub->create_datareader(exchange_evt_topic.in (),
dr_default_qos,
exchange_evt_listener.in ());
OpenDDS產(chǎn)生它自己的線程來處理來自發(fā)布者的傳入事件。因此,訂戶中沒有事件循環(huán)代碼。但是,在準(zhǔn)備關(guān)閉整個(gè)訂戶進(jìn)程之前,我們必須確保不允許主線程退出。因此,我們循環(huán)處理股票報(bào)價(jià)和證券交易所事件,直到TRADING_CLOSED 在“證券交易所事件”主題上收到該事件為止。本質(zhì)上,我們希望接收已發(fā)布的數(shù)據(jù)樣本,直到證券交易所告訴我們它已經(jīng)關(guān)閉。該sleep調(diào)用使我們每秒檢查一次,以避免消耗過多的CPU。
// Wait for events from the Publisher; shut
// down when “close” received
cout << “Subscriber: waiting for events” << endl;
while ( ! exchange_evt_listener_servant.
is_exchange_closed_received() )
{
ACE_OS::sleep(1);
}
收到TRADING_CLOSED事件后,我們可以正常退出循環(huán)。
cout << "Received CLOSED event from publisher; "<< " exiting..."<< endl;} catch (CORBA::Exception& e) {cerr << "Exception caught in main.cpp:" << endl<< e << endl;ACE_OS::exit(1);}最后,我們?cè)陔x開之前先清理自己。
// Cleanuptry {if (!CORBA::is_nil (participant.in ())) {participant->delete_contained_entities();}if (!CORBA::is_nil (dpf.in ())) {dpf->delete_participant(participant.in ());}} catch (CORBA::Exception& e) {cerr << "Exception caught in cleanup."<< endl<< e << endl;ACE_OS::exit(1);}TheServiceParticipant->shutdown ();return 0; }目錄
訂閱者的“股票報(bào)價(jià)”和“證券交易所事件”偵聽器
“股票行情”數(shù)據(jù)閱讀器和“股票交易所事件”數(shù)據(jù)閱讀器均附帶一個(gè)偵聽器。每當(dāng)從發(fā)布者接收到數(shù)據(jù)樣本時(shí),DDS框架就會(huì)調(diào)用這些偵聽器。我們已決定兩個(gè)數(shù)據(jù)讀取器中的每一個(gè)都應(yīng)具有自己的偵聽器,盡管如果我們對(duì)偵聽器進(jìn)行編碼以處理兩種數(shù)據(jù)類型,則可以為兩個(gè)數(shù)據(jù)讀取器使用單個(gè)偵聽器。
每個(gè)偵聽器都實(shí)現(xiàn)DDS::DataReaderListenerIDL接口。我們?cè)谏厦娴挠啈舸a中同時(shí)使用了a QuoteDataReaderListenerImpl和an ExchangeEventDataReaderListenerImpl,但尚未定義這些類。我們現(xiàn)在將這樣做。
首先,我們?yōu)镼uote類型的數(shù)據(jù)讀取器編寫一個(gè)偵聽器。該偵聽器類實(shí)現(xiàn)DDS::DataReaderListener IDL接口,該接口重寫了七個(gè)純虛方法。它是IDL接口的CORBA本地對(duì)象實(shí)現(xiàn),繼承自IDL接口的生成類。
偵聽器類必須重寫所有七個(gè)方法,包括偵聽器的實(shí)現(xiàn)為空的方法。但是,為簡(jiǎn)單起見,我們將僅顯示該on_data_available 方法,當(dāng)有新的Quote數(shù)據(jù)樣本可用時(shí)將調(diào)用該方法。其他六個(gè)方法具有空的實(shí)現(xiàn)。我們還將使用默認(rèn)的構(gòu)造函數(shù)和析構(gòu)函數(shù)。
#include "StockQuoterTypeSupportC.h" #include "StockQuoterTypeSupportImpl.h" #include "dds/DCPS/Service_Participant.h" #include "dds/DdsDcpsSubscriptionS.h" #include "ace/streams.h"class QuoteDataReaderListenerImpl: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> { public:// DDS calls on_data_available on the listener for each// received Quote sample.virtual void on_data_available(DDS::DataReader_ptr reader)throw (CORBA::SystemException){try{我們首先將數(shù)據(jù)讀取器參數(shù)的值縮小為Quote樣本的適當(dāng)類型。
StockQuoter::QuoteDataReader_var quote_dr =StockQuoter::QuoteDataReader::_narrow(reader);if (CORBA::is_nil (quote_dr.in ())){cerr << "QuoteDataReaderListenerImpl:: "<< "on_data_available:"<< " _narrow failed." << endl;ACE_OS::exit(1);}然后,我們從數(shù)據(jù)讀取器中獲取下一個(gè)Quote示例。請(qǐng)注意QuoteDataReader接口的類型安全。
StockQuoter::Quote quote;DDS::SampleInfo si;DDS::ReturnCode_t status =quote_dr->take_next_sample(quote, si) ;收到報(bào)價(jià)樣本后,我們只需打印其內(nèi)容即可。
if (status == DDS::RETCODE_OK) {cout << "Quote: ticker = " << quote.ticker.in()<< endl<< " exchange = " << quote.exchange.in()<< endl<< " full name = " << quote.full_name.in()<< endl<< " value = " << quote.value<< endl<< " timestamp = " << quote.timestamp<< endl;cout << "SampleInfo.sample_rank = "<< si.sample_rank << endl;}else if (status == DDS::RETCODE_NO_DATA){cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!"<< endl;}else{cerr << "ERROR: read Quote: Error: "<< status << endl;}當(dāng)報(bào)價(jià)樣本超出范圍時(shí),堆棧將清除報(bào)價(jià)樣本的內(nèi)存。
} catch (CORBA::Exception& e) {cerr << "Exception caught in read:"<< endl << e << endl;ACE_OS::exit(1);}}我們沒有在DDS::DataReaderListener接口中顯示其他方法的實(shí)現(xiàn),但是即使它們的實(shí)現(xiàn)為空,我們也必須重寫它們。
// must also override:// on_requested_deadline_missed// on_requested_incompatible_qos// on_liveliness_changed// on_subscription_match// on_sample_rejected// on_sample_lost };接下來,我們?yōu)镋xchangeEvent類型的數(shù)據(jù)讀取器編寫一個(gè)偵聽器。基本結(jié)構(gòu)與相同QuoteDataReaderListenerImpl。
#include "ExchangeEventDataReaderListenerImpl.h" #include "StockQuoterTypeSupportImpl.h" #include "dds/DCPS/Service_Participant.h" #include "dds/DdsDcpsSubscriptionS.h" #include "ace/streams.h" #include "ace/Synch.h"class ExchangeEventDataReaderListenerImpl: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> { public:我們將該is_exchange_closed_received方法添加到數(shù)據(jù)讀取器中,以便訂戶的主程序可以找出何時(shí)TRADING_CLOSED 收到證券交易所事件。此方法在互斥鎖的保護(hù)下檢查布爾值。收到股票交易事件on_data_available時(shí),布爾值由偵聽器的方法設(shè)置TRADING_CLOSED。
// app-specificCORBA::Boolean is_exchange_closed_received(){ACE_Guard<ACE_Mutex> guard(this->lock_);return this->is_exchange_closed_received_;}DDS on_data_available在偵聽器上為每個(gè)接收到的ExchangeEvent示例進(jìn)行調(diào)用。
virtual void on_data_available(DDS::DataReader_ptr reader)throw (CORBA::SystemException){try{與中的一樣QuoteDataReaderListenerImpl,我們首先將數(shù)據(jù)讀取器參數(shù)的值縮小為適當(dāng)?shù)念愋?#xff0c;在這種情況下為ExchangeEventDataReader。
StockQuoter::ExchangeEventDataReader_var exchange_evt_dr =
StockQuoter::ExchangeEventDataReader::_narrow(reader);
if (CORBA::is_nil (exchange_evt_dr.in ())) {
cerr << “ExchangeEventDataReaderListenerImpl:: "
<< “on_data_available:”
<< " _narrow failed.”
<< endl;
ACE_OS::exit(1);
}
然后,我們從數(shù)據(jù)讀取器中獲取下一個(gè)ExchangeEvent示例。注意類型安全。
StockQuoter::ExchangeEvent exchange_evt;
DDS::SampleInfo si;
DDS::ReturnCode_t status =
exchange_evt_dr->take_next_sample(exchange_evt, si) ;
收到ExchangeEvent示例后,我們只需打印其內(nèi)容即可。
if (status == DDS::RETCODE_OK) {
cout << "ExchangeEvent: exchange = "
<< exchange_evt.exchange.in() << endl;
收到TRADING_CLOSED事件后,我們將設(shè)置一個(gè)標(biāo)志,指示當(dāng)天該證券交易所已經(jīng)關(guān)閉。
case StockQuoter::TRADING_CLOSED: {
cout << “TRADING_CLOSED” << endl;
}
else if (status == DDS::RETCODE_NO_DATA)
{
cerr << "ERROR: reader received "
<< “DDS::RETCODE_NO_DATA!”
<< endl;
}
else
{
cerr << "ERROR: read ExchangeEvent: Error: "
<< status
<< endl;
}
超出范圍時(shí),堆棧會(huì)清除ExchangeEvent示例。
} catch (CORBA::Exception& e) {cerr << "Exception caught in read:" << endl<< e << endl;ACE_OS::exit(1);}}// must also override:// on_requested_deadline_missed// on_requested_incompatible_qos// on_liveliness_changed// on_subscription_match// on_sample_rejected// on_sample_lost我們添加了兩個(gè)私有類屬性,以跟蹤TRADING_CLOSED事件并使用鎖保護(hù)該值。
private:CORBA::Boolean is_exchange_closed_received_;ACE_Mutex lock_; };這樣就完成了訂戶的C ++代碼。
目錄
建立發(fā)布者和訂閱者
我們使用MPC,即Make Project Creator,為發(fā)布者和訂閱者生成構(gòu)建文件。MPC提供了一種簡(jiǎn)單的語法,并且能夠?yàn)镚NU Make,Visual C ++和許多其他構(gòu)建系統(tǒng)生成構(gòu)建文件。有關(guān)MPC的更多信息,請(qǐng)參見OCI的MPC頁面,網(wǎng)址為http://www.objectcomputing.com/products/mpc。
我們創(chuàng)建兩個(gè)文件來構(gòu)建我們的股票報(bào)價(jià)器,一個(gè)工作區(qū)文件和一個(gè)項(xiàng)目文件。我們的工作區(qū)文件只是告訴MPC在哪里可以找到MPC dcps和dcpsexe基礎(chǔ)項(xiàng)目文件,我們將在以后使用它們。
// // file StockQuoter.mwc //workspace {cmdline += -relative DDS_ROOT=$DDS_ROOT }接下來,我們創(chuàng)建一個(gè)包含三個(gè)項(xiàng)目的MPC文件-一個(gè)包含IDL和TypeSupport文件的Common項(xiàng)目,一個(gè)Publisher,一個(gè)Subscriber。這三個(gè)項(xiàng)目中的每一個(gè)都繼承自dcps或的dcpsexe基礎(chǔ)項(xiàng)目(位于)$DDS_ROOT。首先,我們創(chuàng)建一個(gè)名為StockQuoterCommon的庫(kù)來保存由TAO IDL和opendds_idl編譯器生成的代碼。
// // file StockQuoter.mpc //project(*Common) : dcps {sharedname = StockQuoterCommonlibout = .includes += $(TAO_ROOT)/orbsvcsidlflags += -I$(TAO_ROOT)/orbsvcsidlflags += -Wb,export_macro=StockQuoterCommon_Exportidlflags += -Wb,export_include=StockQuoterCommon_Export.hdcps_ts_flags += --export=StockQuoterCommon_Exportdynamicflags = STOCKQUOTERCOMMON_BUILD_DLL一個(gè)dcps項(xiàng)目有一個(gè)新的部分TypeSupport_Files。本部分執(zhí)行opendds_idl 腳本以從我們的DDS數(shù)據(jù)類型生成TypeSupport文件。在這里,我們指示包含DDS數(shù)據(jù)類型的IDL文件,并指示從中生成的TypeSupport文件。
TypeSupport_Files {StockQuoter.idl}我們的IDL_Files部分包含原始IDL文件以及上一部分生成的TypeSupport IDL文件。
IDL_Files {StockQuoterTypeSupport.idlStockQuoter.idl}在Header_Files和Source_Files部分包含opendds_idl-生成TypeSupport實(shí)現(xiàn)文件。MPC會(huì)自動(dòng)添加生成的IDL存根和框架,因此我們不需要手動(dòng)添加它們。
Header_Files {StockQuoterTypeSupportImpl.h}Source_Files {StockQuoterTypeSupportImpl.cpp} }我們的發(fā)布者從上方使用StockQuoterCommon庫(kù),并添加publisher.cpp包含發(fā)布者的源文件main()。
project(*Publisher) : dcpsexe, svc_utils {after += *Commonexename = publisherincludes += $(TAO_ROOT)/orbsvcslibs += StockQuoterCommondynamicflags = STOCKQUOTERCOMMON_HAS_DLLTypeSupport_Files {}IDL_Files {}Header_Files {}Source_Files {publisher.cpp}Documentation_Files {README.txtdomain_ids} }我們的訂戶還使用StockQuoterCommon庫(kù),添加了一個(gè)subscriber.cpp包含訂戶的main()和兩個(gè)偵聽器的源文件。
project(*Subscriber) : dcpsexe {after += *Commonexename = subscriberincludes += $(TAO_ROOT)/orbsvcslibs += StockQuoterCommondynamicflags = STOCKQUOTERCOMMON_HAS_DLLTypeSupport_Files {}IDL_Files {}Header_Files {QuoteDataReaderListenerImpl.h}Source_Files {QuoteDataReaderListenerImpl.cppsubscriber.cpp}Documentation_Files {README.txtdomain_ids} }我們使用此MPC文件為我們的構(gòu)建系統(tǒng)生成構(gòu)建文件。例如,要生成GNU Makefile,我們執(zhí)行
$ ACE_ROOT / bin / mwc.pl -type gnuace StockQuoter.mwc為了生成Visual C ++ 7.1解決方案文件,我們執(zhí)行
perl%ACE_ROOT%/ bin / mwc.pl -type vc71 StockQuoter.mwc然后,我們構(gòu)建項(xiàng)目。
目錄
配置股票報(bào)價(jià)器
OpenDDS包含基于文件的配置機(jī)制。有了它,OpenDDS用戶可以配置發(fā)布者或訂閱者的傳輸,DCPSInfoRepo過程的位置以及許多其他設(shè)置。配置文件的語法類似于Windows INI文件的語法。它包含幾個(gè)部分,而這些部分又包含類似屬性的條目。基本語法如下:
[section1-name] Attribute1=value1 Attribute2=value2[section2-name] Attribute1=value1 Attribute2=value2《OpenDDS開發(fā)人員指南》的“配置”一章中介紹了完整的配置設(shè)置集。
我們基于TCP的示例dds_tcp_conf.ini對(duì)發(fā)布者和訂閱者都使用一個(gè)配置文件:
dds_tcp_conf.ini # [common] # Debug Level DCPSDebugLevel=0 # IOR of DCPSInfoRepo process. DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo # Sets the global transport configuration (used by default in the # process to config1, defined below DCPSGlobalTransportConfig=config1 # Transport configuration named config1, contains a single transport # instance named tcp1 (defined below) [config/config1] transports=tcp1 # Transport instance named tcp1, of type "tcp". Uses defaults for # all configuration paramaters. [transport/tcp1] transport_type=tcp請(qǐng)注意,有三個(gè)部分,[common],[config/config1],和[transport/tcp1]。本[common] 節(jié)包含適用于整個(gè)過程的配置值。在此配置文件中,我們指定調(diào)試級(jí)別,該DCPSInfoRepo過程的對(duì)象引用以及全局傳輸配置。在這里,我們的DCPSInfoRepo 進(jìn)程正在監(jiān)聽回送(127.0.0.1)接口,這意味著我們已將其配置為僅對(duì)在同一主機(jī)上運(yùn)行的DDS進(jìn)程可用。要使其在網(wǎng)絡(luò)上可用,請(qǐng)使用IP地址或網(wǎng)絡(luò)主機(jī)名代替localhost。我們已經(jīng)指定config1 作為我們的全局傳輸配置,這意味著具有該名稱的傳輸配置將被我們過程中所有未明確指定其他傳輸配置的讀取器和寫入器使用。
本[config/config1]節(jié)定義了名稱為的傳輸配置config1。該transports選項(xiàng)指定 tcp1為此配置中包括的唯一傳輸實(shí)例。
本[transport/tcp1]節(jié)定義了一個(gè)名為的傳輸實(shí)例, tcp1并將其傳輸類型指定為tcp。如OpenDDS文檔中所述,此部分還可以用于通過許多配置選項(xiàng)來配置傳輸。
目錄
通過TCP傳輸運(yùn)行股票報(bào)價(jià)器
要運(yùn)行該示例,我們必須啟動(dòng)一個(gè)DCPSInfoRepo過程,并至少啟動(dòng)一個(gè)發(fā)布者和一個(gè)訂閱者。要啟動(dòng)DCPSInfoRepo,我們使用以下命令行:
$ DDS_ROOT / bin / DCPSInfoRepo -ORBListenEndpoints iiop:// localhost:12345我們的DCPSInfoRepo進(jìn)程偵聽端口12345。該端口與我們?cè)贒CPSInfoRepo上面的傳輸配置文件中的對(duì)象引用中指定的端口匹配。此DCPSInfoRepo 進(jìn)程正在監(jiān)聽回送(127.0.0.1)接口,這意味著我們已將其配置為僅對(duì)在同一主機(jī)上運(yùn)行的DDS進(jìn)程可用。同樣,要使其在網(wǎng)絡(luò)上可用,請(qǐng)使用IP地址或網(wǎng)絡(luò)主機(jī)名代替localhost。
我們有兩個(gè)訂閱者和一個(gè)發(fā)布者:
訂戶-DCPSConfigFile dds_tcp_conf.ini訂戶-DCPSConfigFile dds_tcp_conf.ini發(fā)布者-DCPSConfigFile dds_tcp_conf.ini我們使用-DCPSConfigFile命令行參數(shù)來指示我們?cè)谏厦鎰?chuàng)建的配置文件的名稱。請(qǐng)注意,每個(gè)訂閱者和發(fā)布者都使用相同的傳輸配置文件。
上面的命令行用于運(yùn)行DCPSInfoRepo,帶有內(nèi)置主題的發(fā)布者和訂閱者,默認(rèn)情況下是內(nèi)置主題。我們也可以關(guān)閉內(nèi)置主題來運(yùn)行這些過程。該-NOBITS 所使用的DCPSInfoRepo關(guān)閉內(nèi)置的話題和"-DCPSBit 0" 所使用的其它應(yīng)用程序DDS。命令行如下:
$ DDS_ROOT / bin / DCPSInfoRepo -NOBITS -ORBListenEndpoints iiop:// localhost:12345訂戶-DCPS位0 -DCPSConfigFile dds_tcp_conf.ini訂戶-DCPS位0 -DCPSConfigFile dds_tcp_conf.ini發(fā)布者-DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini發(fā)行者為SPY和MDY股票代號(hào)發(fā)布20個(gè)股票報(bào)價(jià),每個(gè)訂閱者都收到它們。發(fā)布者完成后,它將發(fā)布“ TRADING_CLOSED”消息,這將導(dǎo)致訂閱者退出。
目錄
通過UDP傳輸運(yùn)行股票報(bào)價(jià)器
我們可以使用相同的代碼庫(kù),通過簡(jiǎn)單地運(yùn)行配置文件來運(yùn)行UDP傳輸上的示例,該配置文件定義了一個(gè)指定UDP傳輸實(shí)例的全局傳輸配置。
這是dds_udp_conf.ini文件:
dds_udp_conf.ini # [common] # Debug Level DCPSDebugLevel=0 # IOR of DCPSInfoRepo process. DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo # Sets the global transport configuration (used by default in the # process to config1, defined below DCPSGlobalTransportConfig=config1 # Transport configuration named config1, contains a single transport # instance named udp1 (defined below) [config/config1] transports=udp1 # Transport instance named udp1, of type "udp". Uses defaults for # all configuration paramaters.[transport/udp1] transport_type=udp然后,我們DCPSInfoRepo像以前一樣開始該過程:
$ DDS_ROOT / bin / DCPSInfoRepo -ORBListenEndpoints iiop:// localhost:12345我們使用新的傳輸配置文件啟動(dòng)兩個(gè)訂戶和發(fā)布者:
訂戶-DCPSConfigFile dds_udp_conf.ini訂戶-DCPSConfigFile dds_udp_conf.ini發(fā)布者-DCPSConfigFile dds_udp_conf.ini我們還可以在關(guān)閉內(nèi)置主題的情況下運(yùn)行每個(gè)流程。命令行如下:
$ DDS_ROOT / bin / DCPSInfoRepo -NOBITS -ORBListenEndpoints iiop:// localhost:12345訂戶-DCPS位0 -DCPSConfigFile dds_udp_conf.ini訂戶-DCPS位0 -DCPSConfigFile dds_udp_conf.ini發(fā)布者-DCPSBit 0 -DCPSConfigFile dds_udp_conf.ini和以前一樣,發(fā)布者為SPY和MDY報(bào)價(jià)器符號(hào)發(fā)布20個(gè)股票報(bào)價(jià),每個(gè)訂閱者都收到它們。發(fā)布者完成后,它將再次發(fā)布“ TRADING_CLOSED”消息,這將導(dǎo)致訂閱者退出。唯一的區(qū)別是我們用UDP傳輸代替了TCP傳輸。更改運(yùn)輸方式無需更改代碼。
目錄
摘要
用于實(shí)時(shí)系統(tǒng) 的OMG數(shù)據(jù)分發(fā)服務(wù)(DDS) 是針對(duì)高性能,類型安全,發(fā)布和訂閱通信中間件的規(guī)范。DDS解決了以數(shù)據(jù)為中心的應(yīng)用程序,即那些對(duì)應(yīng)用程序數(shù)據(jù)進(jìn)行分發(fā)非常重要的應(yīng)用程序。
OpenDDS是OMG數(shù)據(jù)分發(fā)服務(wù)規(guī)范的一個(gè)開源實(shí)現(xiàn),為用戶提供了一個(gè)有效的發(fā)布和訂閱框架,并具有開源軟件開發(fā)模型的優(yōu)點(diǎn)。
OpenDDS包含基于文件的配置機(jī)制。通過配置文件,OpenDDS用戶可以配置發(fā)布者或訂閱者的傳輸,調(diào)試輸出,內(nèi)存分配,DCPSInfoRepo 代理進(jìn)程的位置以及許多其他設(shè)置。在示例中我們已經(jīng)顯示,可以在不進(jìn)行任何代碼更改的情況下?lián)Q出OpenDDS應(yīng)用程序的基礎(chǔ)傳輸。
參考資料
示例代碼位于示例/ DCPS / IntroductionToOpenDDS的OpenDDS源代碼分發(fā)中
用于實(shí)時(shí)系統(tǒng)的OMG數(shù)據(jù)分發(fā)服務(wù)(DDS)(https://www.omg.org/spec/DDS/)
OMG DDS門戶(https://www.omg.org/spec/DDS/)
OpenDDS主頁(http://www.opendds.org)
TAO開發(fā)人員指南主頁(http://www.theaceorb.com/product/index.html
《 OpenDDS開發(fā)人員指南》(http://download.objectcomputing.com/OpenDDS/OpenDDS-latest.pdf)
MPC(https://github.com/objectcomputing/MPC)
OpenDDS官網(wǎng)文獻(xiàn):https://opendds.org/about/articles/Article-Intro.html
總結(jié)
- 上一篇: 【转发】响应式Web设计?怎样进行?
- 下一篇: ios学习笔记block回调的应用(一个