将Netty与ClamAV插播一起使用

问题描述 投票:0回答:1

我一直在努力使用Netty进行配置以将字节流传输到ClamAV服务。我正在使用Apache Camel路由。

使用Netty,我无法截获“超出INSTREAM大小限制”消息。

INSTREAM必须在此命令前加上n或z前缀。扫描数据流。在INSTREAM之后,在发送命令的同一套接字上,将数据流分块发送给clamd。这避免了建立新的TCP连接和NAT问题的开销。块的格式为:”其中,以下数据的大小(以字节为单位)以网络字节顺序表示为4字节无符号整数,并且是实际的块。通过发送零长度的块来终止流。注意:不要超过clamd.conf中定义的StreamMaxLength,否则clamd将以超出INSTREAM大小限制的方式进行答复并关闭连接。

使用直接同步套接字连接,我没有问题。谁能为我指出如何使用Netty做到这一点的正确方向?还是我应该坚持使用同步套接字连接。

使用同步套接字的实现。归功于https://github.com/solita/clamav-java“ Antti Virtanen”。

    private class UseSocket implements Processor{
        @Override
        public void process(Exchange exchange) throws Exception{
            try (BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class));
                 Socket socket = new Socket("localhost", 3310);
                 BufferedOutputStream socketOutput = new BufferedOutputStream(socket.getOutputStream())){
                byte[] command = "zINSTREAM\0".getBytes();
                socketOutput.write(command);
                socketOutput.flush();
                byte[] chunk = new byte[2048];
                int chunkSize;
                try(BufferedInputStream socketInput = new BufferedInputStream(socket.getInputStream())){
                    for(chunkSize = message.read(chunk);chunkSize > -1;chunkSize = message.read(chunk)){
                        socketOutput.write(ByteBuffer.allocate(4).putInt(chunkSize).array());
                        socketOutput.write(chunk, 0, chunkSize);
                        socketOutput.flush();

                        if(processReply(socketInput, exchange)){
                            return;
                        }
                    }
                    socketOutput.write(ByteBuffer.allocate(4).putInt(0).array());
                    socketOutput.flush();
                    processReply(socketInput, exchange);
                }
            }
        }

        private boolean processReply(BufferedInputStream in, Exchange exchange) throws Exception{
            if(in.available() > 0) {
                logger.info("processing reply");
                byte[] replyBytes = new byte[256];
                int replySize = in.read(replyBytes);
                if (replySize > 0) {
                    String reply = new String(replyBytes, 0, replySize, StandardCharsets.UTF_8);
                    String avStatus = "infected";
                    if ("stream: OK\0".equals(reply)) {
                        avStatus = "clean";
                    } else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
                        avStatus = "overflow";
                    }
                    exchange.getIn().setHeader("av-status", avStatus);
                    return true;
                }
            }
            return false;
        }
    }   

使用带有入站和出站通道处理程序的Netty的实现。

    private class UseNetty implements Processor{

        @Override
        public void process(Exchange exchange) throws Exception{
            logger.info(CLASS_NAME + ": Creating Netty client");
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.remoteAddress(new InetSocketAddress("localhost", 3310));
                bootstrap.handler(new ClamAvChannelIntializer(exchange));
                ChannelFuture channelFuture = bootstrap.connect().sync();
                channelFuture.channel().closeFuture().sync();
            }catch(Exception ex) {
                logger.error(CLASS_NAME + ": ERROR", ex);
            }
            finally
            {
                eventLoopGroup.shutdownGracefully();
                logger.info(CLASS_NAME + ": Netty client closed");
            }
        }
    }

public class ClamAvChannelIntializer extends ChannelInitializer<SocketChannel> {
    private Exchange exchange;
    public ClamAvChannelIntializer(Exchange exchange){
        this.exchange = exchange;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new ClamAvClientWriter());
        socketChannel.pipeline().addLast(new ClamAvClientHandler(exchange));
    }
}

