nodejs-stream部分
參考:
https://blog.csdn.net/eeewwwddd/article/details/81042225
http://nodejs.cn/api/stream.html#stream_writable_write_chunk_encoding_callback
?
流(stream)是 Node.js 中處理流式數據的抽象接口。?stream?模塊提供了一些 API,用于構建實現了流接口的對象。
Node.js 提供了多種流對象。 例如,HTTP 服務器的請求和?process.stdout?都是流的實例。
流可以是可讀的、可寫的、或者可讀可寫的。 所有的流都是?EventEmitter?的實例,即可以通過事件的監聽得以觸發事件并執行一定的操作,如:
stream?模塊可以通過以下方式使用:
const stream = require('stream');盡管理解流的工作方式很重要,但是?stream?模塊本身主要用于開發者創建新類型的流實例。
對于以消費流對象為主的開發者,極少需要直接使用?stream?模塊。
?
Node.js 中有四種基本的流類型:
Writable - 可寫入數據的流(例如 fs.createWriteStream())。Readable - 可讀取數據的流(例如 fs.createReadStream())。Duplex - 可讀又可寫的流(例如 net.Socket)。Transform - 在讀寫過程中可以修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())?
兩種模式
?二進制模式
每個分塊都是buffer、string對象
對象模式
Node.js 創建的流都是運作在字符串和?Buffer(或?Uint8Array)上。 當然,流的實現也可以使用其它類型的 JavaScript 值(除了?null)。 這些流會以“對象模式”進行操作。
當創建流時,可以使用?objectMode?選項把流實例切換到對象模式。 將已存在的流切換到對象模式是不安全的。
?
?比如如果想創建一個的可以壓入任意形式數據的可讀流,只要在創建流的時候設置參數objectMode為true即可,例如:Readable({ objectMode: true })
如果readable stream寫入的是字符串,那么字符串會默認轉換為Buffer,如果在創建流的時候設置Writable({ decodeStrings: false })參數,那么不會做轉換。
如果readable stream寫入的數據是對象,那么需要這樣創建writable stream,Writable({ objectMode: true })
??就是如果輸入的數據并不是Buffer(或?Uint8Array)格式的時候,那么在創建這個流的時候就要將其設置為對象模式,即設置其的objectMode: true,舉例:
const DuplexStream = require('readable-stream').Duplex const inherits = require('util').inheritsmodule.exports = PostMessageStreaminherits(PostMessageStream, DuplexStream)function PostMessageStream (opts) {DuplexStream.call(this, { objectMode: true, }) ... }?
緩沖
可寫流和可讀流都會在內部的緩沖器中存儲數據,可以分別使用的?writable.writableBuffer?或?readable.readableBuffer?來獲取。
可緩沖的數據大小取決于傳入流構造函數的?highWaterMark?選項。 對于普通的流,highWaterMark?指定了字節的總數。 對于對象模式的流,highWaterMark?指定了對象的總數。
當調用?stream.push(chunk)?時,數據會被緩沖在可讀流中。 如果流的消費者沒有調用?stream.read(),則數據會保留在內部隊列中直到被消費。
一旦內部的可讀緩沖的總大小達到?highWaterMark?指定的閾值時,流會暫時停止從底層資源讀取數據,直到當前緩沖的數據被消費 (也就是說,流會停止調用內部的用于填充可讀緩沖的?readable._read())。
當調用?writable.write(chunk)?時,數據會被緩沖在可寫流中。 當內部的可寫緩沖的總大小小于?highWaterMark?設置的閾值時,調用?writable.write()?會返回?true。 一旦內部緩沖的大小達到或超過?highWaterMark?時,則會返回?false。
stream?API 的主要目標,特別是?stream.pipe(),是為了限制數據的緩沖到可接受的程度,也就是讀寫速度不一致的源頭與目的地不會壓垮內存。
因為?Duplex?和?Transform?都是可讀又可寫的,所以它們各自維護著兩個相互獨立的內部緩沖器用于讀取和寫入, 這使得它們在維護數據流時,讀取和寫入兩邊可以各自獨立地運作。 例如,net.Socket?實例是?Duplex?流,它的可讀端可以消費從 socket 接收的數據,而可寫端則可以將數據寫入到 socket。 因為數據寫入到 socket 的速度可能比接收數據的速度快或者慢,所以在讀寫兩端獨立地進行操作(或緩沖)就顯得很重要了。
?
【1】用于消費流的 API(即讀取流中數據)
test.js
const http = require('http');const server = http.createServer((req, res) => {// req 是一個 http.IncomingMessage 實例,它是可讀流。// res 是一個 http.ServerResponse 實例,它是可寫流。 let body = '';// 接收數據為 utf8 字符串,// 如果沒有設置字符編碼,則會接收到 Buffer 對象。req.setEncoding('utf8');// 如果添加了監聽器,則可讀流會觸發 'data' 事件。req.on('data', (chunk) => {body += chunk;});// 'end' 事件表明整個請求體已被接收。 req.on('end', () => {try {const data = JSON.parse(body);// 響應信息給用戶。res.write(typeof data);res.end();//end()表示寫結束} catch (er) {// json 解析失敗。res.statusCode = 400;return res.end(`錯誤: ${er.message}`);}}); });server.listen(1337);然后在終端使用node test.js運行該服務器
然后在另一個終端使用curl localhost:1337 -d "{}" 連接服務器localhost:1337 ,-d即post數據data為{} ,返回object
curl localhost:1337 -d "{}" 返回object curl localhost:1337 -d "\"foo\"" 返回string curl localhost:1337 -d "not json" 返回 錯誤: Unexpected token o in JSON at position 1?
可寫流(比如例子中的?res)會暴露了一些方法,比如?write()?和?end()?用于寫入數據到流。
當數據可以從流讀取時,可讀流會使用?EventEmitter?API 來通知應用程序。 從流讀取數據的方式有很多種。
可寫流和可讀流都通過多種方式使用?EventEmitter?API 來通訊流的當前狀態。
Duplex?流和?Transform?流都是可寫又可讀的。
對于只需寫入數據到流或從流消費數據的應用程序,并不需要直接實現流的接口,通常也不需要調用?require('stream')
?
《1》可寫流
可寫流是對數據要被寫入的目的地的一種抽象。
可寫流的例子包括:
- 客戶端的 HTTP 請求
- 服務器的 HTTP 響應
- fs 的寫入流
- zlib 流
- crypto 流
- TCP socket
- 子進程 stdin
- process.stdout、process.stderr
上面的一些例子事實上是實現了可寫流接口的?Duplex?流。
所有可寫流都實現了?stream.Writable?類定義的接口。
盡管可寫流的具體實例可能略有差別,但所有的可寫流都遵循同一基本的使用模式,如以下例子所示:
const myStream = getWritableStreamSomehow(); myStream.write('一些數據'); myStream.write('更多數據'); myStream.end('完成寫入數據');//說明完成寫入?
stream.Writable 類
下面介紹幾類事件:
'close' 事件
當流或其底層資源(比如文件描述符)被關閉時觸發。 表明不會再觸發其他事件,也不會再發生操作。
不是所有可寫流都會觸發?'close'?事件。
'drain' 事件
如果調用?stream.write(chunk)?返回?false,可能緩沖區已滿,需要等待,則當有空間可以繼續寫入數據到流時會觸發?'drain'?事件。
// 向可寫流中寫入數據一百萬次。 // 留意背壓(back-pressure)。 function writeOneMillionTimes(writer, data, encoding, callback) {let i = 1000000;write();function write() {let ok = true;do {i--;if (i === 0) {// 最后一次寫入。 writer.write(data, encoding, callback);} else {// 檢查是否可以繼續寫入。 // 不要傳入回調,因為寫入還沒有結束。ok = writer.write(data, encoding);}} while (i > 0 && ok);if (i > 0) {// 被提前中止。// 當觸發 'drain' 事件時繼續寫入,繼續運行write()函數。writer.once('drain', write);}} }'error' 事件
當寫入數據發生錯誤時觸發。
當觸發?'error'?事件時,流還未被關閉
'finish' 事件
調用?stream.end()?且緩沖數據都已傳給底層系統之后觸發。
const http = require('http');const server = http.createServer((req, res) => {// req 是一個 http.IncomingMessage 實例,它是可讀流。// res 是一個 http.ServerResponse 實例,它是可寫流。 let body = '';// 接收數據為 utf8 字符串,// 如果沒有設置字符編碼,則會接收到 Buffer 對象。req.setEncoding('utf8');// 如果添加了監聽器,則可讀流會觸發 'data' 事件。req.on('data', (chunk) => {body += chunk;});// 'end' 事件表明整個請求體已被接收。 req.on('end', () => {try {const data = JSON.parse(body);// 響應信息給用戶。res.write(typeof data);res.end();//會觸發finish事件 res.on('finish', () => {console.error('寫入已完成');});} catch (er) {// json 解析失敗。res.statusCode = 400;return res.end(`錯誤: ${er.message}`);}}); });server.listen(1337);運行結果:
'pipe' 事件
- src?<stream.Readable>?通過管道流入到可寫流的來源流。
當在可讀流上調用?stream.pipe()?時觸發。
var assert = require('assert'); const writer = process.stdout; const reader = process.stdin; writer.on('pipe', (src) => {console.error('有數據正通過管道流入寫入器');assert.equal(src,reader);//兩者相等console.log(src); }); reader.pipe(writer);返回:
有數據正通過管道流入寫入器 ReadStream {connecting: false,_hadError: false,_handle:TTY { owner: [Circular], onread: [Function: onread], reading: false },_parent: null,_host: null,_readableState:ReadableState {objectMode: false,//非對象模式highWaterMark: 0,buffer: BufferList { length: 0 },length: 0,pipes:WriteStream {connecting: false,_hadError: false,_handle: [TTY],_parent: null,_host: null,_readableState: [ReadableState],readable: false,_events: [Object],_eventsCount: 7,_maxListeners: undefined,_writableState: [WritableState],writable: true,allowHalfOpen: false,_sockname: null,_writev: null,_pendingData: null,_pendingEncoding: '',server: null,_server: null,columns: 80,rows: 24,_type: 'tty',fd: 1,_isStdio: true,destroySoon: [Function: destroy],_destroy: [Function],[Symbol(asyncId)]: 2,[Symbol(lastWriteQueueSize)]: 0,[Symbol(timeout)]: null,[Symbol(kBytesRead)]: 0,[Symbol(kBytesWritten)]: 0 },pipesCount: 1,flowing: true,ended: false,endEmitted: false,reading: false,sync: false,needReadable: true,emittedReadable: false,readableListening: false,resumeScheduled: true,emitClose: false,destroyed: false,defaultEncoding: 'utf8',awaitDrain: 0,readingMore: false,decoder: null,encoding: null },readable: true,_events:{ end: [ [Function: onReadableStreamEnd], [Function] ],pause: [Function],data: [Function: ondata] },_eventsCount: 3,_maxListeners: undefined,_writableState:WritableState {objectMode: false,highWaterMark: 0,finalCalled: false,needDrain: false,ending: false,ended: false,finished: false,destroyed: false,decodeStrings: false,defaultEncoding: 'utf8',length: 0,writing: false,corked: 0,sync: true,bufferProcessing: false,onwrite: [Function: bound onwrite],writecb: null,writelen: 0,bufferedRequest: null,lastBufferedRequest: null,pendingcb: 0,prefinished: false,errorEmitted: false,emitClose: false,bufferedRequestCount: 0,corkedRequestsFree:{ next: null,entry: null,finish: [Function: bound onCorkedFinish] } },writable: false,allowHalfOpen: false,_sockname: null,_writev: null,_pendingData: null,_pendingEncoding: '',server: null,_server: null,isRaw: false,isTTY: true,fd: 0,[Symbol(asyncId)]: 5,[Symbol(lastWriteQueueSize)]: 0,[Symbol(timeout)]: null,[Symbol(kBytesRead)]: 0,[Symbol(kBytesWritten)]: 0 } View Code?
'unpipe' 事件
- src?<stream.Readable>?被移除可寫流管道的來源流。
當在可讀流上調用?stream.unpipe()?時觸發。
當可讀流通過管道流向可寫流發生錯誤時,也會觸發?'unpipe'?事件。
var assert = require('assert'); const writer = process.stdout; const reader = process.stdin; writer.on('pipe', (src) => {console.error('有數據正通過管道流入寫入器');assert.equal(src,reader);// console.log(src); }); writer.on('unpipe', (src) => {console.error('已移除可寫流管道');assert.equal(src, reader); }); reader.pipe(writer);//觸發'pipe'事件 reader.unpipe(writer);//觸發'unpipe'事件返回:
userdeMacBook-Pro:stream-learning user$ node test.js 有數據正通過管道流入寫入器 已移除可寫流管道?
?
下面是可使用的方法:
writable.write(chunk[, encoding][, callback])
- chunk?<string>?|?<Buffer>?|?<Uint8Array>?|?<any>?要寫入的數據。 ?對于非對象模式的流chunk?必須是字符串、Buffer?或?Uint8Array。 對于對象模式的流,chunk?可以是任何 JavaScript 值,除了?null。
- encoding?<string>?如果?chunk?是字符串,則指定字符編碼。
- callback?<Function>?當數據塊被輸出到目標后的回調函數。
- 返回:?<boolean>?如果流需要等待?'drain'?事件觸發才能繼續寫入更多數據,則返回?false,否則返回?true。
writable.write()?寫入數據到流,并在數據被完全處理之后調用?callback。 如果發生錯誤,則?callback?可能被調用也可能不被調用。 為了可靠地檢測錯誤,可以為?'error'?事件添加監聽器。
在接收了?chunk?后,如果內部的緩沖小于創建流時配置的?highWaterMark,則返回?true?。 如果返回?false?,則應該停止向流寫入數據,直到?'drain'?事件被觸發。
當流還未被排空時,調用?write()?會緩沖?chunk,并返回?false。 一旦所有當前緩沖的數據塊都被排空了(被操作系統接收并傳輸),則觸發?'drain'?事件。 建議一旦?write()?返回 false,則不再寫入任何數據塊,直到?'drain'?事件被觸發。 當流還未被排空時,也是可以調用?write(),Node.js 會緩沖所有被寫入的數據塊,直到達到最大內存占用,這時它會無條件中止。 甚至在它中止之前, 高內存占用將會導致垃圾回收器的性能變差和 RSS 變高(即使內存不再需要,通常也不會被釋放回系統)。 如果遠程的另一端沒有讀取數據,TCP 的 socket 可能永遠也不會排空,所以寫入到一個不會排空的 socket 可能會導致遠程可利用的漏洞。?
對于?Transform, 寫入數據到一個不會排空的流尤其成問題,因為?Transform?流默認會被暫停,直到它們被 pipe 或者添加了?'data'?或?'readable'?事件句柄。?
如果要被寫入的數據可以根據需要生成或者取得,建議將邏輯封裝為一個可讀流并且使用?stream.pipe()。 如果要優先調用?write(),則可以使用?'drain'?事件來防止背壓與避免內存問題:
var assert = require('assert'); const writer = process.stdout; // const reader = process.stdin; function write(data, cb) { if (!writer.write(data)) { writer.once('drain', cb); } else { process.nextTick(cb); } } // 在回調函數被執行后再進行其他的寫入。 write('hello', () => { console.log('完成寫入,可以進行更多的寫入'); });返回:
node test.js hello完成寫入,可以進行更多的寫入?
舉一個例子說明write和drain:
參考https://blog.csdn.net/eeewwwddd/article/details/81042225
- 如果文件不存在會創建,如果有內容會被清空
- 讀取到highWaterMark的時候就會輸出
- 第一次是真的寫到文件 后面就是寫入緩存區 再從緩存區里面去取
?
let fs = require('fs') let ws = fs.createWriteStream('./foo1.txt',{flags: 'w',encoding: 'utf8',start: 0,//write的highWaterMark只是用來觸發是不是干了highWaterMark: 19 //寫是默認16k,當這里設置的長度小于或者等于我一下子要寫入的字符串長度時,會觸發一次drain,也僅觸發一次,然后將剩余部分的所有內容放入緩存,后面將不會再觸發drain了 }) //返回boolean 每當write一次都會在ws中吃下一個饅頭 當吃下的饅頭數量達到highWaterMark時 就會返回false 吃不下了會把其余放入緩存 其余狀態返回true //write只能放string或者buffer var flag = ws.write('today is a good day','utf8',()=>{console.log('write'); }); ws.on('drain', ()=>{console.log('drain'); });返回:
node test.js drain write如果改為highWaterMark: 20,大于輸入內容,則不會觸發drain
則返回:
?
node test.js write?
?
?
writable.end([chunk][, encoding][, callback])
- chunk?<string>?|?<Buffer>?|?<Uint8Array>?|?<any>?要寫入的數據。 對于非對象模式的流chunk?必須是字符串、Buffer、或?Uint8Array。 對于對象模式的流,?chunk?可以是任何 JavaScript 值,除了?null。
- encoding?<string>?如果?chunk?是字符串,則指定字符編碼。
- callback?<Function>?當流結束時的回調函數。
- 返回:?<this>
調用?writable.end()?表明已沒有數據要被寫入可寫流。 可選的?chunk?和?encoding?參數可以在關閉流之前再寫入一塊數據。 如果傳入了?callback?函數,則會做為監聽器添加到?'finish'?事件。
調用?stream.end()?之后再調用?stream.write()?會導致錯誤
writable.cork()
強制把所有寫入的數據都緩沖到內存中。 當調用?stream.uncork()?或?stream.end()?時,緩沖的數據才會被輸出。
當寫入大量小塊數據到流時,內部緩沖可能失效,從而導致性能下降,writable.cork()?主要用于避免這種情況。 對于這種情況,實現了?writable._writev()?的流可以用更優的方式對寫入的數據進行緩沖。
writable.uncork()
將調用?stream.cork()?后緩沖的所有數據輸出到目標。
當使用?writable.cork()?和?writable.uncork()?來管理流的寫入緩沖時,建議使用?process.nextTick()?來延遲調用?writable.uncork()。 通過這種方式,可以對單個 Node.js 事件循環中調用的所有?writable.write()?進行批處理。
?
擴展:?process.nextTick()
process.nextTick(callback[, ...args])
- callback?<Function>
- ...args?<any>?調用?callback時傳遞給它的額外參數
process.nextTick()方法將?callback?添加到"next tick 隊列"。 一旦當前事件輪詢隊列的任務全部完成,在next tick隊列中的所有callbacks會被依次調用。
這種方式不是setTimeout(fn, 0)的別名。它更加有效率。事件輪詢隨后的ticks 調用,會在任何I/O事件(包括定時器)之前運行。
舉例:
console.log('start'); process.nextTick(() => {console.log('nextTick callback'); }); console.log('scheduled'); // Output: // start // scheduled // nextTick callback?
回到writable.uncork(),舉例:
var assert = require('assert'); const writer = process.stdout;writer.cork(); writer.write('一些 '); writer.write('數據 '); process.nextTick(() => writer.uncork());如果沒有這一句,運行時沒有輸出結果的返回:
node test.js 一些 數據如果一個流上多次調用?writable.cork(),則必須調用同樣次數的?writable.uncork()?才能輸出緩沖的數據。
var assert = require('assert'); const writer = process.stdout; writer.cork(); writer.write('一些 '); writer.cork(); writer.write('數據 '); process.nextTick(() => {writer.uncork();// 數據不會被輸出,直到第二次調用 uncork()。writer.uncork();//注釋掉這一句就不會有輸出,正確輸出為一些 數據 });writable.destroy([error])
- error?<Error>
- 返回:?<this>
銷毀流,并觸發?'error'?事件且傳入?error?參數。 調用該方法后,可寫流就結束了,之后再調用?write()?或?end()?都會導致?ERR_STREAM_DESTROYED?錯誤。 實現流時不應該重寫這個方法,而是重寫?writable._destroy()
?
writable.setDefaultEncoding(encoding)
- encoding?<string>?默認的字符編碼。
- 返回:?<this>
為可寫流設置默認的?encoding。
?
轉自https://blog.csdn.net/eeewwwddd/article/details/81042225
let fs = require('fs') let EventEmitter = require('events') //只有第一次write的時候直接用_write寫入文件 其余都是放到cache中 但是len超過了highWaterMark就會返回false告知需要drain 很占緩存 //從第一次的_write開始 回去一直通過clearBuffer遞歸_write寫入文件 如果cache中沒有了要寫入的東西 會根據needDrain來判斷是否觸發干點 class WriteStream extends EventEmitter{constructor(path,options = {}){super()this.path = paththis.highWaterMark = options.highWaterMark || 64*1024this.flags = options.flags || 'r'this.start = options.start || 0this.pos = this.startthis.autoClose = options.autoClose || truethis.mode = options.mode || 0o666//默認null就是bufferthis.encoding = options.encoding || null//打開這個文件this.open()//寫文件的時候需要哪些參數//第一次寫入的時候 是給highWaterMark個饅頭 他會硬著頭皮寫到文件中 之后才會把多余吃不下的放到緩存中this.writing = false//緩存數組this.cache = []this.callbackList = []//數組長度this.len = 0//是否觸發drain事件this.needDrain = false}clearBuffer(){//取緩存中最上面的一個let buffer = this.cache.shift()if(buffer){//有buffer的情況下this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer(),buffer.callback)}else{//沒有的話 先看看需不需要drainif(this.needDrain){//觸發drain 并初始化所有狀態this.writing = falsethis.needDrain = falsethis.callbackList.shift()()this.emit('drain')}this.callbackList.map(v=>{v()})this.callbackList.length = 0}}_write(chunk,encoding,clearBuffer,callback){//因為write方法是同步調用的 所以可能還沒獲取到fdif(typeof this.fd != 'number'){//直接在open的時間對象上注冊一個一次性事件 當open被emit的時候會被調用return this.once('open',()=>this._write(chunk,encoding,clearBuffer,callback))}fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWrite)=>{this.pos += byteWrite//每次寫完 相應減少內存中的數量this.len -= byteWriteif(callback) this.callbackList.push(callback)//第一次寫完 clearBuffer()})}//寫入方法write(chunk,encoding=this.encoding,callback){//判斷chunk必須是字符串或者buffer 為了統一都變成bufferchunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding)//維護緩存的長度 3this.len += chunk.lengthlet ret = this.len < this.highWaterMarkif(!ret){//表示要觸發drain事件this.needDrain = true}//正在寫入的應該放到內存中if(this.writing){this.cache.push({chunk,encoding,callback})}else{//這里是第一次寫的時候this.writing = true//專門實現寫的方法this._write(chunk,encoding,()=>this.clearBuffer(),callback)}// console.log(ret)//能不能繼續寫了 false代表下次寫的時候更占內存return ret}destory(){if(typeof this.fd != 'number'){return this.emit('close')}//如果文件被打開過 就關閉文件并且觸發close事件fs.close(this.fd,()=>{this.emit('close')})}open(){//fd表示的就是當前this.path的這個文件,從3開始(number類型)fs.open(this.path,this.flags,(err,fd)=>{//有可能fd這個文件不存在 需要做處理if(err){//如果有自動關閉 則幫他銷毀if(this.autoClose){//銷毀(關閉文件,出發關閉文件事件)this.destory()}//如果有錯誤 就會觸發error事件this.emit('error',err)return}//保存文件描述符this.fd = fd//當文件打開成功時觸發open事件this.emit('open',this.fd)})} }?
自定義可寫流
因為createWriteStream內部調用了WriteStream類,WriteStream又實現了Writable接口,WriteStream實現了_write()方法,所以我們通過自定義一個類繼承stream模塊的Writable,并在原型上自定義一個_write()就可以自定義自己的可寫流
返回:
node test.js <Buffer 79 65 73> ok?
?
《2》可讀流
可讀流是對提供數據的來源的一種抽象。
可讀流的例子包括:
- 客戶端的 HTTP 響應
- 服務器的 HTTP 請求
- fs 的讀取流
- zlib 流
- crypto 流
- TCP socket
- 子進程 stdout 與 stderr
- process.stdin
所有可讀流都實現了?stream.Readable?類定義的接口。
?
兩種讀取模式
可讀流運作于兩種模式之一:流動模式(flowing)或暫停模式(paused)。
- 在流動模式中,數據自動從底層系統讀取,并通過?EventEmitter?接口的事件盡可能快地被提供給應用程序。
- 在暫停模式中,必須顯式調用?stream.read()?讀取數據塊。
所有可讀流都開始于暫停模式,可以通過以下方式切換到流動模式:
- 添加?'data'?事件句柄。
- 調用?stream.resume()。
- 調用?stream.pipe()。
可讀流可以通過以下方式切換回暫停模式:
- 如果沒有管道目標,則調用?stream.pause()。
- 如果有管道目標,則移除所有管道目標。調用?stream.unpipe()?可以移除多個管道目標。
只有提供了消費或忽略數據的機制后,可讀流才會產生數據。 如果消費的機制被禁用或移除,則可讀流會停止產生數據。
為了向后兼容,移除?'data'?事件句柄不會自動地暫停流。 如果有管道目標,一旦目標變為?drain?狀態并請求接收數據時,則調用?stream.pause()?也不能保證流會保持暫停模式。
如果可讀流切換到流動模式,且沒有可用的消費者來處理數據,則數據將會丟失。 例如,當調用?readable.resume()?時,沒有監聽?'data'?事件或?'data'?事件句柄已移除。
添加?'readable'?事件句柄會使流自動停止流動,并通過?readable.read()?消費數據。 如果?'readable'?事件句柄被移除,且存在?'data'?事件句柄,則流會再次開始流動。
?
三種狀態
可讀流的兩種模式是對發生在可讀流中更加復雜的內部狀態管理的一種簡化的抽象。
在任意時刻,可讀流會處于以下三種狀態之一:
- readable.readableFlowing === null
- readable.readableFlowing === false
- readable.readableFlowing === true
當?readable.readableFlowing?為?null?時,沒有提供消費流數據的機制,所以流不會產生數據。 在這個狀態下,監聽?'data'?事件、調用?readable.pipe()、或調用?readable.resume()?都會使?readable.readableFlowing?切換到?true,可讀流開始主動地產生數據并觸發事件。
調用?readable.pause()、readable.unpipe()、或接收到背壓,則?readable.readableFlowing?會被設為?false,暫時停止事件流動但不會停止數據的生成。 在這個狀態下,為?'data'?事件綁定監聽器不會使?readable.readableFlowing?切換到?true。
const { PassThrough, Writable } = require('stream'); const pass = new PassThrough(); const writable = new Writable(); pass.pipe(writable); pass.unpipe(writable); // readableFlowing 現在為 false。 pass.on('data', (chunk) => { console.log(chunk.toString()); }); pass.write('ok'); // 不會觸發 'data' 事件。 pass.resume(); // 必須調用它才會觸發 'data' 事件。如果注釋掉它則不會返回結果ok當?readable.readableFlowing?為?false?時,數據可能會堆積在流的內部緩沖中。
?
選擇一種接口風格
可讀流的 API 貫穿了多個 Node.js 版本,且提供了多種方法來消費流數據。 ??開發者通常應該選擇其中一種方法來消費數據,不要在單個流使用多種方法來消費數據。 混合使用?on('data')、on('readable')、pipe()?或異步迭代器,會導致不明確的行為。
對于大多數用戶,建議使用?readable.pipe(),因為它是消費流數據最簡單的方式。 如果開發者需要精細地控制數據的傳遞與產生,可以使用?EventEmitter、readable.on('readable')/readable.read()?或?readable.pause()/readable.resume()。
?
stream.Readable 類
下面是事件的介紹:
'error' 事件
- <Error>
當流因底層內部出錯而不能產生數據、或推送無效的數據塊時觸發。
'close' 事件
當流或其底層資源(比如文件描述符)被關閉時觸發。 表明不會再觸發其他事件,也不會再發生操作。
不是所有可讀流都會觸發?'close'?事件。
'data' 事件
- chunk?<Buffer>?|?<string>?|?<any>?數據塊。 對于非對象模式的流,?chunk?可以是字符串或?Buffer。 對于對象模式的流,chunk?可以是任何 JavaScript 值,除了?null。
當流將數據塊傳送給消費者后觸發。 當調用?readable.pipe(),readable.resume()?或綁定監聽器到?'data'?事件時,流會轉換到流動模式。 當調用?readable.read()?且有數據塊返回時,也會觸發?'data'?事件。
如果使用?readable.setEncoding()?為流指定了默認的字符編碼,則監聽器回調傳入的數據為字符串,否則傳入的數據為?Buffer。
const fs = require('fs'); const rr = fs.createReadStream('data.txt');//hello data rr.on('data', (chunk) => {//readable不行,報錯TypeError: Cannot read property 'length' of undefined console.log(`接收到 ${chunk.length} 個字節的數據`); //chunk為undefined });返回:
node test.js 接收到 10 個字節的數據?
process.stdin.setEncoding('utf8'); process.stdin.on('data', (chunk) => {//readable不行,會閃退??????? console.log(`接收到 ${chunk.length} 個字節的數據`); });返回:
node test.js 今天天氣好 //自己輸入并回車,這個內容就會被process.stdin收到 接收到 6 個字節的數據?
之前有試一個例子一直沒有成功:
process.stdin.setEncoding('utf8'); // process.stdout.write("請輸入用戶名:"); process.stdin.on('data', (chunk) => { // var chunk = process.stdin.read(); console.log(chunk); if (chunk !== null) { process.stdout.write(`data: ${chunk}`); } }); process.stdin.on('end', () => { process.stdout.write('end'); });?
'end' 事件
當流中沒有數據可供消費時觸發。
'end'?事件只有在數據被完全消費掉后才會觸發。 要想觸發該事件,可以將流轉換到流動模式,或反復調用?stream.read()?直到數據被消費完。
'readable' 事件
當流中有數據可供讀取時觸發
當到達流數據尾部時,?'readable'?事件也會觸發。觸發順序在?'end'?事件之前。
事實上,?'readable'?事件表明流有了新的動態:要么是有了新的數據,要么是到了流的尾部。 對于前者,?stream.read()?將返回可用的數據。而對于后者,?stream.read()?將返回?null。 例如,下面的例子中的?foo.txt?是一個空文件:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt'); rr.on('readable', () => { console.log(`讀取的數據: ${rr.read()}`); }); rr.on('end', () => { console.log('結束'); });返回:
node test.js 讀取的數據: null 結束?
有問題:
const fs = require('fs'); const rr = fs.createReadStream('data.txt'); rr.on('readable', function(){//不能是'data'事件,為什么,如果是data,返回只有null和end,明天好好查查這兩者的對比 var chunk = rr.read(); // 獲取到輸入的信息 console.log(chunk); if(chunk === ''){ rr.emit('end'); // 觸發end事件 return } if (chunk !== null) { process.stdout.write('data: '+ chunk +'\n'); } // rr.emit('end'); }); rr.on('end', function() { process.stdout.write('end'+'\n'); //也輸出了,只是被擋住了,加上+'\n'就看出來了 });返回:
node test.js <Buffer 68 65 6c 6c 6f 20 64 61 74 61> data: hello data null end?上面標明的錯誤都是因為一開始沒能弄清楚data和readable的區別,看了博客https://blog.csdn.net/eeewwwddd/article/details/81042225?utm_source=copy后終于明白
參數
path:讀取的文件的路徑
?
data與readable的區別:
- readable和讀流的data的區別就是,readable可以控制自己從緩存區讀多少和控制讀的次數,而data是每次讀取都清空緩存,讀多少輸出多少
- readable是暫停模式,data是流動模式;就是readable需要使用read()來讀取數據,data則是從回調中就能夠得到數據
//因為上面的data事件把數據讀了,清空緩存區。所以導致下面的readable讀出為null rs.on('readable',() => {console.log('readable');console.log(rs.read()); });
返回:
node test.jsdata
he
data
ll
data
o
readable
null
如果把'data'監聽去掉,那么返回結果就是:
node test.js readable he readable ll readable o readable null?
舉例說明readable的使用情況:
(1)
let rs = fs.createReadStream('./foo.txt', {//內容為 Today is a good day.i want to go out for fun.//每次讀7個highWaterMark: 7,encoding: 'utf8' }) //如果讀流第一次全部讀下來并且小于highWaterMark,就會再讀一次(再觸發一次readable事件) rs.on('readable', () => {let result = rs.read(2);console.log(result) })返回:
node test.js To da(2)
//如果rs.read()不加參數,一次性讀完,會從緩存區再讀一次,為null rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js Today i s a goo d day.iwant t o go ou t for f un. null(3)
//如果readable每次都剛好讀完(即rs.read()的參數剛好和highWaterMark相等),就會一直觸發readable事件,如果最后不足他想喝的數,他就會先觸發一次null,最后把剩下的喝完 rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js Today i s a goo d day.iwant t o go ou t for f null un.(4)
//一開始緩存區為0的時候也會默認調一次readable事件,將foo.txt內容清零 rs.on('readable', () => {let result = rs.read();console.log(result) })返回:
node test.js null?
實戰:行讀取器(平常我們的文件可能有回車、換行,此時如果要每次想讀一行的數據,就得用到readable)
let EventEmitter = require('events') //如果要將內容全部讀出就用on('data'),精確讀取就用on('readable') class LineReader extends EventEmitter {constructor(path) {super()this.rs = fs.createReadStream(path)//回車符的十六進制let RETURN = 0x0d//換行符的十六進制let LINE = 0x0alet arr = []this.on('newListener', (type) => {//每次使用 on 監聽事件時觸發'newListener'事件if (type === 'newLine') {//自定義的一個事件'newLine',觸發后就調用'readable',然后自行設定一次讀取一行的操作this.rs.on('readable', () => {let char//每次讀一個,當讀完的時候會返回null,終止循環while (char = this.rs.read(1)) {//讀到文件最后char = nullswitch (char[0]) {case RETURN:break;//Mac下只有換行符,windows下是回車符和換行符,需要根據不同的轉換。因為我這里是Maccase LINE://如果是換行符就把數組轉換為字符串let r = Buffer.from(arr).toString('utf8')//把數組清空arr.length = 0//觸發newLine事件,把得到的一行數據輸出this.emit('newLine', r)break;default://如果不是換行符,就放入數組中arr.push(char[0])}}})}})//以上只能取出換行符之前的代碼,最后一行的后面沒有換行符,所以需要特殊處理。當讀流讀完需要觸發end事件時this.rs.on('end', () => {//取出最后一行數據,轉成字符串let r = Buffer.from(arr).toString('utf8')arr.length = 0this.emit('newLine', r)})} }let lineReader = new LineReader('./foo.txt') lineReader.on('newLine', function (data) {console.log('a line');console.log(data); })返回:
node test.js //可見一次是只讀取一行的 a line if the truth is : a line I a line Am a line A a line boy一般是將整個文件讀取完的:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('readable', () => {console.log('one time');console.log(rr.read()); }); rr.on('end', () => {console.log('結束'); });返回:
node test.js one time if the truth is : I Am A boy one time null 結束?
下面接著方法的介紹:
readable.destroy([error])
銷毀流,并且觸發error事件。然后,可讀流將釋放所有的內部資源。
開發者不應該覆蓋這個方法,應該覆蓋readable._destroy方法。
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('open', function () {console.log('文件被打開'); }); rr.destroy('something wrong');//有參數則為出現的錯誤,會觸發error事件 rr.on('data', function (data) {console.log('data');console.log(data);}); rr.on('error', function (err) {console.log('error');console.log(err); }); rr.on('close', function (err) {console.log('close'); }); rr.on('end', () => {console.log('end'); });返回:
node test.js 文件被打開 error something wrong如果rr.destroy();參數為空,則不會觸發error事件,而是觸發close事件,那么返回為:
?
node test.js 文件被打開 closereadable.isPaused()
- 返回:?<boolean>
readable.isPaused()?方法返回可讀流的當前操作狀態。 該方法主要是在?readable.pipe()?方法的底層機制中用到。大多數情況下,沒有必要直接使用該方法
readable.pause()
- 返回:?this
readable.pause()?方法將會使 flowing 模式的流停止觸發?'data'?事件, 進而切出 flowing 模式。任何可用的數據都將保存在內部緩存中。
?
readable.read([size])
- size?<number>?可選參數,確定讀取數據的大小.
- 返回?<string>?|?<Buffer>?|?<null>
readable.read()方法從內部緩沖區中抽出并返回一些數據。 如果沒有可讀的數據,返回null。readable.read()方法默認數據將作為“Buffer”對象返回 ,除非已經使用readable.setEncoding()方法設置編碼或流運行在對象模式。
可選的size參數指定要讀取的特定數量的字節。如果size字節不可讀,將返回null除非流已經結束,在這種情況下所有保留在內部緩沖區的數據將被返回。
如果沒有指定size參數,則內部緩沖區包含的所有數據將返回。
readable.read()方法只應該在暫停模式下的可讀流上運行。在流模式下,readable.read()自動調用直到內部緩沖區的數據完全耗盡。
一般來說,建議開發人員避免使用'readable'事件和readable.read()方法,使用readable.pipe()或'data'事件代替。
無論size參數的值是什么,對象模式中的可讀流將始終返回調用readable.read(size)的單個項目。
注意:如果readable.read()方法返回一個數據塊,那么一個'data'事件也將被發送。
注意:在已經被發出的'end'事件后調用stream.read([size])事件將返回null。不會拋出運行時錯誤。
?
//fd文件描述符,一般通過fs.open中獲取 //buffer是讀取后的數據放入的緩存目標 //0,從buffer的0位置開始放入 //BUFFER_SIZE,每次放BUFFER_SIZE這么長的長度 //index,每次從文件的index的位置開始讀 //bytesRead,真實讀到的個數 fs.read(fd,buffer,0,BUFFER_SIZE,index,function(err,bytesRead){})?
?
?
?
readable.resume()
- 返回:?this
readable.resume()?方法會重新觸發?'data'?事件, 將暫停模式切換到流動模式。
readable.resume()?方法可以用來充分使用流中的數據,而不用實際處理任何數據,如以下示例所示:
getReadableStreamSomehow().resume().on('end', () => {console.log('Reached the end, but did not read anything.');});readable.setEncoding(encoding)
- encoding?<string>?要使用的編碼
- Returns:?this
readble.setEncoding()?方法會為從可讀流讀入的數據設置字符編碼
默認返回Buffer對象。設置編碼會使得該流數據返回指定編碼的字符串而不是Buffer對象。例如,調用readable.setEncoding('utf8')會使得輸出數據作為UTF-8數據解析,并作為字符串返回。調用readable.setEncoding('hex')使得數據被編碼成16進制字符串格式。
可讀流會妥善處理多字節字符,如果僅僅直接從流中取出Buffer對象,很可能會導致錯誤解碼。
?
?
舉例說明上面的事件和方法的使用:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.on('open', function () {//1 先響應openconsole.log('文件被打開'); }); rr.on('data', function (data) {//2 console.log('data');console.log(rr.isPaused()); //falserr.pause();//3 改為暫停模式,不讀取數據了console.log(rr.isPaused());//true console.log(data);}); setTimeout(function () {//7 兩秒后恢復成流動模式繼續讀取數據console.log('resume');console.log(rr.isPaused());//true rr.resume();console.log(rr.isPaused());//true,因為添加 'readable' 事件句柄會使流自動停止流動,并通過 readable.read() 消費數據。 如果 'readable' 事件句柄被移除,且存在 'data' 事件句柄,則流會再次開始流動 },1000); //注釋掉readable后,結果就為false rr.on('error', function (err) {console.log(err); }); rr.on('readable', () => {//4 因為data將所有數據都讀完并將緩存清空,所以readable只輸出nullconsole.log('readable');console.log(rr.read()); }); rr.on('close', function (err) {//6 關閉console.log('close'); }); rr.on('end', () => {//5 結束console.log('end'); });返回:
node test.js 文件被打開 data false true if the truth is : I Am A boy readable null end close resume true true注釋掉readable返回:
node test.js 文件被打開 data false true if the truth is : I Am A boy resume true false end close?
readable.pipe(destination[, options])
- destination?<stream.Writable>?數據寫入目標
-
options?<Object>?Pipe 選項
- end?<boolean>?在 reader 結束時結束 writer 。默認為?true。
readable.pipe()?綁定一個 [Writable][] 到?readable?上, 將可寫流自動切換到 flowing 模式并將所有數據傳給綁定的 [Writable][]。數據流將被自動管理。這樣,即使是可讀流較快,目標可寫流也不會超負荷(overwhelmed)。
下面例子將?readable?中的所有數據通過管道傳遞給名為?foo.txt?的文件:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); rr.pipe(process.stdout);返回:
node test.js if the truth is : I Am A boy可以在單個可讀流上綁定多個可寫流。
readable.pipe()?方法返回?目標流?的引用,這樣就可以對流進行鏈式地管道操作:
const fs = require('fs'); const zlib = require('zlib'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); const z = zlib.createGzip(); const w = fs.createWriteStream('foo.txt.gz'); rr.pipe(z).pipe(w); //運行后,文件夾中果然出現了一個壓縮文件默認情況下,當源可讀流(the source Readable stream)觸發?'end'?事件時,目標流也會調用?stream.end()?方法從而結束寫入。要禁用這一默認行為,?end?選項應該指定為?false, 這將使目標流保持打開, 如下面例子所示:
const fs = require('fs'); const rr = fs.createReadStream('foo.txt',{encoding: 'utf8'}); const writer = fs.createWriteStream('foo2.txt'); rr.pipe(writer,{end:false}); rr.on('end', () => {console.log('end reader'); }); setTimeout(function(){writer.write('請輸入num1的值:');writer.end(); },2000);返回:
node test.jsend reader
且foo2.txt文件中內容為:
if the truth is : I Am A boy請輸入num1的值:如果去掉{ end: false },則出錯:
node test.js end reader events.js:167throw er; // Unhandled 'error' event^Error [ERR_STREAM_WRITE_AFTER_END]: write after end //這就是因為當源可讀流觸發?'end'?事件時,目標流也會調用?stream.end()?方法從而結束寫入這里有一點要警惕,如果可讀流在處理時發生錯誤,目標可寫流?不會?自動關閉。 如果發生錯誤,需要?手動?關閉所有流以避免內存泄漏。
注意:不管對?process.stderr?和?process.stdout?指定什么選項,它們都是直到 Node.js 進程退出才關閉。
?
readable.unpipe([destination])
- destination?<stream.Writable>?可選的,指定需要分離的目標流
readable.unpipe()?方法將之前通過stream.pipe()方法綁定的流分離
如果?destination?沒有傳入, 則所有綁定的流都會被分離.
如果傳入?destination, 但它沒有被pipe()綁定過,則該方法不作為.
const readable = getReadableStreamSomehow(); const writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt', // but only for the first second readable.pipe(writable); setTimeout(() => {console.log('Stop writing to file.txt');readable.unpipe(writable);console.log('Manually close the file stream');writable.end(); }, 1000);?
readable源碼實現,轉自https://blog.csdn.net/eeewwwddd/article/details/81042225
let fs = require('fs') let EventEmitter = require('events') class ReadStream extends EventEmitter{constructor(path,options = {}){super()this.path = paththis.highWaterMark = options.highWaterMark || 64*1024this.flags = options.flags || 'r'this.start = options.start || 0this.pos = this.start //會隨著讀取的位置改變this.autoClose = options.autoClose || truethis.end = options.end || null//默認null就是bufferthis.encoding = options.encoding || null//參數的問題this.reading = false //非流動模式//創建個buffer用來存儲每次讀出來的數據this.buffers = []//緩存區長度this.len = 0//是否要觸發readable事件this.emittedReadable = false//觸發open獲取文件的fd標識符this.open()//此方法默認同步調用 每次設置on監聽事件時都會調用之前所有的newListener事件this.on('newListener',(type)=>{// 等待著他監聽data事件if(type === 'readable'){//開始讀取 客戶已經監聽的data事件this.read()}})}//readable真正的源碼中的方法,計算出和n最接近的2的冪次數 computeNewHighWaterMark(n) {n--;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;n++;return n;}read(n){//當讀的數量大于水平線,會通過取2的冪次取比他大和最接近的數if(this.len < n){this.highWaterMark = this.computeNewHighWaterMark(n)//重新觸發readbale的callback,所以第一次會觸發nullthis.emittedReadable = true//重新讀新的水位線this._read()}//真正讀取到的let buffer = null//說明緩存里有這么多,取出來if(n>0 && n<=this.len){//定義一個bufferbuffer = Buffer.alloc(n)let buflet flag = truelet index = 0//[buffer<1,2,3,4>,buffer<1,2,3,4>,buffer<1,2,3,4>]//每次取出緩存前的第一個bufferwhile(flag && (buf = this.buffers.shift())){for(let i=0;i<buf.length;i++){//把取出的一個buffer中的數據放入新定義的buffer中buffer[index++] = buf[i]//當buffer的長度和n(參數)長度一樣時,停止循環if(index === n){flag = false//維護緩存,因為可能緩存中的buffer長度大于n,當取出n的長度時,還會剩下其余的buffer,我們需要切割buf并且放到緩存數組之前this.len -= nlet r = buf.slice(i+1)if(r.length){this.buffers.unshift(r)}break}}}}//如果緩存區沒有東西,等會讀完需要觸發readable事件//這里會有一種狀況,就是如果每次Readable讀取的數量正好等于highWaterMark(流讀取到緩存的長度),就會每次都等于0,每次都觸發Readable事件,就會每次讀,讀到沒有為止,最后還會觸發一下nullif(this.len === 0){this.emittedReadable = true}if(this.len < this.highWaterMark){//默認,一開始的時候開始讀取if(!this.reading){this.reading = true//真正多讀取操作this._read()}}return buffer&&buffer.toString()}_read(){if(typeof this.fd != 'number'){//等待著觸發open事件后fd肯定拿到了 再去執行read方法return this.once('open',()=>{this._read()})}//先讀這么多bufferlet buffer = Buffer.alloc(this.highWaterMark)fs.read(this.fd,buffer,0,buffer.length,this.pos,(err,byteRead)=>{if(byteRead > 0){//當第一次讀到數據后,改變reading的狀態,如果觸發read事件,可能還會在觸發第二次_readthis.reading = false//每次讀到數據增加緩存取得長度this.len += byteRead//每次讀取之后,會增加讀取的文件的讀取開始位置this.pos += byteRead//將讀到的buffer放入緩存區buffers中this.buffers.push(buffer.slice(0,byteRead))//觸發readableif(this.emittedReadable){this.emittedReadable = false//可以讀取了,默認開始的時候杯子填滿了this.emit('readable')}}else{//沒讀到就出發end事件this.emit('end')}})}destory(){if(typeof this.fd != 'number'){return this.emit('close')}//如果文件被打開過 就關閉文件并且觸發close事件fs.close(this.fd,()=>{this.emit('close')})}open(){//fd表示的就是當前this.path的這個文件,從3開始(number類型)fs.open(this.path,this.flags,(err,fd)=>{//有可能fd這個文件不存在 需要做處理if(err){//如果有自動關閉 則幫他銷毀if(this.autoClose){//銷毀(關閉文件,觸發關閉文件事件)this.destory()}//如果有錯誤 就會觸發error事件this.emit('error',err)return}//保存文件描述符this.fd = fd//當文件打開成功時觸發open事件this.emit('open',this.fd)})} }?
自定義可讀流
因為createReadStream內部調用了ReadStream類,ReadStream又實現了Readable接口,ReadStream實現了_read()方法,所以我們通過自定義一個類繼承stream模塊的Readable,并在原型上自定義一個_read()就可以自定義自己的可讀流
返回:
node test.js 100?
?
pipe——管道 可以控制速率,因為讀快寫慢
let fs = require('fs') //pipe方法叫管道 可以控制速率 let rs = fs.createReadStream('./foo.txt',{highWaterMark: 4 }) let ws = fs.createWriteStream('./foo1.txt',{highWaterMark: 1 }) //會監聽rs的on('data')將讀取到的數據,通過ws.write的方法寫入文件 //調用寫的一個方法 返回boolean類型 //如果返回false就調用rs的pause方法 暫停讀取 //等待可寫流 寫入完畢在監聽drain resume rs rs.pipe(ws) //會控制速率 防止淹沒可用內存?
let fs = require('fs') //這兩個是上面自己寫的ReadStream和WriteStream let { Readable } = require('stream');class MyRead extends Readable{//流需要一個_read方法,方法中push什么,外面就接收什么 _read(){//push方法就是上面_read方法中的push一樣,把數據放入緩存區中this.push('100');//如果push了null就表示沒有東西可讀了,停止(如果不寫,就會一直push上面的值,死循環)this.push(null);} }let writer = fs.createWriteStream('./foo1.txt',{highWaterMark: 1 });//如果用原來的讀寫,因為寫比較耗時,所以會多讀少寫,耗內存 MyRead.prototype.pipe = function(dest){this.on('data',(data)=>{let flag = dest.write(data)//如果寫入的時候嘴巴吃滿了就不繼續讀了,暫停if(!flag){this.pause()}});//如果寫的時候嘴巴里的吃完了,就會繼續讀dest.on('drain',()=>{this.resume()});this.on('end',()=>{this.destroy()//銷毀ReadStream//清空緩存中的數據fs.fsync(1,()=>{//fs.fsync作用是同步磁盤緩存,1代表的是文件描述符,0,1,2 文件描述符代表標準輸入設備(比如鍵盤),標準輸出設備(顯示器)和標準錯誤dest.destroy()//銷毀WriteStream,之前dest設的是,但是報錯process.stdout cannot be closed });}); } var reader = new MyRead(); reader.pipe(writer);//結果就是將100寫到了文件foo1.txt中上面的文件描述符處本來寫的是dest.fd,但是報錯:
TypeError [ERR_INVALID_ARG_TYPE]: The "fd" argument must be of type number. Received type object
查看writer的fd為null,不知原因,待查明???????????
?
stream.pipeline(...streams[, callback])
- ...streams?<Stream>?兩個或多個要用管道連接的流
- callback?<Function>?一個回調函數,可以帶有一個錯誤信息參數
該模塊方法用于在多個流之間架設管道,可以自動傳遞錯誤和完成掃尾工作,并且可在管道架設完成時提供一個回調函數:
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib');// 使用 pipeline API 輕松連接多個流 // 并在管道完成時獲得通知// 使用pipeline高效壓縮一個可能很大的tar文件: pipeline(fs.createReadStream('foo.txt'),zlib.createGzip(),fs.createWriteStream('foo.tar.gz'),//運行后成功壓縮并返回 管道架設成功 信息(err) => {if (err) {console.error('管道架設失敗', err);} else {console.log('管道架設成功');}} );pipeline?API 也可做成承諾:
const util = require('util'); const stream = require('stream'); const fs = require('fs'); const zlib = require('zlib'); const pipeline = util.promisify(stream.pipeline);async function run() {await pipeline(fs.createReadStream('foo.txt'),zlib.createGzip(),fs.createWriteStream('foo.tar.gz')////運行后成功壓縮并返回 管道架設成功 信息 );console.log('管道架設成功'); }run().catch(console.error);?
?
用于實現流的 API
其實就是覆寫下面的這些方法來實現自己的流操作:
新的流類必須實現一個或多個特定的方法,根據所創建的流類型,如下圖所示:
| 只讀流 | Readable | _read |
| 只寫流 | writable | _write?,_writev,_final |
| 可讀可寫流 | Duplex | _read?,_write?,_writev,_final |
| 操作寫數據,然后讀結果 | Transform | _transform,_flush,_final |
注意:實現流的代碼里面不應該出現調用“public”方法的地方因為這些方法是給使用者使用的(流使用者部分的API所述)。這樣做可能會導致使用流的應用程序代碼產生不利的副作用。
const { Writable } = require('stream');class MyWritable extends Writable {constructor(options) {super(options);// ... } }?
?
雙工流
有了雙工流,我們可以在同一個對象上同時實現可讀和可寫,就好像同時繼承這兩個接口。 重要的是雙工流的可讀性和可寫性操作完全獨立于彼此。這僅僅是將兩個特性組合成一個對象。
let { Duplex } = require('stream') //雙工流,可讀可寫 class MyDuplex extends Duplex{_read(){this.push('hello Duplex')this.push(null)}_write(chunk,encoding,clearBuffer){console.log(chunk)clearBuffer()} }let myDuplex = new MyDuplex() //process.stdin是node自帶的process進程中的可讀流,會監聽命令行的輸入 //process.stdout是node自帶的process進程中的可寫流,會監聽并輸出在命令行中 //所以這里的意思就是在命令行先輸出hello,然后我們輸入什么他就出來對應的buffer(先作為可讀流出來) process.stdin.pipe(myDuplex).pipe(process.stdout)返回:
node test.js hello Duplex?
?
轉換流
在讀寫過程中可以修改或轉換數據的?Duplex?流(例如?zlib.createDeflate())
轉換流的輸出是從輸入中計算出來的。對于轉換流,我們不必實現read或write的方法,我們只需要實現一個transform方法,將兩者結合起來。它有write方法的意思,我們也可以用它來push數據。
let { Transform } = require('stream');class MyTransform extends Transform{_transform(chunk,encoding,callback){//5 myTransform2 push時則觸發myTransform的_transformconsole.log(chunk.toString().toUpperCase());//6 然后輸出from MyTransform2的大寫內容callback();} } let myTransform = new MyTransform();class MyTransform2 extends Transform{_transform(chunk,encoding,callback){//2 觸發myTransform2的_transformconsole.log(chunk.toString().toUpperCase());//3 輸出input的大寫內容INPUTthis.push('from MyTransform2');//4 將from MyTransform2內容寫入myTransformthis.push(null);callback();} } let myTransform2 = new MyTransform2();//此時myTransform2被作為可寫流觸發_transform,輸出輸入的大寫字符后,會通過可讀流push字符到下一個轉換流中 //當寫入的時候才會觸發transform的值,此時才會push,所以后面的pipe拿到的chunk是前面的push的值 process.stdin.pipe(myTransform2).pipe(myTransform);返回:
node test.js input //1 輸入回車 INPUTFROM MYTRANSFORM2?
總結
可讀流
在 flowing 模式下, 可讀流自動從系統底層讀取數據,并通過 EventEmitter 接口的事件盡快將數據提供給應用。
在 paused 模式下,必須顯式調用 stream.read() 方法來從流中讀取數據片段。
所有初始工作模式為 paused 的 Readable 流,可以通過下面三種途徑切換到 flowing 模式:
- 監聽 ‘data’ 事件
- 調用 stream.resume() 方法
- 調用 stream.pipe() 方法將數據發送到 Writable
可讀流可以通過下面途徑切換到 paused 模式:
- 如果不存在管道目標(pipe destination),可以通過調用 stream.pause() 方法實現。
- 如果存在管道目標,可以通過取消 ‘data’ 事件監聽,并調用 stream.unpipe() 方法移除所有管道目標來實現。
可寫流
需要知道只有在嘴真正的吃滿了,并且等到把嘴里的和地上的饅頭(緩存中的)都吃下了才會觸發drain事件
第一次寫入會直接寫入文件中,后面會從緩存中一個個取
雙工流
只是對可寫可讀流的一種應用,既可作為可讀流,也能作為可寫流,并且作為可讀或者可寫時是隔離的
轉換流
一般轉換流是邊輸入邊輸出的,而且一般只有觸發了寫入操作時才會進入_transform方法中。跟雙工流的區別就是,他的可讀可寫是在一起的
?
轉載于:https://www.cnblogs.com/wanghui-garcia/p/9798158.html
總結
以上是生活随笔為你收集整理的nodejs-stream部分的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ghost配置1——删除社交Link
- 下一篇: kafka channle的应用案例