`start reading data from kafka
Using Any for unsupported type: typing.Sequence[~T]
Traceback (most recent call last):
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 104, in <module>
read_from_kafka()
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\examples\python\datastream\main_sample.py", line 83, in read_from_kafka
env.execute("Kafka Streaming Job")
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\datastream\stream_execution_environment.py", line 813, in execute
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\pyflink.zip\pyflink\util\exceptions.py", line 146, in deco
File "C:\flink-1.19.0-bin-scala_2.12\flink-1.19.0\opt\python\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o9.execute.
: java.net.MalformedURLException: no protocol: ['file:/C:/flink-1.19.0-bin-scala_2.12/flink-1.19.0/opt/flink-python-1.19.0.jar']
at java.base/java.net.URL.<init>(URL.java:645)
at java.base/java.net.URL.<init>(URL.java:541)
at java.base/java.net.URL.<init>(URL.java:488)
at org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
at org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 14 more
`
我正在尝试使用 pyflink 在 flink 中执行 kafka 消费者作业。我的代码使用 python 包 pyflink 编写。
如何在窗口中运行flink仪表板。我完成了所有配置设置。字数统计简单的 python 应用程序作业已成功执行。 但是这个kafka只消耗python代码并没有在flink仪表板中执行。
请帮忙解决。
简短回答:您所需要的只是适合您的 Flink 版本的 Kafka 连接器。
你从哪里得到它? -> 简单,在 Maven Central Repo 中 (https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka)
但是等等。您正在使用 Flink 1.19 和 kafka 连接器,该版本截至 2024 年 3 月 23 日尚未发布。 (https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/)
因此,您需要使用源代码来构建它。 而且,你这样做吗?幸运的是,依赖项已经可用。
现在您可以使用以下步骤:
git clone https://github.com/apache/flink-connector-kafka.git
cd flink-connector-kafka
mvn clean package -Dflink.version=1.19.0 -DskipTests
现在如何处理这个 JAR?
pip install apache-flink==1.19.0
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
s_env = TableEnvironment.create(env_settings)
# FLINK 1.19.0
DEPS_DIR = "/Users/sourav/Documents/workspace/entropy-backend/infra-stream/lib-flink-1.19"
s_env.get_config().set(
"pipeline.jars",
f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
)
s_env.get_config().set(
"pipeline.classpaths",
f"file:///{DEPS_DIR}/flink-connector-kafka-3.1-SNAPSHOT.jar;file:///{DEPS_DIR}/kafka-clients-3.7.0.jar",
)
s_env.get_config().get_configuration().to_dict()
测试是否有效
s_env.execute_sql('DROP TABLE IF EXISTS t1')
s_env.execute_sql("""
CREATE TABLE t1 (
symbol STRING,
price FLOAT
ltt timestamp(3),
WATERMARK FOR ltt as ltt - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'kafka-host:port',
'properties.group.id' = 'MY_TEST_GROUP',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
""")