我们有两个HDP集群的设置让我们称它们为A和B.
集群一个节点:
CLUSTER B NODES:
我们的应用程序中有三个主要组件,它们对传入的文件执行ETL(提取,转换和加载)操作。我将这些组件分别称为E,T和L.
组件E特征:
组件T特征:
组分L特征:
组件L是所有三个组件中的宝石,我们没有遇到任何故障。组分E中存在轻微的无法解释的毛刺,但组分T是最麻烦的。
组件E和T都使用DFS客户端与namenode进行通信。
以下是我们在运行组件T时间歇性地观察到的异常的摘录:
clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782)
at org.apache.hadoop.ipc.Client.call(Client.java:1459)
at org.apache.hadoop.ipc.Client.call(Client.java:1392)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy15.complete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy16.complete(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34)
at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47)
at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142)
at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239)
at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072)
at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956)
at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96)
at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131)
at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123)
at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)
如上所述,我们非常间歇地面对此异常,当它确实发生时,我们的应用程序卡住,导致我们重新启动它。
我们尝试过的解决方案:
我的问题
我们解决这个问题的下一个方法是通过使用tcpdump或wireshark分析数据包来识别哪个机器或交换机(没有涉及路由器)正在设置RST标志。我们还将上面提到的所有队列的大小增加到4096,以便有效地处理突发流量。
namenode日志没有显示任何异常的迹象,除了Ambari中看到的Namenode连接负载在某些时间点偷看,而不一定是在发生Connection Reset By Peer异常时。
总而言之,我想知道我们是否正走在正确的轨道上来解决这个问题,还是我们刚刚走向死胡同?
附:我在问题中为内容的长度道歉。在寻求任何帮助或建议之前,我想向读者展示整个背景。感谢您的耐心等待。
首先,您的网络中可能确实存在一些奇怪的东西,也许您会设法通过您提到的步骤来跟踪它。
话虽这么说,当看到步骤时,我发现有些事情发生了违反直觉。
您目前有步骤T进行转换,以及最脆弱的群集内传输。也许你看到的可靠性比人们通常做的要差,但我会认真考虑将复杂部分和脆弱部分分开。
如果你这样做(或者简单地将工作分成较小的块),设计一个可能会看到它的脆弱步骤失败的解决方案应该是相当简单的,但是当发生这种情况时,它只会重试它。当然,重试将以最低成本实现,因为只需要重试一小部分工作。
总结:它可能有助于解决您的连接问题,但如果可能的话,您可能需要设计间歇性故障。