kcp-go源码解析
概念
ARQ:自動重傳請求(Automatic Repeat-reQuest,ARQ)是OSI模型中數(shù)據(jù)鏈路層的錯誤糾正協(xié)議之一.
RTO:Retransmission TimeOut
FEC:Forward Error Correction
kcp簡介
kcp是一個基于udp實現(xiàn)快速、可靠、向前糾錯的的協(xié)議,能以比TCP浪費10%-20%的帶寬的代價,換取平均延遲降低30%-40%,且最大延遲降低三倍的傳輸效果。純算法實現(xiàn),并不負(fù)責(zé)底層協(xié)議(如UDP)的收發(fā)。查看官方文檔kcp
kcp-go是用go實現(xiàn)了kcp協(xié)議的一個庫,其實kcp類似tcp,協(xié)議的實現(xiàn)也很多參考tcp協(xié)議的實現(xiàn),滑動窗口,快速重傳,選擇性重傳,慢啟動等。
kcp和tcp一樣,也分客戶端和監(jiān)聽端。
kcp協(xié)議
layer model
+----------------------+ | Session | +----------------------+ | KCP(ARQ) | +----------------------+ | FEC(OPTIONAL) | +----------------------+ | CRYPTO(OPTIONAL)| +----------------------+ | UDP(Packet) | +----------------------+KCP header
KCP Header Format
4 1 1 2 (Byte) +---+---+---+---+---+---+---+---+ | conv |cmd|frg| wnd | +---+---+---+---+---+---+---+---+ | ts | sn | +---+---+---+---+---+---+---+---+ | una | len | +---+---+---+---+---+---+---+---+ | | + DATA + | | +---+---+---+---+---+---+---+---+代碼結(jié)構(gòu)
src/vendor/github.com/xtaci/kcp-go/ ├── LICENSE ├── README.md ├── crypt.go 加解密實現(xiàn) ├── crypt_test.go ├── donate.png ├── fec.go 向前糾錯實現(xiàn) ├── frame.png ├── kcp-go.png ├── kcp.go kcp協(xié)議實現(xiàn) ├── kcp_test.go ├── sess.go 會話管理實現(xiàn) ├── sess_test.go ├── snmp.go 數(shù)據(jù)統(tǒng)計實現(xiàn) ├── updater.go 任務(wù)調(diào)度實現(xiàn) ├── xor.go xor封裝 └── xor_test.go著重研究兩個文件kcp.go和sess.go
kcp淺析
kcp是基于udp實現(xiàn)的,所有udp的實現(xiàn)這里不做介紹,kcp做的事情就是怎么封裝udp的數(shù)據(jù)和怎么解析udp的數(shù)據(jù),再加各種處理機(jī)制,為了重傳,擁塞控制,糾錯等。下面介紹kcp客戶端和服務(wù)端整體實現(xiàn)的流程,只是大概介紹一下函數(shù)流,不做詳細(xì)解析,詳細(xì)解析看后面數(shù)據(jù)流的解析。
kcp client整體函數(shù)流
和tcp一樣,kcp要連接服務(wù)端需要先撥號,但是和tcp有個很大的不同是,即使服務(wù)端沒有啟動,客戶端一樣可以撥號成功,因為實際上這里的撥號沒有發(fā)送任何信息,而tcp在這里需要三次握手。
DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)V net.DialUDP("udp", nil, udpaddr)V NewConn()V newUDPSession() {初始化UDPSession}V NewKCP() {初始化kcp}V updater.addSession(sess) {管理session會話,任務(wù)管理,根據(jù)用戶設(shè)置的internal參數(shù)間隔來輪流喚醒任務(wù)}V go sess.readLoop()V go s.receiver(chPacket)V s.kcpInput(data)V s.fecDecoder.decodeBytes(data)V s.kcp.Input(data, true, s.ackNoDelay)V kcp.parse_data(seg) {將分段好的數(shù)據(jù)插入kcp.rcv_buf緩沖}V notifyReadEvent()客戶端大體的流程如上面所示,先Dial,建立udp連接,將這個連接封裝成一個會話,然后啟動一個go程,接收udp的消息。
kcp server整體函數(shù)流
ListenWithOptions() V net.ListenUDP()V ServerConn()V newFECDecoder()V go l.monitor() {從chPacket接收udp數(shù)據(jù),寫入kcp}V go l.receiver(chPacket) {從upd接收數(shù)據(jù),并入隊列}V newUDPSession()V updater.addSession(sess) {管理session會話,任務(wù)管理,根據(jù)用戶設(shè)置的internal參數(shù)間隔來輪流喚醒任務(wù)}V s.kcpInput(data)`V s.fecDecoder.decodeBytes(data)V s.kcp.Input(data, true, s.ackNoDelay)V kcp.parse_data(seg) {將分段好的數(shù)據(jù)插入kcp.rcv_buf緩沖}V notifyReadEvent()服務(wù)端的大體流程如上圖所示,先Listen,啟動udp監(jiān)聽,接著用一個go程監(jiān)控udp的數(shù)據(jù)包,負(fù)責(zé)將不同session的數(shù)據(jù)寫入不同的udp連接,然后解析封裝將數(shù)據(jù)交給上層。
kcp 數(shù)據(jù)流詳細(xì)解析
不管是kcp的客戶端還是服務(wù)端,他們都有io行為,就是讀與寫,我們只分析一個就好了,因為它們讀寫的實現(xiàn)是一樣的,這里分析客戶端的讀與寫。
kcp client 發(fā)送消息
s.Write(b []byte) V s.kcp.WaitSnd() {}V s.kcp.Send(b) {將數(shù)據(jù)根據(jù)mss分段,并存在kcp.snd_queue}V s.kcp.flush(false) [flush data to output] {if writeDelay==true {flush}else{每隔`interval`時間flush一次} }V kcp.output(buffer, size) V s.output(buf)V s.conn.WriteTo(ext, s.remote)V s.conn..Conn.WriteTo(buf)讀寫都是在sess.go文件中實現(xiàn)的,Write方法:
// Write implements net.Conn func (s *UDPSession) Write(b []byte) (n int, err error) {for {...// api flow controlif s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {n = len(b)for {if len(b) <= int(s.kcp.mss) {s.kcp.Send(b)break} else {s.kcp.Send(b[:s.kcp.mss])b = b[s.kcp.mss:]}}if !s.writeDelay {s.kcp.flush(false)}s.mu.Unlock()atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))return n, nil}...// wait for write event or timeoutselect {case <-s.chWriteEvent:case <-c:case <-s.die:}if timeout != nil {timeout.Stop()}} }假設(shè)發(fā)送一個hello消息,Write方法會先判斷發(fā)送窗口是否已滿,滿的話該函數(shù)阻塞,不滿則kcp.Send(“hello”),而Send函數(shù)實現(xiàn)根據(jù)mss的值對數(shù)據(jù)分段,當(dāng)然這里的發(fā)送的hello,長度太短,只分了一個段,并把它們插入發(fā)送的隊列里。
func (kcp *KCP) Send(buffer []byte) int {...for i := 0; i < count; i++ {var size intif len(buffer) > int(kcp.mss) {size = int(kcp.mss)} else {size = len(buffer)}seg := kcp.newSegment(size)copy(seg.data, buffer[:size])if kcp.stream == 0 { // message modeseg.frg = uint8(count - i - 1)} else { // stream modeseg.frg = 0}kcp.snd_queue = append(kcp.snd_queue, seg)buffer = buffer[size:]}return 0 }接著判斷參數(shù)writeDelay,如果參數(shù)設(shè)置為false,則立馬發(fā)送消息,否則需要任務(wù)調(diào)度后才會觸發(fā)發(fā)送,發(fā)送消息是由flush函數(shù)實現(xiàn)的。
// flush pending data func (kcp *KCP) flush(ackOnly bool) {var seg Segmentseg.conv = kcp.convseg.cmd = IKCP_CMD_ACKseg.wnd = kcp.wnd_unused()seg.una = kcp.rcv_nxtbuffer := kcp.buffer// flush acknowledgesptr := bufferfor i, ack := range kcp.acklist {size := len(buffer) - len(ptr)if size+IKCP_OVERHEAD > int(kcp.mtu) {kcp.output(buffer, size)ptr = buffer}// filter jitters caused by bufferbloatif ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {seg.sn, seg.ts = ack.sn, ack.tsptr = seg.encode(ptr)}}kcp.acklist = kcp.acklist[0:0]if ackOnly { // flash remain ack segmentssize := len(buffer) - len(ptr)if size > 0 {kcp.output(buffer, size)}return}// probe window size (if remote window size equals zero)if kcp.rmt_wnd == 0 {current := currentMs()if kcp.probe_wait == 0 {kcp.probe_wait = IKCP_PROBE_INITkcp.ts_probe = current + kcp.probe_wait} else {if _itimediff(current, kcp.ts_probe) >= 0 {if kcp.probe_wait < IKCP_PROBE_INIT {kcp.probe_wait = IKCP_PROBE_INIT}kcp.probe_wait += kcp.probe_wait / 2if kcp.probe_wait > IKCP_PROBE_LIMIT {kcp.probe_wait = IKCP_PROBE_LIMIT}kcp.ts_probe = current + kcp.probe_waitkcp.probe |= IKCP_ASK_SEND}}} else {kcp.ts_probe = 0kcp.probe_wait = 0}// flush window probing commandsif (kcp.probe & IKCP_ASK_SEND) != 0 {seg.cmd = IKCP_CMD_WASKsize := len(buffer) - len(ptr)if size+IKCP_OVERHEAD > int(kcp.mtu) {kcp.output(buffer, size)ptr = buffer}ptr = seg.encode(ptr)}// flush window probing commandsif (kcp.probe & IKCP_ASK_TELL) != 0 {seg.cmd = IKCP_CMD_WINSsize := len(buffer) - len(ptr)if size+IKCP_OVERHEAD > int(kcp.mtu) {kcp.output(buffer, size)ptr = buffer}ptr = seg.encode(ptr)}kcp.probe = 0// calculate window sizecwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)if kcp.nocwnd == 0 {cwnd = _imin_(kcp.cwnd, cwnd)}// sliding window, controlled by snd_nxt && sna_una+cwndnewSegsCount := 0for k := range kcp.snd_queue {if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {break}newseg := kcp.snd_queue[k]newseg.conv = kcp.convnewseg.cmd = IKCP_CMD_PUSHnewseg.sn = kcp.snd_nxtkcp.snd_buf = append(kcp.snd_buf, newseg)kcp.snd_nxt++newSegsCount++kcp.snd_queue[k].data = nil}if newSegsCount > 0 {kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)}// calculate resentresent := uint32(kcp.fastresend)if kcp.fastresend <= 0 {resent = 0xffffffff}// check for retransmissionscurrent := currentMs()var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64for k := range kcp.snd_buf {segment := &kcp.snd_buf[k]needsend := falseif segment.xmit == 0 { // initial transmitneedsend = truesegment.rto = kcp.rx_rtosegment.resendts = current + segment.rto} else if _itimediff(current, segment.resendts) >= 0 { // RTOneedsend = trueif kcp.nodelay == 0 {segment.rto += kcp.rx_rto} else {segment.rto += kcp.rx_rto / 2}segment.resendts = current + segment.rtolost++lostSegs++} else if segment.fastack >= resent { // fast retransmitneedsend = truesegment.fastack = 0segment.rto = kcp.rx_rtosegment.resendts = current + segment.rtochange++fastRetransSegs++} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmitneedsend = truesegment.fastack = 0segment.rto = kcp.rx_rtosegment.resendts = current + segment.rtochange++earlyRetransSegs++}if needsend {segment.xmit++segment.ts = currentsegment.wnd = seg.wndsegment.una = seg.unasize := len(buffer) - len(ptr)need := IKCP_OVERHEAD + len(segment.data)if size+need > int(kcp.mtu) {kcp.output(buffer, size)current = currentMs() // time update for a blocking callptr = buffer}ptr = segment.encode(ptr)copy(ptr, segment.data)ptr = ptr[len(segment.data):]if segment.xmit >= kcp.dead_link {kcp.state = 0xFFFFFFFF}}}// flash remain segmentssize := len(buffer) - len(ptr)if size > 0 {kcp.output(buffer, size)}// counter updatessum := lostSegsif lostSegs > 0 {atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)}if fastRetransSegs > 0 {atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)sum += fastRetransSegs}if earlyRetransSegs > 0 {atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)sum += earlyRetransSegs}if sum > 0 {atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)}// update ssthresh// rate halving, https://tools.ietf.org/html/rfc6937if change > 0 {inflight := kcp.snd_nxt - kcp.snd_unakcp.ssthresh = inflight / 2if kcp.ssthresh < IKCP_THRESH_MIN {kcp.ssthresh = IKCP_THRESH_MIN}kcp.cwnd = kcp.ssthresh + resentkcp.incr = kcp.cwnd * kcp.mss}// congestion control, https://tools.ietf.org/html/rfc5681if lost > 0 {kcp.ssthresh = cwnd / 2if kcp.ssthresh < IKCP_THRESH_MIN {kcp.ssthresh = IKCP_THRESH_MIN}kcp.cwnd = 1kcp.incr = kcp.mss}if kcp.cwnd < 1 {kcp.cwnd = 1kcp.incr = kcp.mss} }flush函數(shù)非常的重要,kcp的重要參數(shù)都是在調(diào)節(jié)這個函數(shù)的行為,這個函數(shù)只有一個參數(shù)ackOnly,意思就是只發(fā)送ack,如果ackOnly為true的話,該函數(shù)只遍歷ack列表,然后發(fā)送,就完事了。 如果不是,也會發(fā)送真實數(shù)據(jù)。 在發(fā)送數(shù)據(jù)前先進(jìn)行windSize探測,如果開啟了擁塞控制nc=0,則每次發(fā)送前檢測服務(wù)端的winsize,如果服務(wù)端的winsize變小了,自身的winsize也要更著變小,來避免擁塞。如果沒有開啟擁塞控制,就按設(shè)置的winsize進(jìn)行數(shù)據(jù)發(fā)送。
接著循環(huán)每個段數(shù)據(jù),并判斷每個段數(shù)據(jù)的是否該重發(fā),還有什么時候重發(fā):
1. 如果這個段數(shù)據(jù)首次發(fā)送,則直接發(fā)送數(shù)據(jù)。 2. 如果這個段數(shù)據(jù)的當(dāng)前時間大于它自身重發(fā)的時間,也就是RTO,則重傳消息。 3. 如果這個段數(shù)據(jù)的ack丟失累計超過resent次數(shù),則重傳,也就是快速重傳機(jī)制。這個resent參數(shù)由resend參數(shù)決定。 4. 如果這個段數(shù)據(jù)的ack有丟失且沒有新的數(shù)據(jù)段,則觸發(fā)ER,ER相關(guān)信息ER
最后通過kcp.output發(fā)送消息hello,output是個回調(diào)函數(shù),函數(shù)的實體是sess.go的:
func (s *UDPSession) output(buf []byte) {var ecc [][]byte// extend buf's header spaceext := bufif s.headerSize > 0 {ext = s.ext[:s.headerSize+len(buf)]copy(ext[s.headerSize:], buf)}// FEC stageif s.fecEncoder != nil {ecc = s.fecEncoder.Encode(ext)}// encryption stageif s.block != nil {io.ReadFull(rand.Reader, ext[:nonceSize])checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)s.block.Encrypt(ext, ext)if ecc != nil {for k := range ecc {io.ReadFull(rand.Reader, ecc[k][:nonceSize])checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)s.block.Encrypt(ecc[k], ecc[k])}}}// WriteTo kernelnbytes := 0npkts := 0// if mrand.Intn(100) < 50 {for i := 0; i < s.dup+1; i++ {if n, err := s.conn.WriteTo(ext, s.remote); err == nil {nbytes += nnpkts++}}// }if ecc != nil {for k := range ecc {if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {nbytes += nnpkts++}}}atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes)) }output函數(shù)才是真正的將數(shù)據(jù)寫入內(nèi)核中,在寫入之前先進(jìn)行了fec編碼,fec編碼器的實現(xiàn)是用了一個開源庫github.com/klauspost/reedsolomon,編碼以后的hello就不是和原來的hello一樣了,至少多了幾個字節(jié)。 fec編碼器有兩個重要的參數(shù)reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShards和parityShards,這兩個參數(shù)決定了fec的冗余度,冗余度越大抗丟包性就越強(qiáng)。
kcp的任務(wù)調(diào)度器
其實這里任務(wù)調(diào)度器是一個很簡單的實現(xiàn),用一個全局變量updater來管理session,代碼文件為updater.go。其中最主要的函數(shù)
func (h *updateHeap) updateTask() {var timer <-chan time.Timefor {select {case <-timer:case <-h.chWakeUp:}h.mu.Lock()hlen := h.Len()now := time.Now()if hlen > 0 && now.After(h.entries[0].ts) {for i := 0; i < hlen; i++ {entry := heap.Pop(h).(entry)if now.After(entry.ts) {entry.ts = now.Add(entry.s.update())heap.Push(h, entry)} else {heap.Push(h, entry)break}}}if hlen > 0 {timer = time.After(h.entries[0].ts.Sub(now))}h.mu.Unlock()} }任務(wù)調(diào)度器實現(xiàn)了一個堆結(jié)構(gòu),每當(dāng)有新的連接,session都會插入到這個堆里,接著for循環(huán)每隔interval時間,遍歷這個堆,得到entry然后執(zhí)行entry.s.update()。而entry.s.update()會執(zhí)行s.kcp.flush(false)來發(fā)送數(shù)據(jù)。
總結(jié)
這里簡單介紹了kcp的整體流程,詳細(xì)介紹了發(fā)送數(shù)據(jù)的流程,但未介紹kcp接收數(shù)據(jù)的流程,其實在客戶端發(fā)送數(shù)據(jù)后,服務(wù)端是需要返回ack的,而客戶端也需要根據(jù)返回的ack來判斷數(shù)據(jù)段是否需要重傳還是在隊列里清除該數(shù)據(jù)段。處理返回來的ack是在函數(shù)kcp.Input()函數(shù)實現(xiàn)的。具體詳細(xì)流程下次再介紹。
轉(zhuǎn)載于:https://www.cnblogs.com/zhangboyu/p/34c07c3577c85e9ae5c3477d7cab5f52.html
總結(jié)
以上是生活随笔為你收集整理的kcp-go源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HDnoip2017题解
- 下一篇: 【转】LDA数学八卦