java.lang.reflect.InaccessibleObjectException:模块java.base不会向未命名模块“打开java.util.concurrent.atomic”

问题描述 投票:0回答:1

我正在编写一个 apache flink 程序在本地运行并与 google pubsub 交互。

依赖关系

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.gcp.pubsub.connector.version>3.0.2-1.18</flink.gcp.pubsub.connector.version>
        <flink.version>1.17.0</flink.version>
        <scala.binary.version>2.10</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-gcp-pubsub</artifactId>
            <version>${flink.gcp.pubsub.connector.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

节目

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubSubExample {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class);

    public static void main(String[] args) throws Exception {
        // parse input arguments
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);

        if (parameterTool.getNumberOfParameters() < 3) {
            System.out.println(
                    "Missing parameters!\n"
                            + "Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName <output-topic> "
                            + "--google-project <google project name> ");
            return;
        }

        String projectName = parameterTool.getRequired("google-project");
        String inputTopicName = parameterTool.getRequired("input-topicName");
        String subscriptionName = parameterTool.getRequired("input-subscription");
        String outputTopicName = parameterTool.getRequired("output-topicName");

        PubSubPublisher pubSubPublisher = new PubSubPublisher(projectName, inputTopicName);
        pubSubPublisher.publish(2);

        runFlinkJob(projectName, subscriptionName, outputTopicName);
    }

    private static void runFlinkJob(
            String projectName, String subscriptionName, String outputTopicName) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000L);

        env.addSource(
                        PubSubSource.newBuilder()
                                .withDeserializationSchema(new IntegerSerializer())
                                .withProjectName(projectName)
                                .withSubscriptionName(subscriptionName)
                                .withMessageRateLimit(1)
                                .build())
                .map(PubSubExample::printAndReturn)
                .disableChaining()
                .addSink(
                        PubSubSink.newBuilder()
                                .withSerializationSchema(new IntegerSerializer())
                                .withProjectName(projectName)
                                .withTopicName(outputTopicName)
                                .build());

        env.execute("Flink Streaming PubSubReader");
    }

    private static Integer printAndReturn(Integer i) {
        LOG.info("Processed message with payload: " + i);
        return i;
    }
}


import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;

import java.math.BigInteger;
class PubSubPublisher {
    private final String projectName;
    private final String topicName;

    PubSubPublisher(String projectName, String topicName) {
        this.projectName = projectName;
        this.topicName = topicName;
    }

    /**
     * Publish messages with as payload a single integer. The integers inside the messages start
     * from 0 and increase by one for each message send.
     *
     * @param amountOfMessages amount of messages to send
     */
    void publish(int amountOfMessages) {
        Publisher publisher = null;
        try {
            publisher = Publisher.newBuilder(TopicName.of(projectName, topicName)).build();
            for (int i = 0; i < amountOfMessages; i++) {
                ByteString messageData = ByteString.copyFrom(BigInteger.valueOf(i).toByteArray());
                PubsubMessage message = PubsubMessage.newBuilder().setData(messageData).build();
                publisher.publish(message).get();

                System.out.println("Published message: " + i);
                Thread.sleep(100L);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (publisher != null) {
                    publisher.shutdown();
                }
            } catch (Exception e) {
            }
        }
    }
}


import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;

import com.google.pubsub.v1.PubsubMessage;

import java.io.IOException;
import java.math.BigInteger;

/**
 * Deserialization schema to deserialize messages produced by {@link PubSubPublisher}. The byte[]
 * received by this schema must contain a single Integer.
 */
class IntegerSerializer
        implements PubSubDeserializationSchema<Integer>, SerializationSchema<Integer> {

    @Override
    public Integer deserialize(PubsubMessage message) throws IOException {
        return new BigInteger(message.getData().toByteArray()).intValue();
    }

    @Override
    public boolean isEndOfStream(Integer integer) {
        return false;
    }

    @Override
    public TypeInformation<Integer> getProducedType() {
        return TypeInformation.of(Integer.class);
    }

    @Override
    public byte[] serialize(Integer integer) {
        return BigInteger.valueOf(integer).toByteArray();
    }
}

错误

但是在本地运行该程序时出现以下错误

Exception in thread "main" java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.concurrent.atomic.AtomicReference.serialVersionUID accessible: module java.base does not "opens java.util.concurrent.atomic" to unnamed module @4b5a5ed1
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
    at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
    at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2317)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1244)
    at example.data.PubSubExample.runFlinkJob(PubSubExample.java:50)
    at example.data.PubSubExample.main(PubSubExample.java:33)

尝试1

我在虚拟机选项中添加了以下选项,但仍然出现错误。

--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED

尝试2

我也尝试将 java 版本降级到 11,但这没有帮助。

有人可以帮忙吗?

提前致谢。

java apache-flink flink-streaming java-17
1个回答
0
投票

尝试在本地复制问题后,您可以考虑通过在JVM中添加以下参数来解决它:

--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.rmi/sun.rmi.transport=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.math=ALL-UNNAMED
© www.soinside.com 2019 - 2024. All rights reserved.