使用 PySpark 处理来自生成器的数据并将其写入镶木地板的正确方法?

问题描述 投票:0回答:1

我有一个返回可迭代的数据生成器。它从指定的日期范围获取数据。它将获取的总数据接近十亿个对象。我的目标是获取所有这些数据,将其写入文件夹(本地文件系统)并(已设置并运行)使用 pyspark readstream 读取这些文件并将其写入我的数据库(cassandra)。我的问题的范围仅限于获取数据并将其写入本地文件系统。

我正在努力:

  1. 使用生成器获取数据。
  2. 积累一批数据
  3. batch == batch_size
    时,创建一个 Spark Dataframe 并且,
  4. 将此 Dataframe 写入 .parquet 格式。

但是,我遇到的问题是分段错误(核心转储)和 java 错误连接重置。我对 PySpark 非常陌生,正在尝试自学如何正确设置它并实现我想要的工作流程。具体来说,我非常感谢关于 spark 配置 帮助和反馈 以及我一直遇到的主要错误:

Failed to write to data/data/polygon/trades/batch_99 on attempt 1: An error occurred while calling o1955.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 99.0 failed 1 times, most recent failure: Lost task 8.0 in stage 99.0 (TID 3176) (furkan-desktop executor driver): java.net.SocketException: Connection reset

这是 Spark UI 的屏幕截图:

sparkUIScreenshot

当前实施:

from datetime import datetime
import logging
import time
from dotenv import load_dotenv
import pandas as pd
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    LongType,
    DoubleType,
    ArrayType,
)
import uuid
from polygon import RESTClient

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filename="spark_logs/logfile.log",
    filemode="w",
)
from_date = datetime(2021, 3, 10)
to_date = datetime(2021, 3, 31)
load_dotenv()
client = RESTClient(os.getenv("POLYGON_API_KEY"))

# Create Spark session
spark = (
    SparkSession.builder.appName("TradeDataProcessing")
    .master("local[*]")
    .config("spark.driver.memory", "16g")
    .config("spark.executor.instances", "8")
    .config("spark.executor.memory", "16g")
    .config("spark.executor.memoryOverhead", "4g")
    .config("spark.executor.cores", "4")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "4g")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryoserializer.buffer.max", "512m")
    .config("spark.network.timeout", "800s")
    .config("spark.executor.heartbeatInterval", "20000ms")
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "1")
    .config("spark.dynamicAllocation.maxExecutors", "8")
    .config("spark.dynamicAllocation.initialExecutors", "4")
    .getOrCreate()
)

# Define the schema corresponding to the JSON structure
schema = StructType(
    [
        StructField("exchange", IntegerType(), False),
        StructField("id", StringType(), False),
        StructField("participant_timestamp", LongType(), False),
        StructField("price", DoubleType(), False),
        StructField("size", DoubleType(), False),
        StructField("conditions", ArrayType(IntegerType()), True),
    ]
)


def ensure_directory_exists(path):
    """Ensure directory exists, create if it doesn't"""
    if not os.path.exists(path):
        os.makedirs(path)


# Convert dates to timestamps or use them directly based on your API requirements
from_timestamp = from_date.timestamp() * 1e9  # Adjusting for nanoseconds
to_timestamp = to_date.timestamp() * 1e9

# Initialize the trades iterator with the specified parameters
trades_iterator = client.list_trades(
    "X:BTC-USD",
    timestamp_gte=int(from_timestamp),
    timestamp_lte=int(to_timestamp),
    limit=1_000,
    sort="asc",
    order="asc",
)

trades = []
file_index = 0
output_dir = "data/data/polygon/trades"  # Output directory
ensure_directory_exists(output_dir)  # Make sure the output directory exists


def robust_write(df, path, max_retries=3, retry_delay=5):
    """Attempts to write a DataFrame to a path with retries on failure."""
    for attempt in range(max_retries):
        try:
            df.write.partitionBy("exchange").mode("append").parquet(path)
            print(f"Successfully written to {path}")
            return
        except Exception as e:
            logging.error(f"Failed to write to {path} on attempt {attempt+1}: {e}")
            time.sleep(retry_delay)  # Wait before retrying
    logging.critical(f"Failed to write to {path} after {max_retries} attempts.")


for trade in trades_iterator:
    trade_data = {
        "exchange": int(trade.exchange),
        "id": str(uuid.uuid4()),
        "participant_timestamp": trade.participant_timestamp,
        "price": float(trade.price),
        "size": float(trade.size),
        "conditions": trade.conditions if trade.conditions else [],
    }
    trades.append(trade_data)

    if len(trades) == 10000:
        df = spark.createDataFrame(trades, schema=schema)
        file_name = f"{output_dir}/batch_{file_index}"
        robust_write(df, file_name)

        trades = []
        file_index += 1

if trades:
    df = spark.createDataFrame(trades, schema=schema)
    file_name = f"{output_dir}/batch_{file_index}"
    robust_write(df, file_name)
python apache-spark pyspark
1个回答
0
投票

这不是一个完美的解决方案。但由于流媒体解决方案更合适,因此将其作为一个选项提供。

改编自下面的套接字示例

https://github.com/abulbasar/pyspark-examples/blob/master/structed-streaming-socket.py

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html(在此网页中搜索“socket”)

要确定处理是否完成,只需检查日志中的这一行即可。

WARN TextSocketMicroBatchStream: Stream closed by localhost:9979

需要注意的是,行数可能不准确

num_rows_per_batch
,您可以设置一个触发计时器来测量迭代器生成 10000 行需要多长时间。

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.trigger.html

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType
import json
import threading
import socket

spark = SparkSession.builder \
    .appName("Example") \
    .getOrCreate()

schema = StructType([
    StructField("column1", StringType()),
    StructField("column2", StringType()),
])


def data_iterator():
    for i in range(100):
        yield {"column1": f"value1_{i}", "column2": f"value2_{i}"}


host_given, port_given = "localhost", 9979


def socket_server():
    host = host_given
    port = port_given
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((host, port))
        s.listen(1)
        conn, addr = s.accept()
        with conn:
            for row in data_iterator():
                data = json.dumps(row) + "\n"
                conn.sendall(data.encode())





server_thread = threading.Thread(target=socket_server)
server_thread.start()

df = spark.readStream \
    .format("socket") \
    .option("host", host_given) \
    .option("port", port_given) \
    .load() \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

output_hello = "/path/to/data_output/parquet_so/"
checkpoint_hello = "/path/to/data_output/parquet_checkpoint/"

num_rows_per_batch = 20

query = df.writeStream \
    .format("csv") \
    .option("path", output_hello) \
    .option("checkpointLocation", checkpoint_hello) \
    .option("maxRowsPerFile", num_rows_per_batch) \
    .start()



query.awaitTermination()
© www.soinside.com 2019 - 2024. All rights reserved.