日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python分布式框架_高性能分布式执行框架——Ray

發(fā)布時間:2023/12/10 python 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python分布式框架_高性能分布式执行框架——Ray 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Ray是UC Berkeley RISELab新推出的高性能分布式執(zhí)行框架,它使用了和傳統(tǒng)分布式計算系統(tǒng)不一樣的架構(gòu)和對分布式計算的抽象方式,具有比Spark更優(yōu)異的計算性能。

Ray目前還處于實驗室階段,最新版本為0.2.2版本。雖然Ray自稱是面向AI應(yīng)用的分布式計算框架,但是它的架構(gòu)具有通用的分布式計算抽象。本文對Ray進行簡單的介紹,幫助大家更快地了解Ray是什么,如有描述不當?shù)牡胤?#xff0c;歡迎不吝指正。

一、簡單開始

首先來看一下最簡單的Ray程序是如何編寫的。

# 導(dǎo)入ray,并初始化執(zhí)行環(huán)境

import ray

ray.init()

# 定義ray remote函數(shù)

@ray.remote

def hello():

return "Hello world !"

# 異步執(zhí)行remote函數(shù),返回結(jié)果id

object_id = hello.remote()

# 同步獲取計算結(jié)果

hello = ray.get(object_id)

# 輸出計算結(jié)果

print hello

在Ray里,通過Python注解@ray.remote定義remote函數(shù)。使用此注解聲明的函數(shù)都會自帶一個默認的方法remote,通過此方法發(fā)起的函數(shù)調(diào)用都是以提交分布式任務(wù)的方式異步執(zhí)行的,函數(shù)的返回值是一個對象id,使用ray.get內(nèi)置操作可以同步獲取該id對應(yīng)的對象。熟悉Java里的Future機制的話對此應(yīng)該并不陌生,或許會有人疑惑這和普通的異步函數(shù)調(diào)用沒什么大的區(qū)別,但是這里最大的差異是,函數(shù)hello是分布式異步執(zhí)行的。

remote函數(shù)是Ray分布式計算抽象中的核心概念,通過它開發(fā)者擁有了動態(tài)定制計算依賴(任務(wù)DAG)的能力。比如:

@ray.remote

def A():

return "A"

@ray.remote

def B():

return "B"

@ray.remote

def C(a, b):

return "C"

a_id = A.remote()

b_id = B.remote()

c_id = C.remote(a_id, b_id)

print ray.get(c_id)

例子代碼中,對函數(shù)A、B的調(diào)用是完全并行執(zhí)行的,但是對函數(shù)C的調(diào)用依賴于A、B函數(shù)的返回結(jié)果。Ray可以保證函數(shù)C需要等待A、B函數(shù)的結(jié)果真正計算出來后才會執(zhí)行。如果將函數(shù)A、B、C類比為DAG的節(jié)點的話,那么DAG的邊就是函數(shù)C參數(shù)對函數(shù)A、B計算結(jié)果的依賴,自由的函數(shù)調(diào)用方式允許Ray可以自由地定制DAG的結(jié)構(gòu)和計算依賴關(guān)系。另外,提及一點的是Python的函數(shù)可以定義函數(shù)具有多個返回值,這也使得Python的函數(shù)更天然具備了DAG節(jié)點多入和多出的特點。

二、系統(tǒng)架構(gòu)

Ray是使用什么樣的架構(gòu)對分布式計算做出如上抽象的呢,一下給出了Ray的系統(tǒng)架構(gòu)(來自Ray論文,參考文獻1)。

作為分布式計算系統(tǒng),Ray仍舊遵循了典型的Master-Slave的設(shè)計:Master負責全局協(xié)調(diào)和狀態(tài)維護,Slave執(zhí)行分布式計算任務(wù)。不過和傳統(tǒng)的分布式計算系統(tǒng)不同的是,Ray使用了混合任務(wù)調(diào)度的思路。在集群部署模式下,Ray啟動了以下關(guān)鍵組件:

GlobalScheduler:Master上啟動了一個全局調(diào)度器,用于接收本地調(diào)度器提交的任務(wù),并將任務(wù)分發(fā)給合適的本地任務(wù)調(diào)度器執(zhí)行。

RedisServer:Master上啟動了一到多個RedisServer用于保存分布式任務(wù)的狀態(tài)信息(ControlState),包括對象機器的映射、任務(wù)描述、任務(wù)debug信息等。

LocalScheduler:每個Slave上啟動了一個本地調(diào)度器,用于提交任務(wù)到全局調(diào)度器,以及分配任務(wù)給當前機器的Worker進程。

