我们的代码在 databricks 运行时 10 上运行良好
request_url = https://.com/?fct=Get
response_task = requests.get(url=request_url, headers=headers)
db1 = spark.sparkContext.parallelize([response_task.text])`
df2 = spark.read.json(db1)
由于限制,我们使用运行时 13.3 时遇到此错误是否有另一种方法来序列化大容量的 [response_task.text]:
An error occurred while calling o398.createRDDFromTrustedPath. Trace: py4j.security.Py4JSecurityException: Method public org.apache.spark.api.java.JavaRDD org.apache.spark.sql.SparkSession.createRDDFromTrustedPath(java.lang.String,int) is not whitelisted on class class org.apache.spark.sql.SparkSession at py4j.security.WhitelistingPy4JSecurityManager.checkCall(WhitelistingPy4JSecurityManager.java:473) at py4j.Gateway.invoke(Gateway.java:305) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195) at py4j.ClientServerConnection.run(ClientServerConnection.java:115) at java.lang.Thread.run(Thread.java:750)
response_task = requests.get(url=request_url_gettasks, headers=headers)
response_text = response_task.text
db1 = spark.read.json([response_text])
com.databricks.rpc.UnexpectedHttpException:得到无效响应:502
此代码似乎可以工作(需要进行大量测试)
response_task = requests.get(url=request_url_gettasks, headers=headers)
response_text = response_task.text
task_data = json.loads(response_text)["results"]
schema = StructType([
StructField("PROJECT_ID", IntegerType()),
StructField("PROJECT_NAME", StringType()),
StructField("PROJECT_NUMBER", StringType()),
StructField("PROJECT_TYPE_NAME", StringType()),
....)
df2 = spark.createDataFrame(task_data, schema)
此代码似乎可以工作(需要进行大量测试)
response_task = requests.get(url=request_url_gettasks, headers=headers)
response_text = response_task.text
task_data = json.loads(response_text)["results"]
schema = StructType([
StructField("PROJECT_ID", IntegerType()),
StructField("PROJECT_NAME", StringType()),
StructField("PROJECT_NUMBER", StringType()),
StructField("PROJECT_TYPE_NAME", StringType())
)
df2 = spark.createDataFrame(task_data, schema)