我正在尝试在我的角度中使用 Spring boot webfulx 反应式 api,但我在浏览器开发人员控制台中遇到以下错误。
{error: SyntaxError: Unexpected token { in JSON at position 231 at JSON.parse
API 生成 application/x-ndjson,我不确定 HttpClient 是否无法解析响应。
我的服务等级:
export class UserInfoService {
private baseUrl = "http://localhost:9095/api/userinfo";
private headers= new HttpHeaders()
.set('content-type', 'application/json')
.set('Access-Control-Allow-Origin', '*')
constructor(private http: HttpClient) {}
getUsers(): Observable<UserInfo[]> {
return this.http.get<UserInfo[]>(
this.baseUrl + '/users', {'headers':this.headers});
}
}
我的组件类
export class DashboardComponent implements OnInit {
count: any = 0;
service: HttpServiceService;
usersList: Array<UserInfo> | undefined;
constructor(service: HttpServiceService) {
this.service = service;
}
ngOnInit(): void {
console.log("---ngOnInit()---");
this.service.getUsers().subscribe({
next: (result: any) => {
console.log("||||Response successful");
this.usersList?.push(result);
console.log(result);
},
error: (err: any) => {
console.log(err);
},
complete: () => {
console.log('complete');
}
});
}
}
我想以反应式方式显示模板表中的数据。我在浏览器控制台中看到以下错误:
@GetMapping(path="/users", produces = "application/x-ndjson")
public Flux<UserInfo> getAll() {
return userInfoService.getAll();
}
ndjson
格式不是有效的 application/json
格式,这就是为什么当 HttpClient
尝试解析响应时会收到该错误。
解决方案是以纯文本形式请求内容,然后解析内容。您可以通过在提出请求时发送
responseType: 'text'
来实现此目的:
export class UserInfoService {
private baseUrl = "http://localhost:9095/api/userinfo";
private headers= new HttpHeaders()
.set('Access-Control-Allow-Origin', '*')
constructor(private http: HttpClient) {}
// here is a simple ndjson parser implementation I found:
parse(data: string): any[] {
if (typeof data !== 'string')
throw new Error(`Unexpected type ${type}`);
const rows = data.split(/\n|\n\r/).filter(Boolean);
return rows.map((row) => JSON.parse(row));
}
getUsers(): Observable<UserInfo[]> {
return this.http.get<UserInfo[]>(this.baseUrl + '/users', {
headers: this.headers,
responseType: 'text'
}).pipe(map(this.parse));
}
}
我为该特定任务编写了一个拦截器并切换到
fetch
,因为角度HttpClient
仍然无法正确构建流数据。
import { Injectable } from '@angular/core';
import { HttpContextToken, HttpHandler, HttpInterceptor, HttpRequest, HttpResponse } from '@angular/common/http';
import { catchError, defer, of, filter, from, map, mergeMap, Observable, repeat, scan, tap, timer } from 'rxjs';
export const STREAMING = new HttpContextToken<StreamObserver | null>(() => null);
export type StreamObserver = {
connected: () => void;
disconnected: () => void;
};
@Injectable()
export class HttpStreamingInterceptor implements HttpInterceptor {
constructor() {}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
intercept(request: HttpRequest<any>, next: HttpHandler): Observable<any> {
const observer = request.context.get(STREAMING);
if (!observer) {
// only act if we should
return next.handle(request);
}
// defer so we can retry on a connection reset
return defer(() => {
// notify that we're disconnected - I recommend a debounce() somewhere in the pipe
// when acting on this
observer.disconnected();
return from(
fetch(request.url, {
credentials: 'include',
cache: 'no-store',
headers: request.headers.keys().map((k): [string, string] => [k, request.headers.get(k) ?? '']),
})
);
}).pipe(
filter(r => r.ok),
filter((r): r is typeof r & { body: ReadableStream } => !!r.body),
// Now we have an Observable<ReadableStream<Uint8Array>>
// Make that an Observable<Uint8Array>
mergeMap(r => from(r.body)),
// Notify that we're connected
tap(_ => observer.connected()),
// Don't throw, just complete on errors
catchError(_ => of()),
// Once we're done (either by completion or error), go again – this is a stream after all!
repeat({ delay: () => timer(5000) }),
map(arr => new TextDecoder('utf-8').decode(arr)),
// Buffer strings that haven't seen a newline yet, but forget
// everything we've already emitted!
scan(
(acc, curr) => {
const newBuf = acc.buffer + curr;
const emit = newBuf.split('\n');
const buffer = emit.pop() ?? '';
return { buffer, emit };
},
{ buffer: '', emit: [] as string[] }
),
mergeMap(x => x.emit),
filter((x): x is string => x !== null),
// Produce nice little HttpResponse events with our separate JSON objects
map(x => new HttpResponse({ body: JSON.parse(x) }))
);
}
}
然后可以将其与提供的上下文令牌一起使用,例如
http.get('/api/json-stream', { observe: 'response',
context: new HttpContext().set(STREAMING, {
connected: () => this.store.dispatch(ConnectionEvent()),
disconnected: () => this.store.dispatch(DisconnectionEvent()),
}),
});
请注意,由于拦截器基本上创建了一个无限重试的 Observable,因此阻止它的唯一方法是取消订阅。大多数效果已经通过管道中某处的
switchMap
隐式地做到了这一点。