我正在尝试缓存 Django REST Framework HTTP 流响应。
我的想法是 Response 子类可以在流式传输时将块写入临时文件,并在流式传输最终块后关闭时,运行可调用文件将文件复制到缓存中。
但是在下面的打印语句中,只有第一个
CachedStreamingHttpResponse init
被打印出来,而不是来自相同方法的第二个,其他的都没有,无论是在 gunicorn 还是 runserver 中运行。 (呈现流的 getvalue 也不记录调用。)
from django.http import StreamingHttpResponse
class CachedStreamingHttpResponse(StreamingHttpResponse):
def __init__(self, streaming_content=(), *args, **kwargs):
print("CachedStreamingHttpResponse init")
super().__init__(*args, **kwargs) # Edit: Missing parameter here
print("CachedStreamingHttpResponse init 2")
def _set_streaming_content(self, value):
print("CachedStreamingHttpResponse _set_streaming_content")
self._buffer = TemporaryFile()
self.buffered = False
super()._set_streaming_content(value)
def close(self):
print("CachedStreamingHttpResponse close")
self._buffer.seek(0)
self.buffered = self._buffer
return super().close()
def buffer(self, b):
print("CachedStreamingHttpResponse buffer")
print(b)
self._buffer.write(b)
return b
@property
def streaming_content(self):
print("CachedStreamingHttpResponse streaming_content")
return map(self.buffer, super().streaming_content)
# Edit: Bug 2: Needed to duplicate @streaming_content.setter from super-class
我计划让我的缓存框架将一个可调用对象传递到响应中,然后从 content_stream
finally
块调用它以将临时文件复制到 S3。
我已经检查过,响应以多个块流式传输(下面是我用来检查分块的 Javascript)。但是在
super().__init__
的某个地方,即使 CachedStreamingResponse
在 viewset
list
方法中实例化,响应也会变得沉默。
早点问,因为这可能是愚蠢的差事。
这些响应是由复杂的查询和 DRF 序列化程序和视图集生成的,我们进行流式传输以便用户更快地看到数据到达,而不是为了节省服务器资源。响应最多几兆字节,通常更少,这些将缓存在 S3 上。
async function readData(url) {
const response = await fetch(url);
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log("END");
return;
}
console.log("-> ", value);
}
}
readData(url);
发布 half 上面的答案,这样审稿人就不会浪费时间在愚蠢的语法错误上。我错过了一个参数,不得不复制属性设置器,现在获取所有日志消息。
但是我现在得到两个流,一个压缩,一个没有,都调用
close()
回调。看起来像是从不同的响应实例到同一个临时文件,但我认为这是不可能的。如果之前出现过,我会更新这个或标记一个更好的答案。
class CachedStreamingHttpResponse(StreamingHttpResponse):
def __init__(self, streaming_content=(), *args, **kwargs):
logger.info("CachedStreamingHttpResponse init")
self._post_render_callbacks = []
self._buffer = None
self.buffered = False
super().__init__(streaming_content, *args, **kwargs)
def _set_streaming_content(self, value):
logger.info("CachedStreamingHttpResponse _set_streaming_content")
self._buffer = TemporaryFile()
super()._set_streaming_content(value)
def post_render(self):
logger.info("CachedStreamingHttpResponse post_render")
self._buffer.seek(0)
self.buffered = self._buffer
retval = self
for post_callback in self._post_render_callbacks:
newretval = post_callback(retval)
if newretval is not None:
retval = newretval
def buffer(self, b):
logger.info("CachedStreamingHttpResponse buffer")
self._buffer.write(b)
return b
@staticmethod
def closing_iterator_wrapper(iterable, close):
logger.info("CachedStreamingHttpResponse closing_iterator_wrapper")
try:
yield from iterable
finally:
close()
@property
def streaming_content(self):
logger.info("CachedStreamingHttpResponse streaming_content")
gen = map(self.buffer, super().streaming_content)
return self.closing_iterator_wrapper(gen, self.post_render)
@streaming_content.setter
def streaming_content(self, value):
logger.info("CachedStreamingHttpResponse streaming_content setter")
self._set_streaming_content(value)
def add_post_render_callback(self, callback):
"""A list of callables to be run after the final chunk is returned. Used to copy the response to cache."""
logger.info("CachedStreamingHttpResponse add_post_render_callback")
if self.buffered:
callback(self)
else:
self._post_render_callbacks.append(callback)