如何将 pyspark 数据帧从 hive 保存到 iginte 缓存中?

问题描述 投票:0回答:1

我想按照此博客中的方法将我的pyspark数据帧保存到ignite缓存中,但发生了一些错误。 错误信息是:

Traceback (most recent call last):
  File "test_ignite.py", line 19, in <module>
    .option("config", configFile) \
  File "/home/hdoop/.local/lib/python3.6/site-packages/pyspark/sql/readwriter.py", line 738, in save
    self._jwrite.save()
  File "/home/hdoop/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1322, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/hdoop/.local/lib/python3.6/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/hdoop/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o42.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: ignite. Please find packages at
http://spark.apache.org/third-party-projects.html
       
        at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
        at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: ignite.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
        ... 16 more

这些是我的步骤:

  1. 我执行了一个Python脚本。
    python3 test_ignite.py
  2. 创建了 SparkSession 并连接到 hive。
  3. 尝试从 hive 获取 Pyspark 数据帧。
  4. 将其保存到 ignite 缓存中。 (发生错误)

我的test_ignite.py代码是:

import os
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyignite import Client

spark = SparkSession \
.builder \
.appName("consumer_hive") \
.config("hive.metastore.uris", "thrift://localhost:9083") \
.config("spark.driver.extraClassPath", "/usr/local/ignite/libs/*jar:/usr/local/ignite/libs/optional/ignite-spark/*jar:/usr/local/ignite/libs/ignite-spring/*jar:/usr/local/ignite/libs/ignite-indexing/*jar") \
.enableHiveSupport() \
.getOrCreate()

df = spark.sql("select * from hive_table")

configFile = os.environ['IGNITE_HOME'] + "config/default-config.xml"

df.write.format("ignite") \
            .option("table", "TickData") \
            .option("config", configFile) \
            .save()

我从here发现了同样的问题,但添加spark.driver.extraClassPath后错误仍然存在。 我应该设置任何配置或设置吗,因为我从不修改 /ignite/config/default-config.xml

<?xml version="1.0" encoding="UTF-8"?>
  
<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--
        Alter configuration below as needed.
    -->
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"/>
</beans>

我的 Spark 配置:

spark-defaults.conf

spark.master spark://pc1-vm1:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://pc1-vm1:9000/user/hdoop/log
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 4g
spark.executor.memory 6g
spark.executor.cores 2
spark.dynamicAllocation.maxExecutors 2
spark.ui.enabled true


spark.driver.extraClassPath /usr/local/ignite/libs/*jar:/usr/local/ignite/libs/optional/ignite-spark/*jar:/usr/local/ignite/libs/ignite-spring/*jar:/usr/local/ignite/libs/ignite-indexing/*jar

spark-env.sh

export PYSPARK_PYTHON=python3
export SCALA_HOME=/usr/lib/scala
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_MASTER_HOST=pc1-vm1
export SPARK_WORKER_MEMORY=32G
export SPARK_DAEMON_MEMORY=4G
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
export HADOOP_CLASSPATH='hadoop classpath'
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_CLASSPATH
export HIVE_HOME=/home/hadoop/hive 
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-8.0.22.jar:$SPARK_CLASSPATH

这些是我使用的版本:

- Hadoop v2.10.1
- Spark v2.3.4
- Hive v2.1.1
- Python v3.7.5
- openjdk-8-jre
- Ignite v2.7

有人可以帮助我吗?

python apache-spark pyspark ignite
1个回答
0
投票

您需要在 executor 类路径上点燃 JAR,因此

spark.driver.extraClassPath
可能应该是
spark.executor.extraClassPath
,但您应该使用
spark-submit --packages
标志来添加
spark-ignite

或者,文档提到您可以修改

spark-env.sh
以将 Ignite 库添加到
SPARK_CLASSPATH
(您尚未完成)

https://ignite.apache.org/docs/latest/extensions-and-integrations/ignite-for-spark/installation

© www.soinside.com 2019 - 2024. All rights reserved.