public class ClamAvClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    String CLASS_NAME;
    Logger logger;
    private Exchange exchange;
    public static final int MAX_BUFFER = 2048;
    public ClamAvClientHandler(Exchange exchange){
        super();
        CLASS_NAME = this.getClass().getName();
        logger = LoggerFactory.getLogger(CLASS_NAME);
        this.exchange = exchange;
    }

    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception{
        logger.info(CLASS_NAME + ": Entering channelActive");
        channelHandlerContext.write(exchange);
        logger.info(CLASS_NAME + ": Exiting channelActive");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause){
        cause.printStackTrace();
        channelHandlerContext.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        logger.info(CLASS_NAME + ": Entering channelRead0");
        String reply = byteBuf.toString(CharsetUtil.UTF_8);
        logger.info(CLASS_NAME + ": Reply = " + reply);
        String avStatus = "infected";
        if ("stream: OK\0".equals(reply)) {
            avStatus = "clean";
        } else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
            avStatus = "overflow";
        } else{
            logger.warn("Infected or unknown reply = " + reply);
        }
        exchange.getIn().setHeader("av-status", avStatus);
        logger.info(CLASS_NAME + ": Exiting channelRead0");
        channelHandlerContext.close();
    }
}

public class ClamAvClientWriter extends ChannelOutboundHandlerAdapter {
    String CLASS_NAME;
    Logger logger;
    public static final int MAX_BUFFER = 64000;//2^16
    public ClamAvClientWriter(){
        CLASS_NAME = this.getClass().getName();
        logger = LoggerFactory.getLogger(CLASS_NAME);
    }
    @Override
    public void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception{
        logger.info(CLASS_NAME + ": Entering write");
        Exchange exchange = (Exchange)o;
        try(BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class))){
            channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("zINSTREAM\0".getBytes()));
            byte[] chunk = new byte[MAX_BUFFER];
            for(int i=message.read(chunk);i>-1;i=message.read(chunk)){
                byte[] chunkSize = ByteBuffer.allocate(4).putInt(i).array();
                channelHandlerContext.write(Unpooled.copiedBuffer(chunkSize));
                channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(chunk, 0, i));
            }
            channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(ByteBuffer.allocate(4).putInt(0).array()));
        }
        logger.info(CLASS_NAME + ": Exiting write");
    }
}
java apache-camel netty clam
1个回答
0
投票

我终于放弃了尝试使用Netty进行此操作。我创建了一个新的骆驼处理器,并将套接字流打包在其中。下面的代码,以防有人遇到类似问题。

public class ClamAvInstream implements Processor {
Logger logger;
private final int MAX_BUFFER = 2048;

public ClamAvInstream() {
    logger = LoggerFactory.getLogger(this.getClass().getName());
}

@Override
public void process(Exchange exchange) throws Exception {
    try (BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class));
         Socket socket = new Socket("localhost", 3310);
         BufferedOutputStream socketOutput = new BufferedOutputStream(socket.getOutputStream())) {
        byte[] command = "zINSTREAM\0".getBytes();
        socketOutput.write(command);
        socketOutput.flush();
        byte[] chunk = new byte[MAX_BUFFER];
        int chunkSize;
        try (BufferedInputStream socketInput = new BufferedInputStream(socket.getInputStream())) {
            for (chunkSize = message.read(chunk); chunkSize > -1; chunkSize = message.read(chunk)) {
                socketOutput.write(ByteBuffer.allocate(4).putInt(chunkSize).array());
                socketOutput.write(chunk, 0, chunkSize);
                socketOutput.flush();

                receivedReply(socketInput, exchange);
            }
            socketOutput.write(ByteBuffer.allocate(4).putInt(0).array());
            socketOutput.flush();
            receivedReply(socketInput, exchange);
        } catch(ClamAvException ex){ //close socketInput
            logger.warn(ex.getMessage());
        }
    }//close message, socket, socketOutput
}

private class ClamAvException extends Exception{
    private ClamAvException(String error){
        super(error);
    }
}

private void receivedReply(BufferedInputStream in, Exchange exchange) throws Exception{
    if(in.available() > 0){
        byte[] replyBytes = new byte[256];
        int replySize = in.read(replyBytes);
        if (replySize > 0) {
            String reply = new String(replyBytes, 0, replySize, StandardCharsets.UTF_8);
            logger.info("reply="+reply);
            if(reply.contains("OK")){
                exchange.getIn().setHeader("av-status", "clean");
            }else if(reply.contains("ERROR")){
                if(reply.equals("INSTREAM size limit exceeded. ERROR\0")){
                    exchange.getIn().setHeader("av-status", "overflow");
                }else {
                    exchange.getIn().setHeader("av-status", "error");
                }
                throw new ClamAvException(reply);
            }else if(reply.contains("FOUND")){
                exchange.getIn().setHeader("av-status", "infected");
            }else{
                exchange.getIn().setHeader("av-status", "unknown");
            }
        }
    }
}

}

© www.soinside.com 2019 - 2024. All rights reserved.