Sparklyr错误:由于阶段失败导致作业中止22.0失败1次,最近失败:丢失任务SparkException:看不见标签

问题描述 投票:1回答:1

我想从我的火花源中实现机器学习(kmeans)。我有一个有2列的表:审查和标签(正面或负面)一切似乎都很好。但是当我运行预测时,我得到了下一个错误:

SparkException:作业因阶段失败而中止:阶段22.0中的任务0失败1次,最近失败:阶段22.0中失去的任务0.0(TID 22,localhost):org.apache.spark.SparkException:看不见的标签

这是代码:

sc <- spark_connect(master = "local", version="2.0.0")

colnames(dfAllReviews3Cols) = c("ReviewText", "LabelReview")

#db_drop_table(sc, "dfallreviews3cols")
reviewsTbl <- copy_to(sc, dfAllReviews3Cols)

#List tables
src_tbls(sc)

#Select preview
reviews_preview <- dbGetQuery(sc, "SELECT * FROM dfallreviews3cols LIMIT 10")


##KMeans
partitions <- reviewsTbl %>%
  sdf_partition(training = 0.7, test = 0.3, seed = 999)

reviewsTbl_training <- partitions$training
reviewTbl_test <- partitions$test

kmeans_model <- reviewsTbl_training %>%
  ml_kmeans(ReviewText ~ .)

pred <- sdf_predict(reviewTbl_test, kmeans_model) %>% collect

这是我得到的错误:

pred < - sdf_predict(reviewTbl_test,kmeans_model)%>%collect 错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段22.0中的任务0失败1次,最近失败:阶段22.0中失去的任务0.0(TID 22,localhost):org.apache.spark.SparkException :看不见的标签:AC在我的房间没有工作当维修人员来修理它他不能然后告诉我那是冬天而人们不需要AC房间很不舒服热退房是一场噩梦我的出租车司机我正等着带我去机场两次接待告诉我,我有钱可欠,但这是不真实的,他们检查了他们的记录我在检查时有同样的问题贝尔男孩花了20多分钟把我的行李从我的房间里拿下来不推荐这家酒店。

at org.apache.spark.ml.feature.StringIndexerModel $$ anonfun $ 4.apply(StringIndexer.scala:169) at org.apache.spark.ml.feature.StringIndexerModel $$ anonfun $ 4.apply(StringIndexer.scala:165) at org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(Unknown Source) 在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:370)at org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 4.apply(SparkPlan。 scala:246)org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 4.apply(SparkPlan.scala:240)at org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 24.apply(RDD.scala:784)at org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 24.apply(RDD.scala:784)at org.apache.spark.rdd。 org.apache.spark.rdd.RD中的MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)位于org.apache.spark.rdd.RDD.iterator(RDD.scala:283)的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) .apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)atg.apache.spark.scheduler.Task.run(Task.scala:85)at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala:274)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at java.util.concurrent.ThreadPoolExecutor $ Wo java.lang.Thread.run中的rker.run(ThreadPoolExecutor.java:615)(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2187)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2187)
    at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2187)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2163)
    at sparklyr.Utils$.collect(utils.scala:200)
    at sparklyr.Utils.collect(utils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label:  AC wasn t working in my room When the repair man came to fix it he couldn t and then told me that it s winter and people don t need the AC Room was uncomfortably hot Check out was a nightmare My cab driver was waiting to take me to the airport Twice reception told me I had money to be owed however this was untrue after they checked their records I had the same problem at check in Bell boy took over 20 min to bring my bags down from my room Wouldn t recommend this hotel .
    at org.apache

我怎样才能解决这个问题?

提前致谢!

r apache-spark apache-spark-ml sparklyr
1个回答
2
投票

这真的不是要走的路。基于错误消息,ReviewText显然是一块非结构化文本。

如果你将它直接传递给ml_kmeans,它将被视为一个分类变量,通过StringIndexer传递(这是失败发生的地方 - 如果你对细节感兴趣,你可以检查spark.ml StringIndexer throws 'Unseen label' on fit(),但在实践中,这里几乎不相关)。然后结果将组装成长度等于1的向量。

你可以想象这不是一个非常有用的模型。

一般来说,你应该至少(这可能不会,也可能不会,足以在实践中取得好成绩):

  • 对原始文本进行标记。
  • 编码令牌。

Spark ML提供了一小组基本的文本转换器,包括但不限于TokenizerStopWordsRemoverNGramTF-IDF,以及第三方库(最着名的John Snow Labs’ NLP )以及Pipeline API提供的其他更高级的工具。可以用to compose these, into reusable modules。我强烈建议您在继续之前阅读每个工具的官方文档。

回到你的问题,你可以从这样的事情开始

pipeline <- ml_pipeline(
  # Tokenize the input
  ft_tokenizer(sc, input_col = "ReviewText", output_col = "raw_tokens"),

  # Remove stopwords - https://en.wikipedia.org/wiki/Stop_words
  ft_stop_words_remover(sc, input_col = "raw_tokens", output_col = "tokens"),

  # Apply TF-IDF - https://en.wikipedia.org/wiki/Tf-idf
  ft_hashing_tf(sc, input_col = "tokens", output_col = "tfs"),
  ft_idf(sc, input_col = "tfs", output_col = "features"),
  ml_kmeans(sc, features_col = "features", init_mode = "random")
)

model <- ml_fit(pipeline, reviewsTbl_training)

并调整它以适合您的特定场景。

© www.soinside.com 2019 - 2024. All rights reserved.