我有一个代码,我使用subscribe调用一个可观察的函数。我期待它并行运行,但它按顺序运行。
makedownloadData()是将“showData”列表变量的所有变量存储在不同变量中的函数,例如“downloadData”。函数需要花费大量时间将所有变量复制到downloadData并进一步处理。所以我想使用observable / subscribe技术来调用这个函数,这样它就可以并行运行而不会导致当前序列的延迟,但它只是按顺序运行。没有增强。
第一种方法(没有观察到)
调用函数
this.downloadData=this.makeDownloadData() //This step is taking lot's of time as function respond late.
console.log("print after function call")
要调用的函数
public makeDownloadData() {
var main_data = this.showData;
var down_data:any = [];
for (var i=0; i<main_data.length; i++){
var element: String = "";
var newDate = new Date(main_data[i]["@timestamp"]);
element = element.concat(this.convertDateToLocalFormat(newDate)+" ");
element = element.concat(String(main_data[i]["cu_hostname"])+" ");
element = element.concat(String(main_data[i]["log_agent"])+".");
element = element.concat(String(main_data[i]["log_level"])+" ");
element = element.concat(String(main_data[i]["app_name"])+": ");
element = element.concat(String(main_data[i]["log_message"])+" ");
down_data.push(element.concat("\n"));
}
return down_data;
}
输出:
//Execution of function
"print after function call"
第二种方法(具有可观察性)
导入要求
import { Observable } from 'rxjs';
import 'rxjs/add/observable/of'
调用可观察的函数。
this.makeDownloadData().subscribe(response => {
console.log("Expected to print after") //This should be run in parallel and must printed after upcoming statement as this function is taking time to respond.
console.log(response); //Expected to print after
},
error => {
console.log("Did not got response")
});
console.log("Expected to print before")
要调用的函数
public makeDownloadData(): Observable<any> {
var main_data = this.showData;
var down_data:any = [];
for (var i=0; i<main_data.length; i++){
var element: String = "";
var newDate = new Date(main_data[i]["@timestamp"]);
element = element.concat(this.convertDateToLocalFormat(newDate)+" ");
element = element.concat(String(main_data[i]["cu_hostname"])+" ");
element = element.concat(String(main_data[i]["log_agent"])+".");
element = element.concat(String(main_data[i]["log_level"])+" ");
element = element.concat(String(main_data[i]["app_name"])+": ");
element = element.concat(String(main_data[i]["log_message"])+" ");
down_data.push(element.concat("\n"));
}
return Observable.of(down_data)
}
输出:
"Expected to print after"
Printing response
"Expected to print before"
预期产出:
"Expected to print before"
"Expected to print after"
Printing response
我想使用observable进行并行执行。请帮助。如果有什么不清楚,我会修改问题,使其更清楚。谢谢
与承诺不同,代码就像
Promise.resolve("one").then(console.log);
console.log("two");
导致“二”在“一”之前打印,可观察性本质上不是异步,就像回调不是。所以of("one").subscribe(console.log)
将立即打印,而不是在事件循环结束后打勾。
这是一个较长的讲座:https://www.syntaxsuccess.com/viewarticle/rxjs-subjects-emit-synchronous-values
首先你需要在订阅获取数据之后调用你之前打印的订阅方法,然后调用普通方法,即你的打印方法,所以现在你可以得到正确的答案我会帮助你任何澄清请依赖这个答案
有两个问题:
makeDownloadData()
不是一个可观察的函数,它只返回一个observable(占用时间的代码在return语句之前)下面的代码有一个完全异步的observable(感谢asyncScheduler
函数的of
参数 - 没有它of
是一个返回一个observable的同步函数):
const source = of('Hello, subscriber', asyncScheduler).pipe(
tap(() => console.log('Inside observable')),
map(x => {
for(let i = 0; i < 1000000000; i++)
for(let j = 0; j < 1; j++);
return x;
})
);
console.log('After observable')
source.subscribe(x => console.log(x));
通过说这是异步的,我的意思是After observable
将立即在控制台中。过了一段时间,你会看到'Inside observable'
,然后是'Hello, subscriber'
。
如果你删除了observable的asyncScheduler
,你将等待一段时间并看到相同的序列('After observable', 'Inside observable', 'Hello, subscriber'
),但是所有的代码都会被阻塞,直到observable中的循环结束,之后,你会看到三个字符串几乎同时打印在控制台上。看看这个演示:https://stackblitz.com/edit/rxjs-stackoverflow-556297976433166?file=index.ts
makeDownloadData(): any { // <== doesn't need to return an observable
var main_data = this.showData;
var down_data:any = [];
for (var i=0; i<main_data.length; i++){
var element: String = "";
var newDate = new Date(main_data[i]["@timestamp"]);
element = element.concat(this.convertDateToLocalFormat(newDate)+" ");
element = element.concat(String(main_data[i]["cu_hostname"])+" ");
element = element.concat(String(main_data[i]["log_agent"])+".");
element = element.concat(String(main_data[i]["log_level"])+" ");
element = element.concat(String(main_data[i]["app_name"])+": ");
element = element.concat(String(main_data[i]["log_message"])+" ");
down_data.push(element.concat("\n"));
}
return down_data // <== this doesn't need to return an observable
}
更新:我用of(this.makeDownloadData())
替换了of(1)
,否则它将花费相同的时间来创建在原始问题中运行该函数所需的运算符。然后,您可以将其映射到所需的功能。
import { of, asyncScheduler } from 'rxjs';
import { map } from 'rxjs/operators';
...
this.downloadData=of(1, asyncScheduler)
.pipe(map(() => this.makeDownloadData());
在这里使用of
并不是创建可观察对象的最佳方法,但到目前为止,它是最清晰的,因为我需要使用asyncScheduler
来使其异步。作为第一个参数的'1'是一个哑数据。你可以使用任何东西。 of
函数需要一个参数。