使用浮流传输访问rpc回复中的kafka标头

问题描述 投票:0回答:1

是否有可能在常见的rpc回复中访问kafka标头?这是两个浮士德代理的示例。一个(pow)调用另一个(mul),并接收结果作为值。但是如何知道回复主题中的kafka标头?

#!/usr/bin/env python
from typing import AsyncIterable
import faust
from faust import StreamT

app = faust.App('RPC99', reply_create_topic=True)
pow_topic = app.topic('RPC__pow')
mul_topic = app.topic('RPC__mul')

@app.agent(pow_topic)
async def pow(stream: StreamT[float]) -> AsyncIterable[float]:
    async for value in stream:
        yield await mul.ask(value=value ** 2)
        # Headers for the returning result here?

@app.agent(mul_topic)
async def mul(stream: StreamT[float]) -> AsyncIterable[float]:
    async for value in stream:
        yield value * 100.0
faust
1个回答
0
投票

似乎流事件的属性为headers类型的Union[List[Tuple[str, bytes]]

docs在这里有,但目前在这一点上还不十分详细。

看起来您可以执行类似操作

@app.agent(topic)
async def process(stream):
    async for value in stream:
        do_something(value.headers)
© www.soinside.com 2019 - 2024. All rights reserved.