日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Go Concurrency Patterns: Pipelines and cancellation

發布時間:2024/7/23 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Go Concurrency Patterns: Pipelines and cancellation 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文地址: https://blog.golang.org/pipelines

簡介

Go 語言提供的并發原語使得可以很方便的構建數據流 pipeline,使用這樣的 pipeline 可以高效的利用 I/O 和多 cpu 的優勢. 這篇文章我們將展示如何構建并使用 pipeline.

什么是 pipeline ?

在 go 語言中沒有正式的定義什么是 pipeline. 它只是眾多并發程序類型中的一種. 非正式的說,pipeline 是一系列通過 channel 聯系起來的 stage. 每個 stage 包含多個執行相同功能的 goroutine. 在每個 stage 中, goroutine 執行以下操作:

  • 從輸入 channel 中讀取數據
  • 處理數據,產生新的數據
  • 將數據發送到輸出 channel

除了第一個和最后一個 stage,每個 stage 可以擁有任意數量的 輸入channel 和 輸出channel。 第一個和最后一個 stage 只能有一個輸入channel一個輸出channel. 第一個 stage 也被稱為 Source 或 Producer, 最后一個 stage 被稱為 Sink 或 Consumer

接下來,我們通過一個簡單的示例來說明.

平方數

假設我們的 pipeline 有三個 stage.

第一個 stage 是 gen, 用來將與一組數字轉化為一個 channel.

func gen(nums ...int) <-chan int {out := make(chan int)go func() {for _, n := range nums {out <- n}close(out)}()return out }

第二個 stage 是 sq, 從 輸入channel 中接收數字,計算數字的平方數,并將數字寫入輸出channel中.

func sq(in <-chan int) <-chan int {out := make(chan int)go func() {for n := range in {out <- n * n}close(out)}()return out }

main 函數中建立該 pipeline,并運行最后最后一個 stage. 最后一個 stage 從第二個 stage 中接收平方數,并將接收到的數據打印出來.

func main() {// Set up the pipeline.c := gen(2, 3)out := sq(c)// Consume the output.fmt.Println(<-out) // 4fmt.Println(<-out) // 9 }

因為 gen 的輸入channel 和輸出 channel具有相同的輸入和輸出類型,因此我們可以重復的使用他們任意次.

我們可以將 main 方法重寫為如下形式:

func main() {// Set up the pipeline and consume the output.for n := range sq(sq(gen(2, 3))) {fmt.Println(n) // 16 then 81} }

扇入,扇出

多個函數可以從一個channel中讀取數據,直到這個channel關閉,這叫做 扇出(fan-out). 通過這種方式,我們可以將一些列任務分派給多個 woker,這些 worker 可以在多個 CPU 上執行或者進行 I/O 操作.

一個函數可以從多個輸入 channel 中讀取并處理數據,直到所有的 channel 被關閉. 并將輸出寫入到同一個輸出channel 上,處理完數據后關閉輸出 channel. 這叫做 扇入(fan-in).

舉個例子,我們可以運行兩個 sq 方法,這兩個方法均從同一個輸入 channel 上讀取數據. 這里我們再引入另外一個方法 merge, 該方法用于將兩個 sq 的輸出整合到通過一個輸出channel中.

func main() {in := gen(2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(in)c2 := sq(in)// Consume the merged output from c1 and c2.for n := range merge(c1, c2) {fmt.Println(n) // 4 then 9, or 9 then 4} } func merge(cs ...<-chan int) <-chan int {var wg sync.WaitGroupout := make(chan int)// Start an output goroutine for each input channel in cs. output// copies values from c to out until c is closed, then calls wg.Done.output := func(c <-chan int) {for n := range c {out <- n}wg.Done()}wg.Add(len(cs))for _, c := range cs {go output(c)}// Start a goroutine to close out once all the output goroutines are// done. This must start after the wg.Add call.go func() {wg.Wait()close(out)}()return out }

盡快停止

截至目前,我們將所有的 pipeline 函數設計為如下模式:

  • 當前 stage 應該關閉 輸出channel,當我們處理完了所有的輸入數據,并且所有的輸出數據已經發送到了 輸出channel 之后.
  • 當前 stage 應該持續接收數據直到 輸入channel 被關閉.

這樣設計使得我們可以再接收stage 中使用 range 循環來處理所有的數據,當所有數據被處理并發送到輸出channel之后,我們的循環為自動退出.

但是在真實情況下,我們往往不會接收從輸入channel中接收所有的數據. 有時,我們僅僅需要讀取輸入數據的一個子集便可以繼續往下進行了. 更通常的情況下,stage 提前退出,因為上流 stage 發生了錯誤. 在這種情況下,我們不應該等待所有的數據到來,并且我們希望上流 stage 直接退出而不是繼續產生哪些我們已經不在需要的數據.

在我們的例子中,如果當前 stage 無法正確的處理所有的 輸入數據,那么上流嘗試繼續發送數據到 stage 會被永久的阻塞住.

// Consume the first value from the output.out := merge(c1, c2)fmt.Println(<-out) // 4 or 9return// Since we didn't receive the second value from out,// one of the output goroutines is hung attempting to send it.

這會導致資源泄露. goroutine 會消耗內存和運行時資源, goroutine 堆棧中的對該 channel 的引用會阻止垃圾回收器回收該 channel 所占的資源,直到它自己退出.

我們需要我們 pipeline 中的上流 stage 總是能自動退出即使下流 stage 無法接收該stage 所產生的所有數據. 一種方案是給輸出channel設置 buffer. buffer 中可以保存指定數量的數據,只要buffer沒有滿,往這樣的channel 中發送數據的操作總是能立馬返回.

c := make(chan int, 2) // buffer size 2 c <- 1 // succeeds immediately c <- 2 // succeeds immediately c <- 3 // blocks until another goroutine does <-c and receives 1

如果我們在創建一個輸出channel的時候,便直到需要發送多少數據,那么使用 buffer 會簡化我們的代碼.

func gen(nums ...int) <-chan int {// 這里,對于每個輸入數字,我們均會產生一個輸出,// 因此我們便可以將輸出 channel 的buffer 大小設置為輸入 nums 的大小// 這樣我們往 out channel 中發送數據的操作永遠不會阻塞當前方法out := make(chan int, len(nums))for _, n := range nums {out <- n}close(out)return out }

另外一種方案是,下流 stage 通知上流stage,它已經停止接收數據了.

取消接收

當我們在 main 方法中決定不再從 out channel 中接收數據,直接退出的時候,我們必須通知上流 stage,我們已經不再從該 channel 中接受數據了. 我們可以通過一個 done channel 來實現.

func main() {in := gen(2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(in)c2 := sq(in)// 因為當前 stage 有兩個上流 channel,因此我們將 done 的 buffer 大小初始化為 2done := make(chan struct{}, 2)out := merge(done, c1, c2)fmt.Println(<-out) // 4 or 9// Tell the remaining senders we're leaving.done <- struct{}{}done <- struct{}{} }

上流 stage 需要做如下修改:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {var wg sync.WaitGroupout := make(chan int)// Start an output goroutine for each input channel in cs. output// copies values from c to out until c is closed or it receives a value// from done, then output calls wg.Done.output := func(c <-chan int) {for n := range c {// 這里使用 select 語句代替原先的單純發送數據的操作// 以便當下流 stage 停止接收,往 done channel 上發送停止接收的信號select {case out <- n:// 當我們在 main 方法中往 done channel 發送數據后,我們便會在這里接收到該數據// 我們便可以結束當前 stage 了case <-done: }}wg.Done()}// ... the rest is unchanged ... }

這種方法存在一個問題,那就是對于每個下流 stage,都得知道上流 stage 的數量,這樣我們才能確定 done channel 的大小. 這看起來并不是一個優雅的解決方案.

我們需要一種解決方案,這個解決方案不需要知道上流和下流的 stage 數量.

在 go 中,我們可以通過關閉 channel 來實現. 因為試圖從一個已經關閉的 channel 上接收數據總是會直接返回,返回值是一個對應數據類型的 zero 值.

這意味著,我們只需要在 main 函數中關閉 done channel,然后所有嘗試從 done 中接收信號的上流stage 都會收到一個零值,這樣他們便可以直接退出了.

修改 main 函數,使用這種方案. 我們需要給每個上流 stage 增加一個done channel 參數,這樣,當 在main 中,我們關閉 done 之后,所有上流 stage 都能收到信號,并退出. 上流stage 的實現類似與 merge 的實現,略.

func main() {// Set up a done channel that's shared by the whole pipeline,// and close that channel when this pipeline exits, as a signal// for all the goroutines we started to exit.done := make(chan struct{}) // 注意,這里 done 不要 bufferdefer close(done) // 使用 defer,在 main 函數退出時,該 channel 會被關閉in := gen(done, 2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(done, in)c2 := sq(done, in)// Consume the first value from output.out := merge(done, c1, c2)fmt.Println(<-out) // 4 or 9// done will be closed by the deferred call. }

計算文件 MD5 checksum

接下來,我們看一個更加真實的例子.

MD5 經常被用來計算文件的 checksum. md5sum 命令可以輸出一組文件的 checksum.

% md5sum *.go d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go

在這個例子中,我們來實現 md5sum 命令. 不同的是我們的md5sum 命令接收一個目錄,輸出這個目錄下所有文件的 checksum,按照路徑排序.

func main() {// Calculate the MD5 sum of all files under the specified directory,// then print the results sorted by path name.m, err := MD5All(os.Args[1])if err != nil {fmt.Println(err)return}var paths []stringfor path := range m {paths = append(paths, path)}sort.Strings(paths)for _, path := range paths {fmt.Printf("%x %s\n", m[path], path)} }

MD5All 的實現如下

// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) {m := make(map[string][md5.Size]byte)err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}data, err := ioutil.ReadFile(path)if err != nil {return err}m[path] = md5.Sum(data)return nil})if err != nil {return nil, err}return m, nil }

并行化計算 MD5 checksum

在這節中,我們將 MD5All 拆分為兩個有兩個 stage 的 pipeline. 第一個stage sumFiles 遍歷文件目錄,計算文件 checksum,并將結果發送到輸出 channel 中, 計算結果的類型為 result.

type result struct {path stringsum [md5.Size]byteerr error } func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {// For each regular file, start a goroutine that sums the file and sends// the result on c. Send the result of the walk on errc.c := make(chan result)errc := make(chan error, 1)// 主線程開啟一個 goroutine, 在goroutine 中遍歷文件,并計算checksum,將結果輸出到 c channel,如果發生錯誤,將錯誤信息發送到 errc channelgo func() {var wg sync.WaitGrouperr := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}wg.Add(1)// 為每個文件使用一個單獨的 goroutine 來計算文件 checksumgo func() {data, err := ioutil.ReadFile(path)// 嘗試往 channel c 中發送計算結果,如果發送操作被阻塞且 done 已經被關閉// select 語句便會進入 done 對應的 case,程序得以繼續往下進行select {case c <- result{path, md5.Sum(data), err}:case <-done:}wg.Done()}()// Abort the walk if done is closed.select {case <-done:return errors.New("walk canceled")default:return nil}})// Walk has returned, so all calls to wg.Add are done. Start a// goroutine to close c once all the sends are done.// 等待所有計算文件 checksum 的 goroutine 退出go func() { wg.Wait()close(c) // 結束時,關閉 channel c}()// No select needed here, since errc is buffered.errc <- err}()return c, errc }

MD5All 用來接收 checksum 或者 sumfiles 中發生的錯誤.

func MD5All(root string) (map[string][md5.Size]byte, error) {// MD5All closes the done channel when it returns; it may do so before// receiving all the values from c and errc.done := make(chan struct{})defer close(done)c, errc := sumFiles(done, root)m := make(map[string][md5.Size]byte)// 從 c 上讀取數據,無論 sumFiles 是否正常結束,// range c 都確保我們不會阻塞在這個 for 循環處for r := range c {if r.err != nil {return nil, r.err}m[r.path] = r.sum}// 檢查是否發生錯誤if err := <-errc; err != nil {return nil, err}return m, nil }

限制并行數量

在上一節中,我們給每個文件創建一個 goroutine 用來計算文件的 MD5 checksum. 這里有一個問題,如果某個目錄下有很多文件,那么我們便需要創建大量個 goroutine,這可能會超出實際的物理內存大小.

我們可以通過限制并行處理的文件數量來解決這個問題. 這里,我們通過創建指定數量的 goroutine 來讀取文件. 此時,我們的 pipeline 就需要有三個stage 了: 遍歷文件目錄,讀取數據并計算 MD5 checksum, 收集計算結果.

第一個 stage walkFiles 讀取文件并將結果寫入輸出 channel 中

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {paths := make(chan string)errc := make(chan error, 1)go func() {// Close the paths channel after Walk returns.defer close(paths)// No select needed for this send, since errc is buffered.errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}select {case paths <- path:case <-done:return errors.New("walk canceled")}return nil})}()return paths, errc }

第二個 stage 啟用指定數量個 goroutine 執行 digester 方法. 這個 goroutine 從 paths channel 中讀取文件路徑并計算 MD5 checksum,將結果輸出到 channel c 上

// 注意,這里我們不關閉 channel c,因為我們有多個 goroutine 往 c 中發送數據 func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {for path := range paths {data, err := ioutil.ReadFile(path)select {case c <- result{path, md5.Sum(data), err}:case <-done:return}} } // Start a fixed number of goroutines to read and digest files.c := make(chan result)var wg sync.WaitGroupconst numDigesters = 20wg.Add(numDigesters)for i := 0; i < numDigesters; i++ {go func() {digester(done, paths, c)wg.Done()}()}go func() {wg.Wait()close(c)}()

最后一個 stage 從 channel c 上接收計算結果或者錯誤信息.

m := make(map[string][md5.Size]byte)for r := range c {if r.err != nil {return nil, r.err}m[r.path] = r.sum}// Check whether the Walk failed.if err := <-errc; err != nil {return nil, err}return m, nil

END!!!

總結

以上是生活随笔為你收集整理的Go Concurrency Patterns: Pipelines and cancellation的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 娇妻之欲海泛舟无弹窗笔趣阁 | 国产女人呻吟高潮抽搐声 | 一区二区欧美在线 | 二区视频在线观看 | 精品久久蜜桃 | 久久性感美女视频 | 天天做天天爽 | 日韩免费黄色片 | 美女免费看片 | 痴汉电车在线播放 | 夜夜久久久 | 成人手机在线免费视频 | 丰满熟妇人妻中文字幕 | 日韩高清一二三区 | 羽月希奶水一区二区三区 | 日本成人社区 | 欧洲亚洲精品 | 免费看片亚洲 | 日本黄色大片视频 | 日本少妇全体裸体洗澡 | 欧美日韩精品一区二区三区 | 桃色网站在线观看 | 奇米色777| 丁香色欲久久久久久综合网 | 99免费在线观看 | 久久国产精品精品国产 | 亚洲精品国产精品国自产网站按摩 | 国内视频一区二区三区 | 九九啪| 欧美成人精品一区二区男人小说 | 麻豆精品视频在线观看 | 国产精品久久午夜夜伦鲁鲁 | 在线激情小视频 | 国产调教视频 | av在线资源播放 | 成人免费超碰 | 一边摸一边抽搐一进一出视频 | 亚洲欧美日本国产 | 久一区二区三区 | 欧美视频网站 | 中文字幕 自拍偷拍 | jizzjizz国产 | 男人天堂va | 国产精品女教师 | 热99视频| 精品无码久久久久国产 | 亚洲影视在线观看 | 91精品色| 精品国产av无码一区二区三区 | 欧美三级韩国三级日本三斤 | 在线观看国产精品入口男同 | 一级黄色淫片 | 久久久久久国产精品视频 | 中文字幕一区二区三区人妻四季 | 欧美精品一区二区三区在线播放 | 在线天堂视频 | 成人你懂的 | 国产夫妻自拍小视频 | 人妻体内射精一区二区三区 | 亚洲天堂午夜 | 日韩视频在线观看一区二区 | av手机天堂网| 日韩无遮挡 | 日韩av网站在线 | 日日好av | 日日干影院| 亚洲精品白浆高清久久久久久 | 天堂√在线 | 浪漫樱花在线观看高清动漫 | 精品福利视频一区二区 | 最新中文字幕免费 | 国产午夜一级 | 情侣作爱视频网站 | 麻豆区1免费| 日日日干 | 请用你的手指扰乱我吧 | 色99视频 | 精品国产一区二区三区久久久蜜月 | 国产成人精品免费看视频 | 欧美一级淫 | 成人精品一区二区三区电影 | 就去色av | 中文有码av | 免费色av | v在线| 涩涩涩涩涩涩涩涩涩 | 99精品久久久久久中文字幕 | 久久香视频 | 国产又粗又猛又爽又黄91精品 | 奇米精品一区二区三区在线观看一 | 男女操网站| 亚洲在线一区二区三区 | 99热精品在线 | 一区二区三区四区五区在线视频 | 99精品在线免费观看 | 自拍偷拍第 | 精品人妻在线播放 | 美女福利视频导航 | 亚洲丁香花色 |