stream.compose(...streams)
stream.compose 是实验的。streams<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>- 返回: <stream.Duplex>
将两个或多个流组合成一个 Duplex 流,其写入第一个流并从最后一个流读取。
每个提供的流都通过管道传输到下一个,使用 stream.pipeline。
如果任何流错误,则所有流都将被销毁,包括外部的 Duplex 流。
因为 stream.compose 返回新的流,该流又可以(并且应该)通过管道传输到其他流中,所以它支持组合。
相比之下,当将流传到 stream.pipeline 时,通常第一个流是可读流,最后一个流是可写流,从而形成闭合回路。
如果传入了 Function,则它必须是采用 source Iterable 的工厂方法。
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // 打印 'HELLOWORLD'stream.compose 可用于将异步迭代器、生成器和函数转换为流。
AsyncIterable转换为可读的Duplex。 无法产生null。AsyncGeneratorFunction转换为可读/可写的转换Duplex。 必须将源AsyncIterable作为第一个参数。 无法产生null。AsyncFunction转换为可写的Duplex。 必须返回null或undefined。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// 将 AsyncIterable 转换为可读的 Duplex。
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// 将 AsyncGenerator 转换为转换 Duplex。
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// 将 AsyncFunction 转换为可写的 Duplex。
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // 打印 'HELLOWORLD'将 stream.compose 视为操作符的 readable.compose(stream)。
stream.compose is experimental.streams<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>- Returns: <stream.Duplex>
Combines two or more streams into a Duplex stream that writes to the
first stream and reads from the last. Each provided stream is piped into
the next, using stream.pipeline. If any of the streams error then all
are destroyed, including the outer Duplex stream.
Because stream.compose returns a new stream that in turn can (and
should) be piped into other streams, it enables composition. In contrast,
when passing streams to stream.pipeline, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.
If passed a Function it must be a factory method taking a source
Iterable.
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD'stream.compose can be used to convert async iterables, generators and
functions into streams.
AsyncIterableconverts into a readableDuplex. Cannot yieldnull.AsyncGeneratorFunctionconverts into a readable/writable transformDuplex. Must take a sourceAsyncIterableas first parameter. Cannot yieldnull.AsyncFunctionconverts into a writableDuplex. Must return eithernullorundefined.
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'See readable.compose(stream) for stream.compose as operator.