请看下面的设置
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ThriftSerializer.class.getName());
public class ThriftSerializer implements Serializer<TBase> {
private final ThreadLocal<TSerializer> serializer = new ThreadLocalTSerializer();
@Override
public void configure(Map map, boolean b) {
}
@Override
public byte[] serialize(String s, TBase event) {
try {
return serializer.get().serialize(event);
} catch (TException e) {
return new byte[0];
}
}
@Override
public void close() {
}
}
上述代码导致内存泄漏
但我不理解为什么会发生。是否卡夫卡制片创建不死线程很多?
如果上面的代码替换
@Override
public byte[] serialize(String s, TBase event) {
TSerializer serializer = new TSerializer();
try {
return serializer.serialize(event);
} catch (TException e) {
return new byte[0];
}
}
然后,内存泄漏消失了,这是有意义的,但每个事件的创建需要被垃圾收集,可能导致GC的压力,如果处理能力高的新对象
有人能指出我理解此行为的方向是什么?
据我所知的KafkaProducer是线程安全的共享跨线程一个生产者例如通常会比有多个实例更快。
但是,发送方法是异步的(除非你调用获得()的发送方法,这是不推荐的,这样你会等待每个发送,因此以同步的方式处理它们返回的Future对象上)。
由于文档制作包括持有尚未被传输到服务器,以及作为背景的I / O线程是负责将这些记录到的请求并发送它们的簇的记录缓冲空间一池。如果无法关闭生产者使用后会泄露这些资源。
看来,send方法实际上使用一个后台线程来改变你的记录,并将其发送到集群。
你实际关闭生产者在结束了吗?
producer.flush();
producer.close();
串行的close方法被调用时,卡夫卡会话将被关闭。我的猜测是,你可以试着做一些额外的清洁在串行器的close方法或将其标记为符合垃圾收集。