如何关闭带有异步转换器的流

问题描述 投票:0回答:0

我想将流与卸载工作结合起来给网络工作者,但我在决定在哪里关闭流时遇到了麻烦。

关闭源中的流,当没有更多块要发送时,会导致此异常:

无法将块排入已关闭或已关闭的可读流中 被要求关闭

但不关闭流会导致承诺永远无法实现。

将来我想链接多个异步转换器。我本以为会有一个系统,在这个系统中,关闭的泡沫会通过管道向下流动,当所有的都关闭时,承诺就会兑现,但我找不到这样的东西。

我可以阅读我期望的部分的编号,而不是取消流,但这有点老套。

webworker.js

onmessage = function(e) {
  const chunk = e.data;
  const buffer = chunk.buffer
  const floatArray = new Float32Array(buffer);
  for(let i = 0; i < floatArray.length; i++){
    floatArray[i] += 1
  }
  console.log("sending chunk back")
  postMessage(chunk, [chunk.buffer])
}

main.js

// A source that defines the parameters of a slice without the data
// and offers them in a stream based on objects, not bytes.
// So to use this on a ArrayBuffer, you would need to multiply the index by the size of an object in bytes
class SliceSource {

  index = 0
  constructor(parts, totalObjects) {
    // size of a slice
    this.sliceSize = Math.ceil(totalObjects / parts)
    this.totalObjects = totalObjects
  }

  start(controller) {
    // nothing to setup
  }

  pull(controller) {
    if (this.index >= this.totalObjects) {
      // TODO: who closes the stream? Because this closes the stream to early...
      controller.close()
    } else {
      const end = Math.min(this.index + this.sliceSize, this.totalObjects)
      controller.enqueue({ start: this.index, end: end })
      this.index += this.sliceSize
    }
  }

  cancel(reason) {
    console.log("Cancelled because: ", reason)
  }
}

// enrich a chunk with ArrayBuffer data
class EnrichStream extends TransformStream {
  // data as ArrayBuffer
  // object size in nr of bytes in the object
  // sources is an identifier of the source data
  constructor(data, objectSize) {
    super({
      transform(chunk, controller) {
        const bytes = data.slice(chunk.start * objectSize, chunk.end * objectSize)
        chunk.buffer = bytes
        controller.enqueue(chunk)
      }
    })
  }
}

// transformer that offloads work to a webworker
class ThreadedAdditionTransformer {
  workers = []
  index = 0
  threads = 2
  controller
  constructor() {
    // create workers
    for (let i = 0; i < this.threads; i++) {
      const worker = new Worker("./fiddle-worker-chunk.js", { type: "module" })
      worker.onmessage = (event) => {
        this.controller.enqueue(event.data)
      }
      this.workers.push(worker)
    }
  }
  start(controller) {
    this.controller = controller
  }
  transform(chunk) {
    // send to next webworker and enque the data when it comes back
    this.workers[this.index].postMessage(chunk, [chunk.buffer])
    // up index and wrap it from 11 to 0
    this.index = ++this.index % this.threads
  }
}

const threadedStream = new TransformStream(new ThreadedAdditionTransformer())

// temp writable stream which puts the chunks into original array
class MergeConsumer extends WritableStream {
  constructor() {
    super({
      write(chunk) {
        const chunkView = new Float32Array(chunk.data)
        view.set(chunkView, chunk.start)
      }
    })
  }
}

let data = new ArrayBuffer(1_024);
const view = new Float32Array(data)

await new ReadableStream(new SliceSource(2, view.length))
  .pipeThrough(new EnrichStream(data, view.byteLength / view.length))
  .pipeThrough(threadedStream)
  .pipeTo(new MergeConsumer())
  .catch((error) => {
    console.log("error: ", error)
  })  

console.log(`done: ${done}, value: ${value}`)
javascript stream web-worker
© www.soinside.com 2019 - 2024. All rights reserved.