RxJs 计时器仅调用一次,而不是每秒调用一次

问题描述 投票:0回答:1

下面是我的代码:

this.callActiveListSubscription = timer(0,1000).pipe(switchMap(() => this.realtimeSupervisorHeatMapService.getLiveCallList())).subscribe((data: any) => {
        console.log('inside get live call list subscription',data);

        let fetchRequired = data.fetchRequired;
        this.recent_data = data;

        var listOfStrem = data.stream;
        var list = data.list_Of_contacts.filter(function (el) {
                return el != null;
        });
        if (list.length == 0) {


                // Set SelectedCallKey as None and Active Calls to 0
                this.allCallsData = [];
                this.agentCallList = [];
                this.filteredCallList = [];
                this.activeCallCount = 0;
                this.selectedCallData = null;


        } else {
                // set the amount of active calls
                console.log(this.selectedBusinessUnit)
                if(this.selectedBusinessUnit !== 'All'){

                 this.activeCallCount = list.filter(item=>item.business_unit === this.selectedBusinessUnit).length;
                } else {
                        this.activeCallCount = list.length;
                }

        }

        this.agentCallList = []
        for (let i = 0; i < list.length; i++) {
                if (list[i]) {
                        var agentName = list[i].agent_name;
                        var agtId = list[i].agentId;
                        var streamStartTime = list[i].streamStartTime;
                        var slackDMUrl = list[i].agentSlackDMUrl;
                        slackDMUrl = slackDMUrl === "NA" ? "#" : slackDMUrl;
                        var name = list[i].contact;
                        let phrase_count = list[i].phrases_count;
                        let business_unit = list[i].business_unit
                        let item = { name, agent_name: agentName, agentId: agtId, streamStartTime, slackDMUrl, phrase_count: phrase_count, business_unit }

                        let index = this.agentCallList.findIndex((element: any) => element.name === item.name)
                        if (index === -1) {
                                this.agentCallList.push(item);
                        } else {
                                this.agentCallList[index] = item;
                        }
                        this.agentCallList.sort((a,b)=>b.phrase_count - a.phrase_count);


                        if (i == list.length - 1) {

                                this.changeBusinessUnitDropdown();
                        } else {
                                // console.log("wait")
                        }
                }

        }
})

它位于 ngOnInit() 内部并且仅执行一次。

我希望它按照计时器内提到的那样每秒执行一次。我也使用 mergeMap 代替,但没有运气。任何帮助,将不胜感激。我也使用间隔代替计时器,但它仍然不起作用。

angular rxjs
1个回答
0
投票

不需要

timer
,您可以直接使用
fromEvent
,只需确保在网络套接字关闭时取消订阅即可!我认为您需要两个流,一个用于使用计时器刷新数据,另一个用于监听 websocket 事件。

我找不到

streamArray
的事件,所以我使用
message
事件并分享一个使用 rxjs
fromEvent
监听 Web 套接字的示例。

import axios from 'axios';

import { of, from, fromEvent, timer } from 'rxjs';
import { map, share, switchMap } from 'rxjs/operators';

const socket = new WebSocket('wss://socketsbay.com/wss/v2/1/demo/');
const createObservable = (eventName: string) => {
  return fromEvent(socket, eventName);
};
const getLiveCallList = () => {
  return createObservable('message');
};
getLiveCallList().subscribe((data: any) => {
  console.log(data);
});
socket.addEventListener('open', (event) => {
  console.log('open');
  socket.send('Hello Server!');
});
socket.addEventListener('message', (event) => {
  console.log('Message from server ', event.data);
});

堆栈闪电战

© www.soinside.com 2019 - 2024. All rights reserved.