我正在使用Storm实时处理来自Kafka的消息,并使用streamparse构建我的拓扑。对于此用例,必须保证100%确保处理和确认进入Storm的任何消息。我已经使用try / catch在螺栓上实现了逻辑(请参见下文),除了将其写到Kafka中的另一个“错误”主题之外,我还希望Storm重播这些消息。
[在我的KafkaSpout中,我将tup_id分配为与我的消费者从中获取的Kafka主题的偏移ID相等。但是,当我使用错误的变量引用强制执行Bolt错误时,我看不到该消息被重播。我确实看到一个写“错误” Kafka主题的文章,但是只有一次-意味着该元组永远不会重新提交到我的螺栓中。我对TOPOLOGY_MESSAGE_TIMEOUT_SEC = 60的设置,我希望Storm会每60秒重播一次失败的消息,并使我的错误捕获永久性地写入错误主题。
KafkaSpout.py
class kafkaSpout(Spout):
def initialize(self, stormconf, context):
self.kafka = KafkaClient(str("host:6667"))#,offsets_channel_socket_timeout_ms=60000)
self.topic = self.kafka.topics[str("topic-1")]
self.consumer = self.topic.get_balanced_consumer(consumer_group=str("consumergroup"),auto_commit_enable=False,zookeeper_connect=str("host:2181"))
def next_tuple(self):
for message in self.consumer:
self.emit([json.loads(message.value)],tup_id=message.offset)
self.log("spout emitting tuple ID (offset): "+str(message.offset))
self.consumer.commit_offsets()
def fail(self, tup_id):
self.log("failing logic for consumer. resubmitting tup id: ",str(tup_id))
self.emit([json.loads(message.value)],tup_id=message.offset)
processBolt.py
class processBolt(Bolt):
auto_ack = False
auto_fail = False
def initialize(self, conf, ctx):
self.counts = Counter()
self.kafka = KafkaClient(str("host:6667"),offsets_channel_socket_timeout_ms=60000)
self.topic = self.kafka.topics[str("topic-2")]
self.producer = self.topic.get_producer()
self.failKafka = KafkaClient(str("host:6667"),offsets_channel_socket_timeout_ms=60000)
self.failTopic = self.failKafka.topics[str("topic-error")]
self.failProducer = self.failTopic.get_producer()
def process(self, tup):
try:
self.log("found tup.")
docId = tup.values[0]
url = "solrserver.host.com/?id="+str(docId)
thisIsMyForcedError = failingThisOnPurpose ####### this is what im using to fail my bolt consistent
data = json.loads(requests.get(url).text)
if len(data['response']['docs']) > 0:
self.producer.produce(json.dumps(docId))
self.log("record FOUND {0}.".format(docId))
else:
self.log('record NOT found {0}.'.format(docId))
self.ack(tup)
except:
docId = tup.values[0]
self.failProducer.produce( json.dumps(docId), partition_key=str("ERROR"))
self.log("TUP FAILED IN PROCESS BOLT: "+str(docId))
self.fail(tup)
我将很高兴为这种情况提供有关如何正确实现自定义失败逻辑的帮助。预先感谢。
我有同样的问题。 github上的一个问题已于2017年开放。我不知道为什么没人能解决它。你有吗?