Siddhi 5.1,Java应用程序中的Kafka

问题描述 投票:0回答:1

我正在尝试使用Java制作一个小型应用程序,在其中我从Java应用程序中读取并将数据发送到Kafka主题。

https://siddhi-io.github.io/siddhi-io-kafka/

[我看到了一些示例,但没有看到完整的代码,带有pom的版本,等等。

我已经完成了我一直在阅读的内容,但是我遇到了一件该死的事情,我不知道该怎么做。

我将完成的工作和错误发送给您

public class SourceKafka {

    public static void main(String[] args) throws InterruptedException {

        // Create Siddhi Manager
        SiddhiManager siddhiManager = new SiddhiManager();

        // Siddhi Application
        String siddhiApp = 
                "@App:name('TestExecutionPlan') \n" + 
                "define stream FooStream (symbol string, price float, volume long); \n" + 
                "@info(name = 'query1') \n" + 
                "@sink(\n" + 
                "type='kafka',\n" + 
                "topic='topic_with_partitions',\n" + 
                "partition.no='0',\n" + 
                "bootstrap.servers='localhost:9092',\n" + 
                "@map(type='xml'))\n" + 
                "Define stream BarStream (symbol string, price float, volume long);\n" + 
                "from FooStream select symbol, price, volume insert into BarStream;";

        // Generate runtime
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        // Get InputHandler to push events into Siddhi
        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("FooStream");

        // Start processing
        siddhiAppRuntime.start();

        // Sending events to Siddhi
        inputHandler.send(new Object[] { "IBM", 700f, 100L });
        inputHandler.send(new Object[] { "WSO2", 60.5f, 200L });
        inputHandler.send(new Object[] { "GOOG", 50f, 30L });
        inputHandler.send(new Object[] { "IBM", 76.6f, 400L });
        inputHandler.send(new Object[] { "WSO2", 45.6f, 50L });
        inputHandler.send(new Object[] { "WSO2", 76.6f, 400L });
        inputHandler.send(new Object[] { "WSO2", 45.6f, 50L });
        Thread.sleep(500);

        // Shutdown runtime
        siddhiAppRuntime.shutdown();

        // Shutdown Siddhi Manager
        siddhiManager.shutdown();

    }
}

POM

    <groupId>io.siddhi</groupId>
    <artifactId>siddhi-sample</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-core</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-query-api</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-query-compiler</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-annotations</artifactId>
            <version>${siddhi.version}</version>
        </dependency>

        <dependency>
            <groupId>io.siddhi.extension.io.kafka</groupId>
            <artifactId>siddhi-io-kafka</artifactId>
            <version>5.0.7</version>
        </dependency>

        <dependency>
            <groupId>io.siddhi.extension.map.xml</groupId>
            <artifactId>siddhi-map-xml</artifactId>
            <version>5.0.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.siddhi.extension.map.json</groupId>
            <artifactId>siddhi-map-json</artifactId>
            <version>5.0.6</version>
            <scope>test</scope>
        </dependency>


        <!--to run the test -->
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>${testng.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <properties>
        <!--Make sure to update to the correct Siddhi version -->
        <siddhi.version>5.1.11</siddhi.version>
        <testng.version>6.14.3</testng.version>
    </properties>

错误:


Exception in thread "main" io.siddhi.core.exception.SiddhiAppCreationException: Error on 'TestExecutionPlan' @ Line: 9. Position: 17, near '@sink(
type='kafka',
topic='topic_with_partitions',
partition.no='0',
bootstrap.servers='localhost:9092',
@map(type='xml'))'. No extension exist for sinkMapper:xml
    at io.siddhi.core.util.SiddhiClassLoader.loadExtensionImplementation(SiddhiClassLoader.java:45)
    at io.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventSink(DefinitionParserHelper.java:501)
    at io.siddhi.core.util.SiddhiAppRuntimeBuilder.defineStream(SiddhiAppRuntimeBuilder.java:119)
    at io.siddhi.core.util.parser.SiddhiAppParser.defineStreamDefinitions(SiddhiAppParser.java:374)
    at io.siddhi.core.util.parser.SiddhiAppParser.parse(SiddhiAppParser.java:230)
    at io.siddhi.core.SiddhiManager.createSiddhiAppRuntime(SiddhiManager.java:85)
    at io.siddhi.core.SiddhiManager.createSiddhiAppRuntime(SiddhiManager.java:95)
    at io.siddhi.sample.SourceKafka.main(SourceKafka.java:54)

任何人都可以提供我的代码帮助,或者给我一些使用Siddhi(5.1)并在Kafka主题中编写的完整示例吗?

siddhi
1个回答
0
投票

这意味着您的类路径中没有XML映射器实现jar。确保您有

© www.soinside.com 2019 - 2024. All rights reserved.