日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > php >内容正文

php

coroutine php_PHP 协程实现

發(fā)布時(shí)間:2023/12/10 php 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 coroutine php_PHP 协程实现 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

多進(jìn)程/線程

最早的服務(wù)器端程序都是通過多進(jìn)程、多線程來解決并發(fā)IO的問題。進(jìn)程模型出現(xiàn)的最早,從Unix 系統(tǒng)誕生就開始有了進(jìn)程的概念。最早的服務(wù)器端程序一般都是 Accept 一個(gè)客戶端連接就創(chuàng)建一個(gè)進(jìn)程,然后子進(jìn)程進(jìn)入循環(huán)同步阻塞地與客戶端連接進(jìn)行交互,收發(fā)處理數(shù)據(jù)。

多線程模式出現(xiàn)要晚一些,線程與進(jìn)程相比更輕量,而且線程之間共享內(nèi)存堆棧,所以不同的線程之間交互非常容易實(shí)現(xiàn)。比如實(shí)現(xiàn)一個(gè)聊天室,客戶端連接之間可以交互,聊天室中的玩家可以任意的其他人發(fā)消息。用多線程模式實(shí)現(xiàn)非常簡單,線程中可以直接向某一個(gè)客戶端連接發(fā)送數(shù)據(jù)。而多進(jìn)程模式就要用到管道、消息隊(duì)列、共享內(nèi)存等等統(tǒng)稱進(jìn)程間通信(IPC)復(fù)雜的技術(shù)才能實(shí)現(xiàn)。

最簡單的多進(jìn)程服務(wù)端模型

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr)

or die("Create server failed");

![image.png](http://upload-images.jianshu.io/upload_images/4686383-a16cd123fd865f10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

while(1) {

$conn = stream_socket_accept($serv);

if (pcntl_fork() == 0) {

$request = fread($conn);

// do something

// $response = "hello world";

fwrite($response);

fclose($conn);

exit(0);

}

}

多進(jìn)程/線程模型的流程是:

創(chuàng)建一個(gè) socket,綁定服務(wù)器端口(bind),監(jiān)聽端口(listen),在 PHP 中用 stream_socket_server 一個(gè)函數(shù)就能完成上面 3 個(gè)步驟,當(dāng)然也可以使用更底層的sockets 擴(kuò)展分別實(shí)現(xiàn)。

進(jìn)入 while 循環(huán),阻塞在 accept 操作上,等待客戶端連接進(jìn)入。此時(shí)程序會(huì)進(jìn)入隨眠狀態(tài),直到有新的客戶端發(fā)起 connect 到服務(wù)器,操作系統(tǒng)會(huì)喚醒此進(jìn)程。accept 函數(shù)返回客戶端連接的 socket 主進(jìn)程在多進(jìn)程模型下通過 fork(php: pcntl_fork)創(chuàng)建子進(jìn)程,多線程模型下使用 pthread_create(php: new Thread)創(chuàng)建子線程。

下文如無特殊聲明將使用進(jìn)程同時(shí)表示進(jìn)程/線程。

子進(jìn)程創(chuàng)建成功后進(jìn)入 while 循環(huán),阻塞在 recv(php:fread)調(diào)用上,等待客戶端向服務(wù)器發(fā)送數(shù)據(jù)。收到數(shù)據(jù)后服務(wù)器程序進(jìn)行處理然后使用 send(php: fwrite)向客戶端發(fā)送響應(yīng)。長連接的服務(wù)會(huì)持續(xù)與客戶端交互,而短連接服務(wù)一般收到響應(yīng)就會(huì) close。

當(dāng)客戶端連接關(guān)閉時(shí),子進(jìn)程退出并銷毀所有資源,主進(jìn)程會(huì)回收掉此子進(jìn)程。

image.png

這種模式最大的問題是,進(jìn)程創(chuàng)建和銷毀的開銷很大。所以上面的模式?jīng)]辦法應(yīng)用于非常繁忙的服務(wù)器程序。對(duì)應(yīng)的改進(jìn)版解決了此問題,這就是經(jīng)典的 Leader-Follower 模型。

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr)

or die("Create server failed");

for($i = 0; $i < 32; $i++) {

if (pcntl_fork() == 0) {

while(1) {

$conn = stream_socket_accept($serv);

if ($conn == false) continue;

// do something

$request = fread($conn);

// $response = "hello world";

fwrite($response);

fclose($conn);

}

exit(0);

}

}

它的特點(diǎn)是程序啟動(dòng)后就會(huì)創(chuàng)建 N 個(gè)進(jìn)程。每個(gè)子進(jìn)程進(jìn)入 Accept,等待新的連接進(jìn)入。當(dāng)客戶端連接到服務(wù)器時(shí),其中一個(gè)子進(jìn)程會(huì)被喚醒,開始處理客戶端請(qǐng)求,并且不再接受新的 TCP 連接。當(dāng)此連接關(guān)閉時(shí),子進(jìn)程會(huì)釋放,重新進(jìn)入 Accept,參與處理新的連接。

這個(gè)模型的優(yōu)勢(shì)是完全可以復(fù)用進(jìn)程,沒有額外消耗,性能非常好。很多常見的服務(wù)器程序都是基于此模型的,比如 Apache、PHP-FPM。

