使用 Spring Boot 和异步任务的多线程应用程序中的日志交错问题

问题描述 投票:0回答:0

我正在开发一个 Spring Boot 应用程序,它同时运行一个名为

HeartbeatJob
的作业的多个实例。每个
HeartbeatJob
都有一个生产者和一个消费者,分别生产和消费消息。

我有一个自定义

ThreadPoolTaskExecutor
具有指定的池大小来异步处理这些任务。我的问题是来自不同线程的日志似乎是交错的,因此很难分析日志。

这是一个日志如何混淆的例子:

12:04:22.908 [heartbeatExecutor-3] INFO  com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-SYSTEMS_CLUSTER_1 producer heartbeat message 0"}
12:04:22.908 [heartbeatExecutor-3] INFO  com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-SYSTEMS_CLUSTER_0 producer heartbeat message 0"}
12:04:22.908 [heartbeatExecutor-3] INFO  com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-PRODUCTION_CLUSTER_0 producer heartbeat message 0"}
12:04:22.908 [heartbeatExecutor-3] INFO  com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-PRODUCTION_CLUSTER_1 producer heartbeat message 0"}
12:04:22.908 [heartbeatExecutor-3] INFO  com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-SYSTEMS_CLUSTER_1 producer heartbeat message 1"}

日志不应该从集群外部接收消息。 所以这些日志消息看起来像:

The event received by Heartbeat-{cluster} consumer is: {"name": "Heartbeat-{cluster} producer heartbeat message {n}"}

这里是相关代码:

HeartbeatJobConfiguration

/**
 * This module is responsible for configuring the HeartbeatJob instances and their execution for different clusters.
 * It creates a HeartbeatJob bean and a ThreadPoolTaskExecutor bean for executing the HeartbeatJobs.
 * The module schedules the execution of HeartbeatJobs for the following clusters:
 * - PRODUCTION_CLUSTER_0
 * - PRODUCTION_CLUSTER_1
 * - SYSTEMS_CLUSTER_0
 * - SYSTEMS_CLUSTER_1
 */

package com.climate.config;

import com.climate.canarytesting.jobs.HeartbeatJob;

import com.climate.eventplatform.client.EventPlatformClientException;
import io.micrometer.core.instrument.MeterRegistry;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.io.IOException;

@Configuration
@EnableAsync
public class HeartbeatJobConfiguration {

    private MeterRegistry meterRegistry;

    /**
     * Constructs a new HeartbeatJobConfiguration instance with the provided MeterRegistry.
     *
     * @param meterRegistry the MeterRegistry instance
     */
    @Autowired
    public HeartbeatJobConfiguration(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    /**
     * Creates a new HeartbeatJob bean with the provided MeterRegistry.
     *
     * @return a new HeartbeatJob instance
     */
    @Bean
    public HeartbeatJob heartbeatJob() {
        return new HeartbeatJob(meterRegistry);
    }

    /**
     * Creates a new ThreadPoolTaskExecutor bean for executing HeartbeatJobs.
     *
     * @return a new ThreadPoolTaskExecutor instance
     */
    @Bean
    public TaskExecutor heartbeatExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(4);
        executor.setThreadNamePrefix("heartbeatExecutor-");
        return executor;
    }

    // The following scheduled jobs run the HeartbeatJob for different clusters
    // at the specified cron schedule. The HeartbeatJob is responsible for sending
    // and receiving heartbeats for each respective cluster.

    @Scheduled(cron = "0 * * * * ?")
    public void runProductionCluster0HeartbeatJob() throws EventPlatformClientException, IOException, InterruptedException {
        heartbeatJob().execute("PRODUCTION_CLUSTER_0");
    }

    @Scheduled(cron = "0 * * * * ?")
    public void runProductionCluster1HeartbeatJob() throws EventPlatformClientException, IOException, InterruptedException {
        heartbeatJob().execute("PRODUCTION_CLUSTER_1");
    }

    @Scheduled(cron = "0 * * * * ?")
    public void runSystemsCluster0HeartbeatJob() throws EventPlatformClientException, IOException, InterruptedException {
        heartbeatJob().execute("SYSTEMS_CLUSTER_0");
    }

