运行Python代码不会连接到Spark,也不会在Cassandra中创建数据库。我已经确认服务已在 docker 上启动并且可以从 PC 访问。我将 .jar 文件放在 .env 的 pyspark jar 文件夹中。
Google 和 ChaGTP 让我失望了。
错误:
ERROR:root:Couldn't create the spark session due to exception An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
spark_stream.py:
import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
logging.basicConfig(level=logging.INFO)
def create_keyspace(session):
session.execute("""
CREATE KEYSPACE IF NOT EXISTS spark_streams
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")
print("Keyspace created successfully!")
def create_table(session):
session.execute("""
CREATE TABLE IF NOT EXISTS spark_streams.created_users (
id UUID PRIMARY KEY,
first_name TEXT,
last_name TEXT,
gender TEXT,
address TEXT,
post_code TEXT,
email TEXT,
username TEXT,
registered_date TEXT,
phone TEXT,
picture TEXT);
""")
print("Table created successfully!")
def insert_data(session, **kwargs):
print("inserting data...")
user_id = kwargs.get('id')
first_name = kwargs.get('first_name')
last_name = kwargs.get('last_name')
gender = kwargs.get('gender')
address = kwargs.get('address')
postcode = kwargs.get('post_code')
email = kwargs.get('email')
username = kwargs.get('username')
dob = kwargs.get('dob')
registered_date = kwargs.get('registered_date')
phone = kwargs.get('phone')
picture = kwargs.get('picture')
try:
session.execute("""
INSERT INTO spark_streams.created_users(id, first_name, last_name, gender, address,
post_code, email, username, dob, registered_date, phone, picture)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (user_id, first_name, last_name, gender, address,
postcode, email, username, dob, registered_date, phone, picture))
logging.info(f"Data inserted for {first_name} {last_name}")
except Exception as e:
logging.error(f'could not insert data due to {e}')
def create_spark_connection():
s_conn = None
try:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1,"
"org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1") \
.config('spark.cassandra.connection.host', 'localhost') \
.getOrCreate()
s_conn.sparkContext.setLogLevel("INFO")
logging.info("Spark connection created successfully!")
except Exception as e:
logging.error(f"Couldn't create the spark session due to exception {e}")
return s_conn
def connect_to_kafka(spark_conn):
spark_df = None
try:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'users_created') \
.option('startingOffsets', 'earliest') \
.load()
logging.info("kafka dataframe created successfully")
except Exception as e:
logging.warning(f"kafka dataframe could not be created because: {e}")
return spark_df
def create_cassandra_connection():
try:
# connecting to the cassandra cluster
cluster = Cluster(['localhost'])
return cluster.connect()
except Exception as e:
logging.error(f"Could not create cassandra connection due to {e}")
return None
def create_selection_df_from_kafka(spark_df):
schema = StructType([
StructField("id", StringType(), False),
StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("address", StringType(), False),
StructField("post_code", StringType(), False),
StructField("email", StringType(), False),
StructField("username", StringType(), False),
StructField("registered_date", StringType(), False),
StructField("phone", StringType(), False),
StructField("picture", StringType(), False)
])
sel = spark_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col('value'), schema).alias('data')).select("data.*")
print(sel)
return sel
if __name__ == "__main__":
# create spark connection
spark_conn = create_spark_connection()
if spark_conn is not None:
# connect to kafka with spark connection
spark_df = connect_to_kafka(spark_conn)
selection_df = create_selection_df_from_kafka(spark_df)
session = create_cassandra_connection()
if session is not None:
create_keyspace(session)
create_table(session)
logging.info("Streaming is being started...")
streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('checkpointLocation', '/tmp/checkpoint')
.option('keyspace', 'spark_streams')
.option('table', 'created_users')
.start())
streaming_query.awaitTermination()
docker_compose:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: ['CMD', 'bash', '-c', "echo 'ruok' | nc localhost 2181"]
interval: 10s
timeout: 5s
retries: 5
networks:
- confluent
broker:
image: confluentinc/cp-server:7.4.0
container_name: broker
hostname: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://spark-master:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- confluent
healthcheck:
test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
interval: 10s
timeout: 5s
retries: 5
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
container_name: schema-registry
hostname: schema-registry
depends_on:
broker:
condition: service_healthy
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- confluent
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8081/" ]
interval: 30s
timeout: 10s
retries: 5
control-center:
image: confluentinc/cp-enterprise-control-center:7.4.0
hostname: control-center
container_name: control-center
depends_on:
broker:
condition: service_healthy
schema-registry:
condition: service_healthy
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONFLIENT_METRICS_ENABLE: 'false'
PORT: 9021
networks:
- confluent
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:9021/health" ]
interval: 30s
timeout: 10s
retries: 5
webserver:
image: apache/airflow:2.6.0-python3.9
container_name: af-webserver
hostname: af-webserver
command: webserver
entrypoint: ['/opt/airflow/script/entrypoint.sh']
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Sequential
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
- AIRFLOW_WEBSERVER_SECRET_KEY=this_is_a_very_secured_key
logging:
options:
max-size: 10m
max-file: "3"
volumes:
- ./dags:/opt/airflow/dags
- ./script/entrypoint.sh:/opt/airflow/script/entrypoint.sh
- ./requirements.txt:/opt/airflow/requirements.txt
ports:
- "8080:8080"
healthcheck:
test: ['CMD-SHELL', "[ -f /opt/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
networks:
- confluent
scheduler:
image: apache/airflow:2.6.0-python3.9
container_name: af-scheduler
hostname: af-scheduler
depends_on:
webserver:
condition: service_healthy
volumes:
- ./dags:/opt/airflow/dags
- ./script/entrypoint.sh:/opt/airflow/script/entrypoint.sh
- ./requirements.txt:/opt/airflow/requirements.txt
env_file:
- airflow.env
environment:
- LOAD_EX=n
- EXECUTOR=Sequential
command: bash -c "pip install -r ./requirements.txt && airflow db upgrade && airflow scheduler"
networks:
- confluent
postgres:
image: postgres:14.0
container_name: af-postgres
hostname: af-postgres
env_file:
- postgres.env
environment:
- POSTGRES_DB=airflow
logging:
options:
max-size: 10m
max-file: "3"
networks:
- confluent
spark-master:
image: bitnami/spark:latest
container_name: spark-master
hostname: spark-master
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/"]
interval: 30s
timeout: 10s
retries: 5
networks:
- confluent
spark-worker:
image: bitnami/spark:latest
container_name: spark-worker
hostname: spark-worker
ports:
- "8082:8081"
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
healthcheck:
test: ["CMD", "curl", "-f", "http://172.27.0.7:8082"]
interval: 10s
timeout: 3s
retries: 3
networks:
- confluent
cassandra_db:
image: cassandra:latest
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
env_file:
- cassandra.env
environment:
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=100M
volumes:
- ./:/home
networks:
- confluent
networks:
confluent:
您无法将
spark.cassandra.connection.host
设置为 localhost
,因为对于 Spark 应用程序来说,它将是它自己的容器,而 Cassandra 在另一个容器中运行。您需要改用 Cassandra 容器名称。