前端未收到服务器发送的事件

问题描述 投票:0回答:1

我正在尝试通过应用程序前端的 EventSource 订阅 BE 上的事件。问题似乎反映了事件侦听器,因为我正在到达 BE 端点并注册订阅者。

我在 FE 上使用 React Typescript,在 BE 上使用 Java 17 和 Spring Boot 以及 RedisCache 来保存和恢复订阅的用户,因此如果 pod 关闭或我们需要部署某个功能,我们可以恢复订阅者。

这是我订阅 BE 事件的 FE 函数,在组件的第一个 useEffect 上调用

/**
 * This function registers and receives BE events to update data
 * similar to a webSocket, but only unidirectional from BE to FE
 * For now, just UNANSWERED_BUCKET is used, but in the future
 * can be used for other event types
 * @returns
 */
function registerUnansweredSSeEvent() {
  const sourceUnanswered = SseEventService.subscribeToEventsEventSource(
    context?.myExternalId,
    SseSubscriptionEvents.UNANSWERED_BUCKET,
  );
  sourceUnanswered.addEventListener(
    SseSubscriptionEvents.UNANSWERED_BUCKET,
    (event) => {
      fillUnansweredAfterCategorization(event);
    },
  );
  sourceUnanswered.onopen = (event: any) => logger.info("Connection opened");
  sourceUnanswered.onerror = (event: any) => logger.info("Connection error");
  return sourceUnanswered;
}

这是函数 susbscribeToEventsEventSurce:

subscribeToEventsEventSource(
  externalId: string | undefined,
  eventToSubscribe: string,
) {
  return new EventSource(
    `${this.baseUrl}/subscribe?personalExternalId=${externalId}&eventToSubscribe=${eventToSubscribe}`,
  );
}

我的控制器:

@GetMapping(value = "/subscribe", produces = "text/event-stream")
    public SseEmitter subscribeSseEmitter(
            @RequestParam("personalExternalId") String personalExternalId,
            @RequestParam("eventToSubscribe") String eventToSubscribe){
        return emitterService.registerClient(personalExternalId, eventToSubscribe);
    }

我的完整服务

@Service
public class SSEService {
    private final NewRelicLogger logger = NewRelicLogger.getLogger(SSEService.class);
    private static final AtomicInteger ID_COUNTER = new AtomicInteger(1);
    //2hs maximum time Timeout. Should we increase this?
    public static final long DEFAULT_TIMEOUT = 7200000L;
    private final SlackNotificationService slackNotificationService;
    private final CacheService cacheService;
    public SSEService(SlackNotificationService slackNotificationService, CacheService cacheService) {
        this.slackNotificationService = slackNotificationService;
        this.cacheService = cacheService;
    }
    
    /**
     * @param subscriberExternalId: Email of the registered user to subscribe to notifications
     * @return This method subscribes the user to the events sent from the BE
     */
    @AutomaticLogging
    public SseEmitter registerClient(String subscriberExternalId, String eventToSubscribe) {
        var emitter = new SseEmitter(DEFAULT_TIMEOUT);
        var sseClient = new SseClient(subscriberExternalId != null ? subscriberExternalId : UUID.randomUUID().toString(), emitter);
        addOrUpdateSseClient(sseClient, SseEventType.valueOf(eventToSubscribe));
        emitter.onCompletion(() -> removeFromCache(subscriberExternalId));
        emitter.onError(err -> removeAndLogError(sseClient, err.getMessage()));
        emitter.onTimeout(() -> removeAndLogError(sseClient, "TIMEOUT"));
        
        logger.info("New client registered {}", sseClient.getExternalId());
        slackNotificationService.logInfo("New client registered " + sseClient.getExternalId());
        return emitter;
    }
    
    private void removeFromCache(String subscriberExternalId){
        try {
            cacheService.removeCacheKey(SSE_SUBSCRIBER, subscriberExternalId);
        } catch (Exception e) {
            logger.warn("Couldn't remove cache for subscriber: " + subscriberExternalId);
        }
    }
   
    @AutomaticLogging
    public void unregisterClient(String subscriberExternalId) {
        removeFromCache(subscriberExternalId);
    }
    
    /**
     * @param newClient Here we add a new client removing the previous one
     *                  to not keep open connections if the user reloads the page
     */
    public void addOrUpdateSseClient(SseClient newClient, SseEventType newEventType) {
        String key = newClient.getExternalId();
        SseClient registeredClientsObject = cacheService.getSSEClientFromCache(SSE_SUBSCRIBER, key);
        newClient.getSubscribedEvents().add(newEventType);
        if (registeredClientsObject == null ){
            cacheService.putValueInCache(SSE_SUBSCRIBER, key, newClient);
        } else {
            if (!registeredClientsObject.getSubscribedEvents().contains(newEventType)){
                List<SseEventType> events = registeredClientsObject.getSubscribedEvents();
                events.add(newEventType);
                newClient.setSubscribedEvents(events);
                cacheService.putValueInCache(SSE_SUBSCRIBER, key, newClient);
            }
        }
    }
    
    
    /**
     * @param eventType The type of the event sent to the subscribers
     *                  So only subscribed to this eventType receives the notifications
     *                  Method used to Broadcast the messages to all subscribers
     *          Use this for generic updates
     */
    public void broadcastSseEmitterMessages(SseEventType eventType) {
        List<SseClient> clients = cacheService.getAllSSEClientFromCache()
                .stream()
                .filter(client -> client.getSubscribedEvents().contains(eventType))
                .toList();
        
        for (SseClient client : clients) {
            sendBroadcasterEmitterMessage(client, eventType);
        }
    }
    
