我有两个 ReadableStreams,我想将它们pipe 到一个 WritableStream 中,然后来自 ReadableStreams 的任何数据都会直接进入 WritableStream。
我可以反其道而行之,用
ReadableStream.prototype.tee()
将一个ReadableStream一分为二,但我不知道如何将两个合二为一。
const textarea = document.querySelector("textarea");
// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
const sayMom = () => controller.enqueue("Mom! ");
setInterval(sayMom, 1000);
}});
// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
const sayLois = () => controller.enqueue("Lois! ");
setInterval(sayLois, 700);
}});
// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });
momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display
loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
<textarea readonly></textarea>
手动,通过竞争每个读者的最新阅读以产生整体阅读并根据需要启动这些阅读:
const never = new Promise(() => {});
const mergeStreams = streams => {
const readers = streams.map(s => s.getReader());
const reads = streams.map(() => null);
const dones = [];
const allDone = Promise.all(streams.map(s => new Promise(resolve => {
dones.push(resolve);
})));
return new ReadableStream({
start: controller => {
allDone.then(() => {
controller.close();
});
},
pull: controller =>
Promise.race(
readers.map((r, i) =>
reads[i] ??= r.read().then(({value, done}) => {
if (done) {
dones[i]();
return never;
}
controller.enqueue(value);
reads[i] = null;
})
)
),
cancel: reason => {
for (const reader of readers) {
reader.cancel(reason);
}
},
});
};
const textarea = document.querySelector("textarea");
const never = new Promise(() => {});
const mergeStreams = streams => {
const readers = streams.map(s => s.getReader());
const reads = streams.map(() => null);
const dones = [];
const allDone = Promise.all(streams.map(s => new Promise(resolve => {
dones.push(resolve);
})));
return new ReadableStream({
start: controller => {
allDone.then(() => {
controller.close();
});
},
pull: controller =>
Promise.race(
readers.map((r, i) =>
reads[i] ??= r.read().then(({value, done}) => {
if (done) {
dones[i]();
return never;
}
controller.enqueue(value);
reads[i] = null;
})
)
),
cancel: reason => {
for (const reader of readers) {
reader.cancel(reason);
}
},
});
};
// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
const sayMom = () => controller.enqueue("Mom! ");
setInterval(sayMom, 1000);
}});
// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
const sayLois = () => controller.enqueue("Lois! ");
setInterval(sayLois, 700);
}});
// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });
mergeStreams([
momReadableStream,
loisReadableStream,
]).pipeTo(writableStream).catch(console.error);
<textarea readonly></textarea>