原来 goim 是这样实现高并发
本章將從架構(gòu)和程序設(shè)計(jì)兩個方面來闡述goim 高并發(fā)的實(shí)現(xiàn)原理。
架構(gòu)
首先從架構(gòu)來說 goim 分為三層 comet、logic 和 job。
comet 屬于接入層,非常容易擴(kuò)展,直接開啟多個 comet 節(jié)點(diǎn),前端接入可以使用 LVS 或者 DNS來轉(zhuǎn)發(fā)。
logic 屬于無狀態(tài)的邏輯層,可以隨意增加節(jié)點(diǎn),使用 nginx upstream 來擴(kuò)展 http 接口,內(nèi)部 rpc 部分,可以使用 LVS 四層轉(zhuǎn)發(fā)。
job 用于解耦 comet 和 logic。
系統(tǒng)使用 kafka 作為消息隊(duì)列,可以通過 kafka 使用多個 broker 或者多個 partition 來擴(kuò)展隊(duì)列。使用 redis 作為元數(shù)據(jù)、節(jié)點(diǎn)心跳信息等維護(hù)。
?
程序設(shè)計(jì)
其次在程序設(shè)計(jì)上,一是盡可能的拆分鎖的粒度,來減少資源競態(tài)。二是在內(nèi)存管理方面,通過申請一個大內(nèi)存,然后拆成所需的數(shù)據(jù)類型,自己進(jìn)行管理,來減少頻繁申請與銷毀內(nèi)存操作對性能的損耗。三是充分利用 goroutine 和 channel 實(shí)現(xiàn)高并發(fā)。四是合理應(yīng)用緩沖,提供讀寫性能。
拆分鎖的粒度
比如 comet 模塊,通過 bucket 來拆分 TCP 鏈接,每個 TCP 鏈接根據(jù)一定的規(guī)則劃分到不同的? bucket 中進(jìn)行管理,而不是集中到單個大而全 bucket中,這樣鎖的粒度更小,資源競態(tài)幾率就更低,性能也能更好的提升,不需要將時間花費(fèi)的等鎖上。
//internal/comet/server.go //初始化 Server,生成多個 bucket. func NewServer(c *conf.Config) *Server { ....s.buckets = make([]*Bucket, c.Bucket.Size)s.bucketIdx = uint32(c.Bucket.Size)for i := 0; i < c.Bucket.Size; i++ {//生成多個buckets.buckets[i] = NewBucket(c.Bucket)}... } //根據(jù) subKey 獲取 bucket,將不同的 TCP 分配到不同的 bucket 進(jìn)行管理。 func (s *Server) Bucket(subKey string) *Bucket {idx := cityhash.CityHash32([]byte(subKey), uint32(len(subKey))) % s.bucketIdxif conf.Conf.Debug {log.Infof("%s hit channel bucket index: %d use cityhash", subKey, idx)}return s.buckets[idx] } /*廣播消息通過循環(huán) Bukets, 每個 bucket 有自己的鎖,通過拆分鎖的粒度,來減少鎖的競態(tài),挺高性能。 */ func (s *server) Broadcast(ctx context.Context, req *pb.BroadcastReq) (*pb.BroadcastReply, error) { ....go func() {for _, bucket := range s.srv.Buckets() {bucket.Broadcast(req.GetProto(), req.ProtoOp)if req.Speed > 0 {t := bucket.ChannelCount() / int(req.Speed)time.Sleep(time.Duration(t) * time.Second)}}}()... }?
內(nèi)存管理
在 comet 模塊中,在 round(internal/comet/round.go) 中,會一次性申請足夠的讀緩沖和寫緩存以及定時器,通過一個空閑鏈表中進(jìn)行維護(hù)。每個 TCP 鏈接需要的時候,從這些空閑鏈表獲得,使用完之后放回去。對于 TCP 讀 goroutine 中,每個 TCP 有一個 proto 緩沖(ring),通過環(huán)形數(shù)組實(shí)現(xiàn)。
//internal/comet/server.go //NewRound 根據(jù)配置,提前申請各種數(shù)據(jù)類型,以備使用。 func NewServer(c *conf.Config) *Server {s := &Server{c: c,round: NewRound(c),rpcClient: newLogicClient(c.RPCClient),} ... } //每個 tcp 連接從 round 獲取一個 Timer、Reader、Writer func serveTCP(s *Server, conn *net.TCPConn, r int) {var (tr = s.round.Timer(r)rp = s.round.Reader(r)wp = s.round.Writer(r) ...)s.ServeTCP(conn, rp, wp, tr) }//每個 tcp 連接通過 ring(internal/comet/ring.go) 生成一個 proto 類型的環(huán)形數(shù)組,用于讀取數(shù)據(jù)。 func (s *Server) ServeTCP(conn *net.TCPConn, rp, wp *bytes.Pool, tr *xtime.Timer) {... var ( ch = NewChannel(s.c.Protocol.CliProto, s.c.Protocol.SvrProto) //環(huán)形數(shù)組) ... //讀取數(shù)據(jù)過程,先從環(huán)形數(shù)組中獲得一個 proto,然后將數(shù)據(jù)寫入 proto if p, err = ch.CliProto.Set(); err != nil {break } if err = p.ReadTCP(rr); err != nil {break } }//internal/comet/round.go type Round struct {readers []bytes.Poolwriters []bytes.Pooltimers []time.Timeroptions RoundOptions }func NewRound(c *conf.Config) (r *Round) { ....// readerr.readers = make([]bytes.Pool, r.options.Reader) //生成 N 個緩存池for i = 0; i < r.options.Reader; i++ { r.readers[i].Init(r.options.ReadBuf, r.options.ReadBufSize)}// writerr.writers = make([]bytes.Pool, r.options.Writer)for i = 0; i < r.options.Writer; i++ {r.writers[i].Init(r.options.WriteBuf, r.options.WriteBufSize)}// timerr.timers = make([]time.Timer, r.options.Timer)for i = 0; i < r.options.Timer; i++ {r.timers[i].Init(r.options.TimerSize)}... }goroutine 和 channel 實(shí)現(xiàn)高并發(fā)
比如 comet 對于推送 room 消息, 每個 bucket 將推送通道分成32個,每個通道1024長度。每個通道由一個 goroutine 進(jìn)行消費(fèi)處理。推送 room? 消息的時候,依次推送到這32個通道中。這樣做提高 bucket 內(nèi)部的并發(fā)度,不至于一個通道堵塞,導(dǎo)致全部都在等待。?
//internal/comet/bucket.go //每個bucket 生成 RoutineAmount 個 Channel, 每個 Channel 由一個 roomproc 處理。 func NewBucket(c *conf.Bucket) (b *Bucket) {b.routines = make([]chan *pb.BroadcastRoomReq, c.RoutineAmount)for i := uint64(0); i < c.RoutineAmount; i++ {c := make(chan *pb.BroadcastRoomReq, c.RoutineSize)b.routines[i] = cgo b.roomproc(c)}return } func (b *Bucket) roomproc(c chan *pb.BroadcastRoomReq) {for {arg := <-cif room := b.Room(arg.RoomID); room != nil {room.Push(arg.Proto)}} }//將消息輪詢發(fā)送到 routines 中。 func (b *Bucket) BroadcastRoom(arg *pb.BroadcastRoomReq) {num := atomic.AddUint64(&b.routinesNum, 1) % b.c.RoutineAmountb.routines[num] <- arg }同時 Job?中也充分利用 goroutine?和 channel,在 Job?中 每個 comet 區(qū)分不同的消息推送通道。
1.pushChan:推送單聊消息的通道,分為 N 組,依次將消息推送的 N 組中,每個組有自己的 goroutine, 來提高并發(fā)性
2.roomChan:推送群聊消息的通道,分為 N 組,依次將消息推送的 N 組中,每個組有自己的 goroutine, 來提高并發(fā)性
3.broadcastChan:廣播消息
4.開啟 N?個 goroutine, 每個 goroutine,接收單聊、群聊和廣播消息。
合理應(yīng)用緩沖,提供讀寫性能
在 job 推送 room 消息時,并非在收到消息的時候就推送到 comet,是是通過一定的策略實(shí)現(xiàn)批量推送,來提高讀寫性能。?
對于某個 room 消息而言,會開啟一個 goroutine 來處理并開啟了寫緩沖機(jī)制,按批次進(jìn)行發(fā)送(消息個數(shù))。接收到消息之后,不是馬上發(fā)送,而是進(jìn)行緩沖,等待一段時間,看看還有沒有消息。推送的條件一是已經(jīng)達(dá)到了最大批次數(shù),二是超時。如果長時間沒有消息,會銷毀這個 room。
具體參考 internal/job/room.go pushproc 實(shí)現(xiàn)。
總結(jié)
閱讀了 goim 源碼之后,對與如何設(shè)計(jì)一個高并發(fā)的服務(wù),個人認(rèn)為主要體現(xiàn)在幾個方面,一是拆分職能、可以做到各個模塊的擴(kuò)縮容。拆分粒度,減少競態(tài)和性能損耗。二是更巧妙的方式去使用內(nèi)存來減少頻繁申請和銷毀內(nèi)存的性能損耗。三是充分利用語言特性實(shí)現(xiàn)高并發(fā)。
總結(jié)
以上是生活随笔為你收集整理的原来 goim 是这样实现高并发的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LeetCode 155.最小栈
- 下一篇: eqep t法_TMS5704357BZ