Azure Databricks并发作业-避免在所有作业中使用相同的eventhub消息

问题描述 投票:1回答:1

[在接收Azure Databricks并发作业中的eventhub消息以及在并发作业中使用eventhub消息的正确方法时,请帮助我们实现分区/分组。

在Azure Databricks中创建了3个并发作业,将以scala编写的使用者代码作为jar文件上传。在这种情况下,在所有3个并发作业中接收相同的消息。为了解决这个问题,尝试通过分区消耗事件,但在所有3个分区中接收相同的消息。

并且还尝试通过基于分区键发送消息,并尝试在eventhub中创建使用者组,即使在所有组中均收到相同的消息也是如此。我们不确定在并发作业中处理eventhub消息

EventHub配置:

分区数为3,消息保留为1EventHub生产者:使用.NET(C#)将消息发送到Eventhub正常。EventHub Consumer:能够通过Scala程序接收消息,没有任何问题。

问题:在Azure Databricks中创建了3个并发作业,并以jar文件的形式上传了用Scala编写的使用者代码。在这种情况下,所有3个并发作业都收到相同的消息。为了克服这个问题,尝试通过分区来消耗事件,但在所有3个分区中接收相同的消息。还尝试通过基于分区键发送消息,并尝试在eventhub中创建使用者组,即使在所有组中都接收到相同的消息也是如此。我们不确定在并发作业中处理eventhub消息。

生产者C#代码:

string eventHubName = ConfigurationManager.AppSettings["eventHubname"];
string connectionString = ConfigurationManager.AppSettings["eventHubconnectionstring"];
eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);

for (var i = 0; i < 100; i++)
    {       
        var sender = "event hub message 1"  + i;
        var data = new EventData(Encoding.UTF8.GetBytes(sender));
        Console.WriteLine($"Sending message: {sender}");
        eventHubClient.SendAsync(data);
    }
eventHubClient.CloseAsync();
Console.WriteLine("Press ENTER to exit.");
Console.ReadLine();

消费者Scala代码:

object ReadEvents {
  val spark = SparkSession.builder()
    .appName("eventhub")
    .getOrCreate()
  val sc = spark.sparkContext
  val ssc = new StreamingContext(sc, Seconds(5))
  def main(args : Array[String]) : Unit = {

    val connectionString = ConnectionStringBuilder("ConnectionString").setEventHubName("eventhub1").build
    val positions = Map(new NameAndPartition("eventhub1", 0) -> EventPosition.fromStartOfStream)
    val position2 = Map(new NameAndPartition("eventhub1", 1) -> EventPosition.fromEnqueuedTime(Instant.now()))
    val position3 = Map(new NameAndPartition("eventhub1", 2) -> EventPosition.fromEnqueuedTime(Instant.now()))

    val ehConf = EventHubsConf(connectionString).setStartingPositions(positions)
    val ehConf2 = EventHubsConf(connectionString).setStartingPositions(position2)
    val ehConf3 = EventHubsConf(connectionString).setStartingPositions(position3)
    val stream = org.apache.spark.eventhubs.EventHubsUtils.createDirectStream(ssc, ehConf)
    println("Before the loop")
    stream.foreachRDD(rdd => {
          rdd.collect().foreach(rec => {
        println(String.format("Message is first stream ===>: %s", new String(rec.getBytes(), Charset.defaultCharset())))
         })
    })
    val stream2 = org.apache.spark.eventhubs.EventHubsUtils.createDirectStream(ssc, ehConf2)
        stream2.foreachRDD(rdd2 => {
          rdd2.collect().foreach(rec2 => {
    println(String.format("Message  second stream is ===>: %s", new String(rec2.getBytes(), Charset.defaultCharset())))

      })
    })
    val stream3 = org.apache.spark.eventhubs.EventHubsUtils.createDirectStream(ssc, ehConf)
     stream3.foreachRDD(rdd3 => {
      println("Inside 3rd stream foreach loop")
      rdd3.collect().foreach(rec3 => {
        println(String.format("Message is third stream ===>: %s", new String(rec3.getBytes(), Charset.defaultCharset())))

      })
    })
    ssc.start()
    ssc.awaitTermination()
  }

}

希望在使用scala程序运行的并发作业上接收到eventhub消息时,对其进行适当的分区。

scala azure-eventhub azure-databricks
1个回答
0
投票

下面的代码帮助迭代所有部分

  eventHubsStream.foreachRDD { rdd =>
    rdd.foreach { message =>
      if (message != null) {
        callYouMethod(new String(message.getBytes()))

      }
    }
  }
© www.soinside.com 2019 - 2024. All rights reserved.