服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送
使用 SSE(Server-Sent Events) 進(jìn)行 HTTP 服務(wù)器推送
這個(gè)示例是一個(gè)類似 twitter 的 web 應(yīng)用程序,使用 Server-Sent Events 來(lái)支持實(shí)時(shí)刷新。
運(yùn)行
docker-compose up然后, 瀏覽 http://localhost:8080
您可以添加自己的帖子或點(diǎn)擊按鈕獲得隨機(jī)生成的帖子。
無(wú)論哪種方式,feeds 列表和 feed 中的帖子都應(yīng)該是最新的。嘗試使用第二個(gè)瀏覽器窗口查看更新。
它是如何工作的
可以創(chuàng)建和更新帖子。
帖子可以包含標(biāo)簽。
每個(gè)標(biāo)簽都有自己的 feed,其中包含來(lái)自該標(biāo)簽的所有帖子。
所有的帖子都存儲(chǔ)在 MySQL 中。這就是寫模型。
所有 feed 都異步更新并存儲(chǔ)在 MongoDB 中。這是讀模型。
為什么要使用單獨(dú)的寫和讀模型?
對(duì)于這個(gè)示例應(yīng)用程序,使用多語(yǔ)言持久性(兩個(gè)數(shù)據(jù)庫(kù)引擎)當(dāng)然有些過(guò)頭了。我們這樣做是為了展示這個(gè)技術(shù),以及如何很容易地將它應(yīng)用到 Watermill。
專用的讀模型對(duì)于具有高讀/寫比率的應(yīng)用程序是一種有用的模式。所有寫操作都被原子地應(yīng)用到寫模型(在我們的例子中是 MySQL)。事件處理程序異步更新讀模型(我們使用 Mongo)。
讀取模型中的數(shù)據(jù)可以按原樣使用。也可以獨(dú)立于寫模型進(jìn)行擴(kuò)展。
請(qǐng)記住,要使用此模式,應(yīng)用程序中必須接受最終的一致性。而且,在大多數(shù)用例中,您可能不需要使用它。務(wù)實(shí)!
SSE Router
SSERouter?來(lái)自 watermill-http。當(dāng)創(chuàng)建一個(gè)新的路由器時(shí),你需要傳遞一個(gè)上游訂閱者。來(lái)自該訂閱服務(wù)器的消息將觸發(fā)通過(guò) HTTP 推送更新。
在本例中,我們使用 NATS 作為 Pub/Sub,但這可以是 Watermill 支持的任何 Pub/Sub。
sseRouter, err := watermillHTTP.NewSSERouter(watermillHTTP.SSERouterConfig{
UpstreamSubscriber: router.Subscriber,
ErrorHandler: watermillHTTP.DefaultErrorHandler,
},
router.Logger,
)
Stream Adapters(流適配器)
要使用?SSERouter,你需要準(zhǔn)備一個(gè)帶有兩個(gè)方法的?StreamAdapter。
GetResponse?類似于標(biāo)準(zhǔn)的 HTTP 處理程序。修改現(xiàn)有的處理程序來(lái)匹配這個(gè)簽名應(yīng)該非常容易。
Validate?是一個(gè)額外的方法,它告訴我們是否應(yīng)該為特定的?Message?推送更新。
type StreamAdapter interface {// GetResponse returns the response to be sent back to client.
// Any errors that occur should be handled and written to `w`, returning false as `ok`.
GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
// Validate validates if the incoming message should be handled by this handler.
// Typically this involves checking some kind of model ID.
Validate(r *http.Request, msg *message.Message) (ok bool)
}
Validate?示例如下所示。它檢查消息是否來(lái)自與用戶通過(guò) HTTP 請(qǐng)求發(fā)送的相同的 post ID。
func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {postUpdated := PostUpdated{}
err := json.Unmarshal(msg.Payload, &postUpdated)
if err != nil {
return false
}
postID := chi.URLParam(r, "id")
return postUpdated.OriginalPost.ID == postID
}
如果你想為每條消息觸發(fā)一個(gè)更新,你可以簡(jiǎn)單地返回?true。
func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {return true
}
在開(kāi)始?SSERouter?之前,您需要添加帶有特定主題的處理程序。?AddHandler?返回一個(gè)可以在任何路由庫(kù)中使用的標(biāo)準(zhǔn) HTTP 處理程序。
postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)// ...
r.Get("/posts/{id}", postHandler)
Event handlers(事件處理程序)
該示例使用 Watermill 進(jìn)行所有異步通信,包括 SSE。
發(fā)布了以下事件:
PostCreated
將 post 添加到貼子中包含標(biāo)簽的所有 feeds 中。
FeedUpdated
將更新推送到當(dāng)前訪問(wèn) feed 頁(yè)面的所有客戶端。
PostUpdated
a) 對(duì)于現(xiàn)有標(biāo)簽,帖子內(nèi)容將在標(biāo)簽中更新。
b) 如果添加了新的標(biāo)簽,文章將被添加到標(biāo)簽的 feed 中。
c) 如果標(biāo)簽已刪除,則該帖子將從標(biāo)簽的 feed 中刪除。
將更新推送給所有當(dāng)前訪問(wèn) post 頁(yè)面的客戶端。
使用帖子中存在的標(biāo)簽更新所有 feeds 中的帖子
前端 app
前端應(yīng)用程序是使用 Vue.js 和 Bootstrap 構(gòu)建的。
最有趣的部分是?EventSource?的使用。
this.es = new EventSource('/api/feeds/' + this.feed)this.es.addEventListener('data', event => {
let data = JSON.parse(event.data);
this.posts_stream = data.posts;
}, false);
Refs
watermill.io
總結(jié)
以上是生活随笔為你收集整理的服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: http://xj.gsxt.gov.c
- 下一篇: 会计云课堂实名认证后怎么更改_离职了,税