muduo 笔记
學習陳碩寫的網絡庫muduo,照著實現了一遍,項目地址為learn_muduo.
文章目錄
- base庫
- copyable、noncopyable
- Atomic
- Timestamp
- Date
- Mutex
- Condition
- CountDownLatch
- Thread
- CurrentThread
- Exception
- BlockingQueue
- StringPiece
- LogStream
- Logging
- net庫
- Endian
- InetAddress
- SocketsOps
- Socket
- Channel
- Poller
- EventLoop
- TimerQueue
- Acceptor
- TcpConnection
- Connector
- Buffer
- TcpServer
- TcpClient
- epoll
- HttpServer
base庫
copyable、noncopyable
刪除默認的拷貝構造和賦值來實現類的不可拷貝的屬性。
noncopyable(const noncopyable&) = delete; void operator=(const noncopyable&) = delete;Atomic
Atomic是原子操作類,它是一個模板類,使用GCC提供的加減和邏輯原子操作來實現。
private:volatile T value_;// 原子比較和交換,先判斷*ptr是否和oldval相等, 如果相等將值設置為newval __sync_val_compare_add_swap(type *ptr, type oldval, newval);// value_ += x; __sync_fetch_and_add(&value_, x);// value_ = newValue; __sync_lock_test_and_set(&value_, newValue);通過以上代碼,封裝自增、自減和賦值等原子操作。
Atomic的類模板定義在namespace detail中,在namespace muduo使用模板創建了兩個類。
typedef detail::AtomicIntegerT<int32_t> AtomicInt32; typedef detail::AtomicIntegerT<int64_t> AtomicInt64;Timestamp
Timestamp使用微妙來計算時間,提供toString和格式化的接口,可以獲取當前時間,返回一個對應的Timestamp對象,也可以返回一個
static Timestamp now(); // 返回當前時間 static Timestamp invalid(); // 返回一個空對象 string toString() const; // 返回(秒.微妙)格式 string toFormattedString(bool showMicroseconds = true) const; // 格式化時間,年月日 時:分:秒.微妙Timestamp繼承boost::less_than_comparable,只需提供<的實現,自動實現>、<=、>=,繼承boost::equality_comparable只需提供==自動實現!=。
class Timestamp : public boost::equality_comparable<Timestamp>,public boost::less_than_comparable<Timestamp>跨平臺,int64_t在32位系統是long long int(%lld), 在64位系統是long int(%ld)
printf("%" PRId64 "\n", value);Date
使用julianDayNumber來計算年月日。距離公元前4713年1月1日的天數。和Timestamp類似,提供一些常用的接口。
YearMonthDay getYearMonthDay(int julianDayNumber); // 獲取對應的年月日 int getJulianDayNumber(int year, int month, int day)// 獲取julian day string Date::toIsoString(); // 格式化(年-月-日)Mutex
Mutex封裝鎖,提供加鎖解鎖等操作。
// 屬性 pthread_mutex_t mutex_; // 定義一把鎖 pid_t holder_; // 記錄加鎖的線程IDMutex中一共有3個類:MutexLock、UnassignGuard、MutexLockGuard。
MutexLock使用pthread_mutex_函數封裝初始化鎖、加鎖、解鎖、銷毀鎖的操作。
UnassignGuard是MutexLock的內部類,他的特點是在構造函數中清除鎖的持有者ID,析構的時候設置鎖的持有者ID,這是供``Condition的wait()`使用。
MutexLockGuard采用RAII,構造的時候申請資源lock,析構的時候釋放資源unlock,解放雙手。
Condition
Condition實現條件變量功能,使用pthread_cond_函數來封裝wait(),notify(),notifyAll功能。也是采用RAII的機制,在構造中初始化條件變量pcond_,在析構函數中銷毀條件變量。
Condition是MutexLock的友元,可以使用MutexLock的內部類UnassignGuard來實現wait()的功能。
pthread_cond_wait內部的機制時在線程進入阻塞前釋放資源,當函數返回,重新持有鎖。
void wait() {MutexLock::UnassignGuard ug(mutex_);int ret = pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()); // 將線程添加到條件變量assert(ret == 0); }CountDownLatch
倒計時類,將MutexLock和Condition封裝在一起。
mutable MutexLock mutex_; Condition condition_; int count_;void wait(); // 調用Condition等待 void countDown(); // 技術減1 int getCount(); // 獲取當前的計數Thread
線程類,在namespace muduo中定義了ThreadNameInitializer類負責主線程的初始化操作,指定如果fork之后再之進程中執行after函數。
ThreadNameInitializer() {muduo::CurrentThread::t_threadName = "main";CurrentThread::tid();pthread_atfork(NULL, NULL, &afterFork); }設置ThreadData的結構體,保存線程的回調函數,名字,id,計數等信息。在ThreadData中定義了runInThread函數,用來執行回調函數。
線程執行的流程是:
-
Thread構造,設置回調函數func_,默認CountDownLatch為1。
-
pthread_create創建線程,綁定回調函數startThread,將ThreadData作為參數,創建成功之后主線程阻塞在latch_上,等待子線程的退出。
-
在startThread中調用ThreadData的runInThread函數
-
latch_-1喚醒主線程,同時執行回調函數func_,同時對異常進行處理。
CurrentThread
主線程類,提供stackTrace()用于查看堆棧的信息,同時包括線程的一些基本屬性,id、名字等。
Exception
異常處理類,繼承std::exception,封裝CurrentThread類的stackTrace()和重寫what()方法。
BlockingQueue
無界阻塞隊列,底層是deque,利用條件變量實現一個生產者消費者模型,另外還有一個有界的阻塞隊列(BoundedBlockingQueue),底層是circular_buffer。
StringPiece
C++支持兩種字符串:string和char*,當char*傳入函數,會構造一個臨時的string變量,這就發生了內存的拷貝。StringPiece就是為了減少這種內存的拷貝,統一使用char*記錄字符串。重載了[]、等于、比較等操作。重載<<支持logged的使用。
LogStream
muduo的日志庫采用C++的stream風格,有個好處是輸出日志級別高于語句的日志級別的時候,打印是個空操作。muduo沒有使用標準庫中的iostream,而是自己封裝的LogStream,不同于iostream,LogStream的<<操作是將數據放到緩沖區(FixedBuffer)中,外部程序可以重定向到任何文件中。
Logging
日志類,muduo日志信息一共有5個級別,TRACE,DEBUG,INFO,WARN,ERROR,FATAL。通過宏定義創建Logger的臨時對象,調用stream()函數返回LogStream對象。在Logging中定義了Impl類和SourceFile類,Impl類保存日志需要數據,SourceFile中LOG_函數所在的源文件和行號。
net庫
Endian
提供字節序的轉化。
本地字節序 <–> 網絡字節序
InetAddress
InetAddress是對sockaddr_in和sockaddr_in6的封裝。
// 設置本地端口 InetAddress(uint16_t port = 0, bool loopbackOnly = false, bool ipv6 = false); // 設置一個指定的ip和端口 InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);sa_family_t family(); // 返回協議類型 string toIpPort() const; // 獲取ip和port string toIp() const; // 獲取ip uint16_t toPort() const; // 獲取portSocketsOps
封裝對socket的常用操作。
int createNonblockingOrDie(sa_family_t family); // 創建非阻塞的socket int connect(int sockfd, const struct sockaddr* addr); void bindOrDie(int sockfd, const struct sockaddr* addr); void listenOrDie(int sockfd); int accept(int sockfd, struct sockaddr_in6* addr); // 包含錯誤處理 void close(int sockfd); void shutdownWrite(int sockfd);void toIpPort(char* buf, size_t size, const struct sockaddr* addr); // 獲取ip+port void toIp(char* buf, size_t size, const struct sockaddr* addr); // 獲取ip // 根據ip和port得到對應的sockaddr_in void fromIpPort(const char* ip, uint16_t port, struct sockaddr_in* addr); void fromIpPort(const char* ip, uint16_t port, struct sockaddr_in6* addr); // sockaddr和sockaddr_in(ip和端口分開存儲)的轉換 int getSocketError(int sockfd); const struct sockaddr* sockaddr_cast(const struct sockaddr_in* addr); const struct sockaddr* sockaddr_cast(const struct sockaddr_in6* addr); struct sockaddr* sockaddr_cast(struct sockaddr_in6* addr); const struct sockaddr_in* sockaddr_in_cast(const struct sockaddr* addr); const struct sockaddr_in6* sockaddr_in6_cast(const struct sockaddr* addr);struct sockaddr_in6 getLocalAddr(int sockfd); struct sockaddr_in6 getPeerAddr(int sockfd); bool isSelfConnect(int sockfd); // 判斷子連接ssize_t read(int sockfd, void *buf, size_t count); ssize_t readv(int sockfd, const struct iovec *iov, int iovcnt); ssize_t write(int sockfd, const void *buf, size_t count);Socket
Socket是對socket fd的封裝,通過調用SocketsOps來實現。
// 獲取tcp的信息 bool getTcpInfo(struct tcp_info*) const; bool getTcpInfoString(char* buf, int len) const;void bindAddress(const InetAddress& localaddr); // 綁定地址 void listen(); // 監聽湍口 int accept(InetAddress* peeraddr); // 獲取連接 void shutdownWrite(); // 關閉寫端而不是直接close// TCP_NODELAY void setTcpNoDelay(bool on); // SO_REUSEADDR void setReuseAddr(bool on); // SO_REUSEPORT void setReusePort(bool on); // SO_KEEPALIVE void setKeepAlive(bool on);muduo在斷開連接時,不是直接close socket,而是關閉寫端,意味著還可以讀,這樣可以完整接受對方的數據。
Channel
Channel類負責注冊每個fd的事件回調函數,每個Channel只負責一個fd的事件分發,不擁有fd,不會在析構的時候關閉fd,Channel不是基類,不需要繼承,一般作為其他類的成員。
Poller
Poller是IO復用的封裝,在muduo中是一個抽象基類,作為poll和epoll兩種IO復用機制的父類。Poller是EventLoop的間接成員,只供owner EventLoop在IO線程中調用。poll返回之后,通過遍歷pollfds_數組,找到對應的活動事件,復雜度O(n),在Poller中有一個map<int, Channel*>的映射channels_,
插入新的Channel的復雜度是O(logN),更新已有的Channel的復雜度是O(1),因為Channel記錄了它的pollfds_數組中的下標,可以快速定位。刪除Channel的復雜度也是O(n)。
EventLoop
EventLoop中的loop不斷的調用poll,來獲取當前的活動事件,然后調用每個channel的handleEvent()方法,來處理事件。
EventLoop的runInLoop()函數,可以在IO線程中執行某個用戶的任務回調,如果當前IO線程調用runInLoop()直接執行,否則放入到隊列中等待執行,queueInLoop(),這樣可以將TimerQueue的成員函數移動到其他IO線程,這樣可以在不加鎖的情況下保證線程安全。
IO線程一般阻塞在poll調用,為了讓IO線程可以立即執行用戶回調,muduo的做法是通過調用wakeup來喚醒IO線程,具體是向wakeupfd_中讀寫一個字節來實現,通過wakeup()和handleRead()對wakeupFd_讀寫數據。
queuInLoop()的具體實現是,將cb放到隊列中,在必要時喚醒IO線程,喚醒的條件有兩個:調用queueInLoop的不是IO線程、正在執行隊列中的回調函數doPendingFunctors(),原因是執行回調的函數有可能也會執行queueInLoop(),這樣就要wakeup喚醒IO線程及時做處理,否則新添加的回調函數cb就不能及時被調用。
Reactor模型核心內容時序圖。
TimerQueue
TimerQueue定時器,一般通過select、poll的等待時間來實現定時,在muduo中使用timerfd,將對時間的處理和IO事件統一起來。
muduo的定時器由三個類:TimerId、Timer、TimerQueue。
TimerQueue的接口有addTimer()和cancel(),addTimer()是供EventLoop使用,EventLoop封裝為更好用的runAt()、runAfter()、runEvery()。
TimerQueue使用set管理Timer,set中的key是pair<Timestamp,Timer*>,這樣可以方便處理相同到期時間的Timer。
TimerQueue使用一個Channel來官差timerfd_上的可讀事件。
TimerQueue目前有一個不理性的地方,Timer使用裸指針的方法管理,需要手動delete,在C++11中可以改為unique_ptr,避免手動釋放資源。
通過TimerQueue的getExpired()來獲取超時事件。
TimerQueue回調用戶代碼onTimer()的時序圖。
一次事件循環是從poll返回到再次調用poll阻塞。
循環中的各種回調發生的順序。
Acceptor
Acceptor用于accept()新的TCP連接,通過回調函數通知使用者,供TcpServer使用,生命期由TcpServer控制。
成員函數包括Socket、Channel。Socket封裝了socket文件描述符生命期,Channel用于觀察socket上的可讀事件,回調handleRead(),accept來接受新的連接,并回調用戶的callback。
TcpConnection
TcpConnection是唯一默認使用shared_ptr來管理的class,是muduo最復雜的class。
TcpConnection使用Channel來獲取socket上的IO事件,自己處理可寫事件,把可讀事件通過MessageCallback傳給用戶,TcpConnection擁有Tcp socket,在析構中會close fd。
TcpConnection關閉連接的方式是被動關閉,對方先關閉連接,read返回0,觸發關閉邏輯。
Tcp的關閉流程,X表示TcpConnection通常在這里析構。
TcpConnection增加CloseCallback事件回調,提供給TcpServer和TcpClient使用,通知移除TcpConnectionPtr,普通用戶使用ConnectionCallback。
TcpConnection的狀態圖。
Connector
socket是一次性的,一旦出錯(對方拒絕連接),就無法恢復,只能重來。但是Connector是可以反復使用的,每次嘗試連接都要使用新的socket文件描述符和新的Channel對象。
重試的間隔應該逐漸延長,例如0.5s、1s、2s、4s直到30秒,對于對象的生命期管理方面,如果使用EventLoop::runAfter()定時,而Connector在定時器到期之前析構了怎么辦?可以在Connector的析構函數中注銷定時器。
對于自連接的問題的處理,在發起連接時,首先在本地選擇IP(由路由表確定)和隨機選擇端口,如果目標IP剛好是主機而且端口也相同,這就發生了自連接,處理辦法就是斷開連接重試。
Buffer
muduo在讀取數據時,采用cantter/gatherIO(分散聚集IO),在一次系統調用可以對多個緩沖區進行輸入輸出,而且一部分的緩沖區來自stack,這樣緩沖區足夠大,通常一次readv調用就可以取完數據,
muduo采用的是水平觸發,這樣做不會丟失數據或消息,每次讀取數據只需要一次系統調用,照顧了多個連接的公平性,不會因為某個連接上的數據量過大而影響其他連接處理消息。
發送數據的邏輯是,先嘗試發送數據,如果只發送了部分數據,把剩余的數據放到outputBuffer_,開始關注writable事件,在handleWrite()中記錄發送數據,如果outputBuffer_中已經有待發送的的數據,就不能嘗試發送,否則造成數據錯亂。
TcpServer
TcpServer用于處理新建TcpConnection。
TcpServer新建連接的函數調用。
TcpServer用來管理accpet獲得的TcpConnection,供用戶使用,生命期由用戶控制。使用Accpetor獲取新連接的fd,保存用戶提供的ConnectionCallback和MessageCallback,在新建TcpConnection之后,將這兩個回調函數傳遞給后者。
隨機選擇pool中的EventLoop,不允許TcpConnection在運行中更換EventLoop,每個TcpServer有自己的EventLoopThreadPool。
TcpClient
TcpClient主要使用Connector來進行連接,Connector具備反復嘗試連接的功能,因此客戶端和服務端啟動的順序就無關緊要了。
連接斷開后初次嘗試連接應該具有隨機性,如果服務端崩潰大量客戶端重連,同時重連也會發生丟包,每個TcpClient應該 等待一段隨機時間(0.5-2s)再嘗試連接避免擁塞。
發起連接的時候如果發生TCP SYN丟包,那么系統默認的重試間隔是3s,職期間不會發生錯誤碼。
epoll
epoll是linux獨有的高效的IO復用機制,它與poll的不同之處主要在于poll每次返回整個文件描述符數組,用戶代碼需要遍歷數組以找到哪些文件描述符上有IO事件,epoll_wait()返回活動fd的列表,需要遍歷的數組通常會小的多,再并發連接較大而活動連接比例不高時,epoll比poll更高效。
muduo定義Poller基類并提供兩份實現PollPoller和EPollPoller。
HttpServer
HttpRequest封裝了HTTP請求包的基本格式。
HttpResponse封裝了HTTP的響應包的基本格式。
HttpContext主要是對HTTP請求包的解析,將解析的結果存在對象HttpRequest中。
HttpServer封裝了HTTP的對請求內容的響應,通過用戶指定回調函數來處理請求,使用TcpServer來進行對連接進行處理。
總結
- 上一篇: muduo学习笔记 日志类
- 下一篇: docsify管理学习笔记