有没有更好的方法使用kafka实现多租户?

问题描述 投票:0回答:2

我正在尝试使用 Spring Boot 实现多租户微服务。我已经实现了网络层和持久层。在 Web 层上,我实现了一个过滤器,它在原型 bean 中设置租户 id(使用 ThreadLocalTargetSource),在持久层上,我使用了 Hibernate 多租户配置(每个租户的架构),它们工作正常,数据保存在适当的架构。目前我正在使用 spring-kaka 库在消息传递层上实现相同的行为,到目前为止它的工作方式符合我的预期,但我想知道是否有更好的方法来做到这一点。

这是我的代码:

这是管理 KafkaMessageListenerContainer 的类:

@Component
public class MessagingListenerContainer {

    private final MessagingProperties messagingProperties;

    private KafkaMessageListenerContainer<String, String> container;

    @PostConstruct
    public void init() {
        ContainerProperties containerProps = new ContainerProperties(
                messagingProperties.getConsumer().getTopicsAsList());

        containerProps.setMessageListener(buildCustomMessageListener());

        container = createContainer(containerProps);
        container.start();
    }

    @Bean
    public MessageListener<String, String> buildCustomMessageListener() {
        return new CustomMessageListener();
    }

    private KafkaMessageListenerContainer<String, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = consumerProps();
        …
        return container;
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        …
        return props;
    }

    @PreDestroy
    public void finish() {
        container.stop();
    }

}

这是自定义消息监听器:

@Slf4j
public class CustomMessageListener implements MessageListener<String, String> {

    @Autowired
    private TenantStore tenantStore; // Prototype Bean

    @Autowired
    private List<ServiceListener> services;

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        log.info(“Tenant {} | Payload: {} | Record: {}", record.key(),
                record.value(), record.toString());

        tenantStore.setTenantId(record.key()); // Currently tenant is been setting as key

        services.stream().forEach(sl -> sl.onMessage(record.value()));

    }

}

这是一项使用消息数据和租户的测试服务:

@Slf4j
@Service
public class ConsumerService implements ServiceListener {

    private final MessagesRepository messages;
    private final TenantStore tenantStore;

    @Override
    public void onMessage(String message) {
        log.info("ConsumerService {}, tenant {}", message, tenantStore.getTenantId());
        messages.save(new Message(message));
    }

}

感谢您的宝贵时间!

java spring multi-tenant spring-kafka
2个回答
2
投票

需要明确的是(如果我错了,请纠正我):您为所有租户使用相同的主题。根据每个租户区分消息的方法是使用消息密钥,在您的情况下,消息密钥是租户 ID。

可以通过使用消息标头来存储租户 ID 而不是密钥来进行轻微的改进。通过这样做,您将不再局限于根据租户对消息进行分区。

尽管您描述的模型有效,但它存在重大安全问题。如果有人访问您的主题,那么您将泄露所有租户的数据。

更安全的方法是使用主题命名约定和 ACL(访问控制列表)。您可以在此处找到简短的解释。简而言之,您可以使用后缀或前缀将租户名称包含在主题名称中。 例如:orders_tenantA、orders_tenantB 或tenantA_orders、tenantB_orders

然后,使用 ACL,您可以限制哪些应用程序可以连接到这些特定主题。如果您的一个租户需要将其应用程序之一直接连接到您的 Kafka 集群,那么此场景也很有帮助。


0
投票

我认为没有必要通过 CustomMessageListener 重新发送消息。

只需使用带有RecordIntercepter(或Kafka本机ConsumerInterceptor)的spring-kafka默认配置,然后使用普通的@KafkaListener注解方法来接收消息。

参见这个这个

此外,您不必使用 ThreadLocalTargetSource 原型 bean。只需一个带有包含租户 id 的 ThreadLocal 变量的类,以及用于检索或设置它的静态方法就足够了 - 它可以在非 spring 托管组件中使用。

在生产者方面,ProducerInterceptor 可以帮助您设置租户 id 标头。

恕我直言,只要它是“访问 Kafka 集群的多租户应用程序”而不是“多租户 Kafka 集群”,安全问题并不是那么致命。

© www.soinside.com 2019 - 2024. All rights reserved.