我一直在努力使用Netty进行配置以将字节流传输到ClamAV服务。我正在使用Apache Camel路由。
使用Netty,我无法截获“超出INSTREAM大小限制”消息。
INSTREAM必须在此命令前加上n或z前缀。扫描数据流。在INSTREAM之后,在发送命令的同一套接字上,将数据流分块发送给clamd。这避免了建立新的TCP连接和NAT问题的开销。块的格式为:”其中,以下数据的大小(以字节为单位)以网络字节顺序表示为4字节无符号整数,并且是实际的块。通过发送零长度的块来终止流。注意:不要超过clamd.conf中定义的StreamMaxLength,否则clamd将以超出INSTREAM大小限制的方式进行答复并关闭连接。
使用直接同步套接字连接,我没有问题。谁能为我指出如何使用Netty做到这一点的正确方向?还是我应该坚持使用同步套接字连接。
使用同步套接字的实现。归功于https://github.com/solita/clamav-java“ Antti Virtanen”。
private class UseSocket implements Processor{
@Override
public void process(Exchange exchange) throws Exception{
try (BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class));
Socket socket = new Socket("localhost", 3310);
BufferedOutputStream socketOutput = new BufferedOutputStream(socket.getOutputStream())){
byte[] command = "zINSTREAM\0".getBytes();
socketOutput.write(command);
socketOutput.flush();
byte[] chunk = new byte[2048];
int chunkSize;
try(BufferedInputStream socketInput = new BufferedInputStream(socket.getInputStream())){
for(chunkSize = message.read(chunk);chunkSize > -1;chunkSize = message.read(chunk)){
socketOutput.write(ByteBuffer.allocate(4).putInt(chunkSize).array());
socketOutput.write(chunk, 0, chunkSize);
socketOutput.flush();
if(processReply(socketInput, exchange)){
return;
}
}
socketOutput.write(ByteBuffer.allocate(4).putInt(0).array());
socketOutput.flush();
processReply(socketInput, exchange);
}
}
}
private boolean processReply(BufferedInputStream in, Exchange exchange) throws Exception{
if(in.available() > 0) {
logger.info("processing reply");
byte[] replyBytes = new byte[256];
int replySize = in.read(replyBytes);
if (replySize > 0) {
String reply = new String(replyBytes, 0, replySize, StandardCharsets.UTF_8);
String avStatus = "infected";
if ("stream: OK\0".equals(reply)) {
avStatus = "clean";
} else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
avStatus = "overflow";
}
exchange.getIn().setHeader("av-status", avStatus);
return true;
}
}
return false;
}
}
使用带有入站和出站通道处理程序的Netty的实现。
private class UseNetty implements Processor{
@Override
public void process(Exchange exchange) throws Exception{
logger.info(CLASS_NAME + ": Creating Netty client");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.remoteAddress(new InetSocketAddress("localhost", 3310));
bootstrap.handler(new ClamAvChannelIntializer(exchange));
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
}catch(Exception ex) {
logger.error(CLASS_NAME + ": ERROR", ex);
}
finally
{
eventLoopGroup.shutdownGracefully();
logger.info(CLASS_NAME + ": Netty client closed");
}
}
}
public class ClamAvChannelIntializer extends ChannelInitializer<SocketChannel> {
private Exchange exchange;
public ClamAvChannelIntializer(Exchange exchange){
this.exchange = exchange;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClamAvClientWriter());
socketChannel.pipeline().addLast(new ClamAvClientHandler(exchange));
}
}
public class ClamAvClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
String CLASS_NAME;
Logger logger;
private Exchange exchange;
public static final int MAX_BUFFER = 2048;
public ClamAvClientHandler(Exchange exchange){
super();
CLASS_NAME = this.getClass().getName();
logger = LoggerFactory.getLogger(CLASS_NAME);
this.exchange = exchange;
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception{
logger.info(CLASS_NAME + ": Entering channelActive");
channelHandlerContext.write(exchange);
logger.info(CLASS_NAME + ": Exiting channelActive");
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause){
cause.printStackTrace();
channelHandlerContext.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
logger.info(CLASS_NAME + ": Entering channelRead0");
String reply = byteBuf.toString(CharsetUtil.UTF_8);
logger.info(CLASS_NAME + ": Reply = " + reply);
String avStatus = "infected";
if ("stream: OK\0".equals(reply)) {
avStatus = "clean";
} else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
avStatus = "overflow";
} else{
logger.warn("Infected or unknown reply = " + reply);
}
exchange.getIn().setHeader("av-status", avStatus);
logger.info(CLASS_NAME + ": Exiting channelRead0");
channelHandlerContext.close();
}
}
public class ClamAvClientWriter extends ChannelOutboundHandlerAdapter {
String CLASS_NAME;
Logger logger;
public static final int MAX_BUFFER = 64000;//2^16
public ClamAvClientWriter(){
CLASS_NAME = this.getClass().getName();
logger = LoggerFactory.getLogger(CLASS_NAME);
}
@Override
public void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception{
logger.info(CLASS_NAME + ": Entering write");
Exchange exchange = (Exchange)o;
try(BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class))){
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("zINSTREAM\0".getBytes()));
byte[] chunk = new byte[MAX_BUFFER];
for(int i=message.read(chunk);i>-1;i=message.read(chunk)){
byte[] chunkSize = ByteBuffer.allocate(4).putInt(i).array();
channelHandlerContext.write(Unpooled.copiedBuffer(chunkSize));
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(chunk, 0, i));
}
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(ByteBuffer.allocate(4).putInt(0).array()));
}
logger.info(CLASS_NAME + ": Exiting write");
}
}
我终于放弃了尝试使用Netty进行此操作。我创建了一个新的骆驼处理器,并将套接字流打包在其中。下面的代码,以防有人遇到类似问题。
public class ClamAvInstream implements Processor {
Logger logger;
private final int MAX_BUFFER = 2048;
public ClamAvInstream() {
logger = LoggerFactory.getLogger(this.getClass().getName());
}
@Override
public void process(Exchange exchange) throws Exception {
try (BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class));
Socket socket = new Socket("localhost", 3310);
BufferedOutputStream socketOutput = new BufferedOutputStream(socket.getOutputStream())) {
byte[] command = "zINSTREAM\0".getBytes();
socketOutput.write(command);
socketOutput.flush();
byte[] chunk = new byte[MAX_BUFFER];
int chunkSize;
try (BufferedInputStream socketInput = new BufferedInputStream(socket.getInputStream())) {
for (chunkSize = message.read(chunk); chunkSize > -1; chunkSize = message.read(chunk)) {
socketOutput.write(ByteBuffer.allocate(4).putInt(chunkSize).array());
socketOutput.write(chunk, 0, chunkSize);
socketOutput.flush();
receivedReply(socketInput, exchange);
}
socketOutput.write(ByteBuffer.allocate(4).putInt(0).array());
socketOutput.flush();
receivedReply(socketInput, exchange);
} catch(ClamAvException ex){ //close socketInput
logger.warn(ex.getMessage());
}
}//close message, socket, socketOutput
}
private class ClamAvException extends Exception{
private ClamAvException(String error){
super(error);
}
}
private void receivedReply(BufferedInputStream in, Exchange exchange) throws Exception{
if(in.available() > 0){
byte[] replyBytes = new byte[256];
int replySize = in.read(replyBytes);
if (replySize > 0) {
String reply = new String(replyBytes, 0, replySize, StandardCharsets.UTF_8);
logger.info("reply="+reply);
if(reply.contains("OK")){
exchange.getIn().setHeader("av-status", "clean");
}else if(reply.contains("ERROR")){
if(reply.equals("INSTREAM size limit exceeded. ERROR\0")){
exchange.getIn().setHeader("av-status", "overflow");
}else {
exchange.getIn().setHeader("av-status", "error");
}
throw new ClamAvException(reply);
}else if(reply.contains("FOUND")){
exchange.getIn().setHeader("av-status", "infected");
}else{
exchange.getIn().setHeader("av-status", "unknown");
}
}
}
}
}