以太坊事件框架
過去在學(xué)Actor模型的時候,就認為異步消息是相當?shù)闹匾?#xff0c;在華為的時候,也深扒了一下當時產(chǎn)品用的消息模型,簡單實用,支撐起了很多模塊和業(yè)務(wù),但也有一個缺點是和其他的框架有耦合,最近看到以太坊的事件框架,同樣簡單簡潔,理念很適合初步接觸事件框架的同學(xué),寫文介紹一下。
以太坊的事件框架是一個單獨的基礎(chǔ)模塊,存在于目錄go-ethereum/event中,它有2中獨立的事件框架實現(xiàn),老點的叫TypeMux,已經(jīng)基本棄用,新的叫Feed,當前正在廣泛使用。
TypeMux和Feed還只是簡單的事件框架,與Kafka、RocketMQ等消息系統(tǒng)相比,是非常的傳統(tǒng)和簡單,但是TypeMux和Feed的簡單簡潔,已經(jīng)很好的支撐以太坊的上層模塊,這是當下最好的選擇。
TypeMux和Feed各有優(yōu)劣,最優(yōu)秀的共同特點是,他們只依賴于Golang原始的包,完全與以太坊的其他模塊隔離開來,也就是說,你完全可以把這兩個事件框架用在自己的項目中。
TypeMux的特點是,你把所有的訂閱塞給它就好,事件來了它自會通知你,但有可能會阻塞,通知你不是那么及時,甚至過了一段挺長的時間。
Feed的特點是,它通常不存在阻塞的情況,會及時的把事件通知給你,但需要你為每類事件都建立一個Feed,然后不同的事件去不同的Feed上訂閱和發(fā)送,這其實挺煩人的,如果你用錯了Feed,會導(dǎo)致panic。
接下來,介紹下這種簡單事件框架的抽象模型,然后再回歸到以太坊,介紹下TypeMux和Feed。
!<--more-->
事件框架的抽象結(jié)構(gòu)
如上圖,輕量級的事件框架會把所有的被訂閱的事件收集起來,然后把每個訂閱者組合成一個列表,當事件框架收到某個事件的時候,就把訂閱該事件的所有訂閱者找出來,然后把這個事件發(fā)給他們。
它需要具有2個功能:
如果做成完善的消息系統(tǒng),就還得考慮這些特性:可用性、吞吐量、傳輸延遲、有序消息、消息存儲、過濾、重發(fā),這和事件框架相比就復(fù)雜上去了,我們專注的介紹下以太坊的事件模型怎么完成上述3個功能的。
以太坊的事件模型
TypeMux是一個以太坊不太滿意的事件框架,所以以太坊就搞了Feed出來,它解決了TypeMux效率低下,延遲交付的問題。接下來就先看下這個TypeMux。
TypeMux:同步事件框架
TypeMux是一個同步事件框架。它的實現(xiàn)和上面講的事件框架的抽象結(jié)構(gòu)是完全一樣的,它維護了一個訂閱表,表里維護了每個事件的訂閱者列表。它的特點:
看下它2個功能的實現(xiàn):
TypeMux源碼速遞
TypeMux的精簡組成:
// A TypeMux dispatches events to registered receivers. Receivers can be // registered to handle events of certain type. Any operation // called after mux is stopped will return ErrMuxClosed. // // The zero value is ready to use. // // Deprecated: use Feed // 本質(zhì):哈希列表,每個事件的訂閱者都存到對于的列表里 type TypeMux struct {mutex sync.RWMutex // 鎖subm map[reflect.Type][]*TypeMuxSubscription // 訂閱表:所有事件類型的所有訂閱者stopped bool }訂閱:
// Subscribe creates a subscription for events of the given types. The // subscription's channel is closed when it is unsubscribed // or the mux is closed. // 訂閱者只傳入訂閱的事件類型,然后TypeMux會返回給它一個訂閱對象 func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {sub := newsub(mux)mux.mutex.Lock()defer mux.mutex.Unlock()if mux.stopped {// set the status to closed so that calling Unsubscribe after this// call will short circuit.sub.closed = trueclose(sub.postC)} else {if mux.subm == nil {mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)}for _, t := range types {rtyp := reflect.TypeOf(t)// 在同一次訂閱中,不要重復(fù)訂閱同一個類型的事件oldsubs := mux.subm[rtyp]if find(oldsubs, sub) != -1 {panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))}subs := make([]*TypeMuxSubscription, len(oldsubs)+1)copy(subs, oldsubs)subs[len(oldsubs)] = submux.subm[rtyp] = subs}}return sub }取消訂閱:
func (s *TypeMuxSubscription) Unsubscribe() {s.mux.del(s)s.closewait() }發(fā)布事件和傳遞事件:
// Post sends an event to all receivers registered for the given type. // It returns ErrMuxClosed if the mux has been stopped. // 遍歷map,找到所有訂閱的人,向它們傳遞event,同一個event對象,非拷貝,運行在調(diào)用者goroutine func (mux *TypeMux) Post(ev interface{}) error {event := &TypeMuxEvent{Time: time.Now(),Data: ev,}rtyp := reflect.TypeOf(ev)mux.mutex.RLock()if mux.stopped {mux.mutex.RUnlock()return ErrMuxClosed}subs := mux.subm[rtyp]mux.mutex.RUnlock()for _, sub := range subs {sub.deliver(event)}return nil }func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {// Short circuit delivery if stale event// 不發(fā)送過早(老)的消息if s.created.After(event.Time) {return}// Otherwise deliver the events.postMu.RLock()defer s.postMu.RUnlock()select {case s.postC <- event:case <-s.closing:} }我上面指出了發(fā)送事件可能阻塞,阻塞在哪?關(guān)鍵就在下面這里:創(chuàng)建TypeMuxSubscription時,通道使用的是無緩存通道,讀寫是同步的,這里注定了TypeMux是一個同步事件框架,這是以太坊改用Feed的最大原因。
func newsub(mux *TypeMux) *TypeMuxSubscription {c := make(chan *TypeMuxEvent) // 無緩沖通道,同步讀寫return &TypeMuxSubscription{mux: mux,created: time.Now(),readC: c,postC: c,closing: make(chan struct{}),} }Feed:流式框架
Feed是一個流式事件框架。上文強調(diào)了TypeMux是一個同步框架,也正是因為此以太坊丟棄了它,難道Feed就是一個異步框架?不一定是的,這取決于訂閱者是否采用有緩存的通道,采用有緩存的通道,則Feed就是異步的,采用無緩存的通道,Feed就是同步的,把同步還是異步的選擇交給使用者。
本節(jié)強調(diào)Feed的流式特點。事件本質(zhì)是一個數(shù)據(jù),連續(xù)不斷的事件就組成了一個數(shù)據(jù)流,這些數(shù)據(jù)流不停的流向它的訂閱者那里,并且不會阻塞在任何一個訂閱者那里。
舉幾個不是十分恰當?shù)睦印?/p>
Feed和TypeMux相同的是,它們都是推模式,不同的是Feed是異步的,如果有些訂閱者阻塞了,沒關(guān)系,它會繼續(xù)向后面的訂閱者發(fā)送事件/消息。
Feed是一個一對多的事件流框架。每個類型的事件都需要一個與之對應(yīng)的Feed,訂閱者通過這個Feed進行訂閱事件,發(fā)布者通過這個Feed發(fā)布事件。
看下Feed是如何實現(xiàn)2個功能的:
Feed源碼速遞
Feed定義:
// Feed implements one-to-many subscriptions where the carrier of events is a channel. // Values sent to a Feed are delivered to all subscribed channels simultaneously. // // Feeds can only be used with a single type. The type is determined by the first Send or // Subscribe operation. Subsequent calls to these methods panic if the type does not // match. // // The zero value is ready to use. // 一對多的事件訂閱管理:每個feed對象,當別人調(diào)用send的時候,會發(fā)送給所有訂閱者 // 每種事件類型都有一個自己的feed,一個feed內(nèi)訂閱的是同一種類型的事件,得用某個事件的feed才能訂閱該事件 type Feed struct {once sync.Once // ensures that init only runs oncesendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases. 這個鎖確保了只有一個協(xié)程在使用go routineremoveSub chan interface{} // interrupts SendsendCases caseList // the active set of select cases used by Send,訂閱的channel列表,這些channel是活躍的// The inbox holds newly subscribed channels until they are added to sendCases.mu sync.Mutexinbox caseList // 不活躍的在這里etype reflect.Typeclosed bool }訂閱事件:
// Subscribe adds a channel to the feed. Future sends will be delivered on the channel // until the subscription is canceled. All channels added must have the same element type. // // The channel should have ample buffer space to avoid blocking other subscribers. // Slow subscribers are not dropped. // 訂閱者傳入接收事件的通道,feed將通道保存為case,然后返回給訂閱者訂閱對象 func (f *Feed) Subscribe(channel interface{}) Subscription {f.once.Do(f.init)// 通道和通道類型檢查chanval := reflect.ValueOf(channel)chantyp := chanval.Type()if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {panic(errBadChannel)}sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}f.mu.Lock()defer f.mu.Unlock()if !f.typecheck(chantyp.Elem()) {panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})}// 把通道保存到case// Add the select case to the inbox.// The next Send will add it to f.sendCases.cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}f.inbox = append(f.inbox, cas)return sub }發(fā)送和傳遞事件:這個發(fā)送是比較繞一點的,要想真正掌握其中的運行,最好寫個小程序練習(xí)下。
// Send delivers to all subscribed channels simultaneously. // It returns the number of subscribers that the value was sent to. // 同時向所有的訂閱者發(fā)送事件,返回訂閱者的數(shù)量 func (f *Feed) Send(value interface{}) (nsent int) {rvalue := reflect.ValueOf(value)f.once.Do(f.init)<-f.sendLock // 獲取發(fā)送鎖// Add new cases from the inbox after taking the send lock.// 從inbox加入到sendCases,不能訂閱的時候直接加入到sendCases,因為可能其他協(xié)程在調(diào)用發(fā)送f.mu.Lock()f.sendCases = append(f.sendCases, f.inbox...)f.inbox = nil// 類型檢查:如果該feed不是要發(fā)送的值的類型,釋放鎖,并且執(zhí)行panicif !f.typecheck(rvalue.Type()) {f.sendLock <- struct{}{}panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})}f.mu.Unlock()// Set the sent value on all channels.// 把發(fā)送的值關(guān)聯(lián)到每個case/channel,每一個事件都有一個feed,所以這里全是同一個事件的for i := firstSubSendCase; i < len(f.sendCases); i++ {f.sendCases[i].Send = rvalue}// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix// of sendCases. When a send succeeds, the corresponding case moves to the end of// 'cases' and it shrinks by one element.// 所有case仍然保留在sendCases,只是用過的會移動到最后面cases := f.sendCasesfor {// Fast path: try sending without blocking before adding to the select set.// This should usually succeed if subscribers are fast enough and have free// buffer space.// 使用非阻塞式發(fā)送,如果不能發(fā)送就及時返回for i := firstSubSendCase; i < len(cases); i++ {// 如果發(fā)送成功,把這個case移動到末尾,所以i這個位置就是沒處理過的,然后大小減1if cases[i].Chan.TrySend(rvalue) {nsent++cases = cases.deactivate(i)i--}}// 如果這個地方成立,代表所有訂閱者都不阻塞,都發(fā)送完了if len(cases) == firstSubSendCase {break}// Select on all the receivers, waiting for them to unblock.// 返回一個可用的,直到不阻塞。chosen, recv, _ := reflect.Select(cases)if chosen == 0 /* <-f.removeSub */ {// 這個接收方要刪除了,刪除并縮小sendCasesindex := f.sendCases.find(recv.Interface())f.sendCases = f.sendCases.delete(index)if index >= 0 && index < len(cases) {// Shrink 'cases' too because the removed case was still active.cases = f.sendCases[:len(cases)-1]}} else {// reflect已經(jīng)確保數(shù)據(jù)已經(jīng)發(fā)送,無需再嘗試發(fā)送cases = cases.deactivate(chosen)nsent++}}// 把sendCases中的send都標記為空// Forget about the sent value and hand off the send lock.for i := firstSubSendCase; i < len(f.sendCases); i++ {f.sendCases[i].Send = reflect.Value{}}f.sendLock <- struct{}{}return nsent }總結(jié)
- 上一篇: scikit_learn逻辑回归类库
- 下一篇: 微信小游戏踩坑记录(二)