多進(jìn)程模型也有一些缺點(diǎn)。

這種模型嚴(yán)重依賴進(jìn)程的數(shù)量解決并發(fā)問題,一個(gè)客戶端連接就需要占用一個(gè)進(jìn)程,工作進(jìn)程的數(shù)量有多少,并發(fā)處理能力就有多少。操作系統(tǒng)可以創(chuàng)建的進(jìn)程數(shù)量是有限的。

啟動(dòng)大量進(jìn)程會(huì)帶來額外的進(jìn)程調(diào)度消耗。數(shù)百個(gè)進(jìn)程時(shí)可能進(jìn)程上下文切換調(diào)度消耗占 CPU 不到1%可以忽略不接,如果啟動(dòng)數(shù)千甚至數(shù)萬個(gè)進(jìn)程,消耗就會(huì)直線上升。調(diào)度消耗可能占到 CPU 的百分之幾十甚至 100%。

并行和并發(fā)

談到多進(jìn)程以及類似同時(shí)執(zhí)行多個(gè)任務(wù)的模型,就不得不先談?wù)劜⑿泻筒l(fā)。

并發(fā)(Concurrency)

是指能處理多個(gè)同時(shí)性活動(dòng)的能力,并發(fā)事件之間不一定要同一時(shí)刻發(fā)生。

并行(Parallesim)

是指同時(shí)發(fā)生的兩個(gè)并發(fā)事件,具有并發(fā)的含義,而并發(fā)則不一定并行。

區(qū)別

『并發(fā)』指的是程序的結(jié)構(gòu),『并行』指的是程序運(yùn)行時(shí)的狀態(tài)

『并行』一定是并發(fā)的,『并行』是『并發(fā)』設(shè)計(jì)的一種

單線程永遠(yuǎn)無法達(dá)到『并行』狀態(tài)

正確的并發(fā)設(shè)計(jì)的標(biāo)準(zhǔn)是:

使多個(gè)操作可以在重疊的時(shí)間段內(nèi)進(jìn)行。

two tasks can start, run, and complete in overlapping time periods

參考:

迭代器 & 生成器

在了解 PHP 協(xié)程前,還有 迭代器 和 生成器 這兩個(gè)概念需要先認(rèn)識(shí)一下。

迭代器

PHP5 開始內(nèi)置了 Iterator 即迭代器接口,所以如果你定義了一個(gè)類,并實(shí)現(xiàn)了Iterator 接口,那么你的這個(gè)類對(duì)象就是 ZEND_ITER_OBJECT 即可迭代的,否則就是 ZEND_ITER_PLAIN_OBJECT。

對(duì)于 ZEND_ITER_PLAIN_OBJECT 的類,foreach 會(huì)獲取該對(duì)象的默認(rèn)屬性數(shù)組,然后對(duì)該數(shù)組進(jìn)行迭代。

而對(duì)于 ZEND_ITER_OBJECT 的類對(duì)象,則會(huì)通過調(diào)用對(duì)象實(shí)現(xiàn)的 Iterator 接口相關(guān)函數(shù)來進(jìn)行迭代。

任何實(shí)現(xiàn)了 Iterator 接口的類都是可迭代的,即都可以用 foreach 語句來遍歷。

Iterator 接口

interface Iterator extends Traversable

{

// 獲取當(dāng)前內(nèi)部標(biāo)量指向的元素的數(shù)據(jù)

public mixed current()

// 獲取當(dāng)前標(biāo)量

public scalar key()

// 移動(dòng)到下一個(gè)標(biāo)量

public void next()

// 重置標(biāo)量

public void rewind()

// 檢查當(dāng)前標(biāo)量是否有效

public boolean valid()

}

常規(guī)實(shí)現(xiàn) range 函數(shù)

PHP 自帶的 range 函數(shù)原型:

range — 根據(jù)范圍創(chuàng)建數(shù)組,包含指定的元素

array range (mixed $start , mixed $end [, number $step = 1 ])

建立一個(gè)包含指定范圍單元的數(shù)組。

在不使用迭代器的情況要實(shí)現(xiàn)一個(gè)和 PHP 自帶的 range 函數(shù)類似的功能,可能會(huì)這么寫:

function range ($start, $end, $step = 1)

{

$ret = [];

for ($i = $start; $i <= $end; $i += $step) {

$ret[] = $i;

}

return $ret;

}

需要將生成的所有元素放在內(nèi)存數(shù)組中,如果需要生成一個(gè)非常大的集合,則會(huì)占用巨大的內(nèi)存。

迭代器實(shí)現(xiàn) xrange 函數(shù)

來看看迭代實(shí)現(xiàn)的 range,我們叫做 xrange,他實(shí)現(xiàn)了 Iterator 接口必須的 5 個(gè)方法:

class Xrange implements Iterator

{

protected $start;

protected $limit;

protected $step;

protected $current;

public function __construct($start, $limit, $step = 1)

{

$this->start = $start;

$this->limit = $limit;

$this->step = $step;

}

public function rewind()

{

$this->current = $this->start;

}

public function next()

{

$this->current += $this->step;

}

public function current()

{

return $this->current;

}

public function key()

{

return $this->current + 1;

}

public function valid()

{

return $this->current <= $this->limit;

}

}

使用時(shí)代碼如下:

foreach (new Xrange(0, 9) as $key => $val) {

echo $key, ' ', $val, "\n";

}

輸出:

0 0

1 1

2 2

3 3

4 4

5 5

6 6

7 7

8 8

9 9

看上去功能和 range() 函數(shù)所做的一致,不同點(diǎn)在于迭代的是一個(gè) 對(duì)象(Object) 而不是數(shù)組:

var_dump(new Xrange(0, 9));

輸出:

object(Xrange)#1 (4) {

["start":protected]=>

int(0)

["limit":protected]=>

int(9)

["step":protected]=>

int(1)

["current":protected]=>

NULL

}

另外,內(nèi)存的占用情況也完全不同:

// range

$startMemory = memory_get_usage();

$arr = range(0, 500000);

echo 'range(): ', memory_get_usage() - $startMemory, " bytes\n";

unset($arr);

// xrange

$startMemory = memory_get_usage();

$arr = new Xrange(0, 500000);

echo 'xrange(): ', memory_get_usage() - $startMemory, " bytes\n";

輸出:

xrange(): 624 bytes

range(): 72194784 bytes

range() 函數(shù)在執(zhí)行后占用了 50W 個(gè)元素內(nèi)存空間,而 xrange 對(duì)象在整個(gè)迭代過程中只占用一個(gè)對(duì)象的內(nèi)存。

Yii2 Query

在喜聞樂見的各種 PHP 框架里有不少生成器的實(shí)例,比如 Yii2 中用來構(gòu)建 SQL 語句的 \yii\db\Query 類:

$query = (new \yii\db\Query)->from('user');

// yii\db\BatchQueryResult

foreach ($query->batch() as $users) {

// 每次循環(huán)得到多條 user 記錄

}

來看一下 batch() 做了什么:

/**

* Starts a batch query.

*

* A batch query supports fetching data in batches, which can keep the memory usage under a limit.

* This method will return a [[BatchQueryResult]] object which implements the [[\Iterator]] interface

* and can be traversed to retrieve the data in batches.

*

* For example,

*

*

* $query = (new Query)->from('user');

* foreach ($query->batch() as $rows) {

* // $rows is an array of 10 or fewer rows from user table

* }

*

*

* @param integer $batchSize the number of records to be fetched in each batch.

* @param Connection $db the database connection. If not set, the "db" application component will be used.

* @return BatchQueryResult the batch query result. It implements the [[\Iterator]] interface

* and can be traversed to retrieve the data in batches.

*/

public function batch($batchSize = 100, $db = null)

{

return Yii::createObject([

'class' => BatchQueryResult::className(),

'query' => $this,

'batchSize' => $batchSize,

'db' => $db,

'each' => false,

]);

}

實(shí)際上返回了一個(gè) BatchQueryResult 類,類的源碼實(shí)現(xiàn)了 Iterator 接口 5 個(gè)關(guān)鍵方法:

class BatchQueryResult extends Object implements \Iterator

{

public $db;

public $query;

public $batchSize = 100;

public $each = false;

private $_dataReader;

private $_batch;

private $_value;

private $_key;

/**

* Destructor.

*/

public function __destruct()

{

// make sure cursor is closed

$this->reset();

}

/**

* Resets the batch query.

* This method will clean up the existing batch query so that a new batch query can be performed.

*/

public function reset()

{

if ($this->_dataReader !== null) {

$this->_dataReader->close();

}

$this->_dataReader = null;

$this->_batch = null;

$this->_value = null;

$this->_key = null;

}

/**

* Resets the iterator to the initial state.

* This method is required by the interface [[\Iterator]].

*/

public function rewind()

{

$this->reset();

$this->next();

}

/**

* Moves the internal pointer to the next dataset.

* This method is required by the interface [[\Iterator]].

*/

public function next()

{

if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) {

$this->_batch = $this->fetchData();

reset($this->_batch);

}

if ($this->each) {

$this->_value = current($this->_batch);

if ($this->query->indexBy !== null) {

$this->_key = key($this->_batch);

} elseif (key($this->_batch) !== null) {

$this->_key++;

} else {

$this->_key = null;

}

} else {

$this->_value = $this->_batch;

$this->_key = $this->_key === null ? 0 : $this->_key + 1;

}

}

/**

* Fetches the next batch of data.

* @return array the data fetched

*/

protected function fetchData()

{

// ...

}

/**

* Returns the index of the current dataset.

* This method is required by the interface [[\Iterator]].

* @return integer the index of the current row.

*/

public function key()

{

return $this->_key;

}

/**

* Returns the current dataset.

* This method is required by the interface [[\Iterator]].

* @return mixed the current dataset.

*/

public function current()

{

return $this->_value;

}

/**

* Returns whether there is a valid dataset at the current position.

* This method is required by the interface [[\Iterator]].

* @return boolean whether there is a valid dataset at the current position.

*/

public function valid()

{

return !empty($this->_batch);

}

}

以迭代器的方式實(shí)現(xiàn)了類似分頁取的效果,同時(shí)避免了一次性取出所有數(shù)據(jù)占用太多的內(nèi)存空間。

迭代器使用場(chǎng)景

使用返回迭代器的包或庫時(shí)(如 PHP5 中的 SPL 迭代器)