Worker:每個Slave上可以啟動多個Worker進程執(zhí)行分布式任務(wù),并將計算結(jié)果存儲到ObjectStore。

ObjectStore:每個Slave上啟動了一個ObjectStore存儲只讀數(shù)據(jù)對象,Worker可以通過共享內(nèi)存的方式訪問這些對象數(shù)據(jù),這樣可以有效地減少內(nèi)存拷貝和對象序列化成本。ObjectStore底層由Apache Arrow實現(xiàn)。

Plasma:每個Slave上的ObjectStore都由一個名為Plasma的對象管理器進行管理,它可以在Worker訪問本地ObjectStore上不存在的遠程數(shù)據(jù)對象時,主動拉取其它Slave上的對象數(shù)據(jù)到當前機器。

需要說明的是,Ray的論文中提及,全局調(diào)度器可以啟動一到多個,而目前Ray的實現(xiàn)文檔里討論的內(nèi)容都是基于一個全局調(diào)度器的情況。我猜測可能是Ray尚在建設(shè)中,一些機制還未完善,后續(xù)讀者可以留意此處的細節(jié)變化。

Ray的任務(wù)也是通過類似Spark中Driver的概念的方式進行提交的,有所不同的是:

Spark的Driver提交的是任務(wù)DAG,一旦提交則不可更改。

而Ray提交的是更細粒度的remote function,任務(wù)DAG依賴關(guān)系由函數(shù)依賴關(guān)系自由定制。

論文給出的架構(gòu)圖里并未畫出Driver的概念,因此我在其基礎(chǔ)上做了一些修改和擴充。

Ray的Driver節(jié)點和和Slave節(jié)點啟動的組件幾乎相同,不過卻有以下區(qū)別:

Driver上的工作進程DriverProcess一般只有一個,即用戶啟動的PythonShell。Slave可以根據(jù)需要創(chuàng)建多個WorkerProcess。

Driver只能提交任務(wù),卻不能接收來自全局調(diào)度器分配的任務(wù)。Slave可以提交任務(wù),也可以接收全局調(diào)度器分配的任務(wù)。

Driver可以主動繞過全局調(diào)度器給Slave發(fā)送Actor調(diào)用任務(wù)(此處設(shè)計是否合理尚不討論)。Slave只能接收全局調(diào)度器分配的計算任務(wù)。

三、核心操作

基于以上架構(gòu),我們簡單討論一下Ray中關(guān)鍵的操作和流程。

1. ray.init()

在PythonShell中,使用ray.init()可以在本地啟動ray,包括Driver、HeadNode(Master)和若干Slave。

import ray

ray.init()

如果是直連已有的Ray集群,只需要指定RedisServer的地址即可。

ray.init(redis_address="")

本地啟動Ray得到的輸出如下:

>>> ray.init()

Waiting for redis server at 127.0.0.1:58807 to respond...

Waiting for redis server at 127.0.0.1:23148 to respond...

Allowing the Plasma store to use up to 13.7439GB of memory.

Starting object store with directory /tmp and huge page support disabled

Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================

View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5

======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}

>>>

本地啟動Ray時,可以看到Ray的WebUI的訪問地址。

2. ray.put()

使用ray.put()可以將Python對象存入本地ObjectStore,并且異步返回一個唯一的ObjectID。通過該ID,Ray可以訪問集群中任一個節(jié)點上的對象(遠程對象通過查閱Master的對象表獲得)。

對象一旦存入ObjectStore便不可更改,Ray的remote函數(shù)可以將直接將該對象的ID作為參數(shù)傳入。使用ObjectID作為remote函數(shù)參數(shù),可以有效地減少函數(shù)參數(shù)的寫ObjectStore的次數(shù)。

@ray.remote

def f(x):

pass

x = "hello"

# 對象x往ObjectStore拷貝里10次

[f.remote(x) for _ in range(10)]

# 對象x僅往ObjectStore拷貝1次

x_id = ray.put(x)