    @Scheduled(cron = "0 * * * * ?")
    public void runSystemsCluster1HeartbeatJob() throws EventPlatformClientException, IOException, InterruptedException {
        heartbeatJob().execute("SYSTEMS_CLUSTER_1");
    }

}

HeartbeatJob

/**
 * The HeartbeatJob class is responsible for running a scheduled job that
 * sends and receives heartbeats for the configured environment. It also tracks and reports metrics
 * for the job runs using Micrometer.
 */

package com.climate.canarytesting.jobs;

import com.climate.canarytesting.events.heartbeat.HeartBeatConsumer;
import com.climate.canarytesting.events.heartbeat.HeartBeatProducer;
import com.climate.eventplatform.client.EventPlatformClientException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.scheduling.annotation.Async;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.text.MessageFormat;


public class HeartbeatJob extends BaseJob {

    private final Logger log = LoggerFactory.getLogger(HeartbeatJob.class);
    private String clusterName;
    private final String jobName = "Heartbeat";
    private final Map<String, Counter> totalJobRunsCounters;
    private final Map<String, Counter> successfulJobRunsCounters;

    /**
     * Constructs a new HeartbeatJob instance with the provided meter registry.
     *
     * @param meterRegistry the MeterRegistry instance for reporting metrics
     */
    public HeartbeatJob(MeterRegistry meterRegistry) {
        super(meterRegistry);
        this.totalJobRunsCounters = new ConcurrentHashMap<>();
        this.successfulJobRunsCounters = new ConcurrentHashMap<>();
    }

    /**
     * Executes the heartbeat job on a schedule, sending and receiving heartbeats for the configured environment.
     * It also increments the counters for total job runs and successful job runs.
     *
     * @param clusterName the name of the cluster for which the heartbeat job is executed
     * @throws EventPlatformClientException if there is an error with the event platform client
     * @throws InterruptedException         if the thread is interrupted
     * @throws IOException                  if there is an I/O error
     */
    @Async("heartbeatExecutor")
    public void execute(String clusterName) throws EventPlatformClientException, InterruptedException, IOException {
        this.clusterName = clusterName;
        clusterUrl = setClusterUrlByClusterName(clusterName);

        String counterName;
        counterName = MessageFormat.format("ep.heartbeat.{0}.runs.total", clusterName);
        Counter totalJobRunsCounter = getOrCreateCounter(this.totalJobRunsCounters, counterName);

        counterName = MessageFormat.format("ep.heartbeat.{0}.runs.success", clusterName);
        Counter successfulJobRunsCounter = getOrCreateCounter(this.successfulJobRunsCounters, counterName);

        log.info("{}-{}: incrementing totalJobsRunsCounter", jobName, clusterName);
        totalJobRunsCounter.increment();

        HeartBeatProducer producer = null;
        HeartBeatConsumer consumer = null;

        try {
            log.info("{}-{}: Producer initializing...", jobName, clusterName);
            producer = new HeartBeatProducer(clusterUrl);

            String producerName = MessageFormat.format("{0}-{1} producer", jobName, clusterName);
            producer.produceAll(producerName);
            log.info("{}-{}: Producer terminated...", jobName, clusterName);

            Thread.sleep(5000);

            log.info("{}-{}: Consumer initializing...", jobName, clusterName);
            consumer = new HeartBeatConsumer(clusterUrl);

            String consumerName = MessageFormat.format("{0}-{1} consumer", jobName, clusterName);
            boolean consumed = consumer.consumeUntilTerminated(consumerName);
            log.info("{}-{}: Consumer terminated...", jobName, clusterName);

            if (consumed) {
                log.info("{}-{}: incrementing successfulJobRunsCounter", jobName, clusterName);
                successfulJobRunsCounter.increment();
            }

        } catch (EventPlatformClientException | InterruptedException | IOException e) {
            log.error("{}-{}: Error during job execution. Message: {} {}", jobName, clusterName, e.getMessage(), e);
        } finally {
            if (producer != null) {
                producer.close();
            }
            if (consumer != null) {
                consumer.close();
            }
        }
    }
}

