使用Python中的kafka生成器发送数据的问题(Jupyter Notebook)

问题描述 投票:0回答:1

我正在尝试使用Kafka,Python和Twitter创建大数据分析。我有一个推文数据流,我只采用它们的标签。我的问题与制作人Kafka在Python中使用有关。我无法将我想要的数据发送到我创建的主题中,因为我没有看到任何选项来向生产者发送变量的内容。

https://kafka-python.readthedocs.io/en/master/usage.html中,我只能看到用b'some_string'发送精确字符串的选项。但我想发送我从Twitter Stream中获取的标签。我不太了解Python,所以如果解决方案很明显,请原谅。

进口:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
import kafka
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

流式上下文:

ssc = StreamingContext(sc,60)

键:

consumer_key="consumer_key"
consumer_secret="consumer_secret"
access_token="access_token"
access_token_secret="access_token_secret"

Tweepy:

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

制片人:

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

码:

class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        for hashtag in status.entities['hashtags']:
            prueba = b'hashtag["text"]'
            producer.send('topic', prueba)
            return True
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

StreamListener:

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=MyStreamListener())

推文流:

myStream.filter(track=['some_text'])

问题是,生产者只发送prueba的文字字符串"(hashtag["text"])"。我想发送的不是确切的东西,而是它的内容。

提前致谢。

python apache-spark apache-kafka kafka-producer-api
1个回答
0
投票

producer.send('topic', hashtag)怎么样?您还需要确保将数据编码为原始字节,这是kafka存储的内容。如果hashtag是一个简单的字符串,你可以做producer.send('topic', hashtag.encode('utf-8'))。如果它是dict或更复杂的数据结构,则可能需要在编码为字节之前使用json.dumps。希望这可以帮助!

© www.soinside.com 2019 - 2024. All rights reserved.