无法使用 vertx 和 JDBC 连接器从小型 kafka 消费者应用程序中执行 Azure SQL Server 查询

问题描述 投票:0回答:0

我正在尝试使用 java Kafka 消费者应用程序将数据插入 azure sql 服务器。我已确认与数据库的连接。

这里是方法。

private void poll(KafkaConsumer<String, JsonObject> consumer) {
    vertx.setPeriodic(TIME_OUT_MS,
        timerId -> consumer.poll(Duration.ofMillis(POLL_MS)).onSuccess(records -> {
          for (int i = 0; i < records.size(); i++) {
            KafkaConsumerRecord<String, JsonObject> record = records.recordAt(i);
            System.out.println(
                "key=" + record.key() + ",value=" + record.value() + ",partition=" +
                    record.partition() + ",timestamp=" + record.timestamp() + ",offset=" + record.offset());

            Weather weather = toParams(record);
            JsonObject datasourceConfig = PropertiesHelper.getDatasourceProperties();
            JDBCPool pool = JDBCPool.pool(vertx, datasourceConfig);
            String query = "INSERT INTO fowudatabase.dbo.weather (captureTime, waveHeight, wavePeriod, waveDirection, windSpeed, windDirection) values (?, ?, ?, ?, ?, ?)";
            pool
                .getConnection()
                .onFailure(e -> {
                  System.out.println("failed to get a connection: " + e.toString());
                })
                .onSuccess(conn -> {
                  conn
                      .preparedQuery(query)
                      .execute(Tuple.of(weather.getCaptureTime(), weather.getWaveHeight(), weather.getWavePeriod(),
                          weather.getWaveDirection(), weather.getWindSpeed(), weather.getWindDirection()))
                      .onFailure(e -> {
                        System.out.println("failed to execute query: " + e.toString());
                        conn.close();
                      })
                      .onSuccess(rows -> {
                        System.out.println("successfully added row " + weather.getCaptureTime());
                        conn.close();
                      });
                });
          }
        }).onFailure(cause -> {
          System.out.println(
              "Something went wrong when polling " + cause.toString());
          cause.printStackTrace();

          vertx.cancelTimer(timerId);
        }));
  }

我认为问题在于

String query = "INSERT INTO fowudatabase.dbo.weather (captureTime, waveHeight, wavePeriod, waveDirection, windSpeed, windDirection) values (?, ?, ?, ?, ?, ?)";

查询行,但我不确定接下来要尝试什么。

我的问题是,无论我如何表达我的 sql 查询字符串,我都会遇到两个错误之一。如果我在查询中使用完整的 databasename.dbo.tablename,我会得到“SQL Server 版本不支持对数据库和/或服务器名称的引用”。但是,如果我使用表名和/或 dbo 的任何变体,我就会收到“无效对象”错误。我想我已经尝试了大多数名字的变体,但到目前为止都没有成功。

如有任何帮助,我们将不胜感激。

apache-kafka azure-sql-database vert.x
© www.soinside.com 2019 - 2024. All rights reserved.