我正在尝试添加到我的拓扑中,以从 kafka 主题中提取消息,执行一些逻辑,然后写入 postgres。目前我只是想以最简单的方式将其连接在一起,但我对 quarkus 还很陌生。
在我的 TopologyBuilder 中,我有:
@Inject
EntityManager em;
...
StreamsBuilder streamsbuilder = new StreamsBuilder()
...
streamsBuilder.stream(rdbmsTopic, Consumed.with(Serdes.String(), QUINE_ALERT_SERDES)).process(() -> new AlertService(em));
然后该服务看起来像
@ApplicationScoped
public class AlertService implements Processor<String, QuineOutput, Object, Object> {
EntityManager em;
public AlertService(EntityManager em) {
this.em = em;
}
@Transactional
void persistAlert(Alert alert) {
em.persist(alert);
}
@Override
public void process(Record<String, QuineOutput> record) {
Log.warn("here for persist");
persistAlert(record.value().getData().getAlertNode());
}
}
但是我收到错误:
Caused by: jakarta.enterprise.context.ContextNotActiveException: Cannot use the EntityManager/Session because neither a transaction nor a CDI request context is active. Consider adding @Transactional to your method to automatically activate a transaction, or @ActivateRequestContext if you have valid reasons not to use transactions.
我尝试添加请求上下文注释,并且尝试绕过一些逻辑,我确保我有一个可用的 EntityManager,并且传递给它的数据存在,并且该类被注释为 @实体等
我希望能够在事务中写入 postgres。
拦截器不适用于通过
new
运算符实例化的 beans;您必须在 @Inject
中 AlertService
TopologyBuilder
(当然 - TopologyBuilder
必须是托管 bean,而不是通过 new
创建的 bean)。