日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java调用OpenDDS(2)-理解OpenDDS自带的Messager示例

發布時間:2023/12/10 java 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java调用OpenDDS(2)-理解OpenDDS自带的Messager示例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

OpenDDS安裝好之后,下一步就是利用OpenDDS來開發通信項目了。不過在項目中應用OpenDDS之前,先消化一下OpenDDS安裝包中自帶的示例項目messenger,通過閱讀messenger的源代碼來熟悉一下OpenDDS提供的用來開發Java項目的類。



提綱
1、準備工作
2、發送消息:TestPublisher
3、接收消息:TestSubscriber & DataReaderListenerImpl
4、在IDEA中運行示例



1、準備工作

在開始看OpenDDS的示例項目之前,需要安裝好OpenDDS,并且在編譯OpenDDS的時候要開啟Java支持,這是前提條件。

第一步,在IDEA創建空的maven項目ddstest,用這個項目來看OpenDDS自帶的messenger項目的示例代碼。因為Idea無法直接打開messenger項目,所以才要創建一個ddstest新項目,把messenger的源碼拷貝到ddstest中去邊運行邊看。

第二步,ddstest項目中引入4個必要的jar包。具體方法:File - Project Structure -Libraries - 點擊"+"號 - Java ,然后定位到 %DDS_ROOT%\lib,選中3個jar包:i2jrt.jar、OpenDDS_DCPS.jar、tao_java.jar,定位到%DDS_ROOT%\java\tests\messenger下\messenger_idl下,選中messenger_idl_test.jar。
這樣以來項目建好了,所需要的的4個jar包也都加入到項目的libraries中了。詳情見下圖:

注:由于OpenDDS需要CORBA,而Java 9開始移除了CORBA,所以請在Java 8環境下開發。

然后去%DDS_ROOT%\java\tests\messenger下,將publisher和subscriber兩個目錄下的TestPublisher、TestSubscriber、DataReaderListenerImpl三個java文件復制到ddstest項目的src目錄下。

經過以上2步,項目結構如上圖所示,源代碼都在src下,jar都在external libraries下面。



2、利用TestPublisher發送消息

DomainParticipantFactory dpf = TheParticipantFactory.WithArgs(new StringSeqHolder(args)); DomainParticipant dp = dpf.create_participant(4, PARTICIPANT_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);

第一行用來創建域參與者工廠,需要讀取參數(實際上就是讀取repo.ior文件)
第二行是創建具體的域參與者,create_participant方法的定義(實際在DomainParticipantFactoryImpl.cpp文件中)為:

