我正在使用Cloudera Enterprise 6.1.0版本,并且在使用SparkRunner在HDFS上读写任何文件时,apache beam 2.11 SDK会遇到此问题。但是,随着火花的产生,它正在起作用。
此问题是在将Cloudera版本从5.14.0升级到6.1.0]之后出现的>在旧版本中,使用以下代码可以正常工作。
import java.io.File; import java.io.IOException; import java.sql.ResultSet; import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions; public class Test { public static void main(String[] args) { //HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class); SparkConf sparkConf = new SparkConf() .setMaster("yarn") .set("spark.submit.deployMode", "client") .set("spark.driver.memory", "4g") .set("spark.executor.cores", "5") .set("spark.executor.instances", "30") .set("spark.executor.memory","8g"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); jsc.setLogLevel("ERROR"); SparkContextOptions options = PipelineOptionsFactory.create().as(SparkContextOptions.class); options.setRunner(SparkRunner.class); options.setUsesProvidedSparkContext(true); options.setProvidedSparkContext(jsc); Pipeline p = Pipeline.create(options); Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://host1:8020"); UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab("[email protected]", "/opt/app/kerbfiles/test.keytab"); if(UserGroupInformation.isLoginKeytabBased()){ UserGroupInformation.getLoginUser().reloginFromKeytab(); }else if(UserGroupInformation.isLoginTicketBased()){ UserGroupInformation.getLoginUser().reloginFromTicketCache(); } }catch (IOException e1) { e1.printStackTrace(); } System.out.println("*******************************8"); p.apply("ReadLines", TextIO.read().from("hdfs://host1:8020/hdfsdata/input/Reg_Employee.txt")) .apply("WriteCounts", TextIO.write().to("hdfs://host1:8020/tmp/test")); p.run().waitUntilFinish(); }
以下为例外:
Exception in thread "main" java.lang.IllegalArgumentException: No filesystem found for scheme hdfs at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526) at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:219) at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:700) at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1027) at Test.main(Test.java:91) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
在此问题上需要帮助。
谢谢
Nikhil
我正在使用Cloudera Enterprise 6.1.0版本,并且在使用SparkRunner读取或写入HDFS上的任何文件时,使用Apache Beam 2.11 SDK会遇到此问题。但是,有了火花,它正在起作用。这个问题...
可能有几个原因: