[在接收Azure Databricks并发作业中的eventhub消息以及在并发作业中使用eventhub消息的正确方法时,请帮助我们实现分区/分组。
在Azure Databricks中创建了3个并发作业,将以scala编写的使用者代码作为jar文件上传。在这种情况下,在所有3个并发作业中接收相同的消息。为了解决这个问题,尝试通过分区消耗事件,但在所有3个分区中接收相同的消息。
并且还尝试通过基于分区键发送消息,并尝试在eventhub中创建使用者组,即使在所有组中均收到相同的消息也是如此。我们不确定在并发作业中处理eventhub消息
EventHub配置:
分区数为3,消息保留为1EventHub生产者:使用.NET(C#)将消息发送到Eventhub正常。EventHub Consumer:能够通过Scala程序接收消息,没有任何问题。
问题:在Azure Databricks中创建了3个并发作业,并以jar文件的形式上传了用Scala编写的使用者代码。在这种情况下,所有3个并发作业都收到相同的消息。为了克服这个问题,尝试通过分区来消耗事件,但在所有3个分区中接收相同的消息。还尝试通过基于分区键发送消息,并尝试在eventhub中创建使用者组,即使在所有组中都接收到相同的消息也是如此。我们不确定在并发作业中处理eventhub消息。
生产者C#代码:
string eventHubName = ConfigurationManager.AppSettings["eventHubname"];
string connectionString = ConfigurationManager.AppSettings["eventHubconnectionstring"];
eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
for (var i = 0; i < 100; i++)
{
var sender = "event hub message 1" + i;
var data = new EventData(Encoding.UTF8.GetBytes(sender));
Console.WriteLine($"Sending message: {sender}");
eventHubClient.SendAsync(data);
}
eventHubClient.CloseAsync();
Console.WriteLine("Press ENTER to exit.");
Console.ReadLine();
消费者Scala代码:
object ReadEvents {
val spark = SparkSession.builder()
.appName("eventhub")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
def main(args : Array[String]) : Unit = {
val connectionString = ConnectionStringBuilder("ConnectionString").setEventHubName("eventhub1").build
val positions = Map(new NameAndPartition("eventhub1", 0) -> EventPosition.fromStartOfStream)
val position2 = Map(new NameAndPartition("eventhub1", 1) -> EventPosition.fromEnqueuedTime(Instant.now()))
val position3 = Map(new NameAndPartition("eventhub1", 2) -> EventPosition.fromEnqueuedTime(Instant.now()))
val ehConf = EventHubsConf(connectionString).setStartingPositions(positions)
val ehConf2 = EventHubsConf(connectionString).setStartingPositions(position2)
val ehConf3 = EventHubsConf(connectionString).setStartingPositions(position3)
val stream = org.apache.spark.eventhubs.EventHubsUtils.createDirectStream(ssc, ehConf)
println("Before the loop")
stream.foreachRDD(rdd => {
rdd.collect().foreach(rec => {
println(String.format("Message is first stream ===>: %s", new String(rec.getBytes(), Charset.defaultCharset())))
})
})
val stream2 = org.apache.spark.eventhubs.EventHubsUtils.createDirectStream(ssc, ehConf2)
stream2.foreachRDD(rdd2 => {
rdd2.collect().foreach(rec2 => {
println(String.format("Message second stream is ===>: %s", new String(rec2.getBytes(), Charset.defaultCharset())))
})
})
val stream3 = org.apache.spark.eventhubs.EventHubsUtils.createDirectStream(ssc, ehConf)
stream3.foreachRDD(rdd3 => {
println("Inside 3rd stream foreach loop")
rdd3.collect().foreach(rec3 => {
println(String.format("Message is third stream ===>: %s", new String(rec3.getBytes(), Charset.defaultCharset())))
})
})
ssc.start()
ssc.awaitTermination()
}
}
希望在使用scala程序运行的并发作业上接收到eventhub消息时,对其进行适当的分区。
下面的代码帮助迭代所有部分
eventHubsStream.foreachRDD { rdd =>
rdd.foreach { message =>
if (message != null) {
callYouMethod(new String(message.getBytes()))
}
}
}