Angular HttpClient GET 解析 application/x-ndjson 错误

问题描述 投票:0回答:2

我正在尝试在我的角度中使用 Spring boot webfulx 反应式 api,但我在浏览器开发人员控制台中遇到以下错误。

{error: SyntaxError: Unexpected token { in JSON at position 231 at JSON.parse

API 生成 application/x-ndjson,我不确定 HttpClient 是否无法解析响应。

我的服务等级:

export class UserInfoService {
  private baseUrl = "http://localhost:9095/api/userinfo";

  private headers= new HttpHeaders()
    .set('content-type', 'application/json')
    .set('Access-Control-Allow-Origin', '*')

  constructor(private http: HttpClient) {}

  getUsers(): Observable<UserInfo[]> {
    return this.http.get<UserInfo[]>(
      this.baseUrl + '/users', {'headers':this.headers});
  }
}

我的组件类

export class DashboardComponent implements OnInit {
  count: any = 0;
  service: HttpServiceService;
  usersList: Array<UserInfo> | undefined;

  constructor(service: HttpServiceService) {
    this.service = service;
  }

  ngOnInit(): void {
    console.log("---ngOnInit()---");
    this.service.getUsers().subscribe({
      next: (result: any) => {
        console.log("||||Response successful");
        this.usersList?.push(result);
        console.log(result);        
      },
      error: (err: any) => {
        console.log(err);
      },
      complete: () => {
        console.log('complete');
      }
    });
  }
}

我想以反应式方式显示模板表中的数据。我在浏览器控制台中看到以下错误:

@GetMapping(path="/users", produces = "application/x-ndjson")
public Flux<UserInfo> getAll() {
    return userInfoService.getAll();
}

angular reactive-programming spring-webflux angular2-observables ndjson
2个回答
0
投票

ndjson
格式不是有效的
application/json
格式,这就是为什么当
HttpClient
尝试解析响应时会收到该错误。

解决方案是以纯文本形式请求内容,然后解析内容。您可以通过在提出请求时发送

responseType: 'text'
来实现此目的:

export class UserInfoService {
  private baseUrl = "http://localhost:9095/api/userinfo";

  private headers= new HttpHeaders()
    .set('Access-Control-Allow-Origin', '*')

  constructor(private http: HttpClient) {}

  // here is a simple ndjson parser implementation I found:
  parse(data: string): any[] {
    if (typeof data !== 'string')
      throw new Error(`Unexpected type ${type}`);
    const rows = data.split(/\n|\n\r/).filter(Boolean);
    return rows.map((row) => JSON.parse(row));
  }

  getUsers(): Observable<UserInfo[]> {
    return this.http.get<UserInfo[]>(this.baseUrl + '/users', {
      headers: this.headers,
      responseType: 'text'
    }).pipe(map(this.parse));
  }
}

0
投票

我为该特定任务编写了一个拦截器并切换到

fetch
,因为角度
HttpClient
仍然无法正确构建流数据。

import { Injectable } from '@angular/core';
import { HttpContextToken, HttpHandler, HttpInterceptor, HttpRequest, HttpResponse } from '@angular/common/http';
import { catchError, defer, of, filter, from, map, mergeMap, Observable, repeat, scan, tap, timer } from 'rxjs';

export const STREAMING = new HttpContextToken<StreamObserver | null>(() => null);

export type StreamObserver = {
    connected: () => void;
    disconnected: () => void;
};

@Injectable()
export class HttpStreamingInterceptor implements HttpInterceptor {
    constructor() {}

    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    intercept(request: HttpRequest<any>, next: HttpHandler): Observable<any> {
        const observer = request.context.get(STREAMING);
        if (!observer) {
            // only act if we should
            return next.handle(request);
        }
        // defer so we can retry on a connection reset
        return defer(() => {
            // notify that we're disconnected - I recommend a debounce() somewhere in the pipe
            // when acting on this
            observer.disconnected();
            return from(
                fetch(request.url, {
                    credentials: 'include',
                    cache: 'no-store',
                    headers: request.headers.keys().map((k): [string, string] => [k, request.headers.get(k) ?? '']),
                })
            );
        }).pipe(
            filter(r => r.ok),
            filter((r): r is typeof r & { body: ReadableStream } => !!r.body),
            // Now we have an Observable<ReadableStream<Uint8Array>>
            // Make that an Observable<Uint8Array>
            mergeMap(r => from(r.body)),
            // Notify that we're connected
            tap(_ => observer.connected()),
            // Don't throw, just complete on errors
            catchError(_ => of()),
            // Once we're done (either by completion or error), go again – this is a stream after all!
            repeat({ delay: () => timer(5000) }),
            map(arr => new TextDecoder('utf-8').decode(arr)),
            // Buffer strings that haven't seen a newline yet, but forget
            // everything we've already emitted!
            scan(
                (acc, curr) => {
                    const newBuf = acc.buffer + curr;
                    const emit = newBuf.split('\n');
                    const buffer = emit.pop() ?? '';
                    return { buffer, emit };
                },
                { buffer: '', emit: [] as string[] }
            ),
            mergeMap(x => x.emit),
            filter((x): x is string => x !== null),
            // Produce nice little HttpResponse events with our separate JSON objects
            map(x => new HttpResponse({ body: JSON.parse(x) }))
        );
    }
}

然后可以将其与提供的上下文令牌一起使用,例如

            http.get('/api/json-stream', { observe: 'response',
                    context: new HttpContext().set(STREAMING, {
                        connected: () => this.store.dispatch(ConnectionEvent()),
                        disconnected: () => this.store.dispatch(DisconnectionEvent()),
                    }),
                });

请注意,由于拦截器基本上创建了一个无限重试的 Observable,因此阻止它的唯一方法是取消订阅。大多数效果已经通过管道中某处的

switchMap
隐式地做到了这一点。

© www.soinside.com 2019 - 2024. All rights reserved.