日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

ROS源码学习 二、线程池

發(fā)布時(shí)間:2023/12/20 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ROS源码学习 二、线程池 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2021SC@SDUSC

目錄

1.寫(xiě)在前面

2.ROS線程池概述

3.ROS線程池模型

4.ROS線程池源碼詳解

5.總結(jié)


1.寫(xiě)在前面

????????ROS作為一個(gè)操作系統(tǒng),其職責(zé)是協(xié)調(diào)具有不同功能的node之間的通訊與合作.要想實(shí)現(xiàn)這個(gè)目標(biāo),最重要的方面之一就是在合理利用資源的條件下實(shí)現(xiàn)較高的并發(fā)性.在ROS的內(nèi)核ros-core中,負(fù)責(zé)通訊的服務(wù)器正是通過(guò)將監(jiān)聽(tīng)到的socket連接經(jīng)過(guò)封裝后調(diào)用線程池的相關(guān)方法來(lái)實(shí)現(xiàn)并發(fā)通訊的.?

2.ROS線程池概述

? ? ? ? 在操作系統(tǒng)這門(mén)課上我們都學(xué)習(xí)了線程的概念,線程在本文中不妨理解為實(shí)現(xiàn)了部分?jǐn)?shù)據(jù)共享,可以分組統(tǒng)一管理的輕量級(jí)進(jìn)程.線程在操作系統(tǒng)上大體上有一對(duì)一、多對(duì)一和多對(duì)多三種模型,其區(qū)別在于每個(gè)用戶線程與實(shí)際處理機(jī)(內(nèi)核線程)的對(duì)應(yīng)關(guān)系.

(三種線程模型的示意圖)

? ? ? ? 一對(duì)一模型將用戶線程與內(nèi)核線程一一對(duì)應(yīng),其優(yōu)點(diǎn)是可以增加程序運(yùn)行速度,提高程序的并行能力,但是對(duì)于處理機(jī)資源不能很好的利用.多對(duì)一模型顯然并不能加速計(jì)算,但是仍然提供了并發(fā)能力使得程序能夠同時(shí)處理多個(gè)用戶的請(qǐng)求,使得分時(shí)系統(tǒng)的設(shè)計(jì)目標(biāo)得以實(shí)現(xiàn).多對(duì)多模型則是結(jié)合了兩者的優(yōu)點(diǎn),將處理機(jī)放入緩沖池中擇機(jī)分配給用戶線程,這種設(shè)計(jì)既能減少對(duì)處理機(jī)實(shí)際數(shù)量的需求,又能盡量滿足每個(gè)用戶線程的計(jì)算要求,使程序具有較高的并發(fā)性,是一種較為理想的選擇.本文中即將討論的線程池即是基于多對(duì)多模型的.

? ? ? ? 經(jīng)過(guò)查閱資料(見(jiàn)JVM線程)發(fā)現(xiàn),Java中的線程在Linux/Windows系統(tǒng)中是一對(duì)一模型,是操作系統(tǒng)可感知的,且一個(gè)Java線程對(duì)應(yīng)一個(gè)內(nèi)核線程;在Solaris中默認(rèn)使用Light Weighted Process,LWP的方式通過(guò)調(diào)度器激活等策略將Java線程映射到系統(tǒng)線程上.因此,我們可以認(rèn)為Java線程并不是多對(duì)一模型,而是一對(duì)一模型或者是仿真一對(duì)一模型.

? ? ? ? 之前已經(jīng)提到,一對(duì)一模型對(duì)資源的利用程度并不是很好,因此ROS在軟件層面通過(guò)編寫(xiě)線程池的方式將其實(shí)際使用變?yōu)槎鄬?duì)多模式,接下來(lái)將進(jìn)行詳細(xì)解讀.

3.ROS線程池模型

????????

? ? ? ? ROS線程池的類與依賴關(guān)系如圖所示,接下來(lái)對(duì)各個(gè)類的結(jié)構(gòu)的方法進(jìn)行大體的描述.