無法在一次調(diào)用獲取所需的所有元素時(shí)

要處理數(shù)量巨大的元素時(shí)(數(shù)據(jù)庫中要處理的結(jié)果集內(nèi)容超過內(nèi)存)

...

生成器

需要 PHP 5 >= 5.5.0 或 PHP 7

雖然迭代器僅需繼承接口即可實(shí)現(xiàn),但畢竟需要定義一整個(gè)類然后實(shí)現(xiàn)接口的所有方法,實(shí)在是不怎么方便。

生成器則提供了一種更簡單的方式來實(shí)現(xiàn)簡單的對(duì)象迭代,相比定義類來實(shí)現(xiàn) Iterator 接口的方式,性能開銷和復(fù)雜度大大降低。

生成器允許在 foreach 代碼塊中迭代一組數(shù)據(jù)而不需要?jiǎng)?chuàng)建任何數(shù)組。一個(gè)生成器函數(shù),就像一個(gè)普通的有返回值的自定義函數(shù)類似,但普通函數(shù)只返回一次, 而生成器可以根據(jù)需要通過 yield 關(guān)鍵字返回多次,以便連續(xù)生成需要迭代返回的值。

一個(gè)最簡單的例子就是使用生成器來重新實(shí)現(xiàn) xrange() 函數(shù)。效果和上面我們用迭代器實(shí)現(xiàn)的差不多,但實(shí)現(xiàn)起來要簡單的多。

生成器實(shí)現(xiàn) xrange 函數(shù)

function xrange($start, $limit, $step = 1) {

for ($i = 0; $i < $limit; $i += $step) {

yield $i + 1 => $i;

}

}

foreach (xrange(0, 9) as $key => $val) {

printf("%d %d \n", $key, $val);

}

// 輸出

// 1 0

// 2 1

// 3 2

// 4 3

// 5 4

// 6 5

// 7 6

// 8 7

// 9 8

實(shí)際上生成器生成的正是一個(gè)迭代器對(duì)象實(shí)例,該迭代器對(duì)象繼承了 Iterator 接口,同時(shí)也包含了生成器對(duì)象自有的接口,具體可以參考 Generator 類的定義以及語法參考。

同時(shí)需要注意的是:

一個(gè)生成器不可以返回值,這樣做會(huì)產(chǎn)生一個(gè)編譯錯(cuò)誤。然而 return 空是一個(gè)有效的語法并且它將會(huì)終止生成器繼續(xù)執(zhí)行。

yield 關(guān)鍵字

需要注意的是 yield 關(guān)鍵字,這是生成器的關(guān)鍵。通過上面的例子可以看出,yield 會(huì)將當(dāng)前產(chǎn)生的值傳遞給 foreach,換句話說,foreach 每一次迭代過程都會(huì)從 yield 處取一個(gè)值,直到整個(gè)遍歷過程不再能執(zhí)行到 yield 時(shí)遍歷結(jié)束,此時(shí)生成器函數(shù)簡單的退出,而調(diào)用生成器的上層代碼還可以繼續(xù)執(zhí)行,就像一個(gè)數(shù)組已經(jīng)被遍歷完了。

yield 最簡單的調(diào)用形式看起來像一個(gè) return 申明,不同的是 yield 暫停當(dāng)前過程的執(zhí)行并返回值,而 return 是中斷當(dāng)前過程并返回值。暫停當(dāng)前過程,意味著將處理權(quán)轉(zhuǎn)交由上一級(jí)繼續(xù)進(jìn)行,直到上一級(jí)再次調(diào)用被暫停的過程,該過程又會(huì)從上一次暫停的位置繼續(xù)執(zhí)行。這像是什么呢?如果之前已經(jīng)在鳥哥的文章中粗略看過,應(yīng)該知道這很像操作系統(tǒng)的進(jìn)程調(diào)度,多個(gè)進(jìn)程在一個(gè) CPU 核心上執(zhí)行,在系統(tǒng)調(diào)度下每一個(gè)進(jìn)程執(zhí)行一段指令就被暫停,切換到下一個(gè)進(jìn)程,這樣外部用戶看起來就像是同時(shí)在執(zhí)行多個(gè)任務(wù)。

但僅僅如此還不夠,yield 除了可以返回值以外,還能接收值,也就是可以在兩個(gè)層級(jí)間實(shí)現(xiàn)雙向通信。

來看看如何傳遞一個(gè)值給 yield:

function printer()

{

while (true) {

printf("receive: %s\n", yield);

}

}

$printer = printer();

$printer->send('hello');

$printer->send('world');

// 輸出

receive: hello

receive: world

根據(jù) PHP 官方文檔的描述可以知道 Generator 對(duì)象除了實(shí)現(xiàn) Iterator 接口中的必要方法以外,還有一個(gè) send 方法,這個(gè)方法就是向 yield 語句處傳遞一個(gè)值,同時(shí)從 yield 語句處繼續(xù)執(zhí)行,直至再次遇到 yield 后控制權(quán)回到外部。

既然 yield 可以在其位置中斷并返回或者接收一個(gè)值,那能不能同時(shí)進(jìn)行接收和返回呢?當(dāng)然,這也是實(shí)現(xiàn)協(xié)程的根本。對(duì)上述代碼做出修改:

function printer()

