用于Java应用程序的Amazon Kinesis数据分析:反序列化传入消息中的Avro问题

问题描述 投票:1回答:1

我尝试将Flink应用程序部署到AWS Kinesis Data Analytics中。该应用程序使用Apache Avro对输入消息进行反序列化/反序列化。我的应用程序在本地计算机上运行良好,但是当我将其部署到AWS时,出现异常(在CloudWatch Logs中):Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795

日志详细信息:

{
  "locationInformation": "org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:913)",
  "logger": "org.apache.flink.runtime.taskmanager.Task",
  "message": "Source: Custom Source -> Sink: Unnamed (1/1) (a72ff69f9dc0f9e56d1104ce21456a5d) switched from RUNNING to FAILED.",
  "throwableInformation": [
    "org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.",
    "\tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:160)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:380)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)",
    "\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:133)",
    "\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:275)",
    "\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)",
    "\tat java.lang.Thread.run(Thread.java:748)",
    "Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795",
    "\tat java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)",
    "\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
    "\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
    "\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)",
    "\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)",
    "\tat java.io.ObjectInputStream.readClass(ObjectInputStream.java:1716)",
    "\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1556)",
    "\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
    "\tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readCurrentLayout(AvroSerializer.java:465)",
    "\tat org.apache.flink.formats.avro.typeutils.AvroSerializer.readObject(AvroSerializer.java:432)",
    "\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
    "\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
    "\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
    "\tat java.lang.reflect.Method.invoke(Method.java:498)",
    "\tat java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)",
    "\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)",
    "\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)",
    "\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)",
    "\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)",
    "\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)",
    "\tat org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)",
    "\tat org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:158)",
    "\t... 6 more"
  ],
  "threadName": "Source: Custom Source -> Sink: Unnamed (1/1)",
  "applicationARN": "arn:aws:kinesisanalytics:us-east-1:829044228870:application/poc-kda",
  "applicationVersionId": "8",
  "messageSchemaVersion": "1",
  "messageType": "INFO"
}

我使用库版本:

  • Apache Avro-1.9.1
  • Apache Flink-1.9.1
  • Kinesis生产商库-0.13.1
  • AWS Flink-1.8

注意,如果使用Apache Flink,则存在相同问题-1.8、1.6

KDA Flink代码:

public class KinesisExampleKDA {
   private static final String REGION = "us-east-1";

   public static void main(String[] args) throws Exception {
       Properties consumerConfig = new Properties();
       consumerConfig.put(AWSConfigConstants.AWS_REGION, REGION);
       consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.enableCheckpointing(50000);

       DataStream<EventAttributes> consumerStream = env.addSource(new FlinkKinesisConsumer<>(
               "dev-events", new KinesisSerializer(), consumerConfig));

       consumerStream
               .addSink(getProducer());
       env.execute("kinesis-example");
   }

   private static FlinkKinesisProducer<EventAttributes> getProducer(){
       Properties outputProperties = new Properties();
       outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, REGION);
       outputProperties.setProperty("AggregationEnabled", "false");

       FlinkKinesisProducer<EventAttributes> sink = new FlinkKinesisProducer<>(new KinesisSerializer(), outputProperties);
       sink.setDefaultStream("dev-result");
       sink.setDefaultPartition("0");
       return sink;
   }
}

class KinesisSerializer implements DeserializationSchema<EventAttributes>, SerializationSchema<EventAttributes> {
   @Override
   public EventAttributes deserialize(byte[] bytes) throws IOException {
       return EventAttributes.fromByteBuffer(ByteBuffer.wrap(bytes));
   }

   @Override
   public boolean isEndOfStream(EventAttributes eventAttributes) {
       return false;
   }

   @Override
   public byte[] serialize(EventAttributes eventAttributes) {
       try {
           return eventAttributes.toByteBuffer().array();
       } catch (IOException e) {
           e.printStackTrace();
       }
       return new byte[1];
   }

   @Override
   public TypeInformation<EventAttributes> getProducedType() {
       return TypeInformation.of(EventAttributes.class);
   }
}

Kinesis生产商代码:

public class KinesisProducer {

   private static String streamName = "dev-events";

   public static void main(String[] args) throws InterruptedException, JsonMappingException {

       AmazonKinesis kinesisClient = getAmazonKinesisClient("us-east-1");

       try {
           sendData(kinesisClient, streamName);
       } catch (IOException e) {
           e.printStackTrace();
       }
   }

   private static AmazonKinesis getAmazonKinesisClient(String regionName) {

       AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
       clientBuilder.setEndpointConfiguration(
               new AwsClientBuilder.EndpointConfiguration("kinesis.us-east-1.amazonaws.com",
                       regionName));
       clientBuilder.withCredentials(DefaultAWSCredentialsProviderChain.getInstance());
       clientBuilder.setClientConfiguration(new ClientConfiguration());

       return clientBuilder.build();
   }

   private static void sendData(AmazonKinesis kinesisClient, String streamName) throws IOException {

       PutRecordsRequest putRecordsRequest = new PutRecordsRequest();

       putRecordsRequest.setStreamName(streamName);
       List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
       for (int i = 0; i < 50; i++) {
           PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
           EventAttributes eventAttributes = EventAttributes.newBuilder().setName("Jon.Doe").build();
           putRecordsRequestEntry.setData(eventAttributes.toByteBuffer());
           putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
           putRecordsRequestEntryList.add(putRecordsRequestEntry);
       }

       putRecordsRequest.setRecords(putRecordsRequestEntryList);
       PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
       System.out.println("Put Result" + putRecordsResult);
   }

Avro模式为.avdl:

@version("0.1.0")
@namespace("com.naya.avro")
protocol UBXEventProtocol{

 record EventAttributes{
               union{null, string} name=null;
 }
}

Avro自动生成的实体类:

@org.apache.avro.specific.AvroGenerated
public class EventAttributes extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 2780976157169751219L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"EventAttributes\",\"namespace\":\"com.naya.avro\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

  private static SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder<EventAttributes> ENCODER =
      new BinaryMessageEncoder<EventAttributes>(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder<EventAttributes> DECODER =
      new BinaryMessageDecoder<EventAttributes>(MODEL$, SCHEMA$);
…

Github链接:

有人可以在此添加更多详细信息吗?为什么在AWS上无法使用?

提前谢谢您

java amazon-web-services avro amazon-kinesis amazon-kinesis-analytics
1个回答
1
投票

查看堆栈跟踪,它似乎在尝试读取消息时没有发生,而是实际上在操作员本身的初始化阶段。

Flink的工作方式-它序列化(使用Java序列化)每个需要执行的运算符,然后以序列化形式将它们分布到整个集群中。这意味着KinesisSerializer将自身(作为一个类)进行序列化以通过电线发送。

现在的问题是,Kinesis序列化程序引用了EventAttributes模型,这意味着对EventAttributes(类本身,而不是特定实例)的引用将被序列化。作为序列化元数据的一部分,它有望扩展/实现。就您而言,它需要SpecificRecordBase,它不是您的可分发内容的一部分,而是Avro库的一部分。

因此,运算符本身的完整序列化链是KinesisConsumer-> KinesisSerializer-> EventAttributes-> SpecificRecordBase(Avro lib的一部分)。

但是,AWS使用Flink 1.8,该版本使用Avro 1.8.2,并且所有基本avro类也都来自1.8.2。您编译应用程序并将其链接到1.9的avro二进制文件。因此,当Flink尝试序列化您的运算符并将其发送到集群时,它会将reference序列化为1.9版的SpecificRecordBase。但是,当Flink实际尝试对其进行反序列化时,它会发现该版本与其实际可用的类不匹配(1.8.2),并且链接失败。

您在这里有2个选项:

  1. 请勿使用KDA。而是转到EMR(截止2020年1月已打包1.9.1)或独立Flink(将需要在EMR或准系统上手动部署它)。
  2. 使用Flink 1.8完全编写您的应用程序。您提到“使用1.8.2版的应用程序无法编译”-尝试解决此问题。
© www.soinside.com 2019 - 2024. All rights reserved.