DDS::DomainParticipant_ptr DomainParticipantFactoryImpl::create_participant(DDS::DomainId_t domainId,const DDS::DomainParticipantQos & qos,DDS::DomainParticipantListener_ptr a_listener,DDS::StatusMask mask) {DDS::DomainParticipantQos par_qos = qos;if (par_qos == PARTICIPANT_QOS_DEFAULT) {get_default_participant_qos(par_qos);}if (!Qos_Helper::valid(par_qos)) {ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) ERROR: ")ACE_TEXT("DomainParticipantFactoryImpl::create_participant, ")ACE_TEXT("invalid qos.\n")));return DDS::DomainParticipant::_nil();}if (!Qos_Helper::consistent(par_qos)) {ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) ERROR: ")ACE_TEXT("DomainParticipantFactoryImpl::create_participant, ")ACE_TEXT("inconsistent qos.\n")));return DDS::DomainParticipant::_nil();}RcHandle<DomainParticipantImpl> dp =make_rch<DomainParticipantImpl>(this, domainId, par_qos, a_listener, mask);if (qos_.entity_factory.autoenable_created_entities) {if (dp->enable() != DDS::RETCODE_OK) {ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t) ERROR: ")ACE_TEXT("DomainParticipantFactoryImpl::create_participant, ")ACE_TEXT("unable to enable DomainParticipant.\n")));return DDS::DomainParticipant::_nil();}}ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,tao_mon,this->participants_protector_,DDS::DomainParticipant::_nil());participants_[domainId].insert(dp);return dp._retn(); }

主要就是進行Qos的驗證,然后創建DomainParticipantImpl對象,調用其enable方法以激活,然后添加到參與者組中。參與者組實質是個Map,以域ID作為key。

接下來是創建主題:

MessageTypeSupportImpl servant = new MessageTypeSupportImpl(); Topic top = dp.create_topic("Movie Discussion List",servant.get_type_name(),TOPIC_QOS_DEFAULT.get(),null,DEFAULT_STATUS_MASK.value);

需要注意的是,OpenDDS的主題和發送的消息,必須在發送時就全部確定,不能用Scanner接受用戶輸入然后動態創建

create_topic方法實際調用的是create_topic_i方法,添加了一個topic_mask參數(值為0),整個方法有150+行,僅挑些重點:

首先還是Qos檢查,略過。

接下來是根據主題名檢查主題有沒有創建過(假如開啟了主題過濾,并且不允許重復主題,則直接退出):

TopicMap::mapped_type* entry = 0;bool found = false;{ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,tao_mon,this->topics_protector_,DDS::Topic::_nil());#if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)if (topic_descrs_.count(topic_name)) {if (DCPS_debug_level > 3) {…… //一些報錯信息}return 0;} #endifif (Util::find(topics_, topic_name, entry) == 0) {found = true;}}

假如同名主題已經創建過了,則進行type和qos的比較,如果有一個不符合,則返回空,創建失敗,如果都符合,說明主題已經存在,就直接返回主題對象

假如同名主題不存在,則創建新主題:

首先要檢查欲創建的主題類型有沒有注冊過:

if (0 == topic_mask) {// creating a topic with compile time typetype_support = Registered_Data_Types->lookup(this, type_name);if (CORBA::is_nil(type_support)) {if (DCPS_debug_level >= 1) {…… //報錯}return DDS::Topic::_nil();}has_keys = type_support->has_dcps_key();}

接下來就是對topic信息的檢查,以及初始化Topic對象的過程,如果設置了TopicListener的話,創建完畢后會進行回調

接下來是創建Publisher對象;

Publisher pub = dp.create_publisher(PUBLISHER_QOS_DEFAULT.get(), null, DEFAULT_STATUS_MASK.value);

其C++代碼流程和create_participant類似,不同的是,保存Publisher對象的是個集合而非Map,還有Publisher的ID是用一個內置的ID生成器自動產生的

接下來是創建DataWriter對象, 為之配置了非常復雜的Qos策略,并等待Publisher匹配成功

到這里準備工作才剛剛結束,下面開始正式的數據發送

MessageDataWriter mdw = MessageDataWriterHelper.narrow(dw);Message msg = new Message();msg.subject_id = 99;int handle = mdw.register_instance(msg);msg.from = "OpenDDS-Java";msg.subject = "Review";msg.text = "Worst. Movie. Ever.";msg.count = 0;int ret = RETCODE_TIMEOUT.value;for (; msg.count < N_MSGS; ++msg.count) {while ((ret = mdw.write(msg, handle)) == RETCODE_TIMEOUT.value) {}if (ret != RETCODE_OK.value) {System.err.println("ERROR " + msg.count +" write() returned " + ret);}try {Thread.sleep(100);} catch(InterruptedException ie) {}}

MessageDataWriter是messenger_idl_test包含的DataWriter子類,實際執行Message發送任務,Message的格式由Messenger.idl定義。for循環一共發送40條消息。

假如啟動時加了-w參數,則會等待響應,否則等待1秒后直接進行清理、退出



3、接收消息:TestSubscriber & DataReaderListenerImpl

TestSubscriber的整個流程和TestPublisher幾乎完全一致,不同的是,創建DataReader時,會將DataReaderListenerImpl實例傳入。

當Publisher向域中輸入消息后,就會觸發DATA_AVAILABLE事件,通知DataReaderListenerImpl處理

public synchronized void on_data_available(DataReader reader) {initialize_counts();MessageDataReader mdr = MessageDataReaderHelper.narrow(reader);if (mdr == null) {System.err.println("ERROR: read: narrow failed.");return;}MessageHolder mh = new MessageHolder(new Message());SampleInfoHolder sih = new SampleInfoHolder(new SampleInfo(0, 0, 0,new Time_t(), 0, 0, 0, 0, 0, 0, 0, false, 0));int status = mdr.take_next_sample(mh, sih);if (status == RETCODE_OK.value) {System.out.println("SampleInfo.sample_rank = "+ sih.value.sample_rank);System.out.println("SampleInfo.instance_state = "+ sih.value.instance_state);if (sih.value.valid_data) {String prefix = "";boolean invalid_count = false;if (mh.value.count < 0 || mh.value.count >= counts.size()) {invalid_count = true;}else {if (counts.get(mh.value.count) == false){counts.set(mh.value.count, true);}else {prefix = "ERROR: Repeat ";}}…… //消息內容輸出}…… //一些報錯} else if (status == RETCODE_NO_DATA.value) {System.err.println("ERROR: reader received DDS::RETCODE_NO_DATA!");} else {System.err.println("ERROR: read Messenger.Message: Error: " + status);}}

程序使用MessageHolder存儲讀到的信息,每次讀取一條。



4、在IDEA中運行TestPublisher、TestSubscriber

運行run_test.pl腳本時,可以看到類似:

“C:\Program Files\Java\bin\java.exe” -Xcheck:jni -ea -cp classes;E:\OpenDDS-3.13/lib/i2jrt.jar;E:\OpenDDS-3.13/lib/OpenDDS_DCPS.jar;E:\OpenDDS-3.13/java/tests/messenger/messenger_idl/messenger_idl_test.jar;publisher/classes -Dopendds.native.debug=true TestPublisher -DCPSBit 0 -DCPSConfigFile tcp.ini -r -w
的命令輸出,可以參照這個配置運行條件。

1)首先配置虛擬機選項:

Run菜單 - Edit Configurations,在 vm options 一欄填寫:

-ea -Dopendds.native.debug=true -Djava.library.path=E:\dds\OpenDDS-3.13/java/tests/messenger/messenger_idl;E:\OpenDDS-3.13\lib

請按照自己的實際情況修改路徑。由于已經配置了jar包,所以不需要再寫-cp選項。

-Djava.library.path是用來配置JNI庫路徑,不寫的話會提示找不到dll

去除 -Xcheck:jni 是為了屏蔽掉過多的JNI Warning

2)然后配置主程序參數:

在同一個窗口的 Program arguments一欄填寫:

-DCPSBit 0 -DCPSConfigFile E:\dds\OpenDDS-3.13/java/tests/messenger/tcp.ini

同樣按照自己的實際情況修改路徑。這一行如果寫在vm options中,就會提示”ERROR: Domain Participant Factory not found“,因為參數傳遞給了虛擬機而不是程序,導致無法創建DomainParticipantFactory

3)運行實例

按照以上配置,分別配置TestPublisher的run configuration和TestSubscriber的run configuration,配置兩個,因為運行的時候,它們兩個會同時運行,各自使用自己的run configuration。

單獨啟動一個Visual Studio開發人員命令行,在其中運行:
%DDS_ROOT%/bin/DCPSInfoRepo -o repo.ior

接著修改tcp.ini,將common塊DCPSInfoRepo項的值修改成repo.ior所在位置,不要去掉"file://"前綴

然后按照先subscriber,后publisher的順序啟動程序即可。



參考資料:
1、https://blog.csdn.net/u010670411/article/details/86552739



總結

以上是生活随笔為你收集整理的Java调用OpenDDS(2)-理解OpenDDS自带的Messager示例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。