我有一个返回可迭代的数据生成器。它从指定的日期范围获取数据。它将获取的总数据接近十亿个对象。我的目标是获取所有这些数据,将其写入文件夹(本地文件系统)并(已设置并运行)使用 pyspark readstream 读取这些文件并将其写入我的数据库(cassandra)。我的问题的范围仅限于获取数据并将其写入本地文件系统。
我正在努力:
batch == batch_size
时,创建一个 Spark Dataframe 并且,但是,我遇到的问题是分段错误(核心转储)和 java 错误连接重置。我对 PySpark 非常陌生,正在尝试自学如何正确设置它并实现我想要的工作流程。具体来说,我非常感谢关于 spark 配置 的 帮助和反馈 以及我一直遇到的主要错误:
Failed to write to data/data/polygon/trades/batch_99 on attempt 1: An error occurred while calling o1955.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 99.0 failed 1 times, most recent failure: Lost task 8.0 in stage 99.0 (TID 3176) (furkan-desktop executor driver): java.net.SocketException: Connection reset
from datetime import datetime
import logging
import time
from dotenv import load_dotenv
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
StringType,
LongType,
DoubleType,
ArrayType,
)
import uuid
from polygon import RESTClient
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
filename="spark_logs/logfile.log",
filemode="w",
)
from_date = datetime(2021, 3, 10)
to_date = datetime(2021, 3, 31)
load_dotenv()
client = RESTClient(os.getenv("POLYGON_API_KEY"))
# Create Spark session
spark = (
SparkSession.builder.appName("TradeDataProcessing")
.master("local[*]")
.config("spark.driver.memory", "16g")
.config("spark.executor.instances", "8")
.config("spark.executor.memory", "16g")
.config("spark.executor.memoryOverhead", "4g")
.config("spark.executor.cores", "4")
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "4g")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryoserializer.buffer.max", "512m")
.config("spark.network.timeout", "800s")
.config("spark.executor.heartbeatInterval", "20000ms")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.minExecutors", "1")
.config("spark.dynamicAllocation.maxExecutors", "8")
.config("spark.dynamicAllocation.initialExecutors", "4")
.getOrCreate()
)
# Define the schema corresponding to the JSON structure
schema = StructType(
[
StructField("exchange", IntegerType(), False),
StructField("id", StringType(), False),
StructField("participant_timestamp", LongType(), False),
StructField("price", DoubleType(), False),
StructField("size", DoubleType(), False),
StructField("conditions", ArrayType(IntegerType()), True),
]
)
def ensure_directory_exists(path):
"""Ensure directory exists, create if it doesn't"""
if not os.path.exists(path):
os.makedirs(path)
# Convert dates to timestamps or use them directly based on your API requirements
from_timestamp = from_date.timestamp() * 1e9 # Adjusting for nanoseconds
to_timestamp = to_date.timestamp() * 1e9
# Initialize the trades iterator with the specified parameters
trades_iterator = client.list_trades(
"X:BTC-USD",
timestamp_gte=int(from_timestamp),
timestamp_lte=int(to_timestamp),
limit=1_000,
sort="asc",
order="asc",
)
trades = []
file_index = 0
output_dir = "data/data/polygon/trades" # Output directory
ensure_directory_exists(output_dir) # Make sure the output directory exists
def robust_write(df, path, max_retries=3, retry_delay=5):
"""Attempts to write a DataFrame to a path with retries on failure."""
for attempt in range(max_retries):
try:
df.write.partitionBy("exchange").mode("append").parquet(path)
print(f"Successfully written to {path}")
return
except Exception as e:
logging.error(f"Failed to write to {path} on attempt {attempt+1}: {e}")
time.sleep(retry_delay) # Wait before retrying
logging.critical(f"Failed to write to {path} after {max_retries} attempts.")
for trade in trades_iterator:
trade_data = {
"exchange": int(trade.exchange),
"id": str(uuid.uuid4()),
"participant_timestamp": trade.participant_timestamp,
"price": float(trade.price),
"size": float(trade.size),
"conditions": trade.conditions if trade.conditions else [],
}
trades.append(trade_data)
if len(trades) == 10000:
df = spark.createDataFrame(trades, schema=schema)
file_name = f"{output_dir}/batch_{file_index}"
robust_write(df, file_name)
trades = []
file_index += 1
if trades:
df = spark.createDataFrame(trades, schema=schema)
file_name = f"{output_dir}/batch_{file_index}"
robust_write(df, file_name)
这不是一个完美的解决方案。但由于流媒体解决方案更合适,因此将其作为一个选项提供。
改编自下面的套接字示例
https://github.com/abulbasar/pyspark-examples/blob/master/structed-streaming-socket.py
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html(在此网页中搜索“socket”)
要确定处理是否完成,只需检查日志中的这一行即可。
WARN TextSocketMicroBatchStream: Stream closed by localhost:9979
需要注意的是,行数可能不准确
num_rows_per_batch
,您可以设置一个触发计时器来测量迭代器生成 10000 行需要多长时间。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType
import json
import threading
import socket
spark = SparkSession.builder \
.appName("Example") \
.getOrCreate()
schema = StructType([
StructField("column1", StringType()),
StructField("column2", StringType()),
])
def data_iterator():
for i in range(100):
yield {"column1": f"value1_{i}", "column2": f"value2_{i}"}
host_given, port_given = "localhost", 9979
def socket_server():
host = host_given
port = port_given
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen(1)
conn, addr = s.accept()
with conn:
for row in data_iterator():
data = json.dumps(row) + "\n"
conn.sendall(data.encode())
server_thread = threading.Thread(target=socket_server)
server_thread.start()
df = spark.readStream \
.format("socket") \
.option("host", host_given) \
.option("port", port_given) \
.load() \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
output_hello = "/path/to/data_output/parquet_so/"
checkpoint_hello = "/path/to/data_output/parquet_checkpoint/"
num_rows_per_batch = 20
query = df.writeStream \
.format("csv") \
.option("path", output_hello) \
.option("checkpointLocation", checkpoint_hello) \
.option("maxRowsPerFile", num_rows_per_batch) \
.start()
query.awaitTermination()