日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Scala入门到精通——第二十六节 Scala并发编程基础

發(fā)布時間:2024/1/23 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Scala入门到精通——第二十六节 Scala并发编程基础 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

本節(jié)主要內(nèi)容

  • Scala并發(fā)編程簡介
  • Scala Actor并發(fā)編程模型
  • react模型
  • Actor的幾種狀態(tài)
  • Actor深入使用解析
  • 1. Scala并發(fā)編程簡介

    2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行業(yè)中最不可告人的一個小秘密,他明確論證了處理器在速度上的發(fā)展已經(jīng)走到了盡頭,并且將由全新的單芯片上的并行 “內(nèi)核”(虛擬 CPU)所取代。這一發(fā)現(xiàn)對編程社區(qū)造成了不小的沖擊,因為正確創(chuàng)建線程安全的代碼,在理論而非實(shí)踐中,始終會提高高性能開發(fā)人員的身價,而讓各公司難以聘用他們。看上去,僅有少數(shù)人充分理解了 Java 的線程模型、并發(fā) API 以及 “同步” 的含義,以便能夠編寫同時提供安全性和吞吐量的代碼 —— 并且大多數(shù)人已經(jīng)明白了它的困難所在(來源:http://www.ibm.com/developerworks/cn/java/j-scala02049.html)。

    在Java中,要編寫一個線程安全的程序并不是一件易事,例如:

    class Account { private int balance; synchronized public int getBalance() { return balance; } synchronized public void incrementBalance() { balance++; } }

    上面這段java代碼雖然方法前面加了synchronized ,但它仍然不是線程安全的,例如,在執(zhí)行下面兩個語句

    account.incrementBalance(); account.getBalance();

    時,有可能account.incrementBalance()執(zhí)行完成后,其它線程可能會獲取對象的鎖,修改account的balance,從而造成得不到預(yù)期結(jié)果的問題。解決問題的方法是將兩個功能結(jié)合起來形成一個方法:

    synchronized public int incrementAndGetBalance() { balance++; return balance; }

    但這可能并不是我們想要的,每次獲取balance都要將balance增加, 這顯然與實(shí)際不符。除此之外,java中的并發(fā)編程可能還會經(jīng)常遇到死鎖問題,而這個問題往往難調(diào)試,問題可能會隨機(jī)性的出現(xiàn)。總體上來看,java的并發(fā)編程模型相對較復(fù)雜,難以駕馭。

    Scala很好地解決了java并發(fā)編程的問題,要在scala中進(jìn)行并發(fā)編程,有以下幾種途徑可以實(shí)現(xiàn):
    1 actor消息模型、akka actor并發(fā)模型。

    2 Thread、Runnable

    3 java.util.concurennt

    4 第三方開源并發(fā)框架如Netty,Mina

    在上述四種途徑當(dāng)中,利用 actor消息模型、akka actor并發(fā)模型是scala并發(fā)編程的首先,本節(jié)主要介紹actor消息模型,akka actor并發(fā)模型我們將放在后面的章節(jié)中介紹。
    在scala中,通過不變對象來實(shí)現(xiàn)線程安全,涉及到修改對象狀態(tài)時,則創(chuàng)建一個新的對象來實(shí)現(xiàn),如:

    //成員balance狀態(tài)一旦被賦值,便不能更改 //因而它也是線程安全的 class Person(val age: Integer) { def getAge() = age } object Person{ //創(chuàng)建新的對象來實(shí)現(xiàn)對象狀態(tài)修改def increment(person: Person): Person{ new Person(Person.getAge() + 1) } }

    通過不變對象實(shí)現(xiàn)并發(fā)編程,可以簡化編程模型,使并發(fā)程序更容易現(xiàn)實(shí)和控制。

    2.Scala Actor并發(fā)編程模型

    java中的并發(fā)主要是通過線程來實(shí)現(xiàn),各線程采用共享資源的機(jī)制來實(shí)現(xiàn)程序的并發(fā),這里面臨競爭資源的問題,雖然采用鎖機(jī)制可以避免競爭資源的問題,但會存在死鎖問題,要開發(fā)一套健壯的并發(fā)應(yīng)用程序具有一定的難度。而scala的并發(fā)模型相比于java它更簡單,它采用消息傳遞而非資源共享來實(shí)現(xiàn)程序的并發(fā),消息傳遞正是通過Actor來實(shí)現(xiàn)的。下面的代碼給出了Actor使用示例

    //混入Actor特質(zhì),然后實(shí)現(xiàn)act方法 //如同java中的Runnable接口一樣 //各線程的run方法是并發(fā)執(zhí)行的 //Actor中的act方法也是并發(fā)執(zhí)行的 class ActorDemo extends Actor{//實(shí)現(xiàn) act()方法def act(){while(true){//receive從郵箱中獲取一條消息//然后傳遞給它的參數(shù)//該參數(shù)是一個偏函數(shù)receive{case "actorDemo" => println("receive....ActorDemo")} }} } object ActorDemo extends App{val actor=new ActorDemo//啟動創(chuàng)建的actor actor.start()//主線程發(fā)送消息給actoractor!"actorDemo" }

    下面給的是recieve方法的部分源代碼

    def receive[R](f: PartialFunction[Any, R]): R = {assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")synchronized {if (shouldExit) exit() // linksdrainSendBuffer(mailbox)}var done = falsewhile (!done) {val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {senders = replyTo :: sendersval matches = f.isDefinedAt(m)senders = senders.tailmatches}) ................

    從上述代碼中不能看出,receive方法接受的參數(shù)是一個偏函數(shù),并且是通過mailbox來實(shí)現(xiàn)消息的發(fā)送與接收。

    在前述的class ActorDemo中,receive方法的參數(shù)為

    {case "actorDemo" => println("receive....ActorDemo") }

    該代碼塊在執(zhí)行時被轉(zhuǎn)換為一個PartialFunction[Any, R]的偏函數(shù),其中R是偏函數(shù)的返回類型,對應(yīng)case 語句=> 右邊的部分,在本例子中R是Unit類型,而Any對應(yīng)的則對應(yīng)case語句的模式部分。

    前面給的是通過extends Actor的方式來創(chuàng)建一個Actor類,其實(shí)scala.actors.Actor中提供了一個actor工具方法,可以非常方便地直接創(chuàng)建Actor對象如:

    import scala.actors.Actor._object ActorFromMethod extends App{//通過工具方法actor直接創(chuàng)建Actor對象val methodActor = actor {for (i <- 1 to 5)println("That is the question.")Thread.sleep(1000)} }

    上述代碼創(chuàng)建的actor對象無需調(diào)用start方法,對象創(chuàng)建完成后會立即執(zhí)行。

    scala中本地線程也可用作Actor,下面的代碼演示了如何在REPL命令行中將本地線程當(dāng)作Actor;

    scala> import scala.actors.Actor._ import scala.actors.Actor._//self引用本地線程,并發(fā)送消息 scala> self ! "hello" //接收消息 scala> self.receive { case x:String => x } res1: String = hello

    上述代碼中,如果發(fā)送的消息不是String類型的,線程將被阻塞,為避免這個問題,可以采用receiveWithin方法,

    scala> self ! 123scala> self.receiveWithin(1000) { case x => x } res6: Any = 123scala> self.receiveWithin(1000) { case x => x } res7: Any = TIMEOUT

    3. react模型

    scala中的Actor也是構(gòu)建在java線程基礎(chǔ)之上的,前面在使用Actor時都是通過創(chuàng)建Actor對象,然后再調(diào)用act方法來啟動actor。我們知道,java中線程的創(chuàng)建、銷毀及線程間的切換是比較耗時的,因此實(shí)際中盡量避免頻繁的線程創(chuàng)建、銷毀和銷毀。Scala中提供React方法,在方法執(zhí)行結(jié)束后,線程仍然被保留。下面的代碼演示了react方法的使用:

    package cn.scala.xtwy.concurrency import scala.actors._object NameResolver extends Actor {import java.net.{ InetAddress, UnknownHostException }def act() {react {//匹配主線程發(fā)來的("www.scala-lang.org", NameResolver)case (name: String, actor: Actor) =>//向actor發(fā)送解析后的IP地址信息//由于本例中傳進(jìn)來的actor就是NameResolver自身//也即自己給自己發(fā)送消息,并存入將消息存入郵箱actor ! getIp(name)//再次調(diào)用act方法,試圖從郵箱中提取信息//如果郵箱中信息為空,則進(jìn)入等待模式act()case "EXIT" =>println("Name resolver exiting.")// quit//匹配郵箱中的單個信息,本例中會匹配郵箱中的IP地址信息case msg =>println("Unhandled message: " + msg)act()}}def getIp(name: String): Option[InetAddress] = {try {Some(InetAddress.getByName(name))} catch {case _: UnknownHostException => None}} } object Main extends App{NameResolver.start()//主線程向NameResolver發(fā)送消息("www.scala-lang.org", NameResolver)NameResolver ! ("www.scala-lang.org", NameResolver)NameResolver ! ("wwwwww.scala-lang.org", NameResolver)}

    從上述代碼中可以看到,通過在react方法執(zhí)行結(jié)束時加入act方法,方法執(zhí)行完成后沒有被銷毀,而是繼續(xù)試圖從郵箱中獲取信息,獲取不到則等待。

    4. Actor的幾種狀態(tài)

    Actor有下列幾種狀態(tài):

    • 初始狀態(tài)(New),Actor對象被創(chuàng)建,但還沒有啟動即沒有執(zhí)行start方法時的狀態(tài)
    • 執(zhí)行狀態(tài)(Runnable),正在執(zhí)行時的狀態(tài)
    • 掛起狀態(tài)(Suspended),在react方法中等待時的狀態(tài)
    • 時間點(diǎn)掛起狀態(tài)(TimedSuspended),掛起狀態(tài)的一種特殊形式,reactWithin方法中的等待時的狀態(tài)
    • 阻塞狀態(tài)(Blocked),在receive方法中阻塞等待時的狀態(tài)
    • 時間點(diǎn)阻塞狀態(tài)(TimedBlocked),在receiveWithin方法中阻塞等待時的狀態(tài)
    • 結(jié)束狀態(tài)(Terminated),執(zhí)行完成后被銷毀

    5. Actor深入使用解析

    1 receive方法單次執(zhí)行:

    object Actor2{case class Speak(line : String)case class Gesture(bodyPart : String, action : String)case class NegotiateNewContract()def main(args : Array[String]) ={val badActor =actor{//這里receive方法只會匹配一次便結(jié)束receive{case NegotiateNewContract =>System.out.println("I won't do it for less than $1 million!")case Speak(line) =>System.out.println(line)case Gesture(bodyPart, action) =>System.out.println("(" + action + "s " + bodyPart + ")")case _ =>System.out.println("Huh? I'll be in my trailer.")}}//receive方法只處理下面這條語句發(fā)送的消息badActor ! NegotiateNewContract//下面其余的消息不會被處理badActor ! Speak("Do ya feel lucky, punk?")badActor ! Gesture("face", "grimaces")badActor ! Speak("Well, do ya?")}}

    上述代碼只會輸出:
    I won’t do it for less than $1 million!
    即后面發(fā)送的消息如:
    badActor ! Speak(“Do ya feel lucky, punk?”)
    badActor ! Gesture(“face”, “grimaces”)
    badActor ! Speak(“Well, do ya?”)
    不會被處理。這是因為receive方法的單次執(zhí)行問題。

    2 能夠處理多個消息的receive方法:

    object Actor2{case class Speak(line : String);case class Gesture(bodyPart : String, action : String);case class NegotiateNewContract()//處理結(jié)束消息case class ThatsAWrap()def main(args : Array[String]) ={val badActor =actor{var done = false//while循環(huán)while (! done){receive{case NegotiateNewContract =>System.out.println("I won't do it for less than $1 million!")case Speak(line) =>System.out.println(line)case Gesture(bodyPart, action) =>System.out.println("(" + action + "s " + bodyPart + ")")case ThatsAWrap =>System.out.println("Great cast party, everybody! See ya!")done = truecase _ =>System.out.println("Huh? I'll be in my trailer.")}}}//下面所有的消息都能被處理badActor ! NegotiateNewContractbadActor ! Speak("Do ya feel lucky, punk?")badActor ! Gesture("face", "grimaces")badActor ! Speak("Well, do ya?")//消息發(fā)送后,receive方法執(zhí)行完畢badActor ! ThatsAWrap}}

    3 Actor后面實(shí)現(xiàn)原理仍然是線程的證據(jù)

    object Actor3{case class Speak(line : String);case class Gesture(bodyPart : String, action : String);case class NegotiateNewContract;case class ThatsAWrap;def main(args : Array[String]) ={def ct ="Thread " + Thread.currentThread().getName() + ": "val badActor =actor{var done = falsewhile (! done){receive{case NegotiateNewContract =>System.out.println(ct + "I won't do it for less than $1 million!")case Speak(line) =>System.out.println(ct + line)case Gesture(bodyPart, action) =>System.out.println(ct + "(" + action + "s " + bodyPart + ")")case ThatsAWrap =>System.out.println(ct + "Great cast party, everybody! See ya!")done = truecase _ =>System.out.println(ct + "Huh? I'll be in my trailer.")}}}System.out.println(ct + "Negotiating...")badActor ! NegotiateNewContractSystem.out.println(ct + "Speaking...")badActor ! Speak("Do ya feel lucky, punk?")System.out.println(ct + "Gesturing...")badActor ! Gesture("face", "grimaces")System.out.println(ct + "Speaking again...")badActor ! Speak("Well, do ya?")System.out.println(ct + "Wrapping up")badActor ! ThatsAWrap}}

    執(zhí)行結(jié)果如下:

    Thread main: Negotiating... Thread main: Speaking... Thread main: Gesturing... Thread main: Speaking again... Thread main: Wrapping up Thread ForkJoinPool-1-worker-13: I won't do it for less than $1 million! Thread ForkJoinPool-1-worker-13: Do ya feel lucky, punk? Thread ForkJoinPool-1-worker-13: (grimacess face) Thread ForkJoinPool-1-worker-13: Well, do ya? Thread ForkJoinPool-1-worker-13: Great cast party, everybody! See ya!

    從上述執(zhí)行結(jié)果可以看到,Actor最終的實(shí)現(xiàn)仍然是線程,只不過它提供的編程模型與java中的編程模型不一樣而已。

    4 利用!?發(fā)送同步消息,等待返回值

    import scala.actors._,Actor._object ProdConSample2{case class Message(msg : String)def main(args : Array[String]) : Unit ={val consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("Received message! -> " + msg)done = (msg == "DONE")reply("Already RECEIVED....."+msg)}}}System.out.println("Sending....")//獲取響應(yīng)值val r= consumer !? "Mares eat oats"println("replyed message"+r)System.out.println("Sending....")consumer !? "Does eat oats"System.out.println("Sending....")consumer !? "Little lambs eat ivy"System.out.println("Sending....")consumer !? "Kids eat ivy too"System.out.println("Sending....")consumer !? "DONE" }}

    代碼執(zhí)行結(jié)果:

    Sending.... Received message! -> Mares eat oats replyed messageAlready RECEIVED.....Mares eat oats Sending.... Received message! -> Does eat oats Sending.... Received message! -> Little lambs eat ivy Sending.... Received message! -> Kids eat ivy too Sending.... Received message! -> DONE

    通過上述代碼執(zhí)行結(jié)果可以看到,!?因為是同步消息,發(fā)送完返回結(jié)果后才會接著發(fā)送下一條消息。

    5 Spawn方法發(fā)送消息

    object ProdConSampleUsingSpawn{import concurrent.ops._def main(args : Array[String]) : Unit ={// Spawn Consumerval consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("MESSAGE RECEIVED: " + msg)done = (msg == "DONE")reply("RECEIVED")}}}// Spawn Producerspawn //spawn是一個定義在current.ops中的方法{val importantInfo : Array[String] = Array("Mares eat oats","Does eat oats","Little lambs eat ivy","A kid will eat ivy too","DONE");importantInfo.foreach((msg) => consumer !? msg)}}}

    6 !! 發(fā)送異步消息,返回值是 Future[Any]

    object ProdConSample3{case class Message(msg : String)def main(args : Array[String]) : Unit ={val consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("Received message! -> " + msg)done = (msg == "DONE")reply("Already RECEIVED....."+msg)}}}System.out.println("Sending....")//發(fā)送異步消息,返回val replyFuture= consumer !! "Mares eat oats"val r=replyFuture()println("replyed message*****"+r)System.out.println("Sending....")consumer !! "Does eat oats"System.out.println("Sending....")consumer !! "Little lambs eat ivy"System.out.println("Sending....")consumer !! "Kids eat ivy too"System.out.println("Sending....")consumer !! "DONE" }}

    執(zhí)行結(jié)果:

    Sending.... Received message! -> Mares eat oats replyed message*****Already RECEIVED.....Mares eat oats Sending.... Sending.... Sending.... Received message! -> Does eat oats Sending.... Received message! -> Little lambs eat ivy Received message! -> Kids eat ivy too Received message! -> DONE

    通過上述代碼的執(zhí)行結(jié)果可以看到,!!的消息發(fā)送是異步的,消息發(fā)送后無需等待結(jié)果返回便執(zhí)行下一條語句,但如果要獲取異步消息的返回值,如:

    val replyFuture= consumer !! "Mares eat oats"val r=replyFuture()

    則執(zhí)行到這兩條語句的時候,程序先被阻塞,等獲得結(jié)果之后再發(fā)送其它的異步消息。

    7 loop方法實(shí)現(xiàn)react

    object LoopReact extends App{val a1 = Actor.actor {//注意這里loop是一個方法,不是關(guān)鍵字//實(shí)現(xiàn)類型while循環(huán)的作用loop {react {//為整型時結(jié)束操作case x: Int=>println("a1 stop: " + x); exit()case msg: String => println("a1: " + msg)}}}a1!("我是搖擺少年夢")a1.!(23)}

    總結(jié)

    以上是生活随笔為你收集整理的Scala入门到精通——第二十六节 Scala并发编程基础的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。