Spark如何读取分布式文件

问题描述 投票:0回答:2

Fasta 文件是一个多行文件。它的结构就像

>ID_1
ACGTAGCATGC
>ID_2
AGCTAGTACATC

因此,为了获取序列,我必须读取 2 行中的 1 行。

我有多个大 fasta 文件(每个文件 120Go)需要读取。我使用spark来读取这些文件。我目前使用它来获取数据帧上的所有序列:

val sequences = sc.textFile("path/to/directory").sliding(2, 2).map{case Array(id, seq) => seq}

此命令是否允许获取所有序列并知道文件分布在 Spark 集群上?

apache-spark fasta
2个回答
2
投票

您可以尝试使用 FASTdoop (https://github.com/umbfer/fastdoop),其中实现了 FASTA 和 FASTQ 文件的读取器。

这是代码示例:

SparkSession spark = SparkSession.builder().master("local[*]").appName("FASTdoop Test Short").getOrCreate();    
SparkContext sc = spark.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);

Configuration inputConf = jsc.hadoopConfiguration();
inputConf.setInt("look_ahead_buffer_size", 4096);
String inputPath = "data/short.fasta";

JavaPairRDD<Text, Record> dSequences2 = jsc.newAPIHadoopFile(inputPath, 
        FASTAshortInputFileFormat.class, Text.class, Record.class, inputConf);

/* We drop the keys of the new RDD since they are not used */
JavaRDD<Record> dSequences = dSequences2.values();

for (Record sequence : dSequences.collect()) {
    System.out.println("ID: " + sequence.getKey());
    System.out.println("Sequence: " + sequence.getValue());
}

您可以在“spark_support”分支的自述文件中找到更多详细信息。


1
投票

由于每条记录都以 '>' 开头,您可以尝试将行分隔符更改为 '>' (' ' 默认情况下)使用这行代码:

sc.hadoopConfiguration.set("textinputformat.record.delimiter",">")

那么这应该可以帮助你:

sc.textFile("...")
    .filter(_ != "")
    .map(_.split("\n")(1))

请注意,过滤器仅用于删除生成的第一个空记录,因为文件以“>”开头。

© www.soinside.com 2019 - 2024. All rights reserved.