我正在开发一个 Spring Boot 应用程序,它同时运行一个名为
HeartbeatJob
的作业的多个实例。每个 HeartbeatJob
都有一个生产者和一个消费者,分别生产和消费消息。
我有一个自定义
ThreadPoolTaskExecutor
具有指定的池大小来异步处理这些任务。我的问题是来自不同线程的日志似乎是交错的,因此很难分析日志。
这是一个日志如何混淆的例子:
12:04:22.908 [heartbeatExecutor-3] INFO com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-SYSTEMS_CLUSTER_1 producer heartbeat message 0"}
12:04:22.908 [heartbeatExecutor-3] INFO com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-SYSTEMS_CLUSTER_0 producer heartbeat message 0"}
12:04:22.908 [heartbeatExecutor-3] INFO com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-PRODUCTION_CLUSTER_0 producer heartbeat message 0"}
12:04:22.908 [heartbeatExecutor-3] INFO com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-PRODUCTION_CLUSTER_1 producer heartbeat message 0"}
12:04:22.908 [heartbeatExecutor-3] INFO com.climate.canarytesting.events.heartbeat.HeartBeatConsumer - The event received by Heartbeat-SYSTEMS_CLUSTER_0 consumer is: {"name": "Heartbeat-SYSTEMS_CLUSTER_1 producer heartbeat message 1"}
日志不应该从集群外部接收消息。 所以这些日志消息看起来像:
The event received by Heartbeat-{cluster} consumer is: {"name": "Heartbeat-{cluster} producer heartbeat message {n}"}
这里是相关代码:
HeartbeatJobConfiguration
/**
* This module is responsible for configuring the HeartbeatJob instances and their execution for different clusters.
* It creates a HeartbeatJob bean and a ThreadPoolTaskExecutor bean for executing the HeartbeatJobs.
* The module schedules the execution of HeartbeatJobs for the following clusters:
* - PRODUCTION_CLUSTER_0
* - PRODUCTION_CLUSTER_1
* - SYSTEMS_CLUSTER_0
* - SYSTEMS_CLUSTER_1
*/
package com.climate.config;
import com.climate.canarytesting.jobs.HeartbeatJob;
import com.climate.eventplatform.client.EventPlatformClientException;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.io.IOException;
@Configuration
@EnableAsync
public class HeartbeatJobConfiguration {
private MeterRegistry meterRegistry;
/**
* Constructs a new HeartbeatJobConfiguration instance with the provided MeterRegistry.
*
* @param meterRegistry the MeterRegistry instance
*/
@Autowired
public HeartbeatJobConfiguration(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
/**
* Creates a new HeartbeatJob bean with the provided MeterRegistry.
*
* @return a new HeartbeatJob instance
*/
@Bean
public HeartbeatJob heartbeatJob() {
return new HeartbeatJob(meterRegistry);
}
/**
* Creates a new ThreadPoolTaskExecutor bean for executing HeartbeatJobs.
*
* @return a new ThreadPoolTaskExecutor instance
*/
@Bean
public TaskExecutor heartbeatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setThreadNamePrefix("heartbeatExecutor-");
return executor;
}
// The following scheduled jobs run the HeartbeatJob for different clusters
// at the specified cron schedule. The HeartbeatJob is responsible for sending
// and receiving heartbeats for each respective cluster.
@Scheduled(cron = "0 * * * * ?")
public void runProductionCluster0HeartbeatJob() throws EventPlatformClientException, IOException, InterruptedException {
heartbeatJob().execute("PRODUCTION_CLUSTER_0");
}
@Scheduled(cron = "0 * * * * ?")
public void runProductionCluster1HeartbeatJob() throws EventPlatformClientException, IOException, InterruptedException {
heartbeatJob().execute("PRODUCTION_CLUSTER_1");
}
@Scheduled(cron = "0 * * * * ?")
public void runSystemsCluster0HeartbeatJob() throws EventPlatformClientException, IOException, InterruptedException {
heartbeatJob().execute("SYSTEMS_CLUSTER_0");
}
@Scheduled(cron = "0 * * * * ?")
public void runSystemsCluster1HeartbeatJob() throws EventPlatformClientException, IOException, InterruptedException {
heartbeatJob().execute("SYSTEMS_CLUSTER_1");
}
}
HeartbeatJob
/**
* The HeartbeatJob class is responsible for running a scheduled job that
* sends and receives heartbeats for the configured environment. It also tracks and reports metrics
* for the job runs using Micrometer.
*/
package com.climate.canarytesting.jobs;
import com.climate.canarytesting.events.heartbeat.HeartBeatConsumer;
import com.climate.canarytesting.events.heartbeat.HeartBeatProducer;
import com.climate.eventplatform.client.EventPlatformClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.text.MessageFormat;
public class HeartbeatJob extends BaseJob {
private final Logger log = LoggerFactory.getLogger(HeartbeatJob.class);
private String clusterName;
private final String jobName = "Heartbeat";
private final Map<String, Counter> totalJobRunsCounters;
private final Map<String, Counter> successfulJobRunsCounters;
/**
* Constructs a new HeartbeatJob instance with the provided meter registry.
*
* @param meterRegistry the MeterRegistry instance for reporting metrics
*/
public HeartbeatJob(MeterRegistry meterRegistry) {
super(meterRegistry);
this.totalJobRunsCounters = new ConcurrentHashMap<>();
this.successfulJobRunsCounters = new ConcurrentHashMap<>();
}
/**
* Executes the heartbeat job on a schedule, sending and receiving heartbeats for the configured environment.
* It also increments the counters for total job runs and successful job runs.
*
* @param clusterName the name of the cluster for which the heartbeat job is executed
* @throws EventPlatformClientException if there is an error with the event platform client
* @throws InterruptedException if the thread is interrupted
* @throws IOException if there is an I/O error
*/
@Async("heartbeatExecutor")
public void execute(String clusterName) throws EventPlatformClientException, InterruptedException, IOException {
this.clusterName = clusterName;
clusterUrl = setClusterUrlByClusterName(clusterName);
String counterName;
counterName = MessageFormat.format("ep.heartbeat.{0}.runs.total", clusterName);
Counter totalJobRunsCounter = getOrCreateCounter(this.totalJobRunsCounters, counterName);
counterName = MessageFormat.format("ep.heartbeat.{0}.runs.success", clusterName);
Counter successfulJobRunsCounter = getOrCreateCounter(this.successfulJobRunsCounters, counterName);
log.info("{}-{}: incrementing totalJobsRunsCounter", jobName, clusterName);
totalJobRunsCounter.increment();
HeartBeatProducer producer = null;
HeartBeatConsumer consumer = null;
try {
log.info("{}-{}: Producer initializing...", jobName, clusterName);
producer = new HeartBeatProducer(clusterUrl);
String producerName = MessageFormat.format("{0}-{1} producer", jobName, clusterName);
producer.produceAll(producerName);
log.info("{}-{}: Producer terminated...", jobName, clusterName);
Thread.sleep(5000);
log.info("{}-{}: Consumer initializing...", jobName, clusterName);
consumer = new HeartBeatConsumer(clusterUrl);
String consumerName = MessageFormat.format("{0}-{1} consumer", jobName, clusterName);
boolean consumed = consumer.consumeUntilTerminated(consumerName);
log.info("{}-{}: Consumer terminated...", jobName, clusterName);
if (consumed) {
log.info("{}-{}: incrementing successfulJobRunsCounter", jobName, clusterName);
successfulJobRunsCounter.increment();
}
} catch (EventPlatformClientException | InterruptedException | IOException e) {
log.error("{}-{}: Error during job execution. Message: {} {}", jobName, clusterName, e.getMessage(), e);
} finally {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
}
}
}
HeartbeatProducer
package com.climate.canarytesting.events.heartbeat;
import channels.climate.heartbeat.heartbeat.generated.HeartbeatSchema;
import com.climate.eventplatform.client.EventPlatformClient;
import com.climate.eventplatform.client.EventPlatformClientException;
import com.climate.eventplatform.client.EventPlatformMessage;
import com.climate.eventplatform.client.EventPlatformProducer;
import com.climate.eventplatform.client.EventPlatformOAuth2CredsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class HeartBeatProducer {
// Constants
private static final String TOPIC_NAME = "climate/heartbeat/heartbeat";
private static final String EVENT_AVRO_SCHEMA = HeartbeatSchema.getClassSchema().toString();
private static final int NUM_MESSAGES = 5;
private static final String TERMINATING_STRING = "##TERMINATE##";
private static final String SSM_PARAM_TEST_SERVICE_OAUTH2_CREDS = "production.global.event-platform.oauth2creds.sample-applications";
private final Logger log = LoggerFactory.getLogger(HeartBeatProducer.class);
// Instance variables
private static String serviceUrl;
private EventPlatformClient epClient;
private EventPlatformProducer<HeartbeatSchema> producer;
/**
* Constructs a new instance of the HeartBeatProducer class.
* @param serviceUrl The URL of the Event Platform service to produce messages to.
*/
public HeartBeatProducer(String serviceUrl) throws EventPlatformClientException, IOException {
this.serviceUrl = serviceUrl;
this.epClient = new EventPlatformClient(
serviceUrl,
EventPlatformOAuth2CredsType.SSM,
SSM_PARAM_TEST_SERVICE_OAUTH2_CREDS);
this.producer = new EventPlatformProducer<>(epClient, TOPIC_NAME, EVENT_AVRO_SCHEMA);
}
/**
* Sends a series of heartbeat messages to the Event Platform.
* @param producerName The name of the producer.
* @throws EventPlatformClientException if an error occurs while sending the messages.
* @throws InterruptedException if the thread is interrupted while sleeping.
* @throws IOException if an I/O error occurs.
*/
public void produceAll(String producerName) throws EventPlatformClientException, InterruptedException, IOException {
HeartbeatSchema[] heartbeatMessages = new HeartbeatSchema[NUM_MESSAGES + 1]; // add additional spot for TERMINATING_STRING
heartbeatMessages[0] = new HeartbeatSchema(producerName + " heartbeat message 0");
heartbeatMessages[1] = new HeartbeatSchema(producerName + " heartbeat message 1");
heartbeatMessages[2] = new HeartbeatSchema(producerName + " heartbeat message 2");
heartbeatMessages[3] = new HeartbeatSchema(producerName + " heartbeat message 3");
heartbeatMessages[4] = new HeartbeatSchema(producerName + " heartbeat message 4");
heartbeatMessages[5] = new HeartbeatSchema(TERMINATING_STRING);
for (int i = 0; i < heartbeatMessages.length; i++) {
EventPlatformMessage<HeartbeatSchema> currentMessage = new EventPlatformMessage<>(heartbeatMessages[i], producerName);
producer.send(currentMessage);
log.info("{} sent a message: {}", producerName, currentMessage.getMessageValue());
Thread.sleep(1000);
}
}
public void close() {
if (producer != null) {
try {
producer.close();
} catch ( IOException ex) {
log.error("Failed to close producer.", ex);
}
}
if (epClient != null) {
try {
epClient.close();
} catch (IOException ex) {
log.error("Failed to close EventPlatformClient.", ex);
}
}
}
}
HeartbeatConsumer
package com.climate.canarytesting.events.heartbeat;
import channels.climate.heartbeat.heartbeat.generated.HeartbeatSchema;
import com.climate.eventplatform.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* This class defines a HeartBeatConsumer that is used to consume events from the Event Platform.
*/
public class HeartBeatConsumer {
// Constants
private static final String TOPIC_NAME = "climate/heartbeat/heartbeat";
private static final String SUBSCRIPTION_NAME = "heartbeat_subscription";
private static final String EVENT_AVRO_SCHEMA = HeartbeatSchema.getClassSchema().toString();
private static final String SSM_PARAM_TEST_SERVICE_OAUTH2_CREDS = "production.global.event-platform.oauth2creds.sample-applications";
private static final int ACK_TIMEOUT = 1;
private static final String terminatingString = "##TERMINATE##";
private final Logger log = LoggerFactory.getLogger(HeartBeatConsumer.class);
// Instance variables
private static String serviceUrl;
private EventPlatformClient epClient;
private EventPlatformConsumer<HeartbeatSchema> consumer;
/**
* Constructor that initializes a HeartBeatConsumer with a specific service URL.
*
* @param serviceUrl the service URL to use
*/
public HeartBeatConsumer(String serviceUrl) throws EventPlatformClientException, IOException {
this.serviceUrl = serviceUrl;
this.epClient = new EventPlatformClient(
serviceUrl,
EventPlatformOAuth2CredsType.SSM,
SSM_PARAM_TEST_SERVICE_OAUTH2_CREDS);
this.consumer = new EventPlatformConsumer<>(
epClient,
TOPIC_NAME,
SUBSCRIPTION_NAME,
EVENT_AVRO_SCHEMA,
ACK_TIMEOUT,
EventPlatformSubscriptionInitialPosition.LATEST);
}
/**
* This method consumes events from the Event Platform until a terminating string is received.
*
* @param consumerName the name of the consumer
* @throws IOException if there is an I/O error
*/
public boolean consumeUntilTerminated(final String consumerName) throws IOException {
boolean consumed = false;
try {
consumed = receiveAndProcessUntilTerminated(consumer, consumerName, terminatingString);
} catch (EventPlatformClientException ex) {
log.error("Failed to construct consumer.", ex);
}
return consumed;
}
/**
* This method receives events from an EventPlatformConsumer and processes them until a terminating string is received.
*
* @param consumer the EventPlatformConsumer to receive events from
* @param consumerName the name of the consumer
* @param terminatingString the terminating string to look for in the received events
* @return true if a message was successfully consumed, false otherwise
* @throws EventPlatformClientException if there is an error receiving or acknowledging an event
*/
public boolean receiveAndProcessUntilTerminated(
EventPlatformConsumer<HeartbeatSchema> consumer,
String consumerName,
String terminatingString) throws EventPlatformClientException {
int messageCount = 0;
boolean consumed = false;
// The loop exits only on encountering an exception e.g. connection severed, event schema deleted.
while (true) {
EventPlatformMessage<HeartbeatSchema> event = consumer.receive();
log.info("The event received by {} is: {}", consumerName, event.getMessageValue());
try {
consumer.acknowledge(event);
messageCount++;
// Check if the received event contains the terminating string
if (event.getMessageValue().toString().contains(terminatingString)) {
log.info("{} messages processed.", messageCount);
log.info("Terminating string received, exiting.");
consumed = true;
break;
}
} catch (EventPlatformClientException ex) {
log.error("Failed to acknowledge the event", ex);
break;
}
}
return consumed;
}
public void close() {
if (consumer != null) {
try {
consumer.close();
} catch (IOException ex) {
log.error("Failed to close consumer.", ex);
}
}
if (epClient != null) {
try {
epClient.close();
} catch (IOException ex) {
log.error("Failed to close event platform client.", ex);
}
}
}
}
Logger
变量更新为从static
到非静态:private final Logger log = LoggerFactory.getLogger(HeartBeatProducer.class);