是否可以使用spark流媒体来流式传输数据库表数据?

问题描述 投票:0回答:1

试图流SQLServer表数据。所以,创建了一个简单的java程序与main类。创建了一个sparkconf,并使用它,发起了一个JavaStreamingContext,并从中获取SparkContext。使用JdbcRDD和JavaRDD的Spark APIs从数据库中获取数据,并发起一个输入队列,然后准备JavaInputDStream。于是完成了先决条件,并启动了JavaStreamingContext。所以,我得到的第一组数据,我收到的同时,准备一个inputQueue,但没有得到进一步的流的数据。

package com.ApacheSparkConnection.ApacheSparkConnection;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import com.infosys.himi.maskit.algorithms.encryptiondecryption.EncryptionARC4;
import com.infosys.maskit.common.util.ConfigParams;

import scala.Tuple2;
import scala.reflect.ClassManifestFactory$;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;

public class MainSparkConnector {

    public static void main(String[] args) throws Exception {

        String dbtableQuery = "SELECT TOP 10 AGENT_CODE,AGENT_NAME,WORKING_AREA,COMMISSION,PHONE_NO,COUNTRY FROM dbo.AGENTS where AGENT_CODE >= ? and AGENT_CODE <= ?";

        String host = "XXXXXXXXX";
        String databaseName = "YYYY";
        String user = "sa";
        String password = "XXXXXX@123";

        long previewSize = 0; 

        Instant start = Instant.now();

        SparkConf sparkConf = new SparkConf().setAppName("SparkJdbcDs")
                .setMaster("local[4]")
                .set("spark.driver.allowMultipleContexts", "true");

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
        JavaSparkContext javaSparkContext  =  javaStreamingContext.sparkContext();
        SparkContext sparkContext = javaSparkContext.sc(); 

        String url = "jdbc:sqlserver://" + host + ":1433;databaseName=" + databaseName;
        String driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; 

        DbConnection dbConnection = new DbConnection(driver, url, user, password);

        JdbcRDD<Object[]> jdbcRDD =
                new JdbcRDD<Object[]>(sparkContext, dbConnection, dbtableQuery, 0,
                              100000, 10, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(Object[].class));

        JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));

        List<String> employeeFullNameList = javaRDD.map(new Function<Object[], String>() {
            @Override
            public String call(final Object[] record) throws Exception {
                String rec = "";
                for(Object ob : record) {
                    rec = rec + " " + ob;
                }
                return rec;
            }
        }).collect();

        JavaRDD<String> javaRDD1 = javaStreamingContext.sparkContext().parallelize(employeeFullNameList);
        Queue<JavaRDD<String>> inputQueue = new LinkedList<JavaRDD<String>>();

        inputQueue.add(javaRDD1);

        JavaInputDStream<String> javaDStream = javaStreamingContext.queueStream(inputQueue, true);
        System.out.println("javaDStream.print()");
        javaDStream.print();
        javaDStream.foreachRDD( rdd-> {
            System.out.println("rdd.count() : "+ rdd.count());
            rdd.collect().stream().forEach(n-> System.out.println("item of list: "+n));
        });
        javaStreamingContext.start();

        System.out.println("employeeFullNameList.size() : "+employeeFullNameList.size());

        javaStreamingContext.awaitTermination();
    }

    static class DbConnection extends AbstractFunction0<Connection> implements Serializable {

        private String driverClassName;
        private String connectionUrl;
        private String userName;
        private String password;

        public DbConnection(String driverClassName, String connectionUrl, String userName, String password) {
            this.driverClassName = driverClassName;
            this.connectionUrl = connectionUrl;
            this.userName = userName;
            this.password = password;
        }

        public Connection apply() {
            try {
                Class.forName(driverClassName);
            } catch (ClassNotFoundException e) {
                System.out.println("Failed to load driver class" +e);
            }

            Properties properties = new Properties();
            properties.setProperty("user", userName);
            properties.setProperty("password", password);

            Connection connection = null;
            try {
                connection = DriverManager.getConnection(connectionUrl, properties);
            } catch (SQLException e) {
                System.out.println("Connection failed"+ e);
            }

            return connection;
        }
    }

    static class MapResult extends AbstractFunction1<ResultSet, Object[]> implements Serializable {

        public Object[] apply(ResultSet row) {
            return JdbcRDD.resultSetToObjectArray(row);
        }
    }
}````
Please let me know if am in wrong direction
java apache-spark apache-spark-sql spark-streaming
1个回答
0
投票

流RDBMS的snaoshot的初始数据通过Spark Streaming是很容易的,但有没有直接的方式来获得在DB发生的尾部变化。

更好的解决方案是通过一个 Debezium SQL Server连接器

Debezium的SQL Server Connector可以监控和记录SQL Server数据库模式中的行级变化。

  • 你需要设置一个Kafka集群。
  • 为SQL服务器启用CDC

SQL Server CDC不是为了存储数据库变化的完整历史而设计的。因此,Debezium有必要建立当前数据库内容的基线,并将其流向Kafka。这是通过一个称为快照的过程实现的。

默认情况下(快照模式初始化),连接器将在第一次启动时执行数据库的初始一致快照(意味着根据连接器的过滤器配置捕获任何表内的结构和数据)。

每个快照包括以下步骤。

Determine the tables to be captured

Obtain a lock on each of the monitored tables to ensure that no structural changes can occur to any of the tables. The level of the lock is determined by snapshot.isolation.mode configuration option.

Read the maximum LSN ("log sequence number") position in the server’s transaction log.

Capture the structure of all relevant tables.

Optionally release the locks obtained in step 2, i.e. the locks are held usually only for a short period of time.

Scan all of the relevant database tables and schemas as valid at the LSN position read in step 3, and generate a READ event for each row and write that event to the appropriate table-specific Kafka topic.

Record the successful completion of the snapshot in the connector offsets.

读取变更数据表

第一次启动时,连接器会对捕获的表的结构进行结构快照,并将此信息持久化在其内部数据库历史主题中。然后,连接器为每个源表确定一个变更表,并执行主循环。

For each change table read all changes that were created between last stored maximum LSN and current maximum LSN

Order the read changes incrementally according to commit LSN and change LSN. This ensures that the changes are replayed by Debezium in the same order as were made to the database.

Pass commit and change LSNs as offsets to Kafka Connect.

Store the maximum LSN and repeat the loop.

重新启动后,连接器将从之前离开的偏移(提交和更改LSN)处恢复。

连接器能够在运行时检测白名单源表的CDC是启用还是禁用,并修改其行为。

SQL Server连接器将单个表的所有插入、更新和删除操作的事件写入一个单一的Kafka主题。Kafka主题的名称总是采用serverName.schemaName.tableName的形式,其中serverName是连接器的逻辑名称,由database.server.name配置属性指定,schemaName是发生操作的schema的名称,tableName是发生操作的数据库表的名称。

例如,考虑一个SQL Server安装,其库存数据库包含四个表。products, products_on_hand, customers, and orders 在模式中 dbo. 如果监控这个数据库的连接器被赋予了一个逻辑服务器的履行名,那么连接器就会产生这四个Kafka主题的事件。

    fulfillment.dbo.products
    fulfillment.dbo.products_on_hand
    fulfillment.dbo.customers

    fulfillment.dbo.orders
© www.soinside.com 2019 - 2024. All rights reserved.