我想将流与卸载工作结合起来给网络工作者,但我在决定在哪里关闭流时遇到了麻烦。
关闭源中的流,当没有更多块要发送时,会导致此异常:
无法将块排入已关闭或已关闭的可读流中 被要求关闭
但不关闭流会导致承诺永远无法实现。
将来我想链接多个异步转换器。我本以为会有一个系统,在这个系统中,关闭的泡沫会通过管道向下流动,当所有的都关闭时,承诺就会兑现,但我找不到这样的东西。
我可以阅读我期望的部分的编号,而不是取消流,但这有点老套。
webworker.js
onmessage = function(e) {
const chunk = e.data;
const buffer = chunk.buffer
const floatArray = new Float32Array(buffer);
for(let i = 0; i < floatArray.length; i++){
floatArray[i] += 1
}
console.log("sending chunk back")
postMessage(chunk, [chunk.buffer])
}
main.js
// A source that defines the parameters of a slice without the data
// and offers them in a stream based on objects, not bytes.
// So to use this on a ArrayBuffer, you would need to multiply the index by the size of an object in bytes
class SliceSource {
index = 0
constructor(parts, totalObjects) {
// size of a slice
this.sliceSize = Math.ceil(totalObjects / parts)
this.totalObjects = totalObjects
}
start(controller) {
// nothing to setup
}
pull(controller) {
if (this.index >= this.totalObjects) {
// TODO: who closes the stream? Because this closes the stream to early...
controller.close()
} else {
const end = Math.min(this.index + this.sliceSize, this.totalObjects)
controller.enqueue({ start: this.index, end: end })
this.index += this.sliceSize
}
}
cancel(reason) {
console.log("Cancelled because: ", reason)
}
}
// enrich a chunk with ArrayBuffer data
class EnrichStream extends TransformStream {
// data as ArrayBuffer
// object size in nr of bytes in the object
// sources is an identifier of the source data
constructor(data, objectSize) {
super({
transform(chunk, controller) {
const bytes = data.slice(chunk.start * objectSize, chunk.end * objectSize)
chunk.buffer = bytes
controller.enqueue(chunk)
}
})
}
}
// transformer that offloads work to a webworker
class ThreadedAdditionTransformer {
workers = []
index = 0
threads = 2
controller
constructor() {
// create workers
for (let i = 0; i < this.threads; i++) {
const worker = new Worker("./fiddle-worker-chunk.js", { type: "module" })
worker.onmessage = (event) => {
this.controller.enqueue(event.data)
}
this.workers.push(worker)
}
}
start(controller) {
this.controller = controller
}
transform(chunk) {
// send to next webworker and enque the data when it comes back
this.workers[this.index].postMessage(chunk, [chunk.buffer])
// up index and wrap it from 11 to 0
this.index = ++this.index % this.threads
}
}
const threadedStream = new TransformStream(new ThreadedAdditionTransformer())
// temp writable stream which puts the chunks into original array
class MergeConsumer extends WritableStream {
constructor() {
super({
write(chunk) {
const chunkView = new Float32Array(chunk.data)
view.set(chunkView, chunk.start)
}
})
}
}
let data = new ArrayBuffer(1_024);
const view = new Float32Array(data)
await new ReadableStream(new SliceSource(2, view.length))
.pipeThrough(new EnrichStream(data, view.byteLength / view.length))
.pipeThrough(threadedStream)
.pipeTo(new MergeConsumer())
.catch((error) => {
console.log("error: ", error)
})
console.log(`done: ${done}, value: ${value}`)