ACE入门---很好的文章
轉(zhuǎn)自:http://www.cnblogs.com/dubingsky/archive/2009/07/22/1528292.html
ACE編譯
1. 設(shè)置環(huán)境變量
2. 在 ACE_wrappers\ace目錄下創(chuàng)建 config.h 文件,寫入:
#include "ace/config-win32.h"?
Code
3.?如果你希望使用標準的?C++?頭文件(例如?iostream、cstdio?等)在?#include?"ace/config-win32.h"?前加入:?
#define?ACE_HAS_STANDARD_CPP_LIBRARY?1?
4.?如果你希望使用?MFC?庫,那么?config.h?中加入:?
#define?ACE_HAS_MFC?1?
如果你希望使用?MFC?靜態(tài)庫,那么加入:?
#define?ACE_USES_STATIC_MFC?
5.?如果你希望編譯靜態(tài)版本的?ACE?庫,那么在?config.h?中加入:?
#define?ACE_AS_STATIC_LIBS?
6.?如果你希望減少靜態(tài)庫的大小,可以禁止使用?inline,在?config.h?的?#include?"ace/config-win32.h"?前加入:?
#define?ACE_NO_INLINE?
?
ACE結(jié)構(gòu)簡介
1)ACE OS adaptation層:封裝了 OS API,對上層提供 OS平臺無關(guān)的接口。
2)C++ wrapper facades層:位于 OS adaptation之上,提供了與之相似的功能,這些功能使用 C++的類封裝起來,而不是 C 語言 API。每個 wrapper facade都包含一個或者一個以上的類。我們可以有選擇的繼承、聚合這些 wrapper facade。
3)框架層(Framework layer)
框架層在 C++ wrapper facades層之上,它集成和擴充了 wrapper facade類。
<1> 事件多路分離和分發(fā)框架
ACE Reactor 和 ACE Proactor 實現(xiàn)了 Reactor 模式和 Proactor模式。
<2> 連接建立和服務(wù)初始化框架
ACE Acceptor-Connector框架實現(xiàn)了 Acceptor-Connector模式。
<3> 并發(fā)框架
ACE 提供了 Task框架實現(xiàn)了并發(fā)模式。
<4> 服務(wù)配置框架
ACE 的服務(wù)配置框架實現(xiàn)了 Component Configurator模式。
<5> 流框架
ACE 的流框架實現(xiàn)了 Pipes and Fiters模式。
4)ACE網(wǎng)絡(luò)組件層
組件(component)就是軟件系統(tǒng)中被封裝的一個部分,ACE發(fā)行包中的組件用于提供以下功能:
<1> 演示 ACE
<2> 提供常見網(wǎng)絡(luò)服務(wù)的可復(fù)用實現(xiàn)。如提供日志記錄、時間同步等服務(wù)的可復(fù)用實現(xiàn)。
?
線程的創(chuàng)建與管理
一.線程入口函數(shù)
所有線程必須從一個指定的函數(shù)開始執(zhí)行,該函數(shù)稱為線程函數(shù),它必須具有下列原型:
void* worker(void *arg) {}
該函數(shù)輸入一個void *型的參數(shù),可以在創(chuàng)建線程時傳入。
注意:所有的線程啟動函數(shù)(方法)必須是靜態(tài)的或全局的(就如同直接使用OS線程API時所要求的一樣)。
二.線程基本操作
1.創(chuàng)建一個線程
一個進程的主線程是由操作系統(tǒng)自動生成,如果你要讓一個主線程創(chuàng)建額外的線程,可以通過ACE_Thread::spawn()實現(xiàn),該函數(shù)一般的使用方式如下:
ACE_thread_t threadId;
ACE_hthread_t threadHandle;
ACE_Thread::spawn(
??? (ACE_THR_FUNC)worker,???????//線程執(zhí)行函數(shù)
??? NULL,???????????????????????//執(zhí)行函數(shù)參數(shù)
??? THR_JOINABLE | THR_NEW_LWP,
??? &threadId,
??? &threadHandle
??? );
ACE_Thread::spawn((ACE_THR_FUNC)worker)使用其默認參數(shù),來創(chuàng)建一個worker的線程。
ACE_Thread::spawn_n函數(shù)來創(chuàng)建多個線程。
2.終止線程
在線程函數(shù)體中ACE_Thread::exit()調(diào)用即可終止線程執(zhí)行。
3.設(shè)定線程的相對優(yōu)先級
當一個線程被首次創(chuàng)建時,它的優(yōu)先級等同于它所屬進程的優(yōu)先級。一個線程的優(yōu)先級是相對于其所屬的進程的優(yōu)先級而言的??梢酝ㄟ^調(diào)用ACE_Thread::setprio函數(shù)改變線程的相對優(yōu)先級,該函數(shù)的調(diào)用方式如下:
ACE_Thread::setprio(threadHandle,ACE_DEFAULT_THREAD_PRIORITY)
4.掛起及恢復(fù)線程
掛起線程可以通過來實現(xiàn),它能暫停一個線程的執(zhí)行,其調(diào)用方式如下ACE_Thread::suspend(threadHandle)。
相應(yīng)的,可以通過ACE_Thread::resume(threadHandle)恢復(fù)被掛起的線程的執(zhí)行。
5.等待線程結(jié)束
在主函數(shù)中調(diào)用ACE_Thread::join(threadHandle)可阻塞主函數(shù),直道線程結(jié)束才能繼續(xù)執(zhí)行。
6.停止線程
在主函數(shù)中調(diào)用ACE_Thread::cancel (threadHandle)可停止線程的執(zhí)行(在Unix底下可以,而在windows下好像不起作用,有待檢驗)。
三.程序示例
下面例子演示了如何用ace創(chuàng)建一個線程。
#include "ace/Thread.h"
#include "ace/Synch.h"
?
#pragma comment(lib,"ACEd.lib")
?
?
#include <iostream>
usingnamespace std;
void* worker(void *arg)
{
for(int i=0;i<10;i++)
??? {
??? ????ACE_OS::sleep(1);
??????? cout<<endl<<"hello world"<<endl;
??? }
return NULL;
}
int main(int argc,char *argv[])
{
??? ACE_thread_t threadId;
??? ACE_hthread_t threadHandle;
??? ACE_Thread::spawn(
??????? (ACE_THR_FUNC)worker,??????? //線程執(zhí)行函數(shù)
???? ???NULL,??????????????????????? //執(zhí)行函數(shù)參數(shù)
??????? THR_JOINABLE | THR_NEW_LWP,
??????? &threadId,
??????? &threadHandle
??????? );
??? ACE_Thread::join(threadHandle);
return 0;
}
在這個簡單的例子中,創(chuàng)建了1個工作者線程,執(zhí)行程序中定義的worker()函數(shù)。然后阻塞主函數(shù),待線程結(jié)束后退出程序。
ACE Lock類屬
鎖類屬包含的類包裝簡單的鎖定機制,比如互斥體、信號量、讀/寫互斥體和令牌等。這里我就以互斥體為例簡單的介紹一下其使用方法,對其它的鎖類進行一些簡單的說明。
1.互斥體 ACE_Thread_Mutex
互斥體用于保護共享的易變代碼,也就是全局或靜態(tài)數(shù)據(jù)。這樣的數(shù)據(jù)必須通過互斥體進行保護,以防止它們在多個線程同時訪問時損壞。??
#include "ace/Thread.h"
#include "ace/Synch.h"
#include <iostream>
usingnamespace std;
ACE_Thread_Mutex mutex;
void* Thread1(void *arg)
{
??? mutex.acquire();
??? ACE_OS::sleep(3);
??? cout<<endl<<"hello thread1"<<endl;
??? mutex.release();
return NULL;
}
void* Thread2(void *arg)
{
??? mutex.acquire();
??? cout<<endl<<"hello thread2"<<endl;
??? mutex.release();
return NULL;
}
int main(int argc,char *argv[])
{
??? ACE_Thread::spawn((ACE_THR_FUNC)Thread1);
//Thread2 比Thread1晚創(chuàng)建1秒鐘,故后嘗試獲取互斥體
??? ACE_OS::sleep(1);
??? ACE_Thread::spawn((ACE_THR_FUNC)Thread2);
while(true)
??????? ACE_OS::sleep(10);
return 0;
}
ACE_Thread_Mutex主要有兩個方法:
當線程要訪問共享資源時,首先調(diào)用acquire()方法獲取互斥體,從而獲取對改互斥體所保護的共享資源的唯一訪問權(quán)限,訪問結(jié)束時調(diào)用釋放互斥體,使得其它線程能獲取共享資源的訪問權(quán)限。
2.ACE Lock類屬簡介。
ACE Lock類屬列表如下:
ACE_Mutex
封裝互斥機制(根據(jù)平臺,可以是mutex_t、pthread_mutex_t等等)的包裝類,用于提供簡單而有效的機制來使對共享資源的訪問序列化。它與二元信號量(binary semaphore)的功能相類似??杀挥糜诰€程和進程間的互斥。
ACE_Thread_Mutex
可用于替換ACE_Mutex,專用于線程同步。
ACE_Process_Mutex
可用于替換ACE_Mutex,專用于進程同步。
ACE_NULL_Mutex
提供了ACE_Mutex接口的"無為"(do-nothing)實現(xiàn),可在不需要同步時用作替換。
ACE_RW_Mutex
封裝讀者/作者鎖的包裝類。它們是分別為讀和寫進行獲取的鎖,在沒有作者在寫的時候,多個讀者可以同時進行讀取。
ACE_RW_Thread_Mutex
可用于替換ACE_RW_Mutex,專用于線程同步。
ACE_RW_Process_Mutex
可用于替換ACE_RW_Mutex,專用于進程同步。
ACE_Semaphore
這些類實現(xiàn)計數(shù)信號量,在有固定數(shù)量的線程可以同時訪問一個資源時很有用。在OS不提供這種同步機制的情況下,可通過互斥體來進行模擬。
ACE_Thread_Semaphore
應(yīng)被用于替換ACE_Semaphore,專用于線程同步。
ACE_Process_Semaphore
應(yīng)被用于替換ACE_Semaphore,專用于進程同步。
ACE_Token
提供"遞歸互斥體"(recursive mutex),也就是,當前持有某令牌的線程可以多次重新獲取它,而不會阻塞。而且,當令牌被釋放時,它確保下一個正阻塞并等待此令牌的線程就是下一個被放行的線程。
ACE_Null_Token
令牌接口的"無為"(do-nothing)實現(xiàn),在你知道不會出現(xiàn)多個線程時使用。
ACE_Lock
定義鎖定接口的接口類。一個純虛類,如果使用的話,必須承受虛函數(shù)調(diào)用開銷。
ACE_Lock_Adapter
基于模板的適配器,允許將前面提到的任意一種鎖定機制適配到ACE_Lock接口。
可以簡單的分為以下幾類:
互斥鎖(通常稱為"互斥體"或"二元信號量")用于保護多線程控制并發(fā)訪問的共享資源的完整性。互斥體通過定義臨界區(qū)來序列化多線程控制的執(zhí)行,在臨界區(qū)中每一時刻只有一個線程在執(zhí)行它的代碼?;コ怏w簡單而高效(時間和空間)。
ACE線程庫提供了Mutex式的類(是一組互斥體對象,擁有類似的接口),他是一種簡單而高效的類型是"非遞歸"互斥體。非遞歸互斥體不允許當前擁有互斥體的線程在釋放它之前重新獲取它。否則,將會立即發(fā)生死鎖。遞歸互斥體在ACE Recursive_Thread_Mutex類中可移植地實現(xiàn)。
讀者/作者鎖與互斥體相類似。例如,獲取讀者/作者鎖的線程也必須釋放它。多個線程可同時獲取一個讀者/作者鎖用于讀,但只有一個線程可以獲取該鎖用于寫。當互斥體保護的資源用于讀遠比用于寫要頻繁時,讀者/作者互斥體有助于改善并發(fā)的執(zhí)行。
ACE線程庫提供了一個叫作RW_Mutex的類,在C++封裝類中可移植地實現(xiàn)了讀者/作者鎖的語義。讀者/作者鎖將優(yōu)先選擇權(quán)給作者。因而,如果有多個讀者和一個作者在鎖上等待,作者將會首先獲取它。
計數(shù)信號量
在概念上,計數(shù)信號量是可以原子地增減的整數(shù)。如果線程試圖減少一個值為零的信號量的值,它就會阻塞,直到另一個線程增加該信號量的值。
計數(shù)信號量用于追蹤共享程序狀態(tài)的變化。它們記錄某種特定事件的發(fā)生。因為信號量維護狀態(tài),它們允許線程根據(jù)該狀態(tài)來作決定,即使事件是發(fā)生在過去。
信號量比互斥體效率要低,但是,它們要更為通用,因為它們無需被最初獲取它們的同一線程獲取和釋放。這使得它們能夠用于異步的執(zhí)行上下文中(比如信號處理器)。ACE線程庫提供一個叫作Semaphore的類來可移植地在C++包裝類中實現(xiàn)信號量語義。
ACE Guard類屬
與C一級的互斥體API相比較,Mutex包裝為同步多線程控制提供了一種優(yōu)雅的接口。但是,Mutex潛在地容易出錯,因為程序員有可能忘記調(diào)用release方法(當然,C級的互斥體API更容易出錯)。這可能由于程序員的疏忽或是C++異常的發(fā)生而發(fā)生,然而,其導(dǎo)致及其嚴重的后果--死鎖。
因此,為改善應(yīng)用的健壯性,ACE同步機制有效地利用C++類構(gòu)造器和析構(gòu)器的語義來確保Mutex鎖被自動獲取和釋放。
ACE提供了一個稱為Guard、Write_Guard和Read_Guard的類族,確保在進入和退出C++代碼塊時分別自動獲取和釋放鎖。
Guard類是最基本的守衛(wèi)機制,定義可以簡化如下(實際定義比這相對要復(fù)雜而完善一點):
template <class LOCK>
class Guard
{
public:
??? Guard (LOCK &l): lock_ (&l){ lock_.acquire (); }
??? ?Guard (void) {??? lock_.release (); }
private:
??? LOCK lock_;
}
Guard類的對象定義一"塊"代碼,在其上鎖被自動獲取,并在退出塊時自動釋放,即使是程序拋異常也能保證自動解鎖。這種機制也能為Mutex、RW_Mutex和Semaphore同步封裝工作。
對于讀寫鎖,由于加鎖接口不一樣,ace也提供了相應(yīng)的Read_Guard和Write_Guard類,Read_Guard和Write_Guard類有著與Guard類相同的接口。但是,它們的acquire方法分別對鎖進行讀和寫。
缺省地, Guard類構(gòu)造器將會阻塞程序,直到鎖被獲取。會有這樣的情況,程序必須使用非阻塞的acquire調(diào)用(例如,防止死鎖)。因此,可以傳給ACE Guard的構(gòu)造器第二個參數(shù)(請參看原始代碼,而不是我這里的簡化代碼),指示它使用鎖的try_acquire方法,而不是acquire。隨后調(diào)用者可以使用Guard的locked方法來原子地測試實際上鎖是否已被獲取。
用Guard重寫上一節(jié)的Thread1方法如下(注釋了的部分是原有代碼):
void* Thread1(void *arg)
{
??? ACE_Guard<ACE_Thread_Mutex> guard(mutex);
//mutex.acquire();
??? ACE_OS::sleep(3);
??? cout<<endl<<"hello thread1"<<endl;
//mutex.release();
return NULL;
}
相比較而言,使用Guard更加簡潔,并且會自動解鎖,免除了一部分后顧之憂。
注意:
Guard是在Guard變量析構(gòu)時解鎖,如果在同一函數(shù)中兩次對同一互斥體變量使用Guard要注意其對象生命周期,否則容易造成死鎖。
?
ACE Condition類屬
ACE Condition類屬(條件變量)提供風(fēng)格與互斥體、讀者/作者鎖和計數(shù)信號量不同的鎖定機制。當持有鎖的線程在臨界區(qū)執(zhí)行代碼時,這三種機制讓協(xié)作線程進行等待。相反,條件變量通常被一個線程用于使自己等待,直到一個涉及共享數(shù)據(jù)的條件表達式到達特定的狀態(tài)。當另外的協(xié)作線程指示共享數(shù)據(jù)的狀態(tài)已發(fā)生變化,調(diào)度器就喚醒一個在該條件變量上掛起的線程。于是新喚醒的線程重新對它的條件表達式進行求值,如果共享數(shù)據(jù)已到達合適狀態(tài),就恢復(fù)處理。
ACE線程庫提供一個叫作Condition的類來可移植地在C++包裝類中實現(xiàn)條件變量語義。定義方式如下:
ACE_Thread_Mutex mutex;
ACE_Condition<ACE_Thread_Mutex> cond(mutex);
該對象有兩個常用方法。
向使用該條件變量的其它線程發(fā)送滿足條件信號。
查詢是否滿足條件,如果滿足,則繼續(xù)往下執(zhí)行;如果不滿足條件,主線程就等待在此條件變量上。條件變量隨即自動釋放互斥體,并使主線程進入睡眠。
條件變量總是與互斥體一起使用。這是一種可如下描述的一般模式:
while( expression NOT TRUE ) wait on condition variable;
條件變量不是用于互斥,往往用于線程間的協(xié)作,下面例子演示了通過條件變量實現(xiàn)線程協(xié)作。
#include "ace/Thread.h"
#include "ace/Synch.h"
#include <iostream>
usingnamespace std;
ACE_Thread_Mutex mutex;
ACE_Condition<ACE_Thread_Mutex> cond(mutex);
void* worker(void *arg)
{
??? ACE_OS::sleep(2);???????//保證eater線程的cond.wait()在worker線程的cond.signal()先執(zhí)行
??? mutex.acquire();
??? ACE_OS::sleep(1);
??? cout<<endl<<"produce"<<endl;
??? cond.signal();
??? mutex.release();
return NULL;
}
void* eater(void *arg)
{
??? mutex.acquire();
??? cond.wait();
??? cout<<endl<<"eat"<<endl;
??? mutex.release();
return NULL;
}
int main(int argc,char *argv[])
{
??? ACE_Thread::spawn((ACE_THR_FUNC)worker);
??? ACE_OS::sleep(1);
??? ACE_Thread::spawn((ACE_THR_FUNC)eater);
while(true)
??????? ACE_OS::sleep(10);
return 0;
}
這個例子中,首先創(chuàng)建了一個生產(chǎn)者線程worker和一個消費者線程eater,消費者線程執(zhí)行比生產(chǎn)者快,兩個線程不加限制并發(fā)執(zhí)行會導(dǎo)致先消費,后生產(chǎn)的情況(只是加互斥鎖也不能很好的解決,以為無法保證生產(chǎn)者一定先獲得互斥體)。所以這里通過條件變量的通知方式保證線程的順序執(zhí)行:
1.??????? 消費者線程獲取互斥體,等待條件滿足(生產(chǎn)者生產(chǎn)了食品)。同時釋放互斥體,進入休眠狀態(tài)。
2.??????? 生產(chǎn)者獲取互斥體(雖然是消費者先獲取的互斥體,但消費者調(diào)用的wait函數(shù)會釋放消費者的互斥體),生產(chǎn)商品后,通過條件變量發(fā)送信號(調(diào)用signal函數(shù))通知消費者生產(chǎn)完成,結(jié)束生產(chǎn)過程,釋放互斥體。
3.??????? 消費者收到信號后,重新獲取互斥體,完成消費過程。
使用條件變量的注意事項:
1.??????? 條件變量必須和互斥體一起使用,也就是說使用前必須加鎖(調(diào)用互斥體acquire函數(shù)),使用完后需釋放互斥體。
條件變量中的wait()和signal()成對使用的話,必須保證wait()函數(shù)在signal()之前執(zhí)行,這樣才能保證wait()能收到條件滿足通知,不至于一直等待下去,形成死鎖(worker線程中的第一句話就是起的這個作用)。
?
ACE Synchronization類
這一類并發(fā)控制對象一般也叫做雜項并發(fā)類,這類對象一般用得不多,這里我只是對其作一些簡單的介紹。
1.Atomic_Op類
ACE_Atomic_Op類用于將同步透明地參數(shù)化進基本的算術(shù)運算中。
ACE_Atomic_Op是一種模板類,鎖定機制和需要參數(shù)化的類型被作為參數(shù)傳入其中,重載所有算術(shù)操作符,并確保在操作前獲取鎖,在操作后釋放它。運算本身被委托給通過模板傳入的的類。
使用ACE_Atomic_Op進行變量封裝時,對于那些用ACE_Atomic_Op封裝了的變量操作都變成了線程安全的,而并看不到顯式的加解鎖代碼,代碼變得更簡潔,優(yōu)雅。
2.ACE中的柵欄(Barrier)
一組線程可以使用柵欄來進行共同的相互同步。組中的每個線程各自執(zhí)行,直到到達柵欄,就阻塞在那里。在所有相關(guān)線程到達柵欄后,它們就全部繼續(xù)它們的執(zhí)行。就是說,它們一個接一個地阻塞,等待其他的線程到達柵欄;一旦所有線程都到達了它們的執(zhí)行路徑中的"柵欄點",它們就一起重新啟動。
在ACE中,柵欄在ACE_Barrier類中實現(xiàn)。在柵欄對象被實例化時,它將要等待的線程的數(shù)目會作為參數(shù)傳入。一旦到達執(zhí)行路徑中的"柵欄點",每個線程都在柵欄對象上發(fā)出wait()調(diào)用。它們在這里阻塞,直到其他線程到達它們各自的"柵欄點",然后再一起繼續(xù)執(zhí)行。當柵欄從相關(guān)線程那里接收了適當數(shù)目的wait()調(diào)用時,它就同時喚醒所有阻塞的線程。
舉個簡單的例子,運動員進行賽跑比賽時,雖然他們到達終點有先后順序,但會等到所有的運動員跑完比賽后才一起領(lǐng)獎。
?
面向?qū)ο蟮木€程類ACE_Task
?
我們在前一章中使用ACE_Thread包裝時,你一定已經(jīng)注意到了一些不夠"優(yōu)雅"的地方。那一章中的大多數(shù)程序都被分解為函數(shù)、而不是對象。這是因為ACE_Thread包裝需要一個全局函數(shù)名、或是靜態(tài)方法作為參數(shù)。隨后該函數(shù)(靜態(tài)方法)就被用作所派生的線程的"啟動點"。這自然就使得程序員要為每個線程寫一個函數(shù)。如我們已經(jīng)看到的,這可能會導(dǎo)致非面向?qū)ο蟮某绦蚍纸狻?/span>
ACE_Task對常用線程處理進行了OO包裝,通過ACE_Task,能對線程進行更好的操作。
ACE_Task是ACE中的任務(wù)或主動對象“處理結(jié)構(gòu)”的基類。ACE使用此類來實現(xiàn)主動對象模式。所有希望成為“主動對象”的對象都必須由此類派生。同時可將它看作是更高級的、更為面向?qū)ο蟮木€程。
每個任務(wù)都含有一或多個線程,以及一個底層消息隊列。各個任務(wù)通過消息隊列進行通信。至于消息隊列實現(xiàn)的內(nèi)在細節(jié)程序員不必關(guān)注。發(fā)送任務(wù)用putq()將消息插入到另一任務(wù)的消息隊列中,接收任務(wù)通過使用getq()將消息提取出來。這樣的體系結(jié)構(gòu)大大簡化了多線程程序的編程模型。
其主要成員如下:
open():初始化資源
close():釋放資源
activate():啟動線程,可指定線程的數(shù)目
svc():線程的啟動位置
????? putq():放置消息到任務(wù)的消息隊列中
????? getq():從任務(wù)的消息隊列中取出消息
????? thr_count():返回任務(wù)中線程的數(shù)目
????? last_thread():返回任務(wù)中將線程計數(shù)器從1降為0的線程的ID
?
要創(chuàng)建任務(wù),需要進行以下步驟:
open()方法應(yīng)該包含所有專屬于任務(wù)的初始化代碼。其中可能包括諸如連接控制塊、鎖和內(nèi)存這樣的資源。close()方法是相應(yīng)的終止方法。
在主動對象實例化后,你必須通過調(diào)用activate()啟用它。要在主動對象中創(chuàng)建的線程的數(shù)目,以及其他一些參數(shù),被傳遞給activate()方法。activate()方法會使svc()方法成為所有它生成的線程的啟動點。
如上面所提到的,在主動對象被啟用后,各個新線程在svc()方法中啟動。應(yīng)用開發(fā)者必須在子類中定義此方法。
下面的例子演示怎樣去創(chuàng)建任務(wù):
#include "ace/Task.h"
#include "ace/OS.h"
#include <iostream>
usingnamespace std;
class TaskThread:public ACE_Task<ACE_MT_SYNCH>
{
public:
virtualint svc(void)
??? {
for(int i=0;i<10;i++)
??????? {
??????????? ACE_OS::sleep(1);
??????????? cout<<endl<<"hello thread1"<<endl;
??????? }
return 0;
??? }
};
int main(int argc,char *argv[])
{
??? TaskThread task;
??? task.activate();
while(true)
??????? ACE_OS::sleep(10);
return 0;
}
ACE_Task也封裝了常用線程操作,如暫停,恢復(fù)及停止等,是不是非常簡單和方便呢。
其實ACE_Task的使用還不僅僅是這些,通過它還可實現(xiàn)一種很常用的網(wǎng)絡(luò)編程模式--主動對象模式,其具體功能在后續(xù)的設(shè)計模式部分將作詳細的介紹。
ACE_Message_Queue
在Windows和Linux的config文件中都沒有定義"ACE_HAS_TIMED_MESSAGE_BLOCKS"這個宏,所以msg_deadline_time和msg_execution_time都不起任何作用.
ACE_Message_Queue_Factory這個工廠提供三個靜態(tài)函數(shù)分別用來創(chuàng)建靜態(tài)消息隊列和兩種類型的動態(tài)消息隊列。靜態(tài)消息隊列的消息也支持優(yōu)先級,但是消息的優(yōu)先級是靜態(tài)的,不需要通過動態(tài)計算而來。水位用來控制消息隊列中數(shù)據(jù)的大小,高水位(high_water_mark)用于控制消息隊列的上限,它用于控制生產(chǎn)者往里面放數(shù)據(jù)的量,如果消息隊列中數(shù)據(jù)量已經(jīng)達到高水位,而用使用了鎖,既使“ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();”創(chuàng)建消息隊列,那么生產(chǎn)者將被阻塞。高水位很容易理解,但是低水位是用來做什么的呢?
只要消息隊列中還有數(shù)據(jù)消費者就不會被阻塞的,而當數(shù)據(jù)量超過高水位時,生產(chǎn)者會被阻塞,既然會被阻塞,那么它肯定需要被喚醒,那么什么時候由誰來喚醒生產(chǎn)者呢?這就是低水位的作用,消費者一直消費數(shù)據(jù),當數(shù)據(jù)低于低水位時它就喚醒生產(chǎn)者。
下面的代碼很好的展示了靜態(tài)消息隊列的使用。
#include "ace/Message_Queue.h"
#include "ace/Get_Opt.h"
#include "ace/OS.h"
#include? <ace/Thread_Manager.h>
#include <ace/Synch.h>
//消息隊列指針
ACE_Message_Queue<ACE_MT_SYNCH>* mq;
constchar S1[] ="C++";
constchar S2[] ="Java";
constchar S3[] ="PHP";
constchar S4[] ="C#";
//四個消息指針
ACE_Message_Block* mb1, * mb2, * mb3, * mb4;
//生產(chǎn)者
staticvoid* produce(void *arg)
{
?staticint loop = 1;
??? while(true)
??? {
? ACE_OS::sleep(2);
? ACE_DEBUG((LM_DEBUG, "(%P : %t) producer...\n"));
? while(true)
? {
?? if(loop == 1)
?? {
??? //將高水位設(shè)置為10, S1+S2的長度為3+4+2=9<10,因此可以將S3放進去
??? //但是再放入S4時生產(chǎn)者將會被阻塞
??? //需要注意的是水位的大小并不是消息的個數(shù),而是消息隊列中消息里面的數(shù)據(jù)量之和
??? //如果也能以消息的個數(shù)作為高低水位的值就好了
??? mq->high_water_mark(10);
??? mq->enqueue_prio (mb1);
??? mq->enqueue_prio (mb2);
??? mq->enqueue_prio (mb3);
??? ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending!!\n"));
??? //因為消費者在睡眠6秒之后才會調(diào)用deactivate,因此生產(chǎn)者會在這兒阻塞幾秒鐘
??? //可以不斷地將msg_bytes打印出來觀察觀察
??? int ret = mq->enqueue_prio (mb4);
??? ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by?deactivate, ret = %d!!\n", ret));
??? ++loop;
?? }
?? if(loop == 2)
?? {
??? ACE_OS::sleep(6);
??? //將低水位設(shè)置為5,因為高水位仍然為10,當前的數(shù)據(jù)量又超過了10,
??? //所以下面的入隊操作仍會將生產(chǎn)者阻塞
??? //這樣消費者消費消息,當數(shù)據(jù)量小于5時,將喚醒生產(chǎn)者
??? //生產(chǎn)者在此處等待被消費者喚醒
??? mq->low_water_mark(5);
??? ACE_DEBUG((LM_DEBUG, "(%P : %t) producer will pending again!!\n"));
??? mq->enqueue_prio (mb4);
??? ACE_DEBUG((LM_DEBUG, "(%P : %t) producer waken up by consumer!!\n"));
??? ++loop;
?? }
? }
??? }
??? return NULL;
}
//消費者
void* consume(void *arg)
{
?staticint loop = 1;
??? while(true)
??? {
? ACE_OS::sleep(2);
? ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer...\n"));
? if(loop == 1)
? {
?? //等待6秒,此時生產(chǎn)者和消費者都將被阻塞
?? ACE_OS::sleep(6);
?? //deactivate會喚醒所有的線程,將消息隊列設(shè)置為不可用
?? //以后所存取操作都會返回-1
?? //這個操作會喚醒生產(chǎn)者
?? mq->deactivate();
?? ++loop;
? }
? if(loop == 2)
? {
?? ACE_OS::sleep(2);
?? //將消息隊列的狀態(tài)設(shè)置成ACTIVATED
?? //消息又可以使用了
?? mq->activate();
?? ++loop;
? }
? if(loop == 3)
? {
?? ACE_OS::sleep(10);
?? //消費兩個消息之后,數(shù)據(jù)量就小于5了,低于低水位將喚醒生產(chǎn)者
?? ACE_Message_Block *mb;
?? mq->dequeue_head (mb);
?? mq->dequeue_head (mb);
?? ACE_DEBUG((LM_DEBUG, "(%P : %t) consumer wake up producer!!\n"));
?? ++loop;
? }
??? }
??? return NULL;
}
int main(int argc,char* argv[])
{
?mq = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue();
?int priority;
?//使用隨機數(shù)作為消息的優(yōu)先級
?//數(shù)字越高,優(yōu)先級越高
?priority = ACE_OS::rand() % 100;
?mb1 = new ACE_Message_Block(S1,sizeof S1, priority);
?priority = ACE_OS::rand() % 100;
?mb2 = new ACE_Message_Block(S2,sizeof S2, priority);
?priority = ACE_OS::rand() % 100;
?mb3 = new ACE_Message_Block(S3,sizeof S3, priority);
?priority = ACE_OS::rand() % 100;
?mb4 = new ACE_Message_Block(S4,sizeof S4, priority);
?//將消息壓入隊列中,enqueue_prio根據(jù)消息的優(yōu)先級將消息放到適當?shù)奈恢蒙?span style="color:black">
?//enqueue_head只是簡單地將數(shù)據(jù)存入隊列中,而不考慮消息的優(yōu)先級
?//使用enqueue_prio壓入消息后,可以簡單通過dequeue_head和dequeue_tail
?//分別按優(yōu)先級從高到低和從低到高取消息
?//如果使用enqueue_head和enqueue_tail壓入消息
?//則需要通過dequeue_prio來按照消息的優(yōu)先級依次將消息出隊列
?//沒有必要既使用enqueue_prio壓入消息,又實用dequeue_prio來取消息
?mq->enqueue_prio (mb1);
?mq->enqueue_prio (mb2);
?mq->enqueue_prio (mb3);
?mq->enqueue_prio (mb4);
?//輸出靜態(tài)消息隊列的相關(guān)信息
?//高低水位默認值均為16384
?ACE_DEBUG((LM_DEBUG, "count : %d, bytes : %d, length : %d, high_water_mark : %d, low_water_mark : %d, status : %d\n",
? mq->message_count(), mq->message_bytes(), mq->message_length(),
? mq->high_water_mark(), mq->low_water_mark(),
? mq->state()));
?ACE_Message_Block *mb;
?//使用next遍歷消息,遍歷的順序為高優(yōu)先級到底優(yōu)先級
?ACE_DEBUG((LM_DEBUG, "===========next=============\n"));
?//peek一下,并不彈出消息,類似Windows的PeekMessage
?mq->peek_dequeue_head(mb);
?do
?{
? ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));
?}while(mb = mb->next());
?//使用迭代器遍歷消息隊列,遍歷的順序為高優(yōu)先級到底優(yōu)先級
?ACE_DEBUG((LM_DEBUG, "=========iterator=============\n"));
?ACE_Message_Queue<ACE_MT_SYNCH>::ITERATOR iterator (*mq);
??? for (ACE_Message_Block *entry = 0;
???????? iterator.next (entry) != 0;
???????? iterator.advance ())
?{
? ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", entry->rd_ptr(), entry->msg_priority()));
?}
?ACE_DEBUG((LM_DEBUG, "============dequeue_head==========\n"));
?while(mq->dequeue_head (mb) != -1)
?{
? ACE_DEBUG((LM_DEBUG, "message: %s, priority: %d\n", mb->rd_ptr(), mb->msg_priority()));
? //這里如果不判斷的話,消息隊列空時會導(dǎo)致主線程被阻塞
? if(mq->is_empty())
?? break;
?}
?ACE_DEBUG((LM_DEBUG, "\n\n"));
?//測試高低水位和隊列的state使用,進行測試之前mq隊列已空///
?//產(chǎn)生一個生產(chǎn)者線程
?ACE_Thread_Manager::instance()->spawn_n
???????? (
????????? 1,
????????? (ACE_THR_FUNC) produce
??? );
?產(chǎn)生兩個消費者線程
?ACE_Thread_Manager::instance()->spawn_n
???????? (
????????? 1,
????????? (ACE_THR_FUNC) consume
??? );
?//掛起主線程
?ACE_Thread_Manager::instance()->wait();
?
?return 0;
}
?
?
?
ACE中TCP通信
Tcp通信過程一般為如下步驟:
1.??????? 服務(wù)器綁定端口,等待客戶端連接。
2.??????? 客戶端通過服務(wù)器的ip和服務(wù)器綁定的端口連接服務(wù)器。
3.??????? 服務(wù)器和客戶端通過網(wǎng)絡(luò)建立一條數(shù)據(jù)通路,通過這條數(shù)據(jù)通路進行數(shù)據(jù)交互。
常用API:
1. ACE_INET_Addr類。
ACE"地址"類ACE_Addr的子類,表示TCP/IP和UDP/IP的地址。它通常包含機器的ip和端口信息,通過它可以定位到所通信的進程。
定義方式:
ACE_INET_Addr addInfo(3000,"192.168.1.100");
常用方法:
1.??????? get_host_name??? 獲取主機名
2.??????? get_ip_address??? 獲取ip地址
3.??????? get_port_number??? 獲取端口號
2. ACE_SOCK_Acceptor類。
服務(wù)期端使用,用于綁定端口和被動地接受連接。
常用方法:
1.??????? open 綁定端口
2.??????? accept建立和客戶段的連接
3.ACE_SOCK_Connector類。
客戶端使用,用于主動的建立和服務(wù)器的連接。
常用方法:
1.??????? connect()??? 建立和服務(wù)期的連接。
4. ACE_SOCK_Stream類。
客戶端和服務(wù)器都使用,表示客戶段和服務(wù)器之間的數(shù)據(jù)通路。
常用方法:
1.??????? send ()??? 發(fā)送數(shù)據(jù)
2.??????? recv ()??? 接收數(shù)據(jù)
3.??????? close()??? 關(guān)閉連接(實際上就是斷開了socket連接)。
代碼示例:
下面例子演示了如何如何用ACE創(chuàng)建TCP通信的Server端。
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/INET_Addr.h"
#include "ace/OS.h"
?
#include <string>
#include <iostream>
usingnamespace std;
?
int main(int argc,char *argv[])
{
??? ACE_INET_Addr port_to_listen(3000);???????//綁定的端口
??? ACE_SOCK_Acceptor acceptor;
??? if (acceptor.open (port_to_listen, 1) == -1)????//綁定端口
??? {
??????? cout<<endl<<"bind port fail"<<endl;
??????? return -1;
??? }
?
??? while(true)
??? {
??????? ACE_SOCK_Stream peer; ???????//和客戶端的數(shù)據(jù)通路
??????? ACE_Time_Value timeout (10, 0);
?
??????? if (acceptor.accept (peer) != -1)???//建立和客戶端的連接
??????? {
??????????? cout<<endl<<endl<<"client connect. "<<endl;
??????????? char buffer[1024];
??????????? ssize_t bytes_received;
?
???? ???????ACE_INET_Addr raddr;
??????????? peer.get_local_addr(raddr);
??????????? cout<<endl<<"local port\t"<<raddr.get_host_name()<<"\t"<<raddr.get_port_number()<<endl;
?
??????????? while ((bytes_received =
???????????????peer.recv (buffer, sizeof(buffer))) != -1)???//讀取客戶端發(fā)送的數(shù)據(jù)
??????????? {
???????????????peer.send(buffer, bytes_received);??? //對客戶端發(fā)數(shù)據(jù)
??????????? }
??????????? peer.close ();
??????? }
??? }
?
??? return 0;
}
這個例子實現(xiàn)的功能很簡單,服務(wù)器端綁定3000號端口,等待一個客戶端的連接,然后將從客戶端讀取的數(shù)據(jù)再次轉(zhuǎn)發(fā)給客戶端,也就是實現(xiàn)了一個EchoServer的功能。
相應(yīng)的客戶端程序也比較簡單,代碼如下:
#include <ace/SOCK_Stream.h>
#include <ace/SOCK_Connector.h>
#include <ace/INET_Addr.h>
#include <ace/Time_Value.h>
?
#include <string>
#include <iostream>
usingnamespace std;
?
int main(int argc,char *argv[])
{
??? ACE_INET_Addr addr(3000,"127.0.0.1");
?
??? ACE_SOCK_Connector connector;???
??? ACE_Time_Value timeout(5,0);
??? ACE_SOCK_Stream peer;
?
??? if(connector.connect(peer,addr,&timeout) != 0)
??? {
??????? cout<<"connection failed !"<<endl;
??????? return 1;
??? }
??? cout<<"conneced !"<<endl;
?
??? string s="hello world";
??? peer.send(s.c_str(),s.length());???//發(fā)送數(shù)據(jù)
??? cout<<endl<<"send:\t"<<s<<endl;
?
??? ssize_t bc=0;???????????//接收的字節(jié)數(shù)
?
??? char buf[1024];
??? bc=peer.recv(buf,1024,&timeout);???//接收數(shù)據(jù)
? ??if(bc>=0)
??? {
??????? buf[bc]='\0';
??????? cout<<endl<<"rev:\t"<<buf<<endl;
??? }
??? peer.close();
?
??? return 0;
}
?
ACE中UDP通信
udp是一種無連接的協(xié)議,提供無連接不可靠的服務(wù)。
在ace中,通過ACE_SOCK_Dgram類提供udp通信服務(wù),ACE_SOCK_Dgram和ACE_SOCK_Stream的API非常類似,一樣提供了send,recv及close等常用操作,這里就不再累述了。
udp通信時無需像tcp那樣建立連接和關(guān)閉連接,tcp編程時需要通過accept和connect來建立連接,而udp通信省略了這一步驟,相對來說編程更為簡單。
由于udp通信時無建立連接,服務(wù)器端不能像Tcp通信那樣在建立連接的時候就獲得客戶端的地址信息,故服務(wù)器端不能主動對客戶端發(fā)送信息(不知道客戶端的地址),只有等到收到客戶端發(fā)送的udp信息時才能確定客戶端的地址信息,從而進行通信。
udp通信過程如下:
下面代碼為EchoServer的udp版:
//server.cpp
#include <ace/SOCK_Dgram.h>
#include <ace/INET_Addr.h>
#include <ace/Time_Value.h>
#include <string>
#include <iostream>
usingnamespace std;
int main(int argc,char *argv[])
{
??? ACE_INET_Addr port_to_listen(3000);???//綁定的端口
??? ACE_SOCK_Dgram peer(port_to_listen);???//通信通道
char buf[100];
while(true)
??? {
??????? ACE_INET_Addr remoteAddr;??? //所連接的遠程地址
int bc = peer.recv(buf,100,remoteAddr);???//接收消息,獲取遠程地址信息
if( bc != -1)
??????? {
??????????? string s(buf,bc);
??????????? cout<<endl<<"rev:\t"<<s<<endl;
??????? }
??????? peer.send(buf,bc,remoteAddr);??? //和遠程地址通信
??? }
return 0;
}
相應(yīng)的客戶端程序如下:
//client.cpp
#include <ace/SOCK_Dgram.h>
#include <ace/INET_Addr.h>
#include <ace/Time_Value.h>
#include <string>
#include <iostream>
usingnamespace std;
int main(int argc,char *argv[])
{
??? ACE_INET_Addr remoteAddr(3000,"127.0.0.1");???//所連接的遠程地址
??? ACE_INET_Addr localAddr;???//本地地址信息
??? ACE_SOCK_Dgram peer(localAddr);???//通信通道
??? peer.send("hello",5,remoteAddr);???//發(fā)送消息
char buf[100];
int bc = peer.recv(buf,100,remoteAddr);???//接收消息
if( bc != -1)
??? {
??????? string s(buf,bc);
??????? cout<<endl<<"rev:\t"<<s<<endl;
??? }
return 0;
}
和tcp編程相比,udp無需通過acceptor,connector來建立連接,故代碼相對tcp編程來說要簡單許多。另外,由于udp是一種無連接的通信方式,ACE_SOCK_Dgram的實例對象中無法保存遠端地址信息(保存了本地地址信息),故通信的時候需要加上遠端地址信息。
?
ACE主動對象模式
主動對象模式用于降低方法執(zhí)行和方法調(diào)用之間的耦合。該模式描述了另外一種更為透明的任務(wù)間通信方法。
傳統(tǒng)上,所有的對象都是被動的代碼段,對象中的代碼是在對它發(fā)出方法調(diào)用的線程中執(zhí)行的,當方法被調(diào)用時,調(diào)用線程將阻塞,直至調(diào)用結(jié)束。而主動對象卻不一樣。這些對象具有自己的命令執(zhí)行線程,主動對象的方法將在自己的執(zhí)行線程中執(zhí)行,不會阻塞調(diào)用方法。
例如,設(shè)想對象"A"已在你的程序的main()函數(shù)中被實例化。當你的程序啟動時,OS創(chuàng)建一個線程,以從main()函數(shù)開始執(zhí)行。如果你調(diào)用對象A的任何方法,該線程將"流過"那個方法,并執(zhí)行其中的代碼。一旦執(zhí)行完成,該線程返回調(diào)用該方法的點并繼續(xù)它的執(zhí)行。但是,如果"A"是主動對象,事情就不是這樣了。在這種情況下,主線程不會被主動對象借用。相反,當"A"的方法被調(diào)用時,方法的執(zhí)行發(fā)生在主動對象持有的線程中。另一種思考方法:如果調(diào)用的是被動對象的方法(常規(guī)對象),調(diào)用會阻塞(同步的);而另一方面,如果調(diào)用的是主動對象的方法,調(diào)用不會阻塞(異步的)。
由于主動對象的方法調(diào)用不會阻塞,這樣就提高了系統(tǒng)響應(yīng)速度,在網(wǎng)絡(luò)編程中是大有用武之地的。
在這里我們將一個"Logger"(日志記錄器)對象為例來介紹如何將一個傳統(tǒng)對象改造為主動對象,從而提高系統(tǒng)響應(yīng)速度。
Logger的功能是將一些系統(tǒng)事件的記錄在存儲器上以備查詢,由于Logger使用慢速的I/O系統(tǒng)來記錄發(fā)送給它的消息,因此對Logger的操作將會導(dǎo)致系統(tǒng)長時間的等待。
其功能代碼簡化如下:
class Logger: public ACE_Task<ACE_MT_SYNCH>
{
public:
void LogMsg(const string& msg)
??? {
??????? cout<<endl<<msg<<endl;
??????? ACE_OS::sleep(2);
??? }
};
為了實現(xiàn)記錄日志操作的主動執(zhí)行,我們需要用命令模式將其封裝,從而使得記錄日志的方法能在合適的時間和地方主動執(zhí)行,封裝方式如下:
class LogMsgCmd:public ACE_Method_Object
{
public:
??? LogMsgCmd(Logger *plog,conststring& msg)
??? {
??????? this->log=plog;
??????? this->msg=msg;
??? }
?
??? int call()
??? {
??????? this->log->LogMsg(msg);
??????? return 0;
??? }
?
private:
??? Logger *log;
??? string msg;
};
?
class Logger:public ACE_Task<ACE_MT_SYNCH>
{
public:
??? void LogMsg(conststring& msg)
??? {
??????? cout<<endl<<msg<<endl;
??????? ACE_OS::sleep(2);
??? }
?
??? LogMsgCmd *LogMsgActive(conststring& msg)
??? {
??????? new LogMsgCmd(this,msg);
??? }
};
?
?
這里對代碼功能做一下簡單的說明:
ACE_Method_Object是ACE提供的命令模式借口,命令接口調(diào)用函數(shù)為int call(),在這里通過它可以把每個操作日志的調(diào)用封裝為一個LogMsgCmd對象,這樣,當原來需要調(diào)用LogMsg的方法的地方只要調(diào)用LogMsgActive即可生成一個LogMsgCmd對象,由于調(diào)用LogMsgActive方法,只是對命令進行了封裝,并沒有進行日志操作,所以該方法會立即返回。然后再新開一個線程,將LogMsgCmd對象作為參數(shù)傳入,在該線程中執(zhí)行LogMsgCmd對象的call方法,從而實現(xiàn)無阻塞調(diào)用。
然而,每次對一個LogMsg調(diào)用都開啟一個新線程,無疑是對資源的一種浪費,實際上我們往往將生成的LogMsgCmd對象插入一個命令隊列中,只新開一個命令執(zhí)行線程依次執(zhí)行命令隊列中的所有命令。并且,為了實現(xiàn)對象的封裝,命令隊列和命令執(zhí)行線程往往也封裝到Logger對象中,代碼如下所示:
#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Method_Object.h"
#include "ace/Activation_Queue.h"
#include "ace/Auto_Ptr.h"
?
#include <string>
#include <iostream>
usingnamespace std;
?
class Logger:public ACE_Task<ACE_MT_SYNCH>
{
public:
??? Logger()
??? {
??????? this->activate();
??? }
?
??? int svc();
??? void LogMsg(conststring& msg);
??? void LogMsgActive (conststring& msg);
?
private:
??? ACE_Activation_Queue cmdQueue;???//命令隊列
};
?
class LogMsgCmd:public ACE_Method_Object
{
public:
??? LogMsgCmd(Logger *plog,conststring& msg)
??? {
??????? this->log=plog;
??????? this->msg=msg;
??? }
?
??? int call()
??? {
??????? this->log->LogMsg(msg);
??????? return 0;
??? }
?
private:
??? Logger *log;
??? string msg;
};
?
void Logger::LogMsg(conststring& msg)
{
??? cout<<endl<<msg<<endl;
??? ACE_OS::sleep(2);
}
?
//以主動的方式記錄日志
void Logger::LogMsgActive(conststring& msg)
{
??? //生成命令對象,插入到命令隊列中
??? cmdQueue.enqueue(new LogMsgCmd(this,msg));
}
?
int Logger::svc()
{
??? while(true)
??? {
??????? //遍歷命令隊列,執(zhí)行命令
??????? auto_ptr<ACE_Method_Object> mo
??????????? (this->cmdQueue.dequeue ());
?
??????? if (mo->call () == -1)
??????????? break;
??? }
??? return 0;
}
?
int main (int argc, ACE_TCHAR *argv[])
{
??? Logger log;
??? log. LogMsgActive ("hello");
?
??? ACE_OS::sleep(1);
??? log.LogMsgActive("abcd");
?
??? while(true)
??????? ACE_OS::sleep(1);
?
??? return 0;
}
在這里需要注意一下命令隊列ACE_Activation_Queue對象,它是線程安全的,使用方法比較簡單,這里我也不多介紹了。
主動對象的基本結(jié)構(gòu)就是這樣,然而,由于主動對象是異步調(diào)用的,又引出了如下兩個新問題:
5.? #include"ace/Future.h"
6.? ?
7.? #include <string>
8.? #include <iostream>
9.? usingnamespace std;
10.?
11.void get_info(ACE_Future<string> &fu)
12.{
13.???string state = fu.ready()?"ready":"not ready";
14.???cout<<endl<<state<<endl;
15.???if(fu.ready())
16.???{
17.???????stringvalue;
18.???????fu.get(value);
19.???????cout<<"value:\t"<<value<<endl;
20.???}
21.}
22.?
23.int main(int argc,char *argv[])
24.{
25.???ACE_Future<string> fu;
26.???get_info(fu);
27.???fu.set("12345");
28.???get_info(fu);
29.?
30.???return 0;
}
通過ACE_Future對象來解決上述兩個問題的方法如下:
·???????? 首先創(chuàng)建ACE_Future對象用以保留返回值。
·???????? 調(diào)用主動命令時將ACE_Future對象作為參數(shù)傳入,生成的命令對象中保存ACE_Future對象的指針。
·???????? 命令執(zhí)行線程執(zhí)行完命令后,將返回值通過set()函數(shù)設(shè)置到ACE_Future對象中。
·???????? 調(diào)用線程可以通過ACE_Future對象的ready()函數(shù)查詢該命令是否執(zhí)行完成,如果命令執(zhí)行完成,則可通過get()函數(shù)來獲取返回值。
使用的時候要注意一下ACE_Future對象的生命周期。
為了演示了如何獲取主動命令的執(zhí)行狀態(tài)和結(jié)果,我將上篇文章中的代碼改動了一下,日志類記錄日志后,會將記錄的內(nèi)容作為返回值返回,該返回值會通過ACE_Future對象返回,代碼如下
#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Method_Object.h"
#include "ace/Activation_Queue.h"
#include "ace/Auto_Ptr.h"
?
#include "ace/Future.h"
?
#include <string>
#include <iostream>
usingnamespace std;
?
class Logger:public ACE_Task<ACE_MT_SYNCH>
{
public:
??? Logger()
??? {
??????? this->activate();
??? }
?
??? int svc();
??? string LogMsg(conststring& msg);
??? void LogMsgActive (conststring& msg,ACE_Future<string> *result);
?
private:
??? ACE_Activation_Queue cmdQueue;//命令隊列
};
?
class LogMsgCmd:public ACE_Method_Object
{
public:
??? LogMsgCmd(Logger *plog,conststring& msg,ACE_Future<string> *result)
??? {
??????? this->log=plog;
??????? this->msg=msg;
??????? this->result=result;
??? }
?
??? int call()
??? {
??????? string reply = this->log->LogMsg(msg);
??????? result->set(reply);
??????? return 0;
??? }
?
private:
??? ACE_Future<string> *result;
??? Logger *log;
??? string msg;
};
?
string Logger::LogMsg(conststring& msg)
{
??? ACE_OS::sleep(2);
??? cout<<endl<<msg<<endl;
??? return msg;
}
?
//以主動的方式記錄日志
void Logger::LogMsgActive(conststring& msg,ACE_Future<string> *result)
{
??? //生成命令對象,插入到命令隊列中
??? cmdQueue.enqueue(new LogMsgCmd(this,msg,result));
}
?
int Logger::svc()
{
??? while(true)
??? {
??????? //遍歷命令隊列,執(zhí)行命令
??????? auto_ptr<ACE_Method_Object> mo
??????????? (this->cmdQueue.dequeue ());
?
??????? if (mo->call () == -1)
??????????? break;
??? }
??? return 0;
}
?
void get_info(ACE_Future<string> &fu)
{
??? string state = fu.ready()?"ready":"not ready";
??? cout<<endl<<state<<endl;
??? if(fu.ready())
??? {
??????? stringvalue;
??????? fu.get(value);
??????? cout<<"value:\t"<<value<<endl;
??? }
}
?
int main (int argc, ACE_TCHAR *argv[])
{
??? ACE_Future<string> result;
??? Logger log;
??? log.LogMsgActive ("hello",&result);
?
??? while(true)
??? {
??????? get_info(result);
??????? if(result.ready())
??????????? break;
??????? ACE_OS::sleep(1);
??? }
?
??? cout<<endl<<"cmd end"<<endl;
?
??? while(true)
??????? ACE_OS::sleep(1);
?
??? return 0;
}
這種查詢模式比較簡單有效,但存在一個問題:調(diào)用線程必須不斷輪詢ACE_Future對象以獲取返回值,這樣的效率比較低??梢酝ㄟ^觀察者模式解決這個問題:在ACE_Future對象上注冊一個觀察者,當ACE_Future對象的值發(fā)生改變(異步命令執(zhí)行完成)時主動通知該觀察者,從而獲取返回值。
ACE中的觀察者模式可以通過ACE_Future_Observer來實現(xiàn),使用方法如下:
通過觀察者模式,可以更有效,及時的獲取異步命令的返回值,但同時也增加了程序結(jié)構(gòu)的復(fù)雜度并且難以調(diào)試,使用的時候應(yīng)該根據(jù)需要選取合適的方式。
#include "ace/Future.h"
?
#include <string>
#include <iostream>
usingnamespace std;
?
class MyObserver:public ACE_Future_Observer<string>
{
??? virtualvoid update (const ACE_Future<string> &future)
??? {
??????? stringvalue;
??????? future.get(value);
??????? cout<<endl<<"change:\t"<<value<<endl;
??? }
};
?
int main(int argc,char *argv[])
{
??? MyObserver obv;
??? ACE_Future<string> fu;
?
??? fu.attach(&obv);
???
??? ACE_OS::sleep(3);
??? fu.set("12345");
?
??? while(true)
??????? ACE_OS::sleep(3);
?
??? return 0;
}
?
ACE反應(yīng)器(Reactor)模式
反應(yīng)器(Reactor):用于事件多路分離和分派的體系結(jié)構(gòu)模式
通常的,對一個文件描述符指定的文件或設(shè)備,有兩種工作方式: 阻塞與非阻塞。所謂阻塞方式的意思是指, 當試圖對該文件描述符進行讀寫時, 如果當時沒有東西可讀,或者暫時不可寫,程序就進入等待狀態(tài), 直到有東西可讀或者可寫為止。而對于非阻塞狀態(tài),如果沒有東西可讀, 或者不可寫, 讀寫函數(shù)馬上返回,而不會等待。
在前面的章節(jié)中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:當接收tcp數(shù)據(jù)時,如果遠端沒有數(shù)據(jù)可以讀,則會一直阻塞到讀到需要的數(shù)據(jù)為止。這種方式的傳輸和傳統(tǒng)的被動方法的調(diào)用類似,非常直觀,并且簡單有效,但是同樣也存在一個效率問題,如果你是開發(fā)一個面對著數(shù)千個連接的服務(wù)器程序,對每一個客戶端都采用阻塞的方式通信,如果存在某個非常耗時的讀寫操作時,其它的客戶端通信將無法響應(yīng),效率非常低下。
一種常用做法是:每建立一個Socket連接時,同時創(chuàng)建一個新線程對該Socket進行單獨通信(采用阻塞的方式通信)。這種方式具有很高的響應(yīng)速度,并且控制起來也很簡單,在連接數(shù)較少的時候非常有效,但是如果對每一個連接都產(chǎn)生一個線程的無疑是對系統(tǒng)資源的一種浪費,如果連接數(shù)較多將會出現(xiàn)資源不足的情況。
另一種較高效的做法是:服務(wù)器端保存一個Socket連接列表,然后對這個列表進行輪詢,如果發(fā)現(xiàn)某個Socket端口上有數(shù)據(jù)可讀時(讀就緒),則調(diào)用該socket連接的相應(yīng)讀操作;如果發(fā)現(xiàn)某個Socket端口上有數(shù)據(jù)可寫時(寫就緒),則調(diào)用該socket連接的相應(yīng)寫操作;如果某個端口的Socket連接已經(jīng)中斷,則調(diào)用相應(yīng)的析構(gòu)方法關(guān)閉該端口。這樣能充分利用服務(wù)器資源,效率得到了很大提高。
在Socket編程中就可以通過select等相關(guān)API實現(xiàn)這一方式。但直接用這些API控制起來比較麻煩,并且也難以控制和移植,在ACE中可以通過Reactor模式簡化這一開發(fā)過程。
反應(yīng)器本質(zhì)上提供一組更高級的編程抽象,簡化了事件驅(qū)動的分布式應(yīng)用的設(shè)計和實現(xiàn)。除此而外,反應(yīng)器還將若干不同種類的事件的多路分離集成到易于使用的API中。特別地,反應(yīng)器對基于定時器的事件、信號事件、基于I/O端口監(jiān)控的事件和用戶定義的通知進行統(tǒng)一地處理。
ACE中的反應(yīng)器與若干內(nèi)部和外部組件協(xié)同工作。其基本概念是反應(yīng)器框架檢測事件的發(fā)生(通過在OS事件多路分離接口上進行偵聽),并發(fā)出對預(yù)登記事件處理器(event handler)對象中的方法的"回調(diào)"(callback)。該方法由應(yīng)用開發(fā)者實現(xiàn),其中含有應(yīng)用處理此事件的特定代碼。
使用ACE的反應(yīng)器,只需如下幾步:
1. 創(chuàng)建事件處理器,以處理他所感興趣的某事件。
2.??????? 在反應(yīng)器上登記,通知說他有興趣處理某事件,同時傳遞他想要用以處理此事件的事件處理器的指針給反應(yīng)器。
隨后反應(yīng)器框架將自動地:
1.??????? 在內(nèi)部維護一些表,將不同的事件類型與事件處理器對象關(guān)聯(lián)起來。
2.??????? 在用戶已登記的某個事件發(fā)生時,反應(yīng)器發(fā)出對處理器中相應(yīng)方法的回調(diào)。
反應(yīng)器模式在ACE中被實現(xiàn)為ACE_Reactor類,它提供反應(yīng)器框架的功能接口。
如上面所提到的,反應(yīng)器將事件處理器對象作為服務(wù)提供者使用。反應(yīng)器內(nèi)部記錄某個事件處理器的特定事件的相關(guān)回調(diào)方法。當這些事件發(fā)生時,反應(yīng)器會創(chuàng)建這種事件和相應(yīng)的事件處理器的關(guān)聯(lián)。
事件處理器就是需要通過輪詢發(fā)生事件改變的對象列表中的對象,如在上面的例子中就是連接的客戶端,每個客戶端都可以看成一個事件處理器。
就是反應(yīng)器支持的事件,如Socket讀就緒,寫就緒。拿上面的例子來說,如果某個客戶端(事件處理器)在反應(yīng)器中注冊了讀就緒事件,當客戶端給服務(wù)器發(fā)送一條消息的時候,就會觸發(fā)這個客戶端的數(shù)據(jù)可讀的回調(diào)函數(shù)。
在反應(yīng)器框架中,所有應(yīng)用特有的事件處理器都必須由ACE_Event_Handler的抽象接口類派生??梢酝ㄟ^重載相應(yīng)的"handle_"方法實現(xiàn)相關(guān)的回調(diào)方法。
使用ACE_Reactor基本上有三個步驟:
下面我就以一個Socket客戶端的例子為例簡單的說明反應(yīng)器的基本用法。
#include <ace/OS.h>
#include <ace/Reactor.h>
#include <ace/SOCK_Connector.h>
?
#include <string>
#include <iostream>
usingnamespace std;
?
class MyClient:public ACE_Event_Handler
{
public:
??? bool open()
??? {
??????? ACE_SOCK_Connector connector;
??????? ACE_INET_Addr addr(3000,"127.0.0.1");
??????? ACE_Time_Value timeout(5,0);
??????? if(connector.connect(peer,addr,&timeout) != 0)
??????? {
??????????? cout<<endl<<"connecetd fail";
??????????? returnfalse;
??????? }
??????? ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);
??????? cout<<endl<<"connecetd ";
??????? returntrue;
??? }
?
??? ACE_HANDLE get_handle(void)const
??? {
??????? return peer.get_handle();
??? }
?
??? int handle_input (ACE_HANDLE fd)
??? {
??????? int rev=0;
??????? ACE_Time_Value timeout(5,0);
??????? if((rev=peer.recv(buffer,1000,&timeout))>0)
??????? {
??????????? buffer[rev]='\0';
??????????? cout<<endl<<"rev:\t"<<buffer<<endl;
??????? }
??????? return 3;
??? }
?
private:
??? ACE_SOCK_Stream peer;
??? char buffer[1024];
};
?
int main(int argc,char *argv[])
{
??? MyClient client;
??? client.open();
?
??? while(true)
??? {
??????? ACE_Reactor::instance()->handle_events();
??? }
?
??? return 0;
}
在這個例子中,客戶端連接上服務(wù)器后,通過ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注冊了一個讀就緒的回調(diào)函數(shù),當服務(wù)器端給客戶端發(fā)消息的時候,會自動觸發(fā)handle_input()函數(shù),將接收到的信息打印出來。
在Socket編程中,常見的事件就是"讀就緒","寫就緒",通過對這兩個事件的捕獲分發(fā),可以實現(xiàn)Socket中的異步操作。
Socket編程中的事件處理器
在前面我們已經(jīng)介紹過,在ACE反應(yīng)器框架中,任何都必須派生自ACE_Event_Handler類,并通過重載其相應(yīng)會調(diào)事件處理函數(shù)來實現(xiàn)相應(yīng)的回調(diào)處理的。在Socket編程中,我們通常需要重載的函數(shù)有
當I/O句柄(比如UNIX中的文件描述符)上的輸入可用時,反應(yīng)器自動回調(diào)該方法。
當I/O設(shè)備的輸出隊列有可用空間時,反應(yīng)器自動回調(diào)該方法。
當事件處理器中的事件從Reactor中移除的時候調(diào)用。
此外,為了使Reactor能通過I/O句柄找到對應(yīng)的事件處理器,還必須重載其get_handle()方法以使得Reactor建立起I/O句柄和事件處理器的關(guān)聯(lián)。
使用Reactor框架。
下面我們將以一個客戶端的程序為例,介紹如何在Socket編程中使用Reactor框架。
一.建立一個客戶端對象(事件處理器)。
客戶端對象就是一個事件處理器,其聲明如下:
class Client:public ACE_Event_Handler
{
public:
??? ACE_HANDLE get_handle(void)const;
??? int handle_input (ACE_HANDLE fd);
??? int handle_close (ACE_HANDLE handle,
??? ACE_Reactor_Mask close_mask);
??? ACE_SOCK_Stream& Peer();
private:
??? ACE_SOCK_Stream peer;
};
在Client端中我只關(guān)心"讀就緒"事件,故只重載了handle_input函數(shù)(大多數(shù)應(yīng)用下只需要重載handle_input函數(shù))。另外,在客戶端還保存了一個ACE_SOCK_Stream的peer對象用來進行Socket通信,同時封裝了一個Peer()函數(shù)返回它的引用。
二.重載相應(yīng)回調(diào)處理函數(shù)
ACE_SOCK_Stream& Client::Peer()
{
??? return peer;
}
ACE_HANDLE Client::get_handle(void)const
{
??? return peer.get_handle();
}
int Client::handle_input (ACE_HANDLE fd)
{
??? int rev=0;
??? if((rev = peer.recv(buffer,1000))>0)
??? {
??????? buffer[rev]='\0';
??????? cout<<endl<<"rev:\t"<<buffer<<endl;
??????? return 0;
??? }
??? else//Socket連接發(fā)生錯誤,返回-1,在Reactor中注銷事件,觸發(fā)handle_close函數(shù)
??? {
??????? return -1;
??? }
}
int Client::handle_close (ACE_HANDLE handle,
????????????????????????ACE_Reactor_Mask close_mask)
{
??? cout<<endl<<"connecetd closed";
??? return ACE_Event_Handler::handle_close(handle,close_mask);
}
?
三.在Reactor中注冊事件
首先讓我們來看看相應(yīng)的main函數(shù)的代碼:
int main(int argc,char *argv[])
{
??? Client client;
??? ACE_SOCK_Connector connector;
??? ACE_INET_Addr addr(3000,"127.0.0.1");
??? ACE_Time_Value timeout(5,0);
??? if(connector.connect(client.Peer(),addr,&timeout) != 0)
??? {
??????? cout<<endl<<"connecetd fail";
??????? return 0;
??? }
?
??? ACE_Reactor::instance()->register_handler(&client,ACE_Event_Handler::READ_MASK);
?
while(true)
{
ACE_Reactor::instance()->handle_events();
}
?
return 0;
}
在這里可以看到,使用Reactor框架后,依然首先通過ACE_SOCK_Connector的connect函數(shù)來建立連接。建立連接后,可以通過ACE_Reactor::instance()->register_handler函數(shù)來實現(xiàn)Reactor的注冊,實現(xiàn)I/O事件和Client對象的handle_input方法相關(guān)聯(lián),它的第一個參數(shù)是事件處理器的地址,第二個參數(shù)是事件類型,由于這里只關(guān)心讀就緒事件,故注冊的事件類型是ACE_Event_Handler::READ_MASK。
四.啟動Reactor事件循環(huán)
通過如上設(shè)置后,我們就可以通過ACE_Reactor::instance()->handle_events()啟動Reactor循環(huán)了,這樣,每當服務(wù)器端有數(shù)據(jù)發(fā)送給客戶端時,當客戶端的數(shù)據(jù)就緒時,就回觸發(fā)Client對象的handle_input函數(shù),將接收的數(shù)據(jù)打印出來。
通常的做法是,將Reactor事件循環(huán)作為一個單獨的線程來處理,這樣就不會阻塞main函數(shù)。
五.注銷Reactor事件
Reactor事件的注銷一般有兩種方式,顯式和隱式,下面將分別給予介紹。
當Reactor捕獲事件后,會觸發(fā)相應(yīng)的"handle_"處理函數(shù),當"handle_"處理函數(shù)返回值大于或等于0時,表示處理事件成功,當返回值小于0時,表示處理事件失敗,這時Reactor會自動注銷該句柄所注冊的所有事件,并觸發(fā)handle_close函數(shù),以執(zhí)行相應(yīng)的資源清理工作。
在本例中,當handle_input函數(shù)里的recv函數(shù)接收到0個數(shù)時,說明socket發(fā)生錯誤(大多為Socket連接中斷),此時返回-1,則系統(tǒng)自動注銷相應(yīng)事件。
調(diào)用Reactor對象的remove_handler方法移除,它有兩個參數(shù),第一個是所注冊的事件反應(yīng)器對象,第二個是所要注銷的事件。
在這個示例程序里,連接方只有一個Socket連接,Reactor的優(yōu)勢并沒有體現(xiàn)出來,但在一些網(wǎng)絡(luò)管理系統(tǒng)里,連接方需要對多個需要管理的設(shè)備(服務(wù)器端)進行連接,在這種情況下使用Reactor模式,只需要多開一個Reactor事件循環(huán)線程就能實現(xiàn)事件多路分發(fā)復(fù)用,并且不會阻塞,通過面向?qū)ο蟮幕卣{(diào)方式管理,使用起來非常方便。
Reactor框架的另外一個常用的地方就是服務(wù)器端,一般是一個服務(wù)器端對應(yīng)多個客戶端,這樣用Reactor模式能大幅提高并發(fā)能力,這方面的編程方法將在下一章給與介紹。
?
在服務(wù)器端使用Reactor框架
使用Reactor框架的服務(wù)器端結(jié)構(gòu)如下:
服務(wù)器端注冊兩種事件處理器,ClientAcceptor和ClientService,ClientService類負責(zé)和客戶端的通信,每一個ClientService對象對應(yīng)一個客戶端的Socket連接。 ClientAcceptor專門負責(zé)被動接受客戶端的連接,并創(chuàng)建ClientService對象。這樣,在一個N個Socket連接的服務(wù)器程序中,將存在1個ClientAcceptor對象和N個ClientService對象。
整個服務(wù)器端流程如下:
1.??????? 首先創(chuàng)建一個ClientAcceptor對象,該對象在Reactor上注冊ACCEPT_MASK事件,Reactor將自動在監(jiān)聽端口建立Socket監(jiān)聽。
2.??????? 如果有對該端口的Socket連接時,Reactor將自動回調(diào)handle_input方法,ClientAcceptor重載此方法,并創(chuàng)建一個ClientService對象,用于處理和Client的通信。
3.??????? ClientService對象根據(jù)服務(wù)器的具體功能實現(xiàn),其處理過程和客戶端程序類似,注冊相應(yīng)的回調(diào)事件并分發(fā)即可。
代碼如下:
#include <ace/OS.h>
#include <ace/Reactor.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/Auto_Ptr.h>
?
class ClientService :public ACE_Event_Handler
{
public:
??? ACE_SOCK_Stream &peer (void) {returnthis->sock_; }
??? int open (void)
??? {
??????? //注冊讀就緒回調(diào)函數(shù)
??????? returnthis->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
??? }
??? virtual ACE_HANDLE get_handle (void)const { returnthis->sock_.get_handle (); }
??? virtualint handle_input (ACE_HANDLE fd )
??? {
??????? //一個簡單的EchoServer,將客戶端的信息返回
??????? int rev = peer().recv(buf,100);
??????? if(rev<=0)
??????????? return -1;
??????? peer().send(buf,rev);
??????? return 0;
??? }
??? //釋放相應(yīng)資源
??? virtualint handle_close (ACE_HANDLE, ACE_Reactor_Mask mask)
??? {
?????? ?if (mask == ACE_Event_Handler::WRITE_MASK)
??????????? return 0;
??????? mask = ACE_Event_Handler::ALL_EVENTS_MASK |
??????????? ACE_Event_Handler::DONT_CALL;
??????? this->reactor ()->remove_handler (this, mask);
??????? this->sock_.close ();
???? ???delete this;??? //socket出錯時,將自動刪除該客戶端,釋放相應(yīng)資源
??????? return 0;
??? }
protected:
??? char buf[100];
??? ACE_SOCK_Stream sock_;
};
class ClientAcceptor :public ACE_Event_Handler
{
public:
??? virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);}
??? int open (const ACE_INET_Addr &listen_addr)
??? {
??????? if (this->acceptor_.open (listen_addr, 1) == -1)
??????? {
??????????? ACE_OS::printf("open port fail");
??????????? return -1;
??????? }
??????? //注冊接受連接回調(diào)事件
????? ??returnthis->reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
??? }
??? virtual ACE_HANDLE get_handle (void)const
??? {returnthis->acceptor_.get_handle (); }
??? virtualint handle_input (ACE_HANDLE fd )
??? {
??????? ClientService *client = new ClientService();
??????? auto_ptr<ClientService> p (client);
??????? if (this->acceptor_.accept (client->peer ()) == -1)
??????? {
??????????? ACE_OS::printf("accept client fail");
??????????? return -1;
??????? }
??????? p.release ();
??????? client->reactor (this->reactor ());
??????? if (client->open () == -1)
??????????? client->handle_close (ACE_INVALID_HANDLE, 0);
??????? return 0;
??? }
??? virtualint handle_close (ACE_HANDLE handle,
??????? ACE_Reactor_Mask close_mask)
??? {
??????? if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)
??????? {
??????????? ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |
???????????????ACE_Event_Handler::DONT_CALL;
??????????? this->reactor ()->remove_handler (this, m);
??????????? this->acceptor_.close ();
??????? }
??????? return 0;
??? }
protected:
??? ACE_SOCK_Acceptor acceptor_;
};
int main(int argc,char *argv[])
{
??? ACE_INET_Addr addr(3000,"192.168.1.142");
??? ClientAcceptor server;
??? server.reactor(ACE_Reactor::instance());
??? server.open(addr);
??? while(true)
??? {
??????? ACE_Reactor::instance()->handle_events();
??? }
??? return 0;
}
代碼功能比較簡單,需要注意以下幾點:
1.??????? 這里注冊事件的方式和前面的文章中方式不一樣,是通過ACE_Event_Handler類的reactor()方法設(shè)置和獲取reactor的指針,比較直觀和方便。前面的文章是通過ACE_Reactor::instance()來獲取的一個單體reactor的指針。
定時器的實現(xiàn)
通過Reactor機制,還可以很容易的實現(xiàn)定時器的功能,使用方式如下。
1.??????? 編寫一個事件反應(yīng)器,重載handle_timeout()方法,該方法是定時器的觸發(fā)時間到時,會自動觸發(fā)該方法。
2.??????? 通過Reactor的schedule_timer()方法注冊定時器。
3.??????? 啟動reacotr的handle_events()事件分發(fā)循環(huán)。
4.??????? 當不想使用定時器時,可以通過Reactor的cancel_timer()方法注銷定時器。
下面的代碼簡單的實現(xiàn)了一個定時器,并具有基本的開啟,關(guān)閉功能。
#include <ace/OS.h>
#include <ace/Reactor.h>
class MyTimerHandler :public ACE_Event_Handler
{
private:
int inteval;???//執(zhí)行時間間隔
int delay;???????//延遲執(zhí)行時間
int timerid;
public:
??? MyTimerHandler(int delay,int inteval)
??? {
this->delay=delay;
this->inteval=inteval;
??? }
int open()???//注冊定時器
? ??{
??????? ACE_Time_Value delaytime(inteval);
??????? ACE_Time_Value intevaltime(inteval);
??????? timerid = reactor()->schedule_timer(this,
??????????? 0,??? //傳遞handle_timeout給的參數(shù)
??????????? delaytime,
??????????? intevaltime);
return timerid;
??? }
int close()???//取消定時器
??? {
return reactor()->cancel_timer(timerid);
??? }
//定時器回調(diào)函數(shù)
int handle_timeout (const ACE_Time_Value ¤t_time,
constvoid * = 0)
??? {
??????? time_t epoch = ((timespec_t)current_time).tv_sec;
??????? ACE_DEBUG ((LM_INFO,
??????????? ACE_TEXT ("handle_timeout: %s\n"),
??????????? ACE_OS::ctime (&epoch)));
return 0;
??? }
};
int main(int argc,char *argv[])
{
??? MyTimerHandler * timer =new MyTimerHandler (3,5);
??? timer->reactor(ACE_Reactor::instance());
??? timer->open();
for(int i=0;i<2;i++)???//觸發(fā)次handle_timeout事件
??? {
??????? ACE_OS::printf("\n%d\n",i);
??????? ACE_Reactor::instance()->handle_events();
??? }
??? timer->close();
??? ACE_OS::printf("cancel timer");
while(true)
? ??????ACE_Reactor::instance()->handle_events();
return 0;
}
代碼功能比較簡單,這里就不多做介紹了。
總結(jié)
以上是生活随笔為你收集整理的ACE入门---很好的文章的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ACE源代码目录结构
- 下一篇: TCP close_wait 状态的解释