HeartbeatProducer

package com.climate.canarytesting.events.heartbeat;

import channels.climate.heartbeat.heartbeat.generated.HeartbeatSchema;
import com.climate.eventplatform.client.EventPlatformClient;
import com.climate.eventplatform.client.EventPlatformClientException;
import com.climate.eventplatform.client.EventPlatformMessage;
import com.climate.eventplatform.client.EventPlatformProducer;
import com.climate.eventplatform.client.EventPlatformOAuth2CredsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class HeartBeatProducer {

    // Constants
    private static final String TOPIC_NAME = "climate/heartbeat/heartbeat";
    private static final String EVENT_AVRO_SCHEMA = HeartbeatSchema.getClassSchema().toString();
    private static final int NUM_MESSAGES = 5;
    private static final String TERMINATING_STRING = "##TERMINATE##";
    private static final String SSM_PARAM_TEST_SERVICE_OAUTH2_CREDS = "production.global.event-platform.oauth2creds.sample-applications";
    private final Logger log = LoggerFactory.getLogger(HeartBeatProducer.class);

    // Instance variables
    private static String serviceUrl;
    private EventPlatformClient epClient;
    private EventPlatformProducer<HeartbeatSchema> producer;


    /**
     * Constructs a new instance of the HeartBeatProducer class.
     * @param serviceUrl The URL of the Event Platform service to produce messages to.
     */
    public HeartBeatProducer(String serviceUrl) throws EventPlatformClientException, IOException {
        this.serviceUrl = serviceUrl;
        this.epClient = new EventPlatformClient(
                serviceUrl,
                EventPlatformOAuth2CredsType.SSM,
                SSM_PARAM_TEST_SERVICE_OAUTH2_CREDS);
        this.producer = new EventPlatformProducer<>(epClient, TOPIC_NAME, EVENT_AVRO_SCHEMA);
    }

    /**
     * Sends a series of heartbeat messages to the Event Platform.
     * @param producerName The name of the producer.
     * @throws EventPlatformClientException if an error occurs while sending the messages.
     * @throws InterruptedException if the thread is interrupted while sleeping.
     * @throws IOException if an I/O error occurs.
     */
    public void produceAll(String producerName) throws EventPlatformClientException, InterruptedException, IOException {
        HeartbeatSchema[] heartbeatMessages = new HeartbeatSchema[NUM_MESSAGES + 1]; // add additional spot for TERMINATING_STRING
        heartbeatMessages[0] = new HeartbeatSchema(producerName + " heartbeat message 0");
        heartbeatMessages[1] = new HeartbeatSchema(producerName + " heartbeat message 1");
        heartbeatMessages[2] = new HeartbeatSchema(producerName + " heartbeat message 2");
        heartbeatMessages[3] = new HeartbeatSchema(producerName + " heartbeat message 3");
        heartbeatMessages[4] = new HeartbeatSchema(producerName + " heartbeat message 4");
        heartbeatMessages[5] = new HeartbeatSchema(TERMINATING_STRING);

        for (int i = 0; i < heartbeatMessages.length; i++) {
            EventPlatformMessage<HeartbeatSchema> currentMessage = new EventPlatformMessage<>(heartbeatMessages[i], producerName);
            producer.send(currentMessage);
            log.info("{} sent a message: {}", producerName, currentMessage.getMessageValue());
            Thread.sleep(1000);
        }
    }

    public void close() {
        if (producer != null) {
            try {
                producer.close();
            } catch ( IOException ex) {
                log.error("Failed to close producer.", ex);
            }
        }
        if (epClient != null) {
            try {
                epClient.close();
            } catch (IOException ex) {
                log.error("Failed to close EventPlatformClient.", ex);
            }
        }
    }
}

HeartbeatConsumer

package com.climate.canarytesting.events.heartbeat;

import channels.climate.heartbeat.heartbeat.generated.HeartbeatSchema;
import com.climate.eventplatform.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * This class defines a HeartBeatConsumer that is used to consume events from the Event Platform.
 */
public class HeartBeatConsumer {

