试图建立从第三方REST API端点数据源读取的管道。
我正在使用集线器中的HTTP(版本1.2.0)插件。
响应请求URL为:https://api.example.io/v2/somedata?return_count=false
响应主体的样本:
{
"paging": {
"token": "12456789",
"next": "https://api.example.io/v2/somedata?return_count=false&__paging_token=123456789"
},
"data": [
{
"cID": "aerrfaerrf",
"first": true,
"_id": "aerfaerrfaerrf",
"action": "aerrfaerrf",
"time": "1970-10-09T14:48:29+0000",
"email": "[email protected]"
},
{...}
]
}
日志中的主要错误是:
java.lang.NullPointerException: null
at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.getNextPage(BaseHttpPaginationIterator.java:118) ~[1580429892615-0/:na]
at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.ensurePageIterable(BaseHttpPaginationIterator.java:161) ~[1580429892615-0/:na]
at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.hasNext(BaseHttpPaginationIterator.java:203) ~[1580429892615-0/:na]
at io.cdap.plugin.http.source.batch.HttpRecordReader.nextKeyValue(HttpRecordReader.java:60) ~[1580429892615-0/:na]
at io.cdap.cdap.etl.batch.preview.LimitingRecordReader.nextKeyValue(LimitingRecordReader.java:51) ~[cdap-etl-core-6.1.1.jar:na]
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:214) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:128) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.scheduler.Task.run(Task.scala:109) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [spark-core_2.11-2.3.3.jar:2.3.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]
尝试解决了一段时间之后,我认为问题可能出在与
Link in Response Body
$.paging.next
和paging/next
。都不起作用。/paging/next
中的链接在Chrome中打开时可以正常工作任何人都可以在Google Cloud Data Fusion中以数据源为REST API的方式创建管道吗?
这不是实现此目标的最佳方法,最好的方法是将数据Service APIs Overview提取到pub / sub中,然后将pub / sub用作管道的源,这将为您提供一个简单可靠的暂存位置有关其处理,存储和分析的数据,请参见pub / sub API的文档。为了与Dataflow结合使用,请遵循以下步骤在Using Pub/Sub with Dataflow
的官方文档中