使用批处理数据设计独立于数据源的应用程序

问题描述 投票:0回答:1

[我们有一个旧版应用程序,它从每个用户的mongo读取数据(根据用户请求,查询结果从小到大),我们的应用程序为每个用户创建一个文件,然后将其放置到ftp服务器/ s3。我们以mongo的形式读取数据光标并将每个批次写入文件时,它会立即获得批次数据,因此文件写入性能不错。此应用程序很好用,但绑定到mongo和mongo光标。

现在我们必须重新设计此应用程序,因为我们必须支持不同的数据源,例如mongodb,postgresDB,Kinesis,S3等。到目前为止,我们已经考虑了以下想法-

1。为每个源构建数据API并公开分页的REST响应。这是可行的解决方案,但是与当前游标响应相比,大查询数据可能会比较慢。

2。通过在kafka中馈入批处理数据并在我们的文件生成器中读取批处理数据流来构建数据抽象层。但是大多数情况下,用户要求排序的数据,因此我们需要按顺序读取消息。很大的吞吐量,还需要大量工作才能在写入文件之前组合这些数据消息。

我们正在寻找一种解决方案来替换当前的mongo游标,并使我们的文件生成器独立于数据源。

java mongodb postgresql design-patterns spring-kafka
1个回答
0
投票

因此,听起来好像您本质上是想创建一个API,在其中您可以尽可能地保持流传输的效率,就像在读取用户数据时编写文件一样。

在这种情况下,您可能想为ReadSource定义一个推入解析器API,它将把数据流式传输到WriteTarget,将数据写入您要实现的任何内容。

基本的实现可能看起来像这样:

public class UserDataRecord {
    private String data1;
    private String data2;

    public String getRecordAsString() {
        return data1 + "," + data2;
    }
}

public interface WriteTarget<Record> {
    /** Write a record to the target */
    public void writeRecord(Record record);

    /** Finish writing to the target and save everything */
    public void commit();

    /** Undo whatever was written */
    public void rollback();
}

public abstract class ReadSource<Record> {
    protected final WriteTarget<Record> writeTarget;

    public ReadSource(WriteTarget<Record> writeTarget) { this.writeTarget = writeTarget; }

    public abstract void read();
}

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class RelationalDatabaseReadSource extends ReadSource<UserDataRecord> {
    private Connection dbConnection;

    public RelationalDatabaseReadSource (WriteTarget<UserDataRecord> writeTarget, Connection dbConnection) {
        super(writeTarget);
        this.dbConnection = dbConnection;
    }

    @Override public void read() {
        // read user data from DB and encapsulate it in a record
        try (Statement statement = dbConnection.createStatement();
                ResultSet resultSet = statement.executeQuery("Select * From TABLE Order By COLUMNS");) {
            while (resultSet.next()) {
                UserDataRecord record = new UserDataRecord();
                // stream the records to the write target
                writeTarget.writeRecord(record);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
public class FileWriteTarget implements WriteTarget<UserDataRecord> {
    private File fileToWrite;
    private PrintWriter writer;

    public FileWriteTarget(File fileToWrite) throws IOException {
        this.fileToWrite = fileToWrite;
        this.writer = new PrintWriter(new FileWriter(fileToWrite));
    }

    @Override public void writeRecord(UserDataRecord record) {
        writer.println(record.getRecordAsString().getBytes(StandardCharsets.UTF_8));
    }

    @Override public void commit() {
        // write trailing records
        writer.close();
    }

    @Override
    public void rollback() {
        try { writer.close(); } catch (Exception e) { }
        fileToWrite.delete();
    }
}

这只是总体思路,需要认真改进。任何人都可以随时更新此API。

© www.soinside.com 2019 - 2024. All rights reserved.