(1)Task:Task為T(mén)hreadPool的內(nèi)部接口,代表需要線程池分配線程執(zhí)行的作業(yè).也就是說(shuō),需要使用線程池中線程執(zhí)行的方法都需要封裝繼承了Task接口的類并重寫(xiě)為run方法.

(2)InterruptableTask:InterruptableTask為繼承了Task的ThreadPool內(nèi)部接口,顧名思義,InterruptableTask為接受被打斷的作業(yè),在被打斷時(shí)可以調(diào)用其中斷處理方法.

(3)Poolable:Poolable為T(mén)hreadPool的內(nèi)部類,是線程池容納的對(duì)象,即基本的工作單元.Poolable維護(hù)成員shuttingDown和thread,shuttingDown負(fù)責(zé)標(biāo)記是否應(yīng)當(dāng)停止工作,thread是用于實(shí)際執(zhí)行task的對(duì)象.

(4)ThreadPool是線程池類,負(fù)責(zé)對(duì)外提供Poolable對(duì)象并對(duì)其進(jìn)行統(tǒng)一管理,回收完成工作的Poolable對(duì)象.ThreadPool的數(shù)據(jù)成員有用于標(biāo)記線程組的ThreadGroup、標(biāo)記線程池最大容量的maxSize、標(biāo)記當(dāng)前線程池容量的num,另外還有記錄待分配Poolable對(duì)象的空閑池waitingThreads列表,管理運(yùn)行中Poolable對(duì)象的runningThrads列表,以及已提交但暫未分配Poolable對(duì)象的task列表.

