日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送

發(fā)布時(shí)間:2025/3/8 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

使用 SSE(Server-Sent Events) 進(jìn)行 HTTP 服務(wù)器推送

這個(gè)示例是一個(gè)類似 twitter 的 web 應(yīng)用程序,使用 Server-Sent Events 來(lái)支持實(shí)時(shí)刷新。

運(yùn)行

docker-compose up

然后, 瀏覽 http://localhost:8080

您可以添加自己的帖子或點(diǎn)擊按鈕獲得隨機(jī)生成的帖子。

無(wú)論哪種方式,feeds 列表和 feed 中的帖子都應(yīng)該是最新的。嘗試使用第二個(gè)瀏覽器窗口查看更新。

它是如何工作的

  • 可以創(chuàng)建和更新帖子。

  • 帖子可以包含標(biāo)簽。

  • 每個(gè)標(biāo)簽都有自己的 feed,其中包含來(lái)自該標(biāo)簽的所有帖子。

  • 所有的帖子都存儲(chǔ)在 MySQL 中。這就是寫模型。

  • 所有 feed 都異步更新并存儲(chǔ)在 MongoDB 中。這是讀模型。

為什么要使用單獨(dú)的寫和讀模型?

對(duì)于這個(gè)示例應(yīng)用程序,使用多語(yǔ)言持久性(兩個(gè)數(shù)據(jù)庫(kù)引擎)當(dāng)然有些過(guò)頭了。我們這樣做是為了展示這個(gè)技術(shù),以及如何很容易地將它應(yīng)用到 Watermill。

專用的讀模型對(duì)于具有高讀/寫比率的應(yīng)用程序是一種有用的模式。所有寫操作都被原子地應(yīng)用到寫模型(在我們的例子中是 MySQL)。事件處理程序異步更新讀模型(我們使用 Mongo)。

讀取模型中的數(shù)據(jù)可以按原樣使用。也可以獨(dú)立于寫模型進(jìn)行擴(kuò)展。

請(qǐng)記住,要使用此模式,應(yīng)用程序中必須接受最終的一致性。而且,在大多數(shù)用例中,您可能不需要使用它。務(wù)實(shí)!

SSE Router

SSERouter?來(lái)自 watermill-http。當(dāng)創(chuàng)建一個(gè)新的路由器時(shí),你需要傳遞一個(gè)上游訂閱者。來(lái)自該訂閱服務(wù)器的消息將觸發(fā)通過(guò) HTTP 推送更新。

在本例中,我們使用 NATS 作為 Pub/Sub,但這可以是 Watermill 支持的任何 Pub/Sub。

sseRouter, err := watermillHTTP.NewSSERouter(
watermillHTTP.SSERouterConfig{
UpstreamSubscriber: router.Subscriber,
ErrorHandler: watermillHTTP.DefaultErrorHandler,
},
router.Logger,
)

Stream Adapters(流適配器)

要使用?SSERouter,你需要準(zhǔn)備一個(gè)帶有兩個(gè)方法的?StreamAdapter。

GetResponse?類似于標(biāo)準(zhǔn)的 HTTP 處理程序。修改現(xiàn)有的處理程序來(lái)匹配這個(gè)簽名應(yīng)該非常容易。

Validate?是一個(gè)額外的方法,它告訴我們是否應(yīng)該為特定的?Message?推送更新。

type StreamAdapter interface {
// GetResponse returns the response to be sent back to client.
// Any errors that occur should be handled and written to `w`, returning false as `ok`.
GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
// Validate validates if the incoming message should be handled by this handler.
// Typically this involves checking some kind of model ID.
Validate(r *http.Request, msg *message.Message) (ok bool)
}

Validate?示例如下所示。它檢查消息是否來(lái)自與用戶通過(guò) HTTP 請(qǐng)求發(fā)送的相同的 post ID。

func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
postUpdated := PostUpdated{}

err := json.Unmarshal(msg.Payload, &postUpdated)
if err != nil {
return false
}

postID := chi.URLParam(r, "id")

return postUpdated.OriginalPost.ID == postID
}

如果你想為每條消息觸發(fā)一個(gè)更新,你可以簡(jiǎn)單地返回?true。

func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
return true
}

在開(kāi)始?SSERouter?之前,您需要添加帶有特定主題的處理程序。?AddHandler?返回一個(gè)可以在任何路由庫(kù)中使用的標(biāo)準(zhǔn) HTTP 處理程序。

postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)

// ...

r.Get("/posts/{id}", postHandler)

Event handlers(事件處理程序)

該示例使用 Watermill 進(jìn)行所有異步通信,包括 SSE。

發(fā)布了以下事件:

  • PostCreated

    • 將 post 添加到貼子中包含標(biāo)簽的所有 feeds 中。

  • FeedUpdated

    • 將更新推送到當(dāng)前訪問(wèn) feed 頁(yè)面的所有客戶端。

  • PostUpdated

    • a) 對(duì)于現(xiàn)有標(biāo)簽,帖子內(nèi)容將在標(biāo)簽中更新。

    • b) 如果添加了新的標(biāo)簽,文章將被添加到標(biāo)簽的 feed 中。

    • c) 如果標(biāo)簽已刪除,則該帖子將從標(biāo)簽的 feed 中刪除。

    • 將更新推送給所有當(dāng)前訪問(wèn) post 頁(yè)面的客戶端。

    • 使用帖子中存在的標(biāo)簽更新所有 feeds 中的帖子

前端 app

前端應(yīng)用程序是使用 Vue.js 和 Bootstrap 構(gòu)建的。

最有趣的部分是?EventSource?的使用。

this.es = new EventSource('/api/feeds/' + this.feed)

this.es.addEventListener('data', event => {
let data = JSON.parse(event.data);
this.posts_stream = data.posts;
}, false);

Refs

  • watermill.io

總結(jié)

以上是生活随笔為你收集整理的服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。