Apache Storm - 从SPOUT访问数据库 - 连接池

问题描述 投票:1回答:1

每个刻度线上都有一个喷口进入Postgre数据库并读取另一行。 spout代码如下所示:

class RawDataLevelSpout extends BaseRichSpout implements Serializable {


private int counter;

SpoutOutputCollector collector;


@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("col1", "col2"));
}

@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
    collector = spoutOutputCollector;
}

private Connection initializeDatabaseConnection() {

    try {
        Class.forName("org.postgresql.Driver");
        Connection connection = null;
        connection = DriverManager.getConnection(
                DATABASE_URI,"root", "root");
        return connection;
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

@Override
public void close() {

}

@Override
public void nextTuple() {
    List<String> values = new ArrayList<>();

    PreparedStatement statement = null;
    try {
        Connection connection = initializeDatabaseConnection();
        statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
        statement.setInt(1, counter++);
        ResultSet resultSet = statement.executeQuery();
        resultSet.next();
        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
        int totalColumns = resultSetMetaData.getColumnCount();
        for (int i = 1; i <= totalColumns; i++) {
            String value = resultSet.getString(i);
            values.add(value);
        }


        connection.close();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    collector.emit(new Values(values.stream().toArray(String[]::new)));
}

}

在apache风暴中如何在Spouts中处理连接池的标准方法是什么?此外,是否有可能以某种方式在集群拓扑中的多个运行实例中同步coutner变量?

database connection-pooling apache-storm
1个回答
1
投票

关于连接池,如果需要,可以通过静态变量池连接,但由于不能保证所有的spout实例都在同一个JVM中运行,我认为没有任何意义。

不,没有办法同步计数器。 spout实例可能正在不同的JVM上运行,并且您不希望它们全部阻塞,而spout同意计数器值。我不认为你的喷口实施有意义。如果您想一次只读一行,为什么不只是运行一个spout实例而不是尝试同步多个spout?

您似乎试图将关系数据库用作队列系统,这可能不合适。考虑例如卡夫卡代替。我认为您应该能够使用https://www.confluent.io/product/connectors/http://debezium.io/中的任何一个将数据从Postgres传输到Kafka。

© www.soinside.com 2019 - 2024. All rights reserved.