? ? ? ? 總而言之,ROS將一對(duì)一模型映射為多對(duì)多模型的線程池運(yùn)行邏輯為:

  • 新建線程池對(duì)象
  • 將任務(wù)封裝為線程池支持的task(繼承Task接口)
  • 將task提交給線程池,線程池根據(jù)不同提交方式分配Poolable對(duì)象(下文稱為worker)運(yùn)行任務(wù)
  • 在worker運(yùn)行任務(wù)完成后由線程池負(fù)責(zé)管理與回收? ? ? ?
  • ????????

    ???????????????????????(作業(yè)處理邏輯以及worker的生命周期示意圖)?????????

    4.ROS線程池源碼詳解

    (1)Task/InterruptableTask接口

    public interface Task{void run() throws Throwable;}public interface InterruptableTask extends Task{void shutdown() throws Throwable;}

    前文已經(jīng)提到,想要提交給線程池的任務(wù)必須封裝為T(mén)ask,因此功能類需要實(shí)現(xiàn)Task接口,也就是重寫(xiě)run方法,以run方法作為業(yè)務(wù)邏輯的入口.另外還有InterruptableTask接口,如果想要在出現(xiàn)異常時(shí)中斷作業(yè),可以繼承這個(gè)接口,并重寫(xiě)shutdown方法以進(jìn)行異常處理或一些收尾操作.

    (2)Poolable類

    private class Poolable{private volatile boolean shuttingDown;private Task task;private Thread thread;Poolable(ThreadGroup pGroup, int pNum){thread = new Thread(pGroup, pGroup.getName() + "-" + pNum) {public void run(){while (!shuttingDown){final Task t = getTask(); if (t == null) {try{synchronized (this){if (!shuttingDown && getTask() == null) {wait(); }}} catch (InterruptedException e){// Do nothing}} else{try{t.run(); resetTask(); repool(Poolable.this);} catch (Throwable e){remove(Poolable.this);Poolable.this.shutdown(); resetTask(); }}}}};thread.start();}synchronized void shutdown() {shuttingDown = true;final Task t = getTask(); if (t != null && t instanceof InterruptableTask) {try{((InterruptableTask) t).shutdown();} catch (Throwable th){}}task = null;synchronized (thread){thread.notify(); }}private Task getTask(){return task;}private void resetTask(){task = null;}void start(Task pTask){task = pTask;synchronized (thread){thread.notify();}}}

    ?????????Poolable類的數(shù)據(jù)成員在上文已經(jīng)提及過(guò)了,下面將重點(diǎn)對(duì)其方法進(jìn)行分析.

    ? ? ? ? 1、首先考察其構(gòu)造方法,構(gòu)造方法接收一個(gè)線程組對(duì)象和一個(gè)整型對(duì)象.首先構(gòu)造方法使用參數(shù)創(chuàng)建一個(gè)線程對(duì)象,指定其線程組和線程號(hào).Java的Thread對(duì)象通常需要繼承自Thread或使用實(shí)現(xiàn)了Runnable接口的對(duì)象作為參數(shù)實(shí)例化以完善其run方法(注意此處的run方法與Task的run方法在本質(zhì)上并不相同!),而在此處選擇了在構(gòu)造方法后附加其實(shí)現(xiàn)方法這種語(yǔ)法糖來(lái)實(shí)例化Thread對(duì)象.

    ????????由于構(gòu)造方法實(shí)際上只完成了初始化Thread這一項(xiàng)工作,接下來(lái)我們將分析Thread類是如何完善run方法以使其能夠行使worker的職責(zé)的.在run方法內(nèi)部,首先是一個(gè)while循環(huán),當(dāng)shuttingDown為false時(shí)循環(huán)執(zhí)行.在一輪循環(huán)中,使用一個(gè)局部常變量t來(lái)引用Task對(duì)象,也就是說(shuō)在一輪while循環(huán)中,task是不允許改變的.由此我們也可以判斷,worker實(shí)際上是通過(guò)run方法內(nèi)部的一輪while循環(huán)來(lái)完成一個(gè)task的.

    ? ? ? ? 接下來(lái)進(jìn)行判斷,在任務(wù)為空時(shí),檢查是否應(yīng)被中止,若此worker尚未被設(shè)置為終止且目前暫無(wú)作業(yè)等待執(zhí)行(getTask == null),則調(diào)用wait()方法.因?yàn)榇颂幬挥趕ynchronized塊中,因此調(diào)用此方法等價(jià)于將worker的thread對(duì)象的控制流阻塞于此,其目的是在空閑時(shí)避免自旋等待消耗處理機(jī)資源.

    ? ? ? ? 若成功獲取到了task,則調(diào)用task的run方法(業(yè)務(wù)邏輯),完成且返回后調(diào)用resetTask方法,將此worker的task清空.在清空task后調(diào)用線程池的repool方法,將自身其交還給線程池,由線程池判斷最終去留.如果在此發(fā)生異常,則調(diào)用線程池的remove方法,將此worker移出線程池的引用列表(將不會(huì)再為此異常worker分配task),然后調(diào)用shuttingDown方法將狀態(tài)設(shè)置為停用,并將worker的task變量清空.

    ? ? ? ? 在初始化完成Thread對(duì)象后,立刻執(zhí)行該線程,開(kāi)始此worker被分配的作業(yè).?

    ? ? ? ? 2、注意到shuttingdown方法被synchronized修飾,即此方法只能互斥調(diào)用,具體作用在稍后分析.首先此方法將worker的shuttingDown變量設(shè)置為true,這么做的后果是在worker的thread線程執(zhí)行到while循環(huán)時(shí)或檢測(cè)到task為空時(shí)直接退出,在以后此worker變不再有任何執(zhí)行task的能力.也就是說(shuō),shuttingDown并不是結(jié)束task,而是結(jié)束線程池中一個(gè)worker的生命.

    ? ? ? ? 在設(shè)置了停用標(biāo)志后,獲取這個(gè)worker正在執(zhí)行的task,如果這個(gè)task實(shí)現(xiàn)了InterruptableTask接口,那么則會(huì)調(diào)用其shutdown方法來(lái)進(jìn)行后續(xù)處理.在完成對(duì)task的處理后,將這個(gè)worker的task設(shè)置為null.

    ? ? ? ? 考慮到不僅會(huì)對(duì)運(yùn)行的worker調(diào)用shuttingDown方法,還有可能對(duì)空閑的worker調(diào)用此方法,而空閑的worker中thread阻塞在了方法中,對(duì)其簡(jiǎn)單的移除索引列表并設(shè)置停用位不僅其無(wú)法正確響應(yīng),還會(huì)丟失引用使得資源無(wú)法快速釋放.所以方法在此處進(jìn)入synchronized代碼塊獲取阻塞的thread所釋放的鎖,在同步區(qū)調(diào)用thread的notify方法以喚醒此阻塞的線程,讓其進(jìn)入下一輪while循環(huán)判斷為假后退出.

    ? ? ? ? 講到這里,我們就可以明白為何這段代碼中既使用synchronized修飾方法也要用synchronized修飾代碼塊了,其根本原因在于防止數(shù)據(jù)不一致問(wèn)題.我們知道,鎖的用處有同步也有互斥功能,而在這個(gè)方法中我們要明白,代碼塊中的synchronized是業(yè)務(wù)邏輯要求的,其作用并非制造互斥區(qū),而僅僅在于獲取此worker的線程對(duì)象放棄的對(duì)象鎖,從而喚醒這個(gè)線程使其退出循環(huán),起到同步的效果;而修飾方法的synchronized關(guān)鍵字則是為了實(shí)現(xiàn)互斥訪問(wèn),防止數(shù)據(jù)產(chǎn)生不一致現(xiàn)象:假設(shè)方法無(wú)synchronized修飾,那么從多個(gè)地點(diǎn)同時(shí)調(diào)用這個(gè)方法,可能會(huì)有如下現(xiàn)象發(fā)生:

  • A線程占有處理機(jī),執(zhí)行此方法并調(diào)用了task的shutdown方法(比如效果為向磁盤(pán)寫(xiě)入日志),然后處理器被調(diào)度至其他線程
  • B線程獲得了處理機(jī)并調(diào)用了task的shutdown方法,也向磁盤(pán)寫(xiě)了日志,然后處理機(jī)被調(diào)度至其他線程
  • 重復(fù)這個(gè)模式
  • 磁盤(pán)寫(xiě)了入了多份相同的日志,甚至多個(gè)shutdown方法并發(fā)執(zhí)行導(dǎo)致日志信息無(wú)法閱讀?
  • 因此,使用synchronized修飾方法能夠保證方法的中斷處理階段只執(zhí)行一次,不是業(yè)務(wù)要求,而是并發(fā)編程的內(nèi)在要求.? ? ??

    ?3、getTask方法簡(jiǎn)單返回worker所承擔(dān)的任務(wù)引用、resetTask則將worker的task設(shè)為空.

    ?4、start方法用于在分配task后啟動(dòng)worker,首先使用synchronized關(guān)鍵字獲取到線程的鎖,然后調(diào)用notify喚醒,之后task便得以執(zhí)行,起到了開(kāi)始任務(wù)的功能.

    (3)ThreadPool類

    public ThreadPool(int pMaxSize, String pName){maxSize = pMaxSize;threadGroup = new ThreadGroup(pName);}

    ? ? ? ? 1、首先來(lái)分析ThreadPool的構(gòu)造方法,該方法接收整型變量pMaxSize和字符串pName,其中pMaxSize規(guī)定了線程池的最大容量,pName則用來(lái)初始化ThreadGroup并為其命名,在同一個(gè)線程池worker的Thread對(duì)象都屬于一個(gè)ThreadGroup.

    ? ??

    private synchronized void remove(Poolable pPoolable) {runningThreads.remove(pPoolable);waitingThreads.remove(pPoolable);}

    ? ? ? ? 2、remove方法調(diào)用ArrayList數(shù)據(jù)成員runningThreads和waitingThreads的remove方法,將一個(gè)worker從中刪除.

    ????????

    void repool(Poolable pPoolable){boolean discarding = false;Task task = null;Poolable poolable = null;synchronized (this) {if (runningThreads.remove(pPoolable)) {if (maxSize != 0 && runningThreads.size() + waitingThreads.size() >= maxSize) {discarding = true; } else{waitingThreads.add(pPoolable); if (waitingTasks.size() > 0) {task = (Task) waitingTasks.remove(waitingTasks.size() - 1); poolable = getPoolable(task, false);}}} else {discarding = true;}if (discarding){remove(pPoolable); }}if (poolable != null) {poolable.start(task); }if (discarding) {pPoolable.shutdown();}}

    ? ? ? ? 3、repool方法為上文中worker完成任務(wù)后調(diào)用的方法.首先聲明布爾變量discarding并設(shè)為false用于標(biāo)記是否丟棄此worker.然后聲明Task和Poolable的引用,接著進(jìn)入互斥區(qū).

    ? ? ? ? 因?yàn)榇藭r(shí)worker剛剛完成task,所以調(diào)用runningThreads的remove方法將worker從中移除,如果成功移除代表worker成功完成作業(yè),如果worker在完成作業(yè)中出現(xiàn)異常,則會(huì)通過(guò)shuttingDown方法將worker移除,此時(shí)調(diào)用remove則會(huì)返回false.

    ????????當(dāng)返回值為true時(shí),檢查線程池的worker數(shù)量是否溢出(包括空閑的和運(yùn)行的).若數(shù)量溢出,則將此worker標(biāo)記為棄用,否則將其加入waitingThreads列表,并檢查waitingTasks中是否有排隊(duì)的作業(yè).如果有則以LIFO的方式從排隊(duì)作業(yè)列表中取出一個(gè)task,然后調(diào)用getPoolable方法為嘗試其分配一個(gè)worker.當(dāng)返回值為false時(shí),則將此worker標(biāo)記為棄用.

    ? ? ? ? 接下來(lái)檢測(cè)worker是否被標(biāo)記為棄用,如果是則從兩個(gè)列表中移除.此時(shí)已退出互斥區(qū).接下來(lái)檢查poolable引用是否為null,僅當(dāng)worker成功返回且調(diào)用getPoolable嘗試為排隊(duì)作業(yè)分配worker成功時(shí)此引用不為空,然后在此處開(kāi)始這個(gè)從排隊(duì)列表中取出的作業(yè).

    ? ? ? ? 在最后,如果棄用位為true,則調(diào)用這個(gè)worker的shutdown方法.這些操作看上去與worker的thread成員的run方法中的異常處理分支重復(fù),然而要注意到的是并非只有產(chǎn)生了異常的worker才需要丟棄,溢出的worker以及線程池析構(gòu)時(shí)也需要銷(xiāo)毀worker.此處處理的正是worker數(shù)量過(guò)多時(shí)正常返回的worker.

    ? ? ? ? 注意到這個(gè)方法存在互斥區(qū),設(shè)置為互斥的目的在于引用忙碌和閑置worker的列表類別為ArrayList,并不是線程安全的,因此如果多個(gè)地點(diǎn)同時(shí)調(diào)用時(shí)會(huì)導(dǎo)致if分支被多次測(cè)試通過(guò),從而導(dǎo)致worker被多次加入線程池、作業(yè)排隊(duì)列表被多次彈棧(也并非線程安全的).

    private synchronized Poolable getPoolable(Task pTask, boolean pQueue) {if (maxSize != 0 && runningThreads.size() >= maxSize) {if (pQueue) {waitingTasks.add(pTask); }return null; }Poolable poolable;if (waitingThreads.size() > 0) {poolable = (Poolable) waitingThreads.remove(waitingThreads.size() - 1); } else{poolable = new Poolable(threadGroup, num++); }runningThreads.add(poolable); return poolable; }

    ? ? ? ? 4、getPoolable方法可以看作ThreadPool的核心方法,其作用是向線程池提交作業(yè)以運(yùn)行.這是一個(gè)互斥方法,因?yàn)樽鳂I(yè)等待隊(duì)列不是線程安全的.實(shí)際上,ThreadPool的三個(gè)列表都不是線程安全的,因此涉及到對(duì)這三個(gè)列表CRUD的操作代碼都需要考慮是否是臨界區(qū).

    ? ? ? ? 此方法接收Task對(duì)象作為提交的作業(yè),接收一個(gè)布爾變量標(biāo)記是否接受排隊(duì).讀取參數(shù)后,首先檢查目前線程池的正在運(yùn)行的作業(yè)數(shù)量是否超過(guò)上限,如果超過(guò)上限則根據(jù)排隊(duì)策略選擇進(jìn)入隊(duì)列或者離開(kāi),無(wú)論是那種排隊(duì)策略,此刻都沒(méi)有產(chǎn)生就緒狀態(tài)的worker,因此返回值為null.

    ? ? ? ? 如果線程池仍有空間,則首先檢查是否有空閑的worker,如果有則返回一個(gè),如果沒(méi)有則新建一個(gè).因?yàn)檫@個(gè)worker馬上將要執(zhí)行作業(yè),因此將其加入runningThreads列表,最后將其返回.

    ? ? ? ? 為了保證安全,這個(gè)方法也是互斥的.

    public boolean addTask(Task pTask) {final Poolable poolable = getPoolable(pTask, true);if (poolable != null){poolable.start(pTask);return true;}return false;}public boolean startTask(Task pTask) {final Poolable poolable = getPoolable(pTask, false);if (poolable == null){return false;}poolable.start(pTask);return true;}

    ? ? ? ? 5、getPoolable并public的方法,想要提交任務(wù)要通過(guò)addTask或者startTask.

    ? ? ? ? addTask調(diào)用getPoolable并允許排隊(duì),在能夠立即運(yùn)行時(shí)返回true,在稍后會(huì)執(zhí)行為false;startTask調(diào)用相同的方法但不允許排隊(duì),在能夠立即運(yùn)行時(shí)返回true,不能運(yùn)行則返回false.

    ? ? ? ? 綜合來(lái)看,startTask更加注重實(shí)時(shí)性,而addTask則要求作業(yè)提交后早晚會(huì)被處理.

    public synchronized void shutdown(){while (!waitingThreads.isEmpty()){Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size() - 1);poolable.shutdown();}while (!runningThreads.isEmpty()){Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size() - 1);poolable.shutdown();}}

    ? ? ? ? 6、需要注意此方法并非worker中的shutdown,此方法可以看做線程池的析構(gòu)函數(shù).調(diào)用此方法時(shí)會(huì)銷(xiāo)毀每一個(gè)worker.同樣是互斥方法,

    ????????

    public int getMaxThreads(){return maxSize;}public synchronized int getNumThreads(){return num;}

    ? ? ? 7、最后這兩個(gè)方法簡(jiǎn)單返回線程池的最大容量和當(dāng)前容量.

    ? ? ? ??

    5.總結(jié)

    ????????ROS的線程池是ROS并發(fā)處理RPC得以實(shí)現(xiàn)的基礎(chǔ),而后者又是ROS-Core連結(jié)各類node,進(jìn)行topic和service的調(diào)用時(shí)的底層實(shí)現(xiàn),因此了解其線程池運(yùn)行邏輯對(duì)于學(xué)習(xí)ROS是十分重要的.

    ? ? ? ? 另外,在學(xué)習(xí)線程池源代碼的同時(shí),我們看到了如何將一對(duì)一線程模型通過(guò)編程改變?yōu)槎鄬?duì)多模型,既基本保持了原有的效率又節(jié)省了資源,也體會(huì)到了這樣一種設(shè)計(jì)思想:當(dāng)操作系統(tǒng)的實(shí)現(xiàn)不滿足需求時(shí),就編程對(duì)操作系統(tǒng)進(jìn)行仿真(如Task代表程序,Poolable代表處理機(jī),Pooable+運(yùn)行的Thread代表進(jìn)程),以較低的代價(jià)達(dá)到自己的目的.最后,在并發(fā)編程對(duì)時(shí)候一定要對(duì)線程安全十分敏感,不然一定會(huì)因小失大,看似提高了效率,實(shí)則埋下巨大的隱患.

    ? ?

    總結(jié)

    以上是生活随笔為你收集整理的ROS源码学习 二、线程池的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。