SSE Java 11+ 客户端示例(无依赖项)

问题描述 投票:0回答:3

我正在寻找使用普通 JDK11+ http 客户端读取服务器发送的事件的示例,没有额外的依赖项。我在文档中也找不到任何关于sse的信息。

有什么提示吗?

server-sent-events java-http-client
3个回答
3
投票

SSE(服务器发送事件)客户端的基于 Java 11 的实现:
上交所客户

它提供了一个非常简单的处理 SSE 消息的用法。

用法示例:

EventHandler eventHandler = eventText -> { process(eventText); };
        SSEClient sseClient = 
SSEClient sseClient = SSEClient.builder().url(url).eventHandler(eventHandler)
    .build();
sseClient.start();

注:我是这个SSE客户端的作者


3
投票

编辑1:信息herehere关于传入数据的格式。

编辑 2: 更新了代码示例以处理协议的

data:
部分。还有
event:
id:
retry:
部分(见上面的链接),但我不打算为这些添加处理。

找不到官方

BodySubscriber
做SSE,不过写一个也没那么难。这是一个粗略的实现(但请注意 TODO):

public class SseSubscriber implements BodySubscriber<Void>
{
    protected static final Pattern dataLinePattern = Pattern.compile( "^data: ?(.*)$" );

    protected static String extractMessageData( String[] messageLines )
    {
        var s = new StringBuilder( );
        for ( var line : messageLines )
        {
            var m = dataLinePattern.matcher( line );
            if ( m.matches( ) )
            {
                s.append( m.group( 1 ) );
            }
        }
        return s.toString( );
    }

    protected final Consumer<? super String> messageDataConsumer;
    protected final CompletableFuture<Void> future;
    protected volatile Subscription subscription;
    protected volatile String deferredText;

    public SseSubscriber( Consumer<? super String> messageDataConsumer )
    {
        this.messageDataConsumer = messageDataConsumer;
        this.future = new CompletableFuture<>( );
        this.subscription = null;
        this.deferredText = null;
    }

    @Override
    public void onSubscribe( Subscription subscription )
    {
        this.subscription = subscription;
        try
        {
            this.deferredText = "";
            this.subscription.request( 1 );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
            this.subscription.cancel( );
        }
    }

    @Override
    public void onNext( List<ByteBuffer> buffers )
    {
        try
        {
            // Volatile read
            var deferredText = this.deferredText;

            for ( var buffer : buffers )
            {
                // TODO: Safe to assume multi-byte chars don't get split across buffers?
                var s = deferredText + UTF_8.decode( buffer );

                // -1 means don't discard trailing empty tokens ... so the final token will
                // be whatever is left after the last \n\n (possibly the empty string, but
                // not necessarily), which is the part we need to defer until the next loop
                // iteration
                var tokens = s.split( "\n\n", -1 );

                // Final token gets deferred, not processed here
                for ( var i = 0; i < tokens.length - 1; i++ )
                {
                    var message = tokens[ i ];
                    var lines = message.split( "\n" );
                    var data = extractMessageData( lines );
                    this.messageDataConsumer.accept( data );
                    // TODO: Handle lines that start with "event:", "id:", "retry:"
                }

                // Defer the final token
                deferredText = tokens[ tokens.length - 1 ];
            }

            // Volatile write
            this.deferredText = deferredText;

            this.subscription.request( 1 );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
            this.subscription.cancel( );
        }
    }

    @Override
    public void onError( Throwable e )
    {
        this.future.completeExceptionally( e );
    }

    @Override
    public void onComplete( )
    {
        try
        {
            this.future.complete( null );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
        }
    }

    @Override
    public CompletionStage<Void> getBody( )
    {
        return this.future;
    }
}

然后使用它:

var req = HttpRequest.newBuilder( )
                     .GET( )
                     .uri( new URI( "http://service/path/to/events" )
                     .setHeader( "Accept", "text/event-stream" )
                     .build( );

this.client.sendAsync( req, respInfo ->
{
    if ( respInfo.statusCode( ) == 200 )
    {
        return new SseSubscriber( messageData ->
        {
            // TODO: Handle messageData
        } );
    }
    else
    {
        throw new RuntimeException( "Request failed" );
    }
} );

0
投票

我们刚刚构建了一个轻量级 SSE 处理程序,您可以在这里找到https://github.com/prefab-cloud/java-simple-sse-client。它将管理 Java HttpClient 留给您,并将字节转换为 HttpClient 的 LineSubscriber 的行,因此代码不多。

© www.soinside.com 2019 - 2024. All rights reserved.