我使用弗林克v1.4.0
,我已经设置了两个不同的作业。第一个是管道,从卡夫卡主题消费数据并将其存储到可查询的状态(QS)。数据按日期键。第二提交查询到QS的工作和处理返回的数据。
两个作业用弗林克v.1.3.2
工作的罚款。但随着新的更新,一切都打破了。下面是第一份工作的部分代码:
private void runPipeline() throws Exception {
StreamExecutionEnvironment env = configurationEnvironment();
QueryableStateStream<String, DataBucket> dataByDate = env.addSource(sourceDataFromKafka())
.map(NewDataClass::new)
.keyBy(data.date)
.asQueryableState("QSName", reduceIntoSingleDataBucket());
}
这里是在客户端的代码:
QueryableStateClient client = new QueryableStateClient("localhost", 6123);
// the state descriptor of the state to be fetched.
ValueStateDescriptor<DataBucket> descriptor = new ValueStateDescriptor<>(
"QSName",
TypeInformation.of(new TypeHint<DataBucket>() {}));
jobId = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
String key = "2017-01-06";
CompletableFuture<ValueState<DataBucket> resultFuture = client.getKvState(
jobId,
"QSName",
key,
BasicTypeInfo.STRING_TYPE_INFO,
descriptor);
try {
ValueState<DataBucket> valueState = resultFuture.get();
DataBucket bucket = valueState.value();
System.out.println(bucket.getLabel());
} catch (IOException | InterruptionException | ExecutionException e) {
throw new RunTimeException("Unable to query bucket key: " + key , e);
}
我按照说明按下面的链接:https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
确保通过包括从弗林克分发到flink-queryable-state-runtime_2.11-1.4.0.jar
文件夹中的文件夹opt/
的lib/
使我弗林克群集上可查询的状态,并检查它运行在任务管理器。
我不断收到以下错误:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:84)
at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:210)
at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:174)
at com.company.dept.query.QuerySubmitter.main(QuerySubmitter.java:37)
任何想法发生了什么呢?我认为我的要求没有达到QS在所有...真的,我应该如何改变什么不知道是否和。谢谢。
所以,事实证明,这是两件事情是被导致此错误。首先是创建客户端上的一个descriptor
使用错误的构造。而不是使用一个只需要输入一个名称为QS和TypeHint,我不得不使用另外一个地方有一个默认值沿着keySerialiser
提供按如下:
ValueStateDescriptor<DataBucket> descriptor = new ValueStateDescriptor<>(
"QSName",
TypeInformation.of(new TypeHint<DataBucket>() {}).createSerializer(new ExecutionConfig()),
DataBucket.emptyBucket()); // or anything that can be used as a default value
第二个是相关的主机和端口值。该港口是从v1.3.2
现在设置为9069和本地主机也是在我的情况不同有所不同。您可以通过检查线路上的任何任务管理器的日志,同时验证:Started the Queryable State Proxy Server @ ...
。
最后,如果你在这里是因为你正在寻找允许端口范围可查询状态的客户端代理,我建议你按照各自的问题(弗林克-7788)在这里:https://issues.apache.org/jira/browse/FLINK-7788。