rdd 相关问题

弹性分布式数据集(RDD)是一种分布式内存抽象,允许程序员在大型集群上执行内存计算,同时保留MapReduce等数据流模型的容错能力。

处理每一行以获取日期

我有一个使用year和mon01,mon02提取月份的文件,它们分别使用columname中的最后两个字符(即MON01-01)在各个月中的文本值长度(MON01,MON02 ..)与...的数目相同。

回答 2 投票 -2

比较两个键值对之间的值时,Spark Scala匹配错误

我的目标是读取2个文件,过滤掉停用词,找到常用词并选择在这两个文件之间较小的词数。之后,我应该按降序对键值对进行排序...

回答 1 投票 0

Spark RDD-CountByValue-地图类型-按键顺序

从spark RDD-countByValue返回Map数据类型,并想按键的升/降排序。 val s = flightObjectRDD.map(_。dep_delay / 60 toInt).countByValue()// RDD类型为action,并且...

回答 1 投票 0

如何在Pyspark中设置KryoSerializer?

我是Pyspark的新手,请为我提供帮助:spark = SparkSession.builder.appName(“ FlightDelayRDD”)。master(“ local [*]”)。getOrCreate()sc = spark.sparkContext sc.setSystemProperty(“火花......

回答 1 投票 0

在Spark Scala中读取大文件时数据丢失

val数据= spark.read .text(filepath).toDF(“ val”).withColumn(“ id”,monotonically_increasing_id())val计数= data.count()val标头= data.where(“ id == 1“)。collect()。map(s => s ....

回答 1 投票 -1

为此任务编写自定义AccumulatorParam的正确方法是什么?

上下文:在Azure Databricks,Python编程语言,Spark环境中工作。我有一个rdd,并创建了一个地图操作。 rdd = sc.parallelize(my_collection)映射器= rdd.map(lambda val:...

回答 1 投票 0

如果我使用reduceByKey或groupByKey一个只有两个键的大型数据集会发生什么

我正在使用spark处理我的数据。我已将分布在多个执行器上的RDD配对。数据大小为10tb,分区数为4000。执行程序的总数为100,并且...

回答 1 投票 2

如何对随机的RDD元素对执行操作?

我想对RDD元素的随机对执行操作,以便从较大的数中取1并将其加到较小的数上。例如,让它成为我们的随机对:(23,-52),(3,2),(5,-2),(29,0)。之后...

回答 1 投票 1

如何计算pyspark中每行的单词数

我尝试过:rdd1 = sc.parallelize([“让我们玩得开心。”,“要获得乐趣,您不需要任何计划。”])output = rdd1.map(lambda t:t.split(“ “))。map(lambda list:(lists,len(lists)))输出....

回答 1 投票 1

将MapPartitionsRDD转换为DataFrame并通过2个键将数据分组

我有一个看起来像这样的数据框:国家|用户|数----------------------德国|莎拉| 2中国|保罗| 1德国|艾伦| 3德国|保罗| 1 ...我...]]]

回答 1 投票 1

在Spark中强制立即缓存的最有效方法是什么?

出于计时目的,我需要在执行功能之前强制执行缓存。我最初的方法是使用count()操作,因为这将跨所有分区缓存RDD,这与take()不同,但是...

回答 1 投票 0

如何使用pyspark将DataFrame的每一行另存为HDFS文件

我有一个以ctrl A分隔的文件,并且具有以下标头:filename','file_metadata','data_content','status','errortype','error_message',我需要将每个文件分别转储至hdfs。 ..

回答 1 投票 0

Spark-Straming中的DStream到Rdd

我有一个DStream [String,String],我需要将其转换为RDD [String,String]。有什么办法吗?我需要使用Scala语言。在此先感谢!

回答 1 投票 0

仅从Spark Scala RDD格式的Cassandra表中检索特定属性数据

我仅在使用Spark Scala RDD格式进行查询时遇到问题,这意味着您如何检索特定列的特定数据或如何仅以Scala RDD格式检索特定列的数据。

回答 1 投票 -1

在Spark Scala中将行转换为多行

我有一个问题,我需要将一行转换为多行。这是基于我具有的另一个映射。我试图在下面提供一个例子。假设我有一个实木复合地板文件,其中有...

回答 1 投票 -1

在决策树分类器上拟合RDD数据时出错

#load数据集df = spark.sql(“ select * from ws_var_dataset2”)def labelData(data):#label:row [end],功能:row [0:end-1]返回data.map(lambda行: LabeledPoint(row [-1],row [:-1]))...

回答 1 投票 0

如何从Spark-shell的Array [Array [String]]的RDD中提取字符串值?

我有一个如下数组:Array [Array [String]] = Array(Array(1,1,1,300,item1),Array(2,1,2,300,item2),Array(3,1,2,300,item3 ),Array(4,2,3,100,item4),Array(5,1,3,300,item5))我要提取((...

回答 1 投票 -2

Scala:用于在元组中找到最大值的reduceByKey

我的简单rdd看起来像scala> gp_by_cls.collect res82:Array [[Int,(Int,Int))] = Array((3,(1,26)),(1,(0,37)), (3,(0,77)),(1,(2,914)),(2,(1,13)),(1,(1,25)),(2,(2,893)),(3,( 2,1661)),(2,...

回答 2 投票 0

Scala,将列表映射到稀疏向量

我的RDD为Array [(String,Iterable [(Int,Double)])],它的元素看起来像(000267537-01,List((25,0.01),(35,120.0),(26,2.0), (38,130.0),(21,45.0),(54,180.0),(39,10.0)))现在带有...

回答 1 投票 -1

pyspark rdd split issue

我正在尝试从rdd进行过滤,其值为“ 01-10-2019” print(“ \ n ###删除合并的RDD中的重复项:”)insuredata = insuredatamerged_cache.distinct()print(“ insuredata:” ,type(...

回答 1 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.