Kafka 的 Rest API

问题描述 投票:0回答:2

我需要为kafka编写一个REST API,它可以分别从消费者/生产者读取或写入数据。 我怎样才能做到这一点?

rest hadoop apache-kafka
2个回答
0
投票

这是来自 Confluence 的示例 Rest API(Rest Proxy)代码。我必须输入它,因此它可能包含一些拼写错误。我希望这对你有一点帮助。

(使用Python编写的REST API的生产者)

import requests
import base64
import json

url = "http://restproxy:8082/topics/my_topic"
headers = {
"Content-Type" : "application/vnd.kafka.binary.v1 + json"
   }
# Create one or more messages
payload = {"records":
       [{
           "key":base64.b64encode("firstkey"),
           "value":base64.b64encode("firstvalue")
       }]}
# Send the message
r = requests.post(url, data=json.dumps(payload), headers=headers)
if r.status_code != 200:
   print "Status Code: " + str(r.status_code)
   print r.text

(使用Python编写的Rest API的消费者)

import requests
import base64
import json
import sys

#Base URL for interacting with REST server
baseurl = "http://restporxy:8082/consumers/group1"

#Create the Consumer instance
print "Creating consumer instance"
payload {
    "format": "binary"
    }
headers = {
"Content-Type" : "application/vnd.kafka.v1+json"
    }
r = requests.post(baseurl, data=json.dumps(payload), headers=headers)

if r.status_code !=200:
    print "Status Code: " + str(r.status_code)
    print r.text
    sys.exit("Error thrown while creating consumer")

# Base URI is used to identify the consumer instance
base_uri = r.json()["base_uri"]

#Get the messages from the consumer
headers = {
    "Accept" : "application/vnd.kafka.binary.v1 + json"
    }

# Request messages for the instance on the Topic
r = requests.get(base_uri + "/topics/my_toopic", headers = headers, timeout =20)

if r.status_code != 200: 
    print "Status Code: " + str(r.status_code)
    print r.text
    sys.exit("Error thrown while getting message")

#Output all messages
for message in r.json():
    if message["key"] is not None:
        print "Message Key:" + base64.b64decode(message["key"])
    print "Message Value:" + base64.b64decode(message["value"])

# When we're done, delete the consumer
headers = {
    "Accept" : "application/vnd.kafka.v1+json"
    }

r = requests.delete(base_uri, headers=headers)

if r.status_code != 204: 
    print "Status Code: " + str(r.status_code)
    print r.text

0
投票

我猜你的问题更多是关于如何编写“服务器”REST API 接口而不是客户端(最终只是发出 HTTP 请求)。 例如,您可以使用 Strimzi HTTP 桥 (https://github.com/strimzi/strimzi-kafka-bridge),它可以独立运行,甚至可以在 Kubernetes 中运行,如果您愿意在那里部署集群(那么您可以使用 Strimzi 项目,https://strimzi.io/)。

© www.soinside.com 2019 - 2024. All rights reserved.