[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()可以通過ObjectID獲取ObjectStore內(nèi)的對象并將之轉(zhuǎn)換為Python對象。對于數(shù)組類型的對象,Ray使用共享內(nèi)存機制減少數(shù)據(jù)的拷貝成本。而對于其它對象則需要將數(shù)據(jù)從ObjectStore拷貝到進程的堆內(nèi)存中。

如果調(diào)用ray.get()操作時,對象尚未創(chuàng)建好,則get操作會阻塞,直到對象創(chuàng)建完成后返回。get操作的關(guān)鍵流程如下:

Driver或者Worker進程首先到ObjectStore內(nèi)請求ObjectID對應(yīng)的對象數(shù)據(jù)。

如果本地ObjectStore沒有對應(yīng)的對象數(shù)據(jù),本地對象管理器Plasma會檢查Master上的對象表查看對象是否存儲其它節(jié)點的ObjectStore。

如果對象數(shù)據(jù)在其它節(jié)點的ObjectStore內(nèi),Plasma會發(fā)送網(wǎng)絡(luò)請求將對象數(shù)據(jù)拉到本地ObjectStore。

如果對象數(shù)據(jù)還沒有創(chuàng)建好,Master會在對象創(chuàng)建完成后通知請求的Plasma讀取。

如果對象數(shù)據(jù)已經(jīng)被所有的ObjectStore移除(被LRU策略刪除),本地調(diào)度器會根據(jù)任務(wù)血緣關(guān)系執(zhí)行對象的重新創(chuàng)建工作。

一旦對象數(shù)據(jù)在本地ObjectStore可用,Driver或者Worker進程會通過共享內(nèi)存的方式直接將對象內(nèi)存區(qū)域映射到自己的進程地址空間中,并反序列化為Python對象。

另外,ray.get()可以一次性讀取多個對象的數(shù)據(jù):

result_ids = [ray.put(i) for i in range(10)]

ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

4. @ray.remote

Ray中使用注解@ray.remote可以聲明一個remote function。remote函數(shù)時Ray的基本任務(wù)調(diào)度單元,remote函數(shù)定義后會立即被序列化存儲到RedisServer中,并且分配了一個唯一的ID,這樣就保證了集群的所有節(jié)點都可以看到這個函數(shù)的定義。

不過,這樣對remote函數(shù)定義有了一個潛在的要求,即remote函數(shù)內(nèi)如果調(diào)用了其它的用戶函數(shù),則必須提前定義,否則remote函數(shù)無法找到對應(yīng)的函數(shù)定義內(nèi)容。

remote函數(shù)內(nèi)也可以調(diào)用其它的remote函數(shù),Driver和Slave每次調(diào)用remote函數(shù)時,其實都是向集群提交了一個計算任務(wù),從這里也可以看到Ray的分布式計算的自由性。

Ray中調(diào)用remote函數(shù)的關(guān)鍵流程如下:

調(diào)用remote函數(shù)時,首先會創(chuàng)建一個任務(wù)對象,它包含了函數(shù)的ID、參數(shù)的ID或者值(Python的基本對象直接傳值,復(fù)雜對象會先通過ray.put()操作存入ObjectStore然后返回ObjectID)、函數(shù)返回值對象的ID。

任務(wù)對象被發(fā)送到本地調(diào)度器。

本地調(diào)度器決定任務(wù)對象是在本地調(diào)度還是發(fā)送給全局調(diào)度器。如果任務(wù)對象的依賴(參數(shù))在本地的ObejctStore已經(jīng)存在且本地的CPU和GPU計算資源充足,那么本地調(diào)度器將任務(wù)分配給本地的WorkerProcess執(zhí)行。否則,任務(wù)對象被發(fā)送給全局調(diào)度器并存儲到任務(wù)表(TaskTable)中,全局調(diào)度器根據(jù)當前的任務(wù)狀態(tài)信息決定將任務(wù)發(fā)給集群中的某一個本地調(diào)度器。

本地調(diào)度器收到任務(wù)對象后(來自本地的任務(wù)或者全局調(diào)度分配的任務(wù)),會將其放入一個任務(wù)隊列中,等待計算資源和本地依賴滿足后分配給WorkerProcess執(zhí)行。

Worker收到任務(wù)對象后執(zhí)行該任務(wù),并將函數(shù)返回值存入ObjectStore,并更新Master的對象表(ObjectTable)信息。

@ray.remote注解有一個參數(shù)num_return_vals用于聲明remote函數(shù)的返回值個數(shù),基于此實現(xiàn)remote函數(shù)的多返回值機制。

@ray.remote(num_return_vals=2)

def f():

return 1, 2

x_id, y_id = f.remote()

ray.get(x_id) # 1

ray.get(y_id) # 2

@ray.remote注解的另一個參數(shù)num_gpus可以為任務(wù)指定GPU的資源。使用內(nèi)置函數(shù)ray.get_gpu_ids()可以獲取當前任務(wù)可以使用的GPU信息。

@ray.remote(num_gpus=1)

def gpu_method():

return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())

