在 Confluence Kafka 的 Flex 模板映像中获取信任库文件

问题描述 投票:0回答:2

我们尝试将 truststore.jks 文件存储在 Flex Template Docker 中,但在管道中使用它时我们无法找到它。

我们尝试拉取映像,我们可以看到该文件存在于 mp rust.jks 的 docker 中,但在使用相同的文件时,我们收到以下错误。

Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:67)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:134)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 42 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:154)
... 46 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:155)
... 49 more
Caused by: java.nio.file.NoSuchFileException: /tmp/trust.jks
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
java.base/java.nio.file.Files.newByteChannel(Files.java:371)
java.base/java.nio.file.Files.newByteChannel(Files.java:422)
java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
java.base/java.nio.file.Files.newInputStream(Files.java:156)
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
... 50 more

这是我们使用的 docker 镜像

FROM gcr.io/dataflow-templates-base/python38-template-launcher-base
RUN apt-get update && \
    apt-get install -y openjdk-11-jdk

ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64


ARG WORKDIR=/dataflow/python/using_flex_template_adv3
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY . .
COPY trust.jks /tmp/trust.jks

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
ENV PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
RUN apt-get update \
    && apt-get install -y libffi-dev git \
    && rm -rf /var/lib/apt/lists/* \
    # Upgrade pip and install the requirements.
    && pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r $PYTHON_REQUIREMENTS_FILE \
    # Download the requirements to speed up launching the Dataflow job.
    && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $PYTHON_REQUIREMENTS_FILE

RUN pip install --no-cache-dir apache-beam[gcp]==2.43.0
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

使用的代码

read = p_input | "Read Json from Kafka" >> ReadFromKafka(
    consumer_config=self.consumer_config,
    topics=[self.topic],
    commit_offset_in_finalize=False,
    with_metadata=False
)

consumer_config内容:

bootstrap.servers: bs:9095
group.id: "Group1"
ssl.truststore.location: '/tmp/trust.jks'
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_SSL
sasl.jaas.config: ""

我们正在尝试使用 apache beam python sdk 在 GCP Dataflow 上运行此管道。

编辑

事实证明,Worker VM 和 Launcher VM 是不同的,Worker VM 不会与 Launcher 具有相同的环境。

为了解决这个问题,我必须使用 sdk_container_image,它用于为我们的工作虚拟机配置自定义容器映像 您可以在此处

找到步骤
docker google-cloud-dataflow apache-beam apache-beam-kafkaio
2个回答
0
投票

您可以分享您用来启动数据流作业的命令吗?

从 Dockerfile 来看,您似乎正在将文件复制到模板启动器而不是工作线程中。

模板启动器负责启动弹性模板作业,而工作人员负责执行工作项。看起来您希望文件出现在工作程序中而不是模板启动器中。

从错误中也可以清楚地看出该文件不存在于worker

Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

请按照此处指定的流程在工作线程上使用自定义容器 - https://cloud.google.com/dataflow/docs/guides/using-custom-containers)

注意:仅使用 Dataflow Runner v2 的管道支持自定义容器。


0
投票

您能否详细说明一下如何配置 Worker VM Dockerfile 并将其包含在 Python 代码中?目前面临同样的问题,并尝试使用“使用自定义容器”指南来实现它,但现在没有任何反应:没有 SSL 密钥库错误,但一般也没有 SSL 错误,即使我故意加载错误的凭据也是如此。

© www.soinside.com 2019 - 2024. All rights reserved.