每个刻度线上都有一个喷口进入Postgre数据库并读取另一行。 spout代码如下所示:
class RawDataLevelSpout extends BaseRichSpout implements Serializable {
private int counter;
SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("col1", "col2"));
}
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
}
private Connection initializeDatabaseConnection() {
try {
Class.forName("org.postgresql.Driver");
Connection connection = null;
connection = DriverManager.getConnection(
DATABASE_URI,"root", "root");
return connection;
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
@Override
public void close() {
}
@Override
public void nextTuple() {
List<String> values = new ArrayList<>();
PreparedStatement statement = null;
try {
Connection connection = initializeDatabaseConnection();
statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
statement.setInt(1, counter++);
ResultSet resultSet = statement.executeQuery();
resultSet.next();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int totalColumns = resultSetMetaData.getColumnCount();
for (int i = 1; i <= totalColumns; i++) {
String value = resultSet.getString(i);
values.add(value);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
collector.emit(new Values(values.stream().toArray(String[]::new)));
}
}
在apache风暴中如何在Spouts中处理连接池的标准方法是什么?此外,是否有可能以某种方式在集群拓扑中的多个运行实例中同步coutner变量?
关于连接池,如果需要,可以通过静态变量池连接,但由于不能保证所有的spout实例都在同一个JVM中运行,我认为没有任何意义。
不,没有办法同步计数器。 spout实例可能正在不同的JVM上运行,并且您不希望它们全部阻塞,而spout同意计数器值。我不认为你的喷口实施有意义。如果您想一次只读一行,为什么不只是运行一个spout实例而不是尝试同步多个spout?
您似乎试图将关系数据库用作队列系统,这可能不合适。考虑例如卡夫卡代替。我认为您应该能够使用https://www.confluent.io/product/connectors/或http://debezium.io/中的任何一个将数据从Postgres传输到Kafka。