{

$i = 0;

while (true) {

printf("receive: %s\n", (yield ++$i));

}

}

$printer = printer();

printf("%d\n", $printer->current());

$printer->send('hello');

printf("%d\n", $printer->current());

$printer->send('world');

printf("%d\n", $printer->current());

// 輸出

1

receive: hello

2

receive: world

3

這是另一個(gè)例子:

function gen() {

$ret = (yield 'yield1');

var_dump($ret);

$ret = (yield 'yield2');

var_dump($ret);

}

$gen = gen();

var_dump($gen->current()); // string(6) "yield1"

var_dump($gen->send('ret1')); // string(4) "ret1" (第一個(gè) var_dump)

// string(6) "yield2" (繼續(xù)執(zhí)行到第二個(gè) yield,吐出了返回值)

var_dump($gen->send('ret2')); // string(4) "ret2" (第二個(gè) var_dump)

// NULL (var_dump 之后沒有其他語句,所以這次 ->send() 的返回值為 null)

current 方法是迭代器 Iterator 接口必要的方法,foreach 語句每一次迭代都會(huì)通過其獲取當(dāng)前值,而后調(diào)用迭代器的 next 方法。在上述例子里則是手動(dòng)調(diào)用了 current 方法獲取值。

上述例子已經(jīng)足以表示 yield 能夠作為實(shí)現(xiàn)雙向通信的工具,也就是具備了后續(xù)實(shí)現(xiàn)協(xié)程的基本條件。

上面的例子如果第一次接觸并稍加思考,不免會(huì)疑惑為什么一個(gè) yield 既是語句又是表達(dá)式,而且這兩種情況還同時(shí)存在:

對(duì)于所有在生成器函數(shù)中出現(xiàn)的 yield,首先它都是語句,而跟在 yield 后面的任何表達(dá)式的值將作為調(diào)用生成器函數(shù)的返回值,如果 yield 后面沒有任何表達(dá)式(變量、常量都是表達(dá)式),那么它會(huì)返回 NULL,這一點(diǎn)和 return 語句一致。

yield 也是表達(dá)式,它的值就是 send 函數(shù)傳過來的值(相當(dāng)于一個(gè)特殊變量,只不過賦值是通過 send 函數(shù)進(jìn)行的)。只要調(diào)用send方法,并且生成器對(duì)象的迭代并未終結(jié),那么當(dāng)前位置的 yield 就會(huì)得到 send 方法傳遞過來的值,這和生成器函數(shù)有沒有把這個(gè)值賦值給某個(gè)變量沒有任何關(guān)系。

這個(gè)地方可能需要仔細(xì)品味上面兩個(gè) send() 方法的例子才能理解。但可以簡單的記住:

任何時(shí)候 yield 關(guān)鍵詞即是語句:可以為生成器函數(shù)返回值;

也是表達(dá)式:可以接收生成器對(duì)象發(fā)過來的值。

除了 send() 方法,還有一種控制生成器執(zhí)行的方法是 next() 函數(shù):

Next(),恢復(fù)生成器函數(shù)的執(zhí)行直到下一個(gè) yield

Send(),向生成器傳入一個(gè)值,恢復(fù)執(zhí)行直到下一個(gè) yield

協(xié)程

對(duì)于單核處理器,多進(jìn)程實(shí)現(xiàn)多任務(wù)的原理是讓操作系統(tǒng)給一個(gè)任務(wù)每次分配一定的 CPU 時(shí)間片,然后中斷、讓下一個(gè)任務(wù)執(zhí)行一定的時(shí)間片接著再中斷并繼續(xù)執(zhí)行下一個(gè),如此反復(fù)。由于切換執(zhí)行任務(wù)的速度非常快,給外部用戶的感受就是多個(gè)任務(wù)的執(zhí)行是同時(shí)進(jìn)行的。

多進(jìn)程的調(diào)度是由操作系統(tǒng)來實(shí)現(xiàn)的,進(jìn)程自身不能控制自己何時(shí)被調(diào)度,也就是說:

進(jìn)程的調(diào)度是由外層調(diào)度器搶占式實(shí)現(xiàn)的

而協(xié)程要求當(dāng)前正在運(yùn)行的任務(wù)自動(dòng)把控制權(quán)回傳給調(diào)度器,這樣就可以繼續(xù)運(yùn)行其他任務(wù)。這與『搶占式』的多任務(wù)正好相反, 搶占多任務(wù)的調(diào)度器可以強(qiáng)制中斷正在運(yùn)行的任務(wù), 不管它自己有沒有意愿。『協(xié)作式多任務(wù)』在 Windows 的早期版本 (windows95) 和 Mac OS 中有使用, 不過它們后來都切換到『搶占式多任務(wù)』了。理由相當(dāng)明確:如果僅依靠程序自動(dòng)交出控制的話,那么一些惡意程序?qū)?huì)很容易占用全部 CPU 時(shí)間而不與其他任務(wù)共享。

協(xié)程的調(diào)度是由協(xié)程自身主動(dòng)讓出控制權(quán)到外層調(diào)度器實(shí)現(xiàn)的

回到剛才生成器實(shí)現(xiàn) xrange 函數(shù)的例子,整個(gè)執(zhí)行過程的交替可以用下圖來表示:

image.png

