AsynchronousSocketChannel ReadWritePendingException - 可以同步吗?

问题描述 投票:0回答:1

我正在处理一个TCP服务器,我很好奇是否可以同步AsynchronousSocketChannel的读和写方法。我把通道封装到另一个类中,因为我需要在我的通道上实现一些额外的功能。我的问题是,这是否真的是同步的正确方式。

/**
 * writes bytes from a <b>ByteBuffer</b> into an
 * <b>AsynchronousSocketChannel</b>
 * 
 * @param buffer    the ByteBuffer to write from
 * @param onFailure specifies the method that should be called on failure of the
 *                  write operation
 */
public void write(ByteBuffer buffer, final C onFailure) {

    CompletionHandler<Integer, ByteBuffer> handler = new CompletionHandler<Integer, ByteBuffer>() {

        @Override
        public void completed(Integer result, ByteBuffer buf) {
            if (buf.hasRemaining())
                channel.write(buf, buf, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buf) {
            attachment.call(onFailure, exc);
        }

    };

    synchronized (writeLock) {
        this.channel.write(buffer, buffer, handler);
    }
}

在这种情况下 writeLock 是一个 static final 对象,当我的封装类的任何一个任意实例开始写操作时,该对象就会获得一个锁。这样做真的有用吗,还是会跑出同步块?

java sockets asynchronous io channel
1个回答
0
投票

我是这样解决的。

/**
 * writes bytes from a <b>ByteBuffer</b> into an
 * <b>AsynchronousSocketChannel</b>
 * 
 * @param buffer    the ByteBuffer to write from
 * @param onFailure specifies the method that should be called on failure of the
 *                  write operation
 */
public void write(ByteBuffer buffer, final C onFailure) {

    CompletionHandler<Integer, ByteBuffer> handler = new CompletionHandler<Integer, ByteBuffer>() {

        @Override
        public void completed(Integer result, ByteBuffer buf) {
            if (buf.hasRemaining()) {
                channel.write(buf, buf, this);
                return;
            }

            synchronized (writeLock) {
                if (!writeQueue.isEmpty()) {
                    while (writePending)
                        ;

                    ByteBuffer writeBuf = writeQueue.pop();
                    channel.write(writeBuf, writeBuf, this);
                    writePending = true;
                    return;
                }
            }

            writePending = false;
        }

        @Override
        public void failed(Throwable exc, ByteBuffer buf) {
            writePending = false;
            attachment.call(onFailure, exc);
        }

    };

    synchronized (writeLock) {
        while (this.writePending)
            ;

        this.writeQueue.push(buffer);

        ByteBuffer writeBuffer = this.writeQueue.pop();
        this.channel.write(writeBuffer, writeBuffer, handler);
        this.writePending = true;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.