    /**
     * @param client Subscriber who will receive the event
     * @param event event to notify
     */
    @AutomaticLogging
    private void sendBroadcasterEmitterMessage(SseClient client, SseEventType event) {
        var sseEmitter = client.getSseEmitter();
        try {
            logger.info("Notify client {}", client.getExternalId());
            var eventId = ID_COUNTER.incrementAndGet();
            SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name(event.name())
                    .id(String.valueOf(eventId))
                    .data(event, MediaType.APPLICATION_JSON);
            sseEmitter.send(eventBuilder);
        } catch (IOException e) {
            sseEmitter.completeWithError(e);
        }
    }
    
    private void removeAndLogError(SseClient client, String error) {
        logger.error("Error during communication. Unregister client {}, error {}", client.getExternalId(), error);
        slackNotificationService.logError("Error during communication. Unregister client " + client.getExternalId() + " with error: " + error);
        removeFromCache(client.getExternalId());
    }
    
    public void broadcastSseEmitterMessagesChatBuckets(List<SmallChannelDTO> channels, SseEventType eventType) {
        List<SseClient> clients = cacheService.getAllSSEClientFromCache();
        for (SseClient client: clients) {
            sendBroadcastSseEmitterMessagesChatBuckets(client, channels, eventType);
        }
    }
    
    /**
     * @param client Subscriber who will receive the event for unanswered bucket on chat
     * @param channels the channels to update
     * @param eventType event to notify
     */
    @AutomaticLogging
    private void sendBroadcastSseEmitterMessagesChatBuckets(SseClient client, List<SmallChannelDTO> channels, SseEventType eventType) {
        var sseEmitter = client.getSseEmitter();
        try {
            logger.info("Notify client {} - sseEmitter {} -  the event {}", client.getExternalId(), sseEmitter, eventType.name());
            var eventId = ID_COUNTER.incrementAndGet();
            SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name(eventType.name())
                    .id(String.valueOf(eventId))
                    .data(channels, MediaType.APPLICATION_JSON);
            sseEmitter.send(eventBuilder);
        } catch (IOException e) {
            sseEmitter.completeWithError(e);
        }
    }
    
    
}

以及保存并返回缓存信息的代码:

public SseClient getSSEClientFromCache(String cacheName, String key) {
        SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(cacheName);
        if (cache != null) {
            Cache.ValueWrapper valueWrapper = cache.get(key);
            if (valueWrapper != null) {
                return (SseClient) valueWrapper.get();
            }
        }
        return null;
    }
    
    public List<SseClient> getAllSSEClientFromCache() {
        SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(SSE_SUBSCRIBER);
        if (cache != null) {
            List<String> allKeys = cache.getAllKeys().stream()
                    .filter(c -> c.startsWith(SSE_SUBSCRIBER))
                    .map(c -> c.substring(SSE_SUBSCRIBER.length() + 1)) //we cut the key so only get the externalID
                    .toList();
            List<SseClient> clients = new ArrayList<>();
            for (String key : allKeys) {
                SseClient client = this.getSSEClientFromCache(SSE_SUBSCRIBER, key);
                if (client != null) {
                    clients.add(client);
                }
            }
            return clients;
        }
        return Collections.emptyList();
    }
    
    public void putValueInCache(String cacheName, Object key, Object value) {
        SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(cacheName);
        if (cache != null) {
            cache.put(key, value);
        }
    }

当我需要触发事件时,它可以工作,但 FE 没有收到它。

有什么帮助吗?

reactjs typescript spring-boot server-sent-events
1个回答
0
投票

我找到了解决方案并设法使其发挥作用!

我已经将我的控制器更改为如下所示并且它有效,但在我添加此之前还没有完全发挥作用

cacheControl(CacheControl.noCache())

@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     public ResponseEntity<SseEmitter> subscribeSseEmitter(
             @RequestParam("personalExternalId") String personalExternalId,
             @RequestParam("eventToSubscribe") String eventToSubscribe) {
         return ResponseEntity
                 .ok()
                 .header("Connection", "keep-alive")
                 .cacheControl(CacheControl.noCache())
                 .contentType(MediaType.TEXT_EVENT_STREAM)
                 .body(emitterService.registerClientToSet(personalExternalId, eventToSubscribe));
     }

但现在我面临另一个问题 如果我想重新连接客户端,超时会发生什么? 似乎 Fe 一直在尝试重新连接,而服务器表示显示超时。我认为设置默认超时而不是仅仅放置 Long.maxValue 或类似的东西是一个很好的做法。

© www.soinside.com 2019 - 2024. All rights reserved.