我正在尝试使用 Spring Boot 实现多租户微服务。我已经实现了网络层和持久层。在 Web 层上,我实现了一个过滤器,它在原型 bean 中设置租户 id(使用 ThreadLocalTargetSource),在持久层上,我使用了 Hibernate 多租户配置(每个租户的架构),它们工作正常,数据保存在适当的架构。目前我正在使用 spring-kaka 库在消息传递层上实现相同的行为,到目前为止它的工作方式符合我的预期,但我想知道是否有更好的方法来做到这一点。
这是我的代码:
这是管理 KafkaMessageListenerContainer 的类:
@Component
public class MessagingListenerContainer {
private final MessagingProperties messagingProperties;
private KafkaMessageListenerContainer<String, String> container;
@PostConstruct
public void init() {
ContainerProperties containerProps = new ContainerProperties(
messagingProperties.getConsumer().getTopicsAsList());
containerProps.setMessageListener(buildCustomMessageListener());
container = createContainer(containerProps);
container.start();
}
@Bean
public MessageListener<String, String> buildCustomMessageListener() {
return new CustomMessageListener();
}
private KafkaMessageListenerContainer<String, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = consumerProps();
…
return container;
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
…
return props;
}
@PreDestroy
public void finish() {
container.stop();
}
}
这是自定义消息监听器:
@Slf4j
public class CustomMessageListener implements MessageListener<String, String> {
@Autowired
private TenantStore tenantStore; // Prototype Bean
@Autowired
private List<ServiceListener> services;
@Override
public void onMessage(ConsumerRecord<String, String> record) {
log.info(“Tenant {} | Payload: {} | Record: {}", record.key(),
record.value(), record.toString());
tenantStore.setTenantId(record.key()); // Currently tenant is been setting as key
services.stream().forEach(sl -> sl.onMessage(record.value()));
}
}
这是一项使用消息数据和租户的测试服务:
@Slf4j
@Service
public class ConsumerService implements ServiceListener {
private final MessagesRepository messages;
private final TenantStore tenantStore;
@Override
public void onMessage(String message) {
log.info("ConsumerService {}, tenant {}", message, tenantStore.getTenantId());
messages.save(new Message(message));
}
}
感谢您的宝贵时间!
需要明确的是(如果我错了,请纠正我):您为所有租户使用相同的主题。根据每个租户区分消息的方法是使用消息密钥,在您的情况下,消息密钥是租户 ID。
可以通过使用消息标头来存储租户 ID 而不是密钥来进行轻微的改进。通过这样做,您将不再局限于根据租户对消息进行分区。
尽管您描述的模型有效,但它存在重大安全问题。如果有人访问您的主题,那么您将泄露所有租户的数据。
更安全的方法是使用主题命名约定和 ACL(访问控制列表)。您可以在此处找到简短的解释。简而言之,您可以使用后缀或前缀将租户名称包含在主题名称中。 例如:orders_tenantA、orders_tenantB 或tenantA_orders、tenantB_orders
然后,使用 ACL,您可以限制哪些应用程序可以连接到这些特定主题。如果您的一个租户需要将其应用程序之一直接连接到您的 Kafka 集群,那么此场景也很有帮助。
我认为没有必要通过 CustomMessageListener 重新发送消息。
只需使用带有RecordIntercepter(或Kafka本机ConsumerInterceptor)的spring-kafka默认配置,然后使用普通的@KafkaListener注解方法来接收消息。
此外,您不必使用 ThreadLocalTargetSource 原型 bean。只需一个带有包含租户 id 的 ThreadLocal 变量的类,以及用于检索或设置它的静态方法就足够了 - 它可以在非 spring 托管组件中使用。
在生产者方面,ProducerInterceptor 可以帮助您设置租户 id 标头。
恕我直言,只要它是“访问 Kafka 集群的多租户应用程序”而不是“多租户 Kafka 集群”,安全问题并不是那么致命。