是否有可能在常见的rpc回复中访问kafka标头?这是两个浮士德代理的示例。一个(pow
)调用另一个(mul
),并接收结果作为值。但是如何知道回复主题中的kafka标头?
#!/usr/bin/env python
from typing import AsyncIterable
import faust
from faust import StreamT
app = faust.App('RPC99', reply_create_topic=True)
pow_topic = app.topic('RPC__pow')
mul_topic = app.topic('RPC__mul')
@app.agent(pow_topic)
async def pow(stream: StreamT[float]) -> AsyncIterable[float]:
async for value in stream:
yield await mul.ask(value=value ** 2)
# Headers for the returning result here?
@app.agent(mul_topic)
async def mul(stream: StreamT[float]) -> AsyncIterable[float]:
async for value in stream:
yield value * 100.0
似乎流事件的属性为headers
类型的Union[List[Tuple[str, bytes]]
。
docs在这里有,但目前在这一点上还不十分详细。
看起来您可以执行类似操作
@app.agent(topic)
async def process(stream):
async for value in stream:
do_something(value.headers)