日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入tornado中的IOStream

發(fā)布時(shí)間:2025/5/22 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入tornado中的IOStream 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

IOStream對(duì)tornado的高效起了很大的作用,他封裝了socket的非阻塞IO的讀寫操作。大體上可以這么說,當(dāng)連接建立后,服務(wù)端與客戶端的請(qǐng)求響應(yīng)的讀寫都是基于IOStream的,也就是說:IOStream是用來處理對(duì)連接的讀寫,當(dāng)然IOStream是異步的讀寫而且可以有很多花樣的讀寫。

接下來說一下有關(guān)接收請(qǐng)求的大體流程:

  當(dāng)連接建立,服務(wù)器端會(huì)產(chǎn)生一個(gè)對(duì)應(yīng)該連接的socket,同時(shí)將該socket封裝至IOStream實(shí)例中(這代表著IOStream的初始化)。

  我們知道tornado是基于IO多路復(fù)用的(就拿epoll來說),此時(shí)將socket進(jìn)行register,事件為READABLE,這一步與IOStream沒有多大關(guān)系。?

  當(dāng)該socket事件發(fā)生時(shí),也就是意味著有數(shù)據(jù)從連接發(fā)送到了系統(tǒng)緩沖區(qū)中,這時(shí)就需要將chunk讀入到我們?cè)趦?nèi)存中為其開辟的_read_buffer中,在IOStream中使用deque作為buffer。_read_buffer表示讀緩沖,當(dāng)然也有_write_buffer,并且在讀的過程中也會(huì)檢測(cè)總尺寸是否大于我們?cè)O(shè)定的最大緩沖尺寸。不管是讀緩沖還是寫緩沖本質(zhì)上就是tornado進(jìn)程開辟的一段用來存儲(chǔ)數(shù)據(jù)的內(nèi)存。

  而這些chunk一般都是客戶端發(fā)送的請(qǐng)求了,但是我們還需要對(duì)這些chunk作進(jìn)一步操作,比如這個(gè)chunk中可能包含了多個(gè)請(qǐng)求,如何把請(qǐng)求分離?(每個(gè)請(qǐng)求首部的結(jié)束符是b'\r\n\r\n'),這里就用到read_until來分離請(qǐng)求并設(shè)置callback了。同時(shí)會(huì)將被分離的請(qǐng)求數(shù)據(jù)從_read_buffer中移除。

  然后就是將callback以及他的參數(shù)(被分離的請(qǐng)求數(shù)據(jù))添加至IOLoop._callbacks中,等待下一次IOLoop的執(zhí)行,屆時(shí)會(huì)迭代_callbacks并執(zhí)行回調(diào)函數(shù)。 

  補(bǔ)充: tornado是水平觸發(fā),所以假如讀完一次chunk后系統(tǒng)緩存區(qū)中依然還有數(shù)據(jù),那么下一次的epoll.poll()依然會(huì)返回該socket。

?

在iostream中有一個(gè)類叫做:IOStream ?

有幾個(gè)較為重要的屬性:

def __init__():self.socket = socket # 封裝socket self.socket.setblocking(False) # 設(shè)置socket為非阻塞self.io_loop = io_loop or ioloop.IOLoop.current() self._read_buffer = deque() # 讀緩沖self._write_buffer = deque() # 寫緩沖 self._read_callback = None # 讀到指定字節(jié)數(shù)據(jù)時(shí),或是指定標(biāo)志字符串時(shí),需要執(zhí)行的回調(diào)函數(shù)self._write_callback = None # 發(fā)送完_write_buffer的數(shù)據(jù)時(shí),需要執(zhí)行的回調(diào)函數(shù)

有幾個(gè)較為重要的方法

class IOStream(object):def read_until(self, delimiter, callback): def read_bytes(self, num_bytes, callback, streaming_callback=None): def read_until_regex(self, regex, callback): def read_until_close(self, callback, streaming_callback=None): def write(self, data, callback=None):

以上所有的方法都需要一個(gè)可選的callback參數(shù),如果該參數(shù)為None則該方法會(huì)返回一個(gè)Future對(duì)象。

以上所有的讀方法本質(zhì)上都是讀取該socket所發(fā)送來的數(shù)據(jù),然后當(dāng)讀到指定分隔符或者標(biāo)記或者條件觸發(fā)的時(shí)候,停止讀,然后將該分隔符以及其前面的數(shù)據(jù)作為callback(如果沒有callback,則將數(shù)據(jù)設(shè)置為Future對(duì)象的result)的參數(shù),然后將callback添加至IOLoop._callbacks中。當(dāng)然其中所有的"讀"操作是非阻塞的!

  像read_until ?read_until_regex 這兩個(gè)方法相差不大,原理都是差不多的,都是在buffer中找指定的字符或者字符樣式。

  而read_bytes則是設(shè)置讀取字節(jié)數(shù),達(dá)到這些字節(jié)就會(huì)觸發(fā)并運(yùn)行回調(diào)函數(shù)(當(dāng)然這些回調(diào)函數(shù)不是立刻運(yùn)行,而是被送到ioloop中的_callbacks中),該方法主要是用來讀取包含content-length或者分塊傳輸編碼的具有主體信息的請(qǐng)求或者響應(yīng)。

  而read_until_close則是主要被用在非持久連接上,因?yàn)榉浅志眠B接響應(yīng)的結(jié)束標(biāo)志就是連接關(guān)閉。

read_bytes和read_until_close這兩個(gè)方法都有streaming_callback這個(gè)參數(shù),假如指定了該參數(shù),那么只要read_buffer中有數(shù)據(jù),則將數(shù)據(jù)作為參數(shù)調(diào)用該函數(shù)


就拿比較常見的read_until方法來說,下面是代碼簡(jiǎn)化版:

def read_until(self, delimiter, callback=None, max_bytes=None):future = self._set_read_callback(callback)    # 可能是Future對(duì)象,也可能是Noneself._read_delimiter = delimiter          # 設(shè)置分隔符self._read_max_bytes = max_bytes          # 設(shè)置最大讀字節(jié)數(shù)self._try_inline_read()return future

其中_set_read_callback會(huì)根據(jù)callback是否存在返回None或者Future對(duì)象(存在返回None,否則返回一個(gè)Future實(shí)例對(duì)象)

如果我們
再來看_try_inline_read方法的簡(jiǎn)化版:

def _try_inline_read(self):"""嘗試從_read_buffer中讀取所需數(shù)據(jù)"""# 查看是否我們已經(jīng)在之前的讀操作中得到了數(shù)據(jù)self._run_streaming_callback() # 檢查字符流回調(diào),如果調(diào)用read_bytes和read_until_close并指定了streaming_callback參數(shù)就會(huì)造成這個(gè)回調(diào)pos = self._find_read_pos() # 嘗試在_read_buffer中找到分隔符的位置。找到則返回分隔符末尾所處的位置,如果不能,則返回None。if pos is not None:self._read_from_buffer(pos)return
self._check_closed() # 檢查當(dāng)前IOStream是否關(guān)閉pos = self._read_to_buffer_loop() # 從系統(tǒng)緩沖中讀取一個(gè)chunk,檢查是否含有分隔符,沒有則繼續(xù)讀取一個(gè)chunk,合并兩個(gè)chunk,再次檢查是否函數(shù)分隔符…… 如果找到了分隔符,會(huì)返回分隔符末尾在_read_buffer中所處的位置if pos is not None: # 如果找到了分隔符,self._read_from_buffer(pos) # 將所需的數(shù)據(jù)從_read_buffer中移除,并將其作為callback的參數(shù),然后將callback封裝后添加至IOLoop._callbacks中 return
# 沒找到分隔符,要么關(guān)閉IOStream,要么為該socket在IOLoop中注冊(cè)事件if self.closed(): self._maybe_run_close_callback()else:self._add_io_state(ioloop.IOLoop.READ)

上面的代碼被我用空行分為了三部分,每一部分順序的對(duì)應(yīng)下面每一句話

分析該方法:

  1 首先在_read_buffer第一項(xiàng)中找分隔符,找到了就將分隔符以及其前的數(shù)據(jù)從_read_buffer中移除并將其作為參數(shù)傳入回調(diào)函數(shù),沒找到就將第二項(xiàng)與第一項(xiàng)合并然后繼續(xù)找……;

  2 如果在_read_buffer所有項(xiàng)中都沒找到的話就把系統(tǒng)緩存中的數(shù)據(jù)讀取至_read_buffer,然后合并再次查找,

  3 如果把系統(tǒng)緩存中的數(shù)據(jù)都取完了都還沒找到,那么就等待下一次該socket發(fā)生READ事件后再找,這時(shí)的找則就是:將系統(tǒng)緩存中的數(shù)據(jù)讀取到_read_buffer中然后找,也就是執(zhí)行第2步。

?來看一看這三部分分別調(diào)用了什么方法:

第一部分中的_find_read_pos以及_read_from_buffer

前者主要是在_read_buffer中查找分隔符,并返回分隔符的位置,后者則是將分隔符以及分隔符前面的所有數(shù)據(jù)從_read_buffer中取出并將其作為callback的參數(shù),然后將callback封裝后添加至IOLoop._callbacks中

來看_find_read_pos方法的簡(jiǎn)化版:

def _find_read_pos(self): # 嘗試在_read_buffer中尋找分隔符。找到則返回分隔符末尾所處的位置,如果不能,則返回None。if self._read_delimiter is not None:if self._read_buffer: # 查看_read_buffer中是否有之前未處理的數(shù)據(jù)while True:loc = self._read_buffer[0].find(self._read_delimiter) # 查找分隔符所出現(xiàn)的首部位置if loc != -1: # 在_read_buffer的首項(xiàng)中找到了delimiter_len = len(self._read_delimiter)self._check_max_bytes(self._read_delimiter, loc + delimiter_len)return loc + delimiter_len # 分隔符末尾的位置if len(self._read_buffer) == 1:break_double_prefix(self._read_buffer)self._check_max_bytes(self._read_delimiter, len(self._read_buffer[0]))return None _find_read_pos def _read_from_buffer(self, pos): # 將所需的數(shù)據(jù)從_read_buffer中移除,并將其作為callback的參數(shù),然后將callback封裝后添加至IOLoop._callbacks中 self._read_bytes = self._read_delimiter = self._read_regex = Noneself._read_partial = Falseself._run_read_callback(pos, False) _read_from_buffer 來看_run_read_callback源碼簡(jiǎn)化版:def _run_read_callback(self, size, streaming):if streaming:callback = self._streaming_callbackelse:callback = self._read_callbackself._read_callback = self._streaming_callback = Noneif self._read_future is not None: # 這里將_read_future進(jìn)行set_resultassert callback is Nonefuture = self._read_futureself._read_future = Nonefuture.set_result(self._consume(size))if callback is not None:assert (self._read_future is None) or streamingself._run_callback(callback, self._consume(size)) # 將后者作為前者的參數(shù),然后將前者進(jìn)行封裝后添加至IOLoop._callbacks中 來看_consume的源碼:def _consume(self, loc): # 將self._read_buffer 的首項(xiàng)改為 原首項(xiàng)[loc:] ,然后返回 原首項(xiàng)[:loc]if loc == 0:return b""_merge_prefix(self._read_buffer, loc) # 將雙端隊(duì)列(deque)的首項(xiàng)調(diào)整為指定大小。self._read_buffer_size -= locreturn self._read_buffer.popleft()來看_run_callback源碼簡(jiǎn)化版:def _run_callback(self, callback, *args):# 將callback封裝后添加至ioloop._callbacks中def wrapper():self._pending_callbacks -= 1try:return callback(*args)finally:self._maybe_add_error_listener()with stack_context.NullContext():self._pending_callbacks += 1self.io_loop.add_callback(wrapper) # 將callback添加至IOLoop._callbacks中 _run_read_callback

這里面還用到一個(gè)很有意思的函數(shù):_merge_prefix ,這個(gè)函數(shù)的作用就是將deque的首項(xiàng)調(diào)整為指定大小

def _merge_prefix(deque, size):"""Replace the first entries in a deque of strings with a singlestring of up to size bytes.>>> d = collections.deque(['abc', 'de', 'fghi', 'j'])>>> _merge_prefix(d, 5); print(d)deque(['abcde', 'fghi', 'j'])Strings will be split as necessary to reach the desired size.>>> _merge_prefix(d, 7); print(d)deque(['abcdefg', 'hi', 'j'])>>> _merge_prefix(d, 3); print(d)deque(['abc', 'defg', 'hi', 'j'])>>> _merge_prefix(d, 100); print(d)deque(['abcdefghij'])"""if len(deque) == 1 and len(deque[0]) <= size:returnprefix = []remaining = sizewhile deque and remaining > 0:chunk = deque.popleft()if len(chunk) > remaining:deque.appendleft(chunk[remaining:])chunk = chunk[:remaining]prefix.append(chunk)remaining -= len(chunk)if prefix:deque.appendleft(type(prefix[0])().join(prefix))if not deque:deque.appendleft(b"") _merge_prefix

第二部分的_read_to_buffer_loop

來看_read_to_buffer_loop簡(jiǎn)化版:系統(tǒng)緩沖中的data可能十分長(zhǎng),為了查找指定的字符,我們應(yīng)該先讀一個(gè)chunk,檢查其中是否有指定的字符,若有則返回分隔符末尾所處的位置若沒有則繼續(xù)讀第二個(gè)chunk,然后將這兩個(gè)chunk合并(多字節(jié)分隔符(例如“\ r \ n”)可能跨讀取緩沖區(qū)中的兩個(gè)塊),重復(fù)查找過程def _read_to_buffer_loop(self):try:next_find_pos = 0self._pending_callbacks += 1while not self.closed():if self._read_to_buffer() == 0: # 從系統(tǒng)緩沖中讀一個(gè)chunk并將其添加至_read_buffer中,然后返回chunk的大小,如果無數(shù)據(jù)則返回0breakself._run_streaming_callback() if self._read_buffer_size >= next_find_pos: # _read_buffer_size 表示_read_buffer的大小pos = self._find_read_pos() # 嘗試在_read_buffer中尋找分隔符。找到則返回分隔符末尾所處的位置,如果不能,則返回None。 if pos is not None:return posnext_find_pos = self._read_buffer_size * 2return self._find_read_pos()finally:self._pending_callbacks -= 1 _read_to_buffer_loop

第三部分_add_io_state,該函數(shù)和ioloop異步相關(guān)

def _add_io_state(self, state):if self.closed(): # 連接已經(jīng)關(guān)閉returnif self._state is None:self._state = ioloop.IOLoop.ERROR | state with stack_context.NullContext():self.io_loop.add_handler(self.fileno(), self._handle_events, self._state) # 為對(duì)應(yīng)socket的文件描述符添加事件及其處理函數(shù),elif not self._state & state:self._state = self._state | stateself.io_loop.update_handler(self.fileno(), self._state)# self._handle_events 是根據(jù)events選擇對(duì)應(yīng)的處理函數(shù),在這里我們假設(shè)處理函數(shù)是_handle_readdef _handle_read(self):try:pos = self._read_to_buffer_loop()except UnsatisfiableReadError:raiseexcept Exception as e:gen_log.warning("error on read: %s" % e)self.close(exc_info=True)returnif pos is not None:self._read_from_buffer(pos)returnelse:self._maybe_run_close_callback() _add_io_state

?

參考:

  http://www.nowamagic.net/academy/detail/13321051

轉(zhuǎn)載于:https://www.cnblogs.com/MnCu8261/p/6703778.html

總結(jié)

以上是生活随笔為你收集整理的深入tornado中的IOStream的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。