    // Constants
    private static final String TOPIC_NAME = "climate/heartbeat/heartbeat";
    private static final String SUBSCRIPTION_NAME = "heartbeat_subscription";
    private static final String EVENT_AVRO_SCHEMA = HeartbeatSchema.getClassSchema().toString();
    private static final String SSM_PARAM_TEST_SERVICE_OAUTH2_CREDS = "production.global.event-platform.oauth2creds.sample-applications";
    private static final int ACK_TIMEOUT = 1;
    private static final String terminatingString = "##TERMINATE##";
    private final Logger log = LoggerFactory.getLogger(HeartBeatConsumer.class);

    // Instance variables
    private static String serviceUrl;
    private EventPlatformClient epClient;
    private EventPlatformConsumer<HeartbeatSchema> consumer;

    /**
     * Constructor that initializes a HeartBeatConsumer with a specific service URL.
     *
     * @param serviceUrl the service URL to use
     */
    public HeartBeatConsumer(String serviceUrl) throws EventPlatformClientException, IOException {
        this.serviceUrl = serviceUrl;
        this.epClient = new EventPlatformClient(
                serviceUrl,
                EventPlatformOAuth2CredsType.SSM,
                SSM_PARAM_TEST_SERVICE_OAUTH2_CREDS);
        this.consumer = new EventPlatformConsumer<>(
                epClient,
                TOPIC_NAME,
                SUBSCRIPTION_NAME,
                EVENT_AVRO_SCHEMA,
                ACK_TIMEOUT,
                EventPlatformSubscriptionInitialPosition.LATEST);
    }

    /**
     * This method consumes events from the Event Platform until a terminating string is received.
     *
     * @param consumerName the name of the consumer
     * @throws IOException if there is an I/O error
     */
    public boolean consumeUntilTerminated(final String consumerName) throws IOException {
        boolean consumed = false;

        try {
            consumed = receiveAndProcessUntilTerminated(consumer, consumerName, terminatingString);
        } catch (EventPlatformClientException ex) {
            log.error("Failed to construct consumer.", ex);
        }

        return consumed;
    }

    /**
     * This method receives events from an EventPlatformConsumer and processes them until a terminating string is received.
     *
     * @param consumer          the EventPlatformConsumer to receive events from
     * @param consumerName      the name of the consumer
     * @param terminatingString the terminating string to look for in the received events
     * @return true if a message was successfully consumed, false otherwise
     * @throws EventPlatformClientException if there is an error receiving or acknowledging an event
     */
    public boolean receiveAndProcessUntilTerminated(
            EventPlatformConsumer<HeartbeatSchema> consumer,
            String consumerName,
            String terminatingString) throws EventPlatformClientException {
        int messageCount = 0;
        boolean consumed = false;

        // The loop exits only on encountering an exception e.g. connection severed, event schema deleted.
        while (true) {
            EventPlatformMessage<HeartbeatSchema> event = consumer.receive();

            log.info("The event received by {} is: {}", consumerName, event.getMessageValue());

            try {
                consumer.acknowledge(event);
                messageCount++;

                // Check if the received event contains the terminating string
                if (event.getMessageValue().toString().contains(terminatingString)) {
                    log.info("{} messages processed.", messageCount);
                    log.info("Terminating string received, exiting.");
                    consumed = true;
                    break;
                }
            } catch (EventPlatformClientException ex) {
                log.error("Failed to acknowledge the event", ex);
                break;
            }
        }
        return consumed;
    }

    public void close() {
        if (consumer != null) {
            try {
                consumer.close();
            } catch (IOException ex) {
                log.error("Failed to close consumer.", ex);
            }
        }
        if (epClient != null) {
            try {
                epClient.close();
            } catch (IOException ex) {
                log.error("Failed to close event platform client.", ex);
            }
        }
    }
}

我尝试过的事情

  • 我发现间隔计划的工作似乎有效。大约 30 秒的时差和日志似乎表现正确。
  • Logger
    变量更新为从
    static
    到非静态:
    private final Logger log = LoggerFactory.getLogger(HeartBeatProducer.class);
java spring-boot multithreading slf4j
© www.soinside.com 2019 - 2024. All rights reserved.