協(xié)程可以理解為純用戶態(tài)的線程,通過協(xié)作而不是搶占來進(jìn)行任務(wù)切換。相對(duì)于進(jìn)程或者線程,協(xié)程所有的操作都可以在用戶態(tài)而非操作系統(tǒng)內(nèi)核態(tài)完成,創(chuàng)建和切換的消耗非常低。

簡單的說 Coroutine(協(xié)程) 就是提供一種方法來中斷當(dāng)前任務(wù)的執(zhí)行,保存當(dāng)前的局部變量,下次再過來又可以恢復(fù)當(dāng)前局部變量繼續(xù)執(zhí)行。

我們可以把大任務(wù)拆分成多個(gè)小任務(wù)輪流執(zhí)行,如果有某個(gè)小任務(wù)在等待系統(tǒng) IO,就跳過它,執(zhí)行下一個(gè)小任務(wù),這樣往復(fù)調(diào)度,實(shí)現(xiàn)了 IO 操作和 CPU 計(jì)算的并行執(zhí)行,總體上就提升了任務(wù)的執(zhí)行效率,這也便是協(xié)程的意義。

PHP 協(xié)程和 yield

PHP 從 5.5 開始支持生成器及 yield 關(guān)鍵字,而 PHP 協(xié)程則由 yield 來實(shí)現(xiàn)。

要理解協(xié)程,首先要理解:代碼是代碼,函數(shù)是函數(shù)。函數(shù)包裹的代碼賦予了這段代碼附加的意義:不管是否顯式的指明返回值,當(dāng)函數(shù)內(nèi)的代碼塊執(zhí)行完后都會(huì)返回到調(diào)用層。而當(dāng)調(diào)用層調(diào)用某個(gè)函數(shù)的時(shí)候,必須等這個(gè)函數(shù)返回,當(dāng)前函數(shù)才能繼續(xù)執(zhí)行,這就構(gòu)成了后進(jìn)先出,也就是 Stack。

而協(xié)程包裹的代碼,不是函數(shù),不完全遵守函數(shù)的附加意義,協(xié)程執(zhí)行到某個(gè)點(diǎn),協(xié)會(huì)協(xié)程會(huì) yield 返回一個(gè)值然后掛起,而不是 return 一個(gè)值然后結(jié)束,當(dāng)再次調(diào)用協(xié)程的時(shí)候,會(huì)在上次 yield 的點(diǎn)繼續(xù)執(zhí)行。

所以協(xié)程違背了通常操作系統(tǒng)和 x86 的 CPU 認(rèn)定的代碼執(zhí)行方式,也就是 Stack 的這種執(zhí)行方式,需要運(yùn)行環(huán)境(比如 php,python 的 yield 和 golang 的 goroutine)自己調(diào)度,來實(shí)現(xiàn)任務(wù)的中斷和恢復(fù),具體到 PHP,就是靠 yield 來實(shí)現(xiàn)。

堆棧式調(diào)用 和 協(xié)程調(diào)用的對(duì)比:

image.png

結(jié)合之前的例子,可以總結(jié)一下 yield 能做的就是:

實(shí)現(xiàn)不同任務(wù)間的主動(dòng)讓位、讓行,把控制權(quán)交回給任務(wù)調(diào)度器。

通過 send() 實(shí)現(xiàn)不同任務(wù)間的雙向通信,也就可以實(shí)現(xiàn)任務(wù)和調(diào)度器之間的通信。

yield 就是 PHP 實(shí)現(xiàn)協(xié)程的方式。

協(xié)程多任務(wù)調(diào)度

首先是一個(gè)任務(wù)類:

Task

class Task

{

// 任務(wù) ID

protected $taskId;

// 協(xié)程對(duì)象

protected $coroutine;

// send() 值

protected $sendVal = null;

// 是否首次 yield

protected $beforeFirstYield = true;

public function __construct($taskId, Generator $coroutine) {

$this->taskId = $taskId;

$this->coroutine = $coroutine;

}

public function getTaskId() {

return $this->taskId;

}

public function setSendValue($sendVal) {

$this->sendVal = $sendVal;

}

public function run() {

// 如之前提到的在send之前, 當(dāng)?shù)鞅粍?chuàng)建后第一次 yield 之前,一個(gè) renwind() 方法會(huì)被隱式調(diào)用

// 所以實(shí)際上發(fā)生的應(yīng)該類似:

// $this->coroutine->rewind();

// $this->coroutine->send();

// 這樣 renwind 的執(zhí)行將會(huì)導(dǎo)致第一個(gè) yield 被執(zhí)行, 并且忽略了他的返回值.

// 真正當(dāng)我們調(diào)用 yield 的時(shí)候, 我們得到的是第二個(gè)yield的值,導(dǎo)致第一個(gè)yield的值被忽略。

// 所以這個(gè)加上一個(gè)是否第一次 yield 的判斷來避免這個(gè)問題

if ($this->beforeFirstYield) {

$this->beforeFirstYield = false;

return $this->coroutine->current();

} else {

$retval = $this->coroutine->send($this->sendVal);

$this->sendVal = null;

return $retval;

}

}

public function isFinished() {

return !$this->coroutine->valid();

}

}

接下來是調(diào)度器,比 foreach 是要復(fù)雜一點(diǎn),但好歹也能算個(gè)正兒八經(jīng)的 Scheduler :)

Scheduler

class Scheduler

{

protected $maxTaskId = 0;

protected $taskMap = []; // taskId => task

protected $taskQueue;

public function __construct() {

$this->taskQueue = new SplQueue();

}

// (使用下一個(gè)空閑的任務(wù)id)創(chuàng)建一個(gè)新任務(wù),然后把這個(gè)任務(wù)放入任務(wù)map數(shù)組里. 接著它通過把任務(wù)放入任務(wù)隊(duì)列里來實(shí)現(xiàn)對(duì)任務(wù)的調(diào)度. 接著run()方法掃描任務(wù)隊(duì)列, 運(yùn)行任務(wù).如果一個(gè)任務(wù)結(jié)束了, 那么它將從隊(duì)列里刪除, 否則它將在隊(duì)列的末尾再次被調(diào)度。

public function newTask(Generator $coroutine) {

$tid = ++$this->maxTaskId;

$task = new Task($tid, $coroutine);

$this->taskMap[$tid] = $task;

$this->schedule($task);

return $tid;

}

public function schedule(Task $task) {

// 任務(wù)入隊(duì)

$this->queue->enqueue($task);

}

public function run() {

while (!$this->queue->isEmpty()) {

// 任務(wù)出隊(duì)

$task = $this->queue->dequeue();

$task->run();

if ($task->isFinished()) {

unset($this->taskMap[$task->getTaskId()]);

} else {

$this->schedule($task);

}

}

}

}

隊(duì)列可以使每個(gè)任務(wù)獲得同等的 CPU 使用時(shí)間,

Demo

function task1() {

for ($i = 1; $i <= 10; ++$i) {

echo "This is task 1 iteration $i.\n";

yield;

}

}

function task2() {

for ($i = 1; $i <= 5; ++$i) {

echo "This is task 2 iteration $i.\n";

yield;

}

}

$scheduler = new Scheduler;

$scheduler->newTask(task1());

$scheduler->newTask(task2());

$scheduler->run();

輸出:

This is task 1 iteration 1.

This is task 2 iteration 1.

This is task 1 iteration 2.

This is task 2 iteration 2.

This is task 1 iteration 3.

This is task 2 iteration 3.

This is task 1 iteration 4.

This is task 2 iteration 4.

This is task 1 iteration 5.

This is task 2 iteration 5.

This is task 1 iteration 6.

This is task 1 iteration 7.

This is task 1 iteration 8.

This is task 1 iteration 9.

This is task 1 iteration 10.

結(jié)果正是我們期待的,最初的 5 次迭代,兩個(gè)任務(wù)是交替進(jìn)行的,而在第二個(gè)任務(wù)結(jié)束后,只有第一個(gè)任務(wù)繼續(xù)執(zhí)行到結(jié)束。

協(xié)程非阻塞 IO

若想真正的發(fā)揮出協(xié)程的作用,那一定是在一些涉及到阻塞 IO 的場(chǎng)景,我們都知道 Web 服務(wù)器最耗時(shí)的部分通常都是 socket 讀取數(shù)據(jù)等操作上,如果進(jìn)程對(duì)每個(gè)請(qǐng)求都掛起的等待 IO 操作,那處理效率就太低了,接下來我們看個(gè)支持非阻塞 IO 的 Scheduler:

class Scheduler

{

protected $maxTaskId = 0;

protected $tasks = []; // taskId => task

protected $queue;

// resourceID => [socket, tasks]

protected $waitingForRead = [];

protected $waitingForWrite = [];

public function __construct() {

// SPL 隊(duì)列

$this->queue = new SplQueue();

}

public function newTask(Generator $coroutine) {

$tid = ++$this->maxTaskId;

$task = new Task($tid, $coroutine);

$this->tasks[$tid] = $task;

$this->schedule($task);

return $tid;

}

public function schedule(Task $task) {

// 任務(wù)入隊(duì)

$this->queue->enqueue($task);

}

public function run() {

while (!$this->queue->isEmpty()) {

// 任務(wù)出隊(duì)

$task = $this->queue->dequeue();

$task->run();

if ($task->isFinished()) {

unset($this->tasks[$task->getTaskId()]);

} else {

$this->schedule($task);

}

}

}

public function waitForRead($socket, Task $task)

{

if (isset($this->waitingForRead[(int)$socket])) {

$this->waitingForRead[(int)$socket][1][] = $task;

} else {

$this->waitingForRead[(int)$socket] = [$socket, [$task]];

}

}

public function waitForWrite($socket, Task $task)

{

if (isset($this->waitingForWrite[(int)$socket])) {

$this->waitingForWrite[(int)$socket][1][] = $task;

} else {

$this->waitingForWrite[(int)$socket] = [$socket, [$task]];

}

}

/**

* @param $timeout 0 represent

*/

protected function ioPoll($timeout)

{

$rSocks = [];

foreach ($this->waitingForRead as list($socket)) {

$rSocks[] = $socket;

}

$wSocks = [];

foreach ($this->waitingForWrite as list($socket)) {

$wSocks[] = $socket;

}

$eSocks = [];

// $timeout 為 0 時(shí), stream_select 為立即返回,為 null 時(shí)則會(huì)阻塞的等,見 http://php.net/manual/zh/function.stream-select.php

if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) {

return;

}

foreach ($rSocks as $socket) {

list(, $tasks) = $this->waitingForRead[(int)$socket];

unset($this->waitingForRead[(int)$socket]);

foreach ($tasks as $task) {

$this->schedule($task);

}

}

foreach ($wSocks as $socket) {

list(, $tasks) = $this->waitingForWrite[(int)$socket];

unset($this->waitingForWrite[(int)$socket]);

foreach ($tasks as $task) {

$this->schedule($task);

}

}

}

/**

* 檢查隊(duì)列是否為空,若為空則掛起的執(zhí)行 stream_select,否則檢查完 IO 狀態(tài)立即返回,詳見 ioPoll()

* 作為任務(wù)加入隊(duì)列后,由于 while true,會(huì)被一直重復(fù)的加入任務(wù)隊(duì)列,實(shí)現(xiàn)每次任務(wù)前檢查 IO 狀態(tài)

* @return Generator object for newTask

*

*/

protected function ioPollTask()

{

while (true) {

if ($this->taskQueue->isEmpty()) {

$this->ioPoll(null);

} else {

$this->ioPoll(0);

}

yield;

}

}

/**

* $scheduler = new Scheduler;

* $scheduler->newTask(Web Server Generator);

* $scheduler->withIoPoll()->run();

*

* 新建 Web Server 任務(wù)后先執(zhí)行 withIoPoll() 將 ioPollTask() 作為任務(wù)入隊(duì)

*

* @return $this

*/

public function withIoPoll()

{

$this->newTask($this->ioPollTask());

return $this;

}

}

這個(gè)版本的 Scheduler 里加入一個(gè)永不退出的任務(wù),并且通過 stream_select 支持的特性來實(shí)現(xiàn)快速的來回檢查各個(gè)任務(wù)的 IO 狀態(tài),只有 IO 完成的任務(wù)才會(huì)繼續(xù)執(zhí)行,而 IO 還未完成的任務(wù)則會(huì)跳過,完整的代碼和例子可以戳這里。

也就是說任務(wù)交替執(zhí)行的過程中,一旦遇到需要 IO 的部分,調(diào)度器就會(huì)把 CPU 時(shí)間分配給不需要 IO 的任務(wù),等到當(dāng)前任務(wù)遇到 IO 或者之前的任務(wù) IO 結(jié)束才再次調(diào)度 CPU 時(shí)間,以此實(shí)現(xiàn) CPU 和 IO 并行來提升執(zhí)行效率,類似下圖:

image.png

單任務(wù)改造

如果想將一個(gè)單進(jìn)程任務(wù)改造成并發(fā)執(zhí)行,我們可以選擇改造成多進(jìn)程或者協(xié)程:

多進(jìn)程,不改變?nèi)蝿?wù)執(zhí)行的整體過程,在一個(gè)時(shí)間段內(nèi)同時(shí)執(zhí)行多個(gè)相同的代碼段,調(diào)度權(quán)在 CPU,如果一個(gè)任務(wù)能獨(dú)占一個(gè) CPU 則可以實(shí)現(xiàn)并行。

協(xié)程,把原有任務(wù)拆分成多個(gè)小任務(wù),原有任務(wù)的執(zhí)行流程被改變,調(diào)度權(quán)在進(jìn)程自己,如果有 IO 并且可以實(shí)現(xiàn)異步,則可以實(shí)現(xiàn)并行。

多進(jìn)程改造

image.png

協(xié)程改造

image.png

協(xié)程(Coroutines)和 Go 協(xié)程(Goroutines)

PHP 的協(xié)程或者其他語言中,比如 Python、Lua 等都有協(xié)程的概念,和 Go 協(xié)程有些相似,不過有兩點(diǎn)不同:

Go 協(xié)程意味著并行(或者可以以并行的方式部署,可以用 runtime.GOMAXPROCS() 指定可同時(shí)使用的 CPU 個(gè)數(shù)),協(xié)程一般來說只是并發(fā)。

Go 協(xié)程通過通道 channel 來通信;協(xié)程通過 yield 讓出和恢復(fù)操作來通信。

Go 協(xié)程比普通協(xié)程更強(qiáng)大,也很容易從協(xié)程的邏輯復(fù)用到 Go 協(xié)程,而且在 Go 的開發(fā)中也使用的極為普遍,有興趣的話可以了解一下作為對(duì)比。

結(jié)束

個(gè)人感覺 PHP 的協(xié)程在實(shí)際使用中想要徒手實(shí)現(xiàn)和應(yīng)用并不方便而且場(chǎng)景有限,但了解其概念及實(shí)現(xiàn)原理對(duì)更好的理解并發(fā)不無裨益。

如果想更多的了解協(xié)程的實(shí)際應(yīng)用場(chǎng)景不妨試試已經(jīng)大名鼎鼎的 Swoole,其對(duì)多種協(xié)議的 client 做了底層的協(xié)程封裝,幾乎可以做到以同步編程的寫法實(shí)現(xiàn)協(xié)程異步 IO 的效果。

參考

關(guān)注 NewtonIO - 創(chuàng)造者們的技術(shù)與工具

image.png

總結(jié)

以上是生活随笔為你收集整理的coroutine php_PHP 协程实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。