5. ray.wait()

ray.wait()操作支持批量的任務(wù)等待,基于此可以實現(xiàn)一次性獲取多個ObjectID對應(yīng)的數(shù)據(jù)。

# 啟動5個remote函數(shù)調(diào)用任務(wù)

results = [f.remote(i) for i in range(5)]

# 阻塞等待4個任務(wù)完成,超時時間為2.5s

ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包含了5個ObjectID,使用ray.wait操作可以一直等待有4個任務(wù)完成后返回,并將完成的數(shù)據(jù)對象放在第一個list類型返回值內(nèi),未完成的ObjectID放在第二個list返回值內(nèi)。如果設(shè)置了超時時間,那么在超時時間結(jié)束后仍未等到預(yù)期的返回值個數(shù),則已超時完成時的返回值為準。

6. ray.error_info()

使用ray.error_info()可以獲取任務(wù)執(zhí)行時產(chǎn)生的錯誤信息。

>>> import time

>>> @ray.remote

>>> def f():

>>> time.sleep(5)

>>> raise Exception("This task failed!!")

>>> f.remote()

Remote function __main__.f failed with:

Traceback (most recent call last):

File "", line 4, in f

Exception: This task failed!!

You can inspect errors by running

ray.error_info()

If this driver is hanging, start a new one with

ray.init(redis_address="127.0.0.1:65452")

>>> ray.error_info()

[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n File "", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

7. Actor

Ray的remote函數(shù)只能處理無狀態(tài)的計算需求,有狀態(tài)的計算需求需要使用Ray的Actor實現(xiàn)。在Python的class定義前使用@ray.remote可以聲明Actor。

@ray.remote

class Counter(object):

def __init__(self):

self.value = 0

def increment(self):

self.value += 1

return self.value

使用如下方式創(chuàng)建Actor對象。

a1 = Counter.remote()

a2 = Counter.remote()

Ray創(chuàng)建Actor的流程為:

Master選取一個Slave,并將Actor創(chuàng)建任務(wù)分發(fā)給它的本地調(diào)度器。

創(chuàng)建Actor對象,并執(zhí)行它的構(gòu)造函數(shù)。

從流程可以看出,Actor對象的創(chuàng)建時并行的。

通過調(diào)用Actor對象的方法使用Actor。

a1.increment.remote() # ray.get returns 1

a2.increment.remote() # ray.get returns 1

調(diào)用Actor對象的方法的流程為:

首先創(chuàng)建一個任務(wù)。

該任務(wù)被Driver直接分配到創(chuàng)建該Actor對應(yīng)的本地執(zhí)行器執(zhí)行,這個操作繞開了全局調(diào)度器(Worker是否也可以使用Actor直接分配任務(wù)尚存疑問)。

返回Actor方法調(diào)用結(jié)果的ObjectID。

為了保證Actor狀態(tài)的一致性,對同一個Actor的方法調(diào)用是串行執(zhí)行的。

四、安裝Ray

如果只是使用Ray,可以使用如下命令直接安裝。

pip intall ray

如果需要編譯Ray的最新源碼進行安裝,按照如下步驟進行(MaxOS):

# 更新編譯依賴包

brew update

brew install cmake pkg-config automake autoconf libtool boost wget

pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six

# 下載源碼編譯安裝

git clone https://github.com/ray-project/ray.git

cd ray/python

python setup.py install

# 測試

python test/runtest.py

# 安裝WebUI需要的庫[可選]

pip install jupyter ipywidgets bokeh

# 編譯Ray文檔[可選]

cd ray/doc

pip install -r requirements-doc.txt

make html

open _build/html/index.html

我在MacOS上安裝jupyter時,遇到了Python的setuptools庫無法升級的情況,原因是MacOS的安全性設(shè)置問題,可以使用如下方式解決:

重啟電腦,啟動時按住Command+R進入Mac保護模式。

打開命令行,輸入命令csrutils disable關(guān)閉系統(tǒng)安全策略。

重啟電腦,繼續(xù)安裝jupyter。

安裝完成后,重復(fù)如上的方式執(zhí)行csrutils enable,再次重啟即可。

進入PythonShell,輸入代碼本地啟動Ray:

import ray

ray.init()

瀏覽器內(nèi)打開WebUI界面如下:

參考資料

總結(jié)

以上是生活随笔為你收集整理的python分布式框架_高性能分布式执行框架——Ray的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。