我正在尝试使用 java Kafka 消费者应用程序将数据插入 azure sql 服务器。我已确认与数据库的连接。
这里是方法。
private void poll(KafkaConsumer<String, JsonObject> consumer) {
vertx.setPeriodic(TIME_OUT_MS,
timerId -> consumer.poll(Duration.ofMillis(POLL_MS)).onSuccess(records -> {
for (int i = 0; i < records.size(); i++) {
KafkaConsumerRecord<String, JsonObject> record = records.recordAt(i);
System.out.println(
"key=" + record.key() + ",value=" + record.value() + ",partition=" +
record.partition() + ",timestamp=" + record.timestamp() + ",offset=" + record.offset());
Weather weather = toParams(record);
JsonObject datasourceConfig = PropertiesHelper.getDatasourceProperties();
JDBCPool pool = JDBCPool.pool(vertx, datasourceConfig);
String query = "INSERT INTO fowudatabase.dbo.weather (captureTime, waveHeight, wavePeriod, waveDirection, windSpeed, windDirection) values (?, ?, ?, ?, ?, ?)";
pool
.getConnection()
.onFailure(e -> {
System.out.println("failed to get a connection: " + e.toString());
})
.onSuccess(conn -> {
conn
.preparedQuery(query)
.execute(Tuple.of(weather.getCaptureTime(), weather.getWaveHeight(), weather.getWavePeriod(),
weather.getWaveDirection(), weather.getWindSpeed(), weather.getWindDirection()))
.onFailure(e -> {
System.out.println("failed to execute query: " + e.toString());
conn.close();
})
.onSuccess(rows -> {
System.out.println("successfully added row " + weather.getCaptureTime());
conn.close();
});
});
}
}).onFailure(cause -> {
System.out.println(
"Something went wrong when polling " + cause.toString());
cause.printStackTrace();
vertx.cancelTimer(timerId);
}));
}
我认为问题在于
String query = "INSERT INTO fowudatabase.dbo.weather (captureTime, waveHeight, wavePeriod, waveDirection, windSpeed, windDirection) values (?, ?, ?, ?, ?, ?)";
查询行,但我不确定接下来要尝试什么。
我的问题是,无论我如何表达我的 sql 查询字符串,我都会遇到两个错误之一。如果我在查询中使用完整的 databasename.dbo.tablename,我会得到“SQL Server 版本不支持对数据库和/或服务器名称的引用”。但是,如果我使用表名和/或 dbo 的任何变体,我就会收到“无效对象”错误。我想我已经尝试了大多数名字的变体,但到目前为止都没有成功。
如有任何帮助,我们将不胜感激。