引起:com.fasterxml.jackson.core.JsonParseException:无法识别的标记“ÿ”:正在等待(JSON字符串,数字,数组,对象或标记“null”,

问题描述 投票:0回答:1

我正在使用 Spring Boot Apache Kafka 示例并出现以下错误。

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"�contentType
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3557)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2652)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:857)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
    at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357)
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1704)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

这是代码-

应用程序.属性

spring.cloud.stream.function.definition=pageViewEventSupplier;process;pageCount

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=analytics-pvin
spring.cloud.stream.kafka.streams.binder.functions.pageCount.applicationId=analytics-pcin

spring.kafka.consumer.bootstrap-servers=localhost:9092
#
# defaults
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
#
# page views out
spring.cloud.stream.bindings.pageViewEventSupplier-out-0.destination=pvs
#
# page views in
spring.cloud.stream.bindings.process-in-0.destination=pvs
#
# page counts out
spring.cloud.stream.bindings.process-out-0.destination=pcs
#
# page counts in
spring.cloud.stream.bindings.pageCount-in-0.destination=pcs

AnalyticsApplication.java

package com.example.analytics;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
public class AnalyticsApplication {

    private static final String PAGE_COUNT_MV = "pcmv";

    public static void main(String[] args) {
        SpringApplication.run(AnalyticsApplication.class, args);
    }

    @Component
    public static class PageViewEventSource {

        @Bean
        public Supplier<PageViewEvent> pageViewEventSupplier() {
            List<String> names = Arrays.asList("mfisher", "dyser", "schacko", "abilan", "ozhurakousky", "grussell");
            List<String> pages = Arrays.asList("blog", "sitemap", "initializr", "news", "colophon", "about");
            return () -> {
                String rPage = pages.get(new Random().nextInt(pages.size()));
                String rName = pages.get(new Random().nextInt(names.size()));
                return new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);
            };
        }
    }

    @Component
    public static class PageViewEventProcessor {

        @Bean
        public Function<KStream<String, PageViewEvent>, KStream<String, Long>> process() {
            return e -> e.filter((key, value) -> value.getDuration() > 10)
                    .map((key, value) -> new KeyValue<>(value.getPage(), "0"))
                    .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                    .count(Materialized.as(AnalyticsApplication.PAGE_COUNT_MV))
                    .toStream();
        }
    }

    @Component
    public static class PageCountSink {

        private final Log log = LogFactory.getLog(getClass());

        @Bean
        public Consumer<KTable<String, Long>> pageCount() {

            return counts -> counts
                    .toStream()
                    .foreach((key, value) -> log.info(key + "=" + value));
        }
    }

    @RestController
    public static class CountRestController {

        private final InteractiveQueryService interactiveQueryService;

        public CountRestController(InteractiveQueryService registry) {
            this.interactiveQueryService = registry;
        }

        @GetMapping("/counts")
        Map<String, Long> counts() {
            Map<String, Long> counts = new HashMap<>();
            ReadOnlyKeyValueStore<String, Long> queryableStoreType =
                    this.interactiveQueryService.getQueryableStore(AnalyticsApplication.PAGE_COUNT_MV, QueryableStoreTypes.keyValueStore());
            KeyValueIterator<String, Long> all = queryableStoreType.all();
            while (all.hasNext()) {
                KeyValue<String, Long> value = all.next();
                counts.put(value.key, value.value);
            }
            return counts;
        }
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class PageViewEvent {
    private String userId, page;
    private long duration;
}
apache-kafka-streams spring-kafka
1个回答
0
投票

简单来说,该主题包含的数据不是有效的 JSON 格式。

检查调试器中的内容或使用控制台使用者。

© www.soinside.com 2019 - 2024. All rights reserved.