Node.js 流
流
稳定性: 2 - 不稳定
流用于处理Node.js中的流数据的抽象接口,在Node里被不同的对象实现。例如,对HTTP服务器的请求是流,process.stdout 是流。
流是可读的,可写的,或者是可读写的,所有的流是EventEmitter的实例。
Node.js访问流模块的方法如下所示:
const stream = require('stream'
你可以通过require('stream')
加载Stream基类。其中包括了Readable
流、Writable
流、Duplex
流和Transform
流的基类。
本文将分为3个部分进行介绍。
第一个部分解释了在你的程序中使用流时候需要了解的内容。如果你不用实现流式API,可以只看这个部分。
如果你想实现你自己的流,第二个部分解释了这部分API。这些API让你的实现更加简单。
第三个部分深入的解释了流是如何工作的,包括一些内部机制和函数,这些内容不要改动,除非你明确知道你要做什么。
面向流消费者的API
流可以是可读(Readable),可写(Writable),或者是可读可写的(Duplex,双工)。
所有的流都是事件分发器(EventEmitters),但是也有自己的方法和属性,这取决于他它们是可读(Readable),可写(Writable),或者兼具两者(Duplex,双工)的。
如果流是可读写的,则它实现了下面的所有方法和事件。因此,这个部分API完全阐述了Duplex或Transform流,即便他们的实现有所不同。
没有必要为了消费流而在你的程序里实现流的接口。如果你正在你的程序里实现流接口,请同时参考下面的流实现程序API。
基本所有的Node程序,无论多简单,都会使用到流。以下是一个使用流的例子:
javascript
var http = require('http'
var server = http.createServer(function (req, res) {
// req is an http.IncomingMessage, which is 可读流(Readable stream)
// res is an http.ServerResponse, which is a Writable Stream
var body = '';
// we want to get the data as utf8 strings
// If you don't set an encoding, then you'll get Buffer objects
req.setEncoding('utf8'
// 可读流(Readable stream) emit 'data' 事件 once a 监听器(listener) is added
req.on('data', function (chunk) {
body += chunk;
}
// the end 事件 tells you that you have entire body
req.on('end', function () {
try {
var data = JSON.parse(body
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end('error: ' + er.message
}
// write back something interesting to the user:
res.write(typeof data
res.end(
}
}
server.listen(1337
// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o
类: stream.Readable
可读流(Readable stream)接口是对你正在读取的数据的来源的抽象。换句话说,数据来自可读流读取的某个文件、或某个实现了流接口的对象。
可读流(Readable stream)不会分发数据,直到你表明准备就绪。
可读流(Readable stream) 有2种模式: 流动模式(flowing mode)
和暂停模式(paused mode)
。流动模式(flowing mode)
时,尽快的从底层系统读取数据并提供给你的程序。暂停模式(paused mode)
时,你必须明确的调用stream.read()
来读取数据。暂停模式(paused mode)
是默认模式。
注意
: 如果没有绑定数据处理函数,并且没有pipe()
目标,流会切换到流动模式(flowing mode),并且数据会丢失。
可以通过下面几个方法,将流切换到流动模式(flowing mode)。
- 添加一个
'data'
事件事件处理器来监听数据.
可以通过以下方法来切换到暂停模式(paused mode):
- 如果没有“导流(pipe)”目标,调用
pause()
方法.
注意:为了向后兼容考虑, 移除'data'事件监听器并不会自动暂停流。同样的,当有导流目标时,调用pause()并不能保证流在那些目标排空后,请求更多数据时保持暂停状态。
可读流(Readable stream)例子包括:
事件: 'readable'
当一个数据块可以从流中读出,将会触发'readable'
事件.`
某些情况下, 如果没有准备好,监听一个'readable'
事件将会导致一些数据从底层系统读取到内部缓存。
javascript
var readble = getReadableStreamSomehow(
readable.on('readable', function() {
// there is some data to read now
}
一旦内部缓存排空,一旦有更多数据将会再次触发readable
事件。
事件: 'data'
chunk
{Buffer | String} 数据块
绑定一个data
事件的监听器(listener)到一个未明确暂停的流,会将流切换到流动模式。数据会尽额能的传递。
如果你像尽快的从流中获取数据,以下的方法是最快的:
javascript
var readable = getReadableStreamSomehow(
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length
}
事件: 'end'
如果没有更多的可读数据,将会触发这个事件。
注意:只有数据已经被完全消费,end
事件才会触发
。可以通过切换到流动模式(flowing mode)来实现,或者通过调用重复调用read()
获取数据,直到结束。
javascript
var readable = getReadableStreamSomehow(
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length
}
readable.on('end', function() {
console.log('there will be no more data.'
}
事件: 'close'
当底层资源(例如源头的文件描述符)关闭时触发。并不是所有流都会触发这个事件。
事件: 'error'
- {Error Object}
当接收数据时发生错误触发。
readable.read(size)
size
{Number} 可选参数, 需要读入的数据量
read()
方法从内部缓存中拉取数据。如果没有可用数据,将会返回null
如果传了size
参数,将会返回相当字节的数据。如果size
不可用,将会返回null
。
如果你没有指定size
参数。将会返回内部缓存的所有数据。
这个方法仅能再暂停模式(paused mode)里调用。 流动模式(flowing mode)下这个方法会被自动调用直到内存缓存排空。
javascript
var readable = getReadableStreamSomehow(
readable.on('readable', function() {
var chunk;
while (null !== (chunk = readable.read())) {
console.log('got %d bytes of data', chunk.length
}
}
如果这个方法返回一个数据块, 它同时也会触发'data'
事件.
readable.setEncoding(encoding)
encoding
{String} 要使用的编码.
调用此函数会使得流返回指定编码的字符串,而不是Buffer对象。例如,如果你调用readable.setEncoding('utf8')
,输出数据将会是UTF-8编码,并且返回字符串。如果你调用readable.setEncoding('hex')
,将会返回2进制编码的数据。
该方法能正确处理多字节字符。如果不想这么做,仅简单的直接拉取缓存并调buf.toString(encoding)
,可能会导致字节错位。因此,如果你想以字符串读取数据,请使用下述的方法:
javascript
var readable = getReadableStreamSomehow(
readable.setEncoding('utf8'
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string'
console.log('got %d characters of string data', chunk.length
}
readable.resume()
- 返回:
this
这个方法让可读流(Readable stream)继续触发data
事件.
这个方法会将流切换到流动模式(flowing mode)。 如果你不想从流中消费数据,而想得到end
事件,可以调用readable.resume()
来打开数据流,如下所示:
javascript
var readable = getReadableStreamSomehow(
readable.resume(
readable.on('end', function(chunk) {
console.log('got to the end, but did not read anything'
}
readable.pause()
- 返回:
this
这个方法会使得流动模式(flowing mode)的流停止触发data
事件,切换到流动模式(flowing mode)。并让后续可用数据留在内部缓冲区中。
javascript
var readable = getReadableStreamSomehow(
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length
readable.pause(
console.log('there will be no more data for 1 second'
setTimeout(function() {
console.log('now data will start flowing again'
readable.resume(
}, 1000
}
readable.isPaused()
- 返回:
Boolean
这个方法返回readable
是否被客户端代码明确
的暂停(调用readable.pause()
)。
var readable = new stream.Readable
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pipe(destination, options)
destination
{Writable Stream} 写入数据的目标
这个方法从可读流(Readable stream)拉取所有数据,并将数据写入到提供的目标中。自动管理流量,这样目标不会快速的可读流(Readable stream)淹没。
可以导流到多个目标。
javascript
var readable = getReadableStreamSomehow(
var writable = fs.createWriteStream('file.txt'
// All the data from readable goes into 'file.txt'
readable.pipe(writable
这个函数返回目标流, 因此你可以建立导流链:
javascript
var r = fs.createReadStream('file.txt'
var z = zlib.createGzip(
var w = fs.createWriteStream('file.txt.gz'
r.pipe(z).pipe(w
例如:模拟Unix的cat
命令:
javascript
process.stdin.pipe(process.stdout
默认情况下,当源数据流触发end
的时候调用end()
,所以destination
不可再写。传{ end:false }
作为options
,可以保持目标流打开状态。
这会让writer
保持打开状态,可以在最后写入"Goodbye":
javascript
reader.pipe(writer, { end: false }
reader.on('end', function() {
writer.end('Goodbye\n'
}
注意:process.stderr
和process.stdout
直到进程结束才会关闭,无论是否指定它们。
readable.unpipe(destination)
destination
{Writable Stream} 可选,指定解除导流的流
这个方法会解除之前调用pipe()
设置的钩子(pipe()
)。
如果没有指定destination
,则所有的导流(pipe)都会被移除。
如果指定了destination
,但是没有建立如果没有指定destination
,则什么事情都不会发生。
javascript
var readable = getReadableStreamSomehow(
var writable = fs.createWriteStream('file.txt'
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable
setTimeout(function() {
console.log('stop writing to file.txt'
readable.unpipe(writable
console.log('manually close the file stream'
writable.end(
}, 1000
readable.unshift(chunk)
chunk
{Buffer | String} 数据块插入到读队列中
这个方法很有用,当一个流正被一个解析器消费,解析器可能需要将某些刚拉取出的数据“逆消费”,返回到原来的源,以便流能将它传递给其它消费者。
如果你在程序中必须经常调用stream.unshift(chunk)
,那你可以考虑实现Transform来替换(参见下文API for Stream Implementors)。
javascript
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
stream.on('error', callback
stream.on('readable', onReadable
var decoder = new StringDecoder('utf8'
var header = '';
function onReadable() {
var chunk;
while (null !== (chunk = stream.read())) {
var str = decoder.write(chunk
if (str.match(/\n\n/)) {
// found the header boundary
var split = str.split(/\n\n/
header += split.shift(
var remaining = split.join('\n\n'
var buf = new Buffer(remaining, 'utf8'
if (buf.length)
stream.unshift(buf
stream.removeListener('error', callback
stream.removeListener('readable', onReadable
// now the body of the message can be read from the stream.
callback(null, header, stream
} else {
// still reading the header.
header += str;
}
}
}
}
readable.wrap(stream)
stream
{Stream} 一个旧式的可读流(Readablestream
)
v0.10版本之前的Node流并未实现现在所有流的API(更多信息详见下文“兼容性”部分)。
如果你使用的是旧的Node库,它触发'data'
事件,并拥有仅做查询用的pause()
方法,那么你能使用wrap()
方法来创建一个Readable流来使用旧版本的流,作为数据源。
你应该很少需要用到这个函数,但它会留下方便和旧版本的Node程序和库交互。
例如:
javascript
var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader
myReader.on('readable', function() {
myReader.read( // etc.
}
类: stream.Writable
可写流(Writable stream )接口是你正把数据写到一个目标的抽象。
可写流(Writable stream )的例子包括:
- http requests, on the client
writable.write(chunk, encoding)
chunk
{String | Buffer} 准备写的数据
这个方法向底层系统写入数据,并在数据处理完毕后调用所给的回调。
返回值表示你是否应该继续立即写入。如果数据要缓存在内部,将会返回false
。否则返回true
。
返回值仅供参考。即使返回false
,你也可能继续写。但是写会缓存在内存里,所以不要做的太过分。最好的办法是等待drain
事件后,再写入数据。
事件: 'drain'
如果调用writable.write(chunk)
返回 false,drain
事件会告诉你什么时候将更多的数据写入到流中。
javascript
// Write the data to the supplied 可写流(Writable stream ) 1MM times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write(
function write() {
var ok = true;
do {
i -= 1;
if (i === 0) {
// last time!
writer.write(data, encoding, callback
} else {
// see if we should continue, or wait
// don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding
}
} while (i > 0 && ok
if (i > 0) {
// had to stop early!
// write some more once it drains
writer.once('drain', write
}
}
}
writable.cork()
强制缓存所有写入。
调用.uncork()
或.end()
后,会把缓存数据写入。
writable.uncork()
写入所有.cork()调用之后缓存的数据。
writable.setDefaultEncoding(encoding)
encoding
{String} 新的默认编码
给写数据流设置默认编码方式,如编码有效,则返回true
,否则返回false
。
writable.end(chunk, callback)
chunk
{String | Buffer} 可选,要写入的数据
当没有更多的数据写入的时候调用这个方法。如果给出,回调会被用作finish事件的监听器。
调用end()
后调用write()
会产生错误。
javascript
// write 'hello, ' and then end with 'world!'
var file = fs.createWriteStream('example.txt'
file.write('hello, '
file.end('world!'
// writing more now is not allowed!
事件: 'finish'
调用end()
方法后,并且所有的数据已经写入到底层系统,将会触发这个事件。
javascript
var writer = getWritableStreamSomehow(
for (var i = 0; i < 100; i ++) {
writer.write('hello, #' + i + '!\n'
}
writer.end('this is the end\n'
writer.on('finish', function() {
console.error('all writes are now complete.'
}
事件: 'pipe'
src
{Readable Stream} 是导流(pipe)到可写流的源流。
无论何时在可写流(Writable stream )上调用pipe()
方法,都会触发'pipe'事件,添加这个流到目标。
javascript
var writer = getWritableStreamSomehow(
var reader = getReadableStreamSomehow(
writer.on('pipe', function(src) {
console.error('something is piping into the writer'
assert.equal(src, reader
}
reader.pipe(writer
事件: 'unpipe'
src
{Readable Stream}未写入此可写的源流。
无论何时在可写流(Writable stream )上调用unpipe()
方法,都会触发'unpipe'事件,将这个流从目标上移除。
javascript
var writer = getWritableStreamSomehow(
var reader = getReadableStreamSomehow(
writer.on('unpipe', function(src) {
console.error('something has stopped piping into the writer'
assert.equal(src, reader
}
reader.pipe(writer
reader.unpipe(writer
事件: 'error'
- {Error object}
写或导流(pipe)数据时,如果有错误会触发。
类: stream.Duplex
双工流(Duplex streams)是同时实现了Readable和Writable 接口。用法详见下文。
双工流(Duplex streams) 的例子包括:
- tcp sockets
类: stream.Transform
转换流(Transform streams)是双工Duplex流,它的输出是从输入计算得来。 它实现了Readable和Writable接口. 用法详见下文.
转换流(Transform streams)的例子包括:
- zlib streams
流实现程序API
无论实现什么形式的流,模式都是一样的:
- 在你的子类中扩展适合的父类。 (
util.inherits
方法很有帮助)
所扩展的类和要实现的方法取决于你要编写的流类。
Use-case | Class | 方法(s) to implement |
---|---|---|
Reading only | Readable | _read |
Writing only | Writable | _write |
Reading and writing | Duplex | _read, _write |
Operate on written data, then read the result | Transform | _transform, _flush |
在你的代码里,千万不要调用流实现程序API里的方法。否则可能会引起消费流的程序副作用。
类: stream.Readable
stream.Readable
是一个可被扩充的、实现了底层_read(size)
方法的抽象类。
参照之前的流实现程序API查看如何在你的程序里消费流。以下内容解释了在你的程序里如何实现可读流(Readable stream)。
Example: 计数流
这是可读流(Readable stream)的基础例子,它将从1至1,000,000递增地触发数字,然后结束:
javascript
var Readable = require('stream').Readable;
var util = require('util'
util.inherits(Counter, Readable
function Counter(opt) {
Readable.call(this, opt
this._max = 1000000;
this._index = 1;
}
Counter.prototype._read = function() {
var i = this._index++;
if (i > this._max)
this.push(null
else {
var str = '' + i;
var buf = new Buffer(str, 'ascii'
this.push(buf
}
};
Example: 简单协议 v1 (初始版)
和之前描述的parseHeader
函数类似,但它被实现为自定义流。注意这个实现不会将输入数据转换为字符串。
实际上,更好的办法是将他实现为Transform流,使用下面的实现方法会更好:
javascript
// A parser for a simple data protocol.
// "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
// 注意: This can be done more simply as a Transform stream!
// Using Readable directly for this is sub-optimal. See the
// alternative example below under Transform section.
var Readable = require('stream').Readable;
var util = require('util'
util.inherits(SimpleProtocol, Readable
function SimpleProtocol(source, options) {
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(source, options
Readable.call(this, options;
this._inBody = false;
this._sawFirstCr = false;
// source is 可读流(Readable stream), such as a socket or file
this._source = source;
var self = this;
source.on('end', function() {
self.push(null
}
// give it a kick whenever the source is readable
// read(0) will not consume any bytes
source.on('readable', function() {
self.read(0
}
this._rawHeader = [];
this.header = null;
}
SimpleProtocol.prototype._read = function(n) {
if (!this._inBody) {
var chunk = this._source.read(
// if the source doesn't have data, we don't have data yet.
if (chunk === null)
return this.push(''
// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) { // '\n'
if (this._sawFirstCr) {
split = i;
break;
} else {
this._sawFirstCr = true;
}
} else {
this._sawFirstCr = false;
}
}
if (split === -1) {
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk
this.push(''
} else {
this._inBody = true;
var h = chunk.slice(0, split
this._rawHeader.push(h
var header = Buffer.concat(this._rawHeader).toString(
try {
this.header = JSON.parse(header
} catch (er) {
this.emit('error', new Error('invalid simple protocol data')
return;
}
// now, because we got some extra data, unshift the rest
// back into the 读取队列 so that our consumer will see it.
var b = chunk.slice(split
this.unshift(b
// and let them know that we are done parsing the header.
this.emit('header', this.header
}
} else {
// from there on, just provide the data to our consumer.
// careful not to push(null), since that would indicate EOF.
var chunk = this._source.read(
if (chunk) this.push(chunk
}
};
// Usage:
// var parser = new SimpleProtocol(source
// Now parser is 可读流(Readable stream) that will emit 'header'
// with the parsed header data.
new stream.Readable(options)
options
{Object}
Readable的扩展类中,确保调用了Readable的构造函数,这样才能正确初始化。
readable._read(size)
size
{Number} 异步读取的字节数
注意:实现这个函数,但不要直接调用。
这个函数不要直接调用。在子类里实现,仅能被内部的Readable类调用。
所有可读流(Readable stream) 的实现必须停供一个_read
方法,从底层资源里获取数据。
这个方法以下划线开头,是因为对于定义它的类是内部的,不会被用户程序直接调用。你可以在自己的扩展类中实现。
当数据可用时,通过调用readable.push(chunk)
将之放到读取队列中。再次调用_read
,需要继续推出更多数据。
size
参数仅供参考。调用“read”可以知道知道应当抓取多少数据;其余与之无关的实现,比如TCP或TLS,则可忽略这个参数,并在可用时返回数据。例如,没有必要“等到”size
个字节可用时才调用stream.push(chunk)。
readable.push(chunk, encoding)
chunk
{Buffer | null | String} 推入到读取队列的数据块
注意: 这个函数必须被 Readable 实现者调用, 而不是可读流(Readable stream)的消费者.
_read()
函数直到调用push(chunk)
后才能被再次调用。
Readable
类将数据放到读取队列,当'readable'
事件触发后,被read()
方法取出。push()
方法会插入数据到读取队列中。如果调用了null
,会触发数据结束信号 (EOF)。
这个API被设计成尽可能地灵活。比如说,你可以包装一个低级别的,具备某种暂停/恢复机制,和数据回调的数据源。这种情况下,你可以通过这种方式包装低级别来源对象:
javascript
// source is an object with readStop() and readStart() 方法s,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.
util.inherits(SourceWrapper, Readable
function SourceWrapper(options) {
Readable.call(this, options
this._source = getLowlevelSourceObject(
var self = this;
// Every time there's data, we push it into the internal buffer.
this._source.ondata = function(chunk) {
// if push() 返回 false, then we need to stop reading from source
if (!self.push(chunk))
self._source.readStop(
};
// When the source ends, we push the EOF-signaling `null` chunk
this._source.onend = function() {
self.push(null
};
}
// _read will be called when the stream wants to pull more data in
// the advisory size 参数 is ignored in this case.
SourceWrapper.prototype._read = function(size) {
this._source.readStart(
};
类: stream.Writable
stream.Writable
是个抽象类,它扩展了一个底层的实现_write(chunk, encoding, callback)
方法.
参考上面的流实现程序API,来了解在你的程序里如何消费可写流。下面内容介绍了如何在你的程序里实现可写流。
new stream.Writable(options)
options
{Object}
请确保Writable类的扩展类中,调用构造函数以便缓冲设定能被正确初始化。
writable._write(chunk, encoding, callback)
chunk
{Buffer | String} 要写入的数据块。总是buffer, 除非decodeStrings
选项为false
。
所以可写流(Writable stream ) 实现必须提供一个_write()
方法,来发送数据给底层资源。
注意: 这个函数不能直接调用
,由子类实现,仅内部可写方法可以调用。
使用标准的callback(error)
方法调用回调函数,来表明写入完成或遇到错误。
如果构造函数选项中设定了decodeStrings
标识,则chunk
可能会是字符串而不是Buffer,encoding
表明了字符串的格式。这种设计是为了支持对某些字符串数据编码提供优化处理的实现。如果你没有明确的设置decodeStrings
为false
,这样你就可以安不管encoding
参数,并假定chunk
一直是一个缓存。
该方法以下划线开头,是因为对于定义它的类来说,这个方法是内部的,并且不应该被用户程序直接调用。你应当在你的扩充类中重写这个方法。
writable._writev(chunks, callback)
chunks
{Array} 准备写入的数据块,每个块格式如下:{ chunk: ..., encoding: ... }
.
注意: 这个函数不能直接调用。
由子类实现,仅内部可写方法可以调用.
这个函数的实现是可选的。多数情况下,没有必要实现。如果实现,将会在所有数据块缓存到写队列后调用。
类: stream.Duplex
双工流(duplex stream)同时兼具可读和可写特性,比如一个TCP socket连接。
注意stream.Duplex
可以像Readable或Writable一样被扩充,实现了底层_read(sise) 和_write(chunk, encoding, callback) 方法的抽象类。
由于JavaScript并没有多重继承能力,因此这个类继承自Readable,寄生自Writable.从而让用户在双工扩展类中同时实现低级别的_read(n)
方法和低级别的_write(chunk, encoding, callback)
方法。
new stream.Duplex(options)
options
{Object} 传递Writable and Readable构造函数,有以下的内容:
扩展自Duplex的类,确保调用了父亲的构造函数,保证缓存设置能正确初始化。
类: stream.Transform
转换流(transform class) 是双工流(duplex stream),输入输出端有因果关系,比如,zlib流或crypto流。
输入输出没有要求大小相同,块数量相同,到达时间相同。例如,一个Hash流只会在输入结束时产生一个数据块的输出;一个zlib流会产生比输入小得多或大得多的输出。
转换流(transform class) 必须实现_transform()
方法,而不是_read()
和_write()
方法,也可以实现_flush()
方法(参见如下)。
new stream.Transform(options)
options
{Object} 传递给Writable和Readable构造函数。
扩展自转换流(transform class) 的类,确保调用了父亲的构造函数,保证缓存设置能正确初始化。
transform._transform(chunk, encoding, callback)
chunk
{Buffer | String} 准备转换的数据块。是buffer,除非decodeStrings
选项设置为false
。
注意:这个函数不能直接调用。
由子类实现,仅内部可写方法可以调用.
所有的转换流(transform class) 实现必须提供 _transform
方法来接收输入,并生产输出。
_transform
可以做转换流(transform class)里的任何事,处理写入的字节,传给接口的写端,异步I/O,处理事情等等。
调用transform.push(outputChunk)
0次或多次,从这个输入块里产生输出,依赖于你想要多少数据作为输出。
仅在当前数据块完全消费后调用这个回调。
注意,输入块可能有,也可能没有对应的输出块。如果你提供了第二个参数,将会传给push方法。如下述的例子:
javascript
transform.prototype._transform = function (data, encoding, callback) {
this.push(data
callback(
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data
}
该方法以下划线开头,是因为对于定义它的类来说,这个方法是内部的,并且不应该被用户程序直接调用。你应当在你的扩充类中重写这个方法。
transform._flush(callback)
callback
{函数} 当你处理完数据后调用这个函数 (错误参数为可选参数)
注意:这个函数不能直接调用。
由子类实现,仅内部可写方法可以调用.
某些情况下,转换操作可能需要分发一点流最后的数据。例如,Zlib
流会存储一些内部状态,以便优化压缩输出。
有些时候,你可以实现_flush
方法,它可以在最后面调用,当所有的写入数据被消费后,分发end
告诉读端。和_transform
一样,当刷新操作完毕, transform.push(chunk)
为0次或更多次数。
该方法以下划线开头,是因为对于定义它的类来说,这个方法是内部的,并且不应该被用户程序直接调用。你应当在你的扩充类中重写这个方法。
事件: 'finish' and 'end'
finish
和end
事件 分别来自Writable和Readable类。.end()
事件结束后调用finish
事件,所有的数据已经被_transform
处理完毕,调用_flush
后,所有的数据输出完毕,触发end
。
Example:SimpleProtocolparser v2
上面的简单协议分析例子列子可以通过使用高级别的Transform流来实现,和parseHeader
,SimpleProtocol v1
列子类似。
在这个示例中,输入会被导流到解析器中,而不是作为参数提供。这种做法更符合Node流的惯例。
javascript
var util = require('util'
var Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform
function SimpleProtocol(options) {
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(options
Transform.call(this, options
this._inBody = false;
this._sawFirstCr = false;
this._rawHeader = [];
this.header = null;
}
SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
if (!this._inBody) {
// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) { // '\n'
if (this._sawFirstCr) {
split = i;
break;
} else {
this._sawFirstCr = true;
}
} else {
this._sawFirstCr = false;
}
}
if (split === -1) {
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk
} else {
this._inBody = true;
var h = chunk.slice(0, split
this._rawHeader.push(h
var header = Buffer.concat(this._rawHeader).toString(
try {
this.header = JSON.parse(header
} catch (er) {
this.emit('error', new Error('invalid simple protocol data')
return;
}
// and let them know that we are done parsing the header.
this.emit('header', this.header
// now, because we got some extra data, emit this first.
this.push(chunk.slice(split)
}
} else {
// from there on, just provide the data to our consumer as-is.
this.push(chunk
}
done(
};
// Usage:
// var parser = new SimpleProtocol(
// source.pipe(parser)
// Now parser is 可读流(Readable stream) that will emit 'header'
// with the parsed header data.
类: stream.PassThrough
这是Transform流的简单实现,将输入的字节简单的传递给输出。它的主要用途是测试和演示。偶尔要构建某种特殊流时也会用到。
流: 内部细节
缓冲
可写流(Writable streams )和可读流(Readable stream)都会缓存数据到内部对象上,叫做_writableState.buffer
或_readableState.buffer
。
缓存的数据量,取决于构造函数是传入的highWaterMark
参数。
调用stream.push(chunk)
时,缓存数据到可读流(Readable stream)。在数据消费者调用stream.read()
前,数据会一直缓存在内部队列中。
调用stream.write(chunk)
时,缓存数据到可写流(Writable stream)。即使write()
返回false
。
流(尤其是pipe()
方法)得目的是限制数据的缓存量到一个可接受的水平,使得不同速度的源和目的不会淹没可用内存。
stream.read(0)
某些时候,你可能想不消费数据的情况下,触发底层可读流(Readable stream)机制的刷新。这种情况下可以调用stream.read(0),它总会返回null。
如果内部读取缓冲低于highWaterMark
,并且流当前不在读取状态,那么调用read(0)
会触发一个低级_read
调用。
虽然基本上没有必要这么做。但你在Node内部的某些地方看到它确实这么做了,尤其是在Readable流类的内部。
stream.push('')
推一个0字节的字符串或缓存 (不在Object mode时)会发送有趣的副作用。 因为它是一个对stream.push()
的调用,它将会结束reading
进程。然而,它没有添加任何数据到可读缓冲区中,所以没有东西可供用户消费。
少数情况下,你当时没有提供数据,但你的流的消费者(或你的代码的其它部分)会通过调用stream.read(0)
得知何时再次检查。在这种情况下,你可以调用 stream.push('')
。
到目前为止,这个功能唯一一个使用情景是在tls.CryptoStream类中,但它将在Node v0.12中被废弃。如果你发现你不得不使用stream.push('')
,请考虑另一种方式。
和老版本的兼容性
v0.10版本前,可读流(Readable stream)接口比较简单,因此功能和用处也小。
'data'
事件会立即开始触发,而不会等待你调用read()
方法。如果你需要进行某些I/O来决定如何处理数据,那么你只能将数据块储存到某种缓冲区中以防它们流失。
在Node v0.10中, 加入了下文所述的Readable类。为了考虑向后兼容,添加了'data'
事件监听器或resume()方法被调用时,可读流(Readable stream)会切换到 "流动模式(flowing mode)"。其作用是,即便你不使用新的read()
方法和'readable'
事件,你也不必担心丢失'data'
数据块。
大多数程序会维持正常功能。然而,下列条件下也会引入边界情况:
- 没有添加
'data'
事件处理器
例如:
javascript
// WARNING! BROKEN!
net.createServer(function(socket) {
// we add an 'end' 方法, but never consume the data
socket.on('end', function() {
// It will never get here.
socket.end('I got your message (but didnt read it)\n'
}
}).listen(1337
v0.10版本前的Node,流入的消息数据会被简单的抛弃。之后的版本,socket会一直保持暂停。
这种情形下,调用resume()
方法来开始工作:
javascript
// Workaround
net.createServer(function(socket) {
socket.on('end', function() {
socket.end('I got your message (but didnt read it)\n'
}
// start the flow of data, discarding it.
socket.resume(
}).listen(1337
可读流(Readable stream)切换到流动模式(flowing mode),v0.10 版本前,可以使用wrap()
方法将风格流包含在一个可读类里。
Object Mode
通常情况下,流仅操作字符串和缓存。
处于object mode
的流,除了缓存和字符串,还可以可以读出普通JavaScript值。
在对象模式里,可读流(Readable stream) 调用stream.read(size)
总会返回单个项目,无论是什么参数。
在对象模式里, 可写流(Writable stream ) 总会忽略传给stream.write(data, encoding)
的encoding
参数。
特殊值null
在对象模式里,依旧保持它的特殊性。也就说,对于对象模式的可读流(Readable stream),stream.read()
返回null
意味着没有更多数据,同时stream.push(null)
会告知流数据结束(EOF)。
Node核心不存在对象模式的流,这种设计只被某些用户态流式库所使用。
应该在你的子类构造函数里,设置objectMode
。在过程中设置不安全。
对于双工流(Duplex streams),objectMode
可以用readableObjectMode
和writableObjectMode
分别为读写端分别设置。这些选项,被转换流(Transform streams)用来实现解析和序列化。
javascript
var util = require('util'
var StringDecoder = require('string_decoder').StringDecoder;
var Transform = require('stream').Transform;
util.inherits(JSONParseStream, Transform
// Gets \n-delimited JSON string data, and emits the parsed objects
function JSONParseStream() {
if (!(this instanceof JSONParseStream))
return new JSONParseStream(
Transform.call(this, { readableObjectMode : true }
this._buffer = '';
this._decoder = new StringDecoder('utf8'
}
JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
this._buffer += this._decoder.write(chunk
// split on newlines
var lines = this._buffer.split(/\r?\n/
// keep the last partial line buffered
this._buffer = lines.pop(
for (var l = 0; l < lines.length; l++) {
var line = lines[l];
try {
var obj = JSON.parse(line
} catch (er) {
this.emit('error', er
return;
}
// push the parsed object out to the readable consumer
this.push(obj
}
cb(
};
JSONParseStream.prototype._flush = function(cb) {
// Just handle any leftover
var rem = this._buffer.trim(
if (rem) {
try {
var obj = JSON.parse(rem
} catch (er) {
this.emit('error', er
return;
}
// push the parsed object out to the readable consumer
this.push(obj
}
cb(
};