日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java akka 实战_Akka实战:分散、聚合模式

發(fā)布時(shí)間:2025/3/15 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java akka 实战_Akka实战:分散、聚合模式 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

分散與聚合:簡(jiǎn)單說(shuō)就是一個(gè)任務(wù)需要拆分成多個(gè)小任務(wù),每個(gè)小任務(wù)執(zhí)行完后再把結(jié)果聚合在一起返回。

實(shí)例背景

本實(shí)例來(lái)自一個(gè)真實(shí)的線上產(chǎn)品,現(xiàn)將其需求簡(jiǎn)化如下:

傳入一個(gè)關(guān)鍵詞:key,根據(jù)key從網(wǎng)上抓取相關(guān)新聞

可選傳入一個(gè)超時(shí)參數(shù):duration,設(shè)置任務(wù)到期時(shí)必須反回?cái)?shù)據(jù)(返回實(shí)際已抓取數(shù)據(jù))

若超時(shí)到返回實(shí)際已爬取數(shù)據(jù),則任務(wù)應(yīng)繼續(xù)運(yùn)行直到所以數(shù)據(jù)抓取完成,并存庫(kù)

設(shè)計(jì)

根據(jù)需求,一個(gè)簡(jiǎn)化的分散、聚合模式可以使用兩個(gè)actor來(lái)實(shí)現(xiàn)。

NewsTask:接收請(qǐng)求,并設(shè)置超時(shí)時(shí)間

SearchPageTask:執(zhí)行實(shí)際的新聞抓取操作(本實(shí)例將使用TimeUnit模擬抓取耗時(shí))

實(shí)現(xiàn)

NewsTask

override def metricPreStart(): Unit = {

context.system.scheduler.scheduleOnce(doneDuration, self, TaskDelay)

}

override def metricReceive: Receive = {

case StartFetchNews =>

_receipt = sender()

(0 until NewsTask.TASK_SIZE).foreach { i =>

context.actorOf(SearchPageTask.props(self), "scatter-" + i) ! SearchPage(key)

}

case GetNewsItem(newsItem) =>

_newses ::= newsItem

if (_newses.size == NewsTask.TASK_SIZE) {

logger.debug(s"分散任務(wù),${NewsTask.TASK_SIZE}個(gè)已全部完成")

if (_receipt != null) {

_receipt ! NewsResult(key, _newses)

_receipt = null

}

self ! PoisonPill

}

case TaskDelay =>

if (_receipt != null) {

_receipt ! NewsResult(key, _newses)

_receipt = null

}

}

metricPreStart方法中設(shè)置定時(shí)方法,調(diào)用時(shí)間為從代碼運(yùn)行開(kāi)始到doneDuration時(shí)間為止。定時(shí)被觸發(fā)時(shí)將向當(dāng)前Actor發(fā)送一個(gè)TaskDelay消息。

在metricReceive方法中,分別對(duì)StartFetchNews、GetNewsItem、TaskDelay三個(gè)消息進(jìn)行操作。

在收到StartFetchNews消息時(shí),actor首先保存發(fā)送者actor的引用(結(jié)果將返回到此actor)。再根據(jù)TASK_SIZE生成相應(yīng)子任務(wù)

GetNewsItem消息的處理中,每收到一個(gè)消息就將其添加到_newses列表中。并判斷當(dāng)_newses個(gè)數(shù)等于TASK_SIZE時(shí)(所有子任務(wù)已完成)將結(jié)果發(fā)送給_receipt。

self ! PoisonPill,這句代碼停止actor自身。它將把“毒藥”發(fā)送到NewsTask Actor的接收郵箱隊(duì)列中。

TaskDelay消息被觸發(fā)時(shí),將直接返回已完成的新聞_newses。返回?cái)?shù)據(jù)后并不終止當(dāng)前還未運(yùn)行完任務(wù)。

SearchPageTask

override def metricReceive: Receive = {

case SearchPage(key) =>

// XXX 模擬抓取新聞時(shí)間

TimeUtils.sleep(Random.nextInt(20).seconds)

val item = NewsItem(

"http://newssite/news/" + self.path.name,

"測(cè)試新聞" + self.path.name,

self.path.name,

TimeUtils.now().toString,

"內(nèi)容簡(jiǎn)介", "新聞?wù)?#34;)

taskRef ! GetNewsItem(item)

context.stop(self)

}

SearchPageTask的代碼邏輯就比較易懂了,這里使用sleep來(lái)模擬實(shí)際抓取新聞時(shí)的耗時(shí)。生成結(jié)果后返回?cái)?shù)據(jù)給`taskRef`,并終止自己。

執(zhí)行測(cè)試

./sbt

akka-action > testOnly me.yangbajing.akkaaction.scattergather.ScatterGatherTest

總結(jié)

這是一個(gè)簡(jiǎn)單的Akka實(shí)例,實(shí)現(xiàn)了任務(wù)分發(fā)與結(jié)果聚合。提供了一種在指定時(shí)間內(nèi)返回部份有效數(shù)據(jù),同時(shí)任務(wù)繼續(xù)執(zhí)行的方式。這種分散、聚合的模式在實(shí)際生產(chǎn)中很常用,比如對(duì)多種數(shù)據(jù)源的整合,或某些需要長(zhǎng)時(shí)間運(yùn)行同時(shí)對(duì)返回?cái)?shù)據(jù)完整性無(wú)強(qiáng)制要求的情況等。

MetricActor演示了怎么自定義Actor,并為其提供一些偵測(cè)點(diǎn)的方式。以后有時(shí)間會(huì)寫(xiě)篇詳文介紹。

總結(jié)

以上是生活随笔為你收集整理的java akka 实战_Akka实战:分散、聚合模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。