无法解组组件的发现数据:CONTINUOUS_PROC,在Ignite中具有多个ContinuousQuery侦听器

问题描述 投票:0回答:1

我正在尝试在本地计算机上使用Ignite(v2.8.0)和ContinuousQuery-es。我的设置是一个服务器节点和两个客户端。

问题是服务器节点启动且第一个客户端已连接,并使用ContinuousQuery侦听更新,而其他任何后续客户端均失败,但以下异常:

[ERROR] 2020-04-18 20:00:18.436 [tcp-client-disco-msg-worker-#4] TcpDiscoverySpi - Failed to unmarshal discovery data for component: CONTINUOUS_PROC

我也发现它一直失败,直到在第一个客户端中关闭查询游标(一旦关闭任何其他single客户端都可以连接)。

服务器节点代码:

public class IgniteServerCacheBootstrap {

    final static Logger logger = LoggerFactory.getLogger(IgniteCacheClient.class);

    public static void main(String[] args) throws IgniteCheckedException, InterruptedException {

        IgniteConfiguration serverConfig = new IgniteConfiguration()
                .setGridLogger(new Log4J2Logger("log4j2.xml"));

        Ignite server = Ignition.start(serverConfig);
        Thread.currentThread().join();
    }

}

客户端节点代码(主要来自Ignite示例)我正在尝试并行运行两个这样的节点:

public class IgniteCacheClient implements Serializable {

    Logger logger = LoggerFactory.getLogger(IgniteCacheClient.class);

    private IgniteCache igniteCache;

    public IgniteCacheClient() throws IgniteCheckedException {
        IgniteConfiguration clientConfig = new IgniteConfiguration()
                .setGridLogger(new Log4J2Logger("log4j2.xml"))
                .setClientMode(true);

        Ignite client = Ignition.getOrStart(clientConfig);
        igniteCache = client.getOrCreateCache("MY_CACHE");
    }

    public void run() throws InterruptedException {

        // Create new continuous query.
        ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

        qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
            @Override
            public boolean apply(Integer key, String val) {
                return key > 10;
            }
        }));

        // Callback that is called locally when update notifications are received.
        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
            @Override
            public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
                for (CacheEntryEvent<? extends Integer, ? extends String> e : evts)
                    logger.info("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
            }
        });

        // This filter will be evaluated remotely on all nodes.
        // Entry that pass this filter will be sent to the caller.
        qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
            @Override
            public CacheEntryEventFilter<Integer, String> create() {
                return new CacheEntryEventFilter<Integer, String>() {
                    @Override
                    public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
                        return e.getKey() > 10;
                    }
                };
            }
        });

        // Execute query.
        QueryCursor<Cache.Entry<Integer, String>> cur = igniteCache.query(qry);

        // Iterate through existing data.
        for (Cache.Entry<Integer, String> e : cur)
            logger.info("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');

        Thread.currentThread().join();
    }
}

完全例外

[ERROR] 2020-04-18 20:00:18.436 [tcp-client-disco-msg-worker-#4] TcpDiscoverySpi - Failed to unmarshal discovery data for component: CONTINUOUS_PROC
org.apache.ignite.IgniteCheckedException: Failed to deserialize object with given class loader: sun.misc.Launcher$AppClassLoader@18b4aac2
    at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:132) ~[ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:93) ~[ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.internal.util.IgniteUtils.unmarshalZip(IgniteUtils.java:10248) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket.unmarshalData(DiscoveryDataPacket.java:340) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket.unmarshalGridData(DiscoveryDataPacket.java:155) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.onExchange(TcpDiscoverySpi.java:2069) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.processNodeAddFinishedMessage(ClientImpl.java:2219) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.processDiscoveryMessage(ClientImpl.java:2088) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.body(ClientImpl.java:1930) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.discovery.tcp.ClientImpl$1.body(ClientImpl.java:302) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:61) [ignite-core-2.8.0.jar:2.8.0]
Caused by: java.io.InvalidObjectException: Failed to find cache for name: MY_CACHE
    at org.apache.ignite.internal.processors.cache.GridCacheContext.readResolve(GridCacheContext.java:2376) ~[ignite-core-2.8.0.jar:2.8.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.readExternal(IgniteCacheProxyImpl.java:2192) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.readExternal(GatewayProtectedCacheProxy.java:1706) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2.readExternal(CacheContinuousQueryHandlerV2.java:179) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at java.util.HashMap.readObject(HashMap.java:1409) ~[?:1.8.0_172]
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) ~[?:?]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.util.IgniteUtils.readMap(IgniteUtils.java:5409) [ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryData.readExternal(GridContinuousProcessor.java:2446) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:124) ~[ignite-core-2.8.0.jar:2.8.0]
    ... 11 more
Caused by: java.lang.IllegalStateException: Failed to find cache for name: MY_CACHE
    at org.apache.ignite.internal.processors.cache.GridCacheContext.readResolve(GridCacheContext.java:2371) ~[ignite-core-2.8.0.jar:2.8.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.readExternal(IgniteCacheProxyImpl.java:2192) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.readExternal(GatewayProtectedCacheProxy.java:1706) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2.readExternal(CacheContinuousQueryHandlerV2.java:179) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at java.util.HashMap.readObject(HashMap.java:1409) ~[?:1.8.0_172]
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) ~[?:?]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.internal.util.IgniteUtils.readMap(IgniteUtils.java:5409) ~[ignite-core-2.8.0.jar:2.8.0]
    at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryData.readExternal(GridContinuousProcessor.java:2446) ~[ignite-core-2.8.0.jar:2.8.0]
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
    at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:124) ~[ignite-core-2.8.0.jar:2.8.0]
    ... 11 more
java caching ignite
1个回答
0
投票

“找不到名称为MY_CACHE的缓存”

[我认为您的private IgniteCache igniteCache正在与您的Lambda一起转让。请尝试将所有匿名内部类转换为已命名的静态内部类。我说的是谓词,监听器和工厂。

© www.soinside.com 2019 - 2024. All rights reserved.