Capacytron
Capacytron

Reputation: 3739

Netty BufferSizePredictor truncates UDP datagram

I'm trying to develop custom Flume source which can receive custom UDP packets. Here is my code:

public class XvlrUdpSource extends AbstractSource
        implements EventDrivenSource, Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(XvlrUdpSource.class);


    private int port;
    private String host;
    private Channel nettyChannel;

    private static final Logger logger = LoggerFactory.getLogger(XvlrUdpSource.class);

    private CounterGroup counterGroup = new CounterGroup();

    public class XvlrUpdHander extends SimpleChannelHandler {

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
            try {
                System.out.println("class: "+ mEvent.getMessage().getClass());
                /** ChannelBuffer holds just first 768 bytes of the whole input UDP packet*/
                ChannelBuffer channelBuffer = (ChannelBuffer)mEvent.getMessage();
                   Event xvlrPacketEvent = EventBuilder.withBody( ((ChannelBuffer)mEvent.getMessage()).array());
                System.out.println("Length is:["+xvlrPacketEvent.getBody().length+"]");
                //Event e = syslogUtils.extractEvent((ChannelBuffer)mEvent.getMessage());
                if(xvlrPacketEvent == null){
                    return;
                }
                getChannelProcessor().processEvent(xvlrPacketEvent);
                counterGroup.incrementAndGet("events.success");
            } catch (ChannelException ex) {
                counterGroup.incrementAndGet("events.dropped");
                logger.error("Error writting to channel", ex);
                return;
            }
        }
    }

    @Override
    public void start() {
        ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap
                (new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
        final XvlrUpdHander handler = new XvlrUpdHander();
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(handler);
            }
        });

        if (host == null) {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
        } else {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
        }

        super.start();
    }

    @Override
    public void stop() {
        logger.info("Syslog UDP Source stopping...");
        logger.info("Metrics:{}", counterGroup);
        if (nettyChannel != null) {
            nettyChannel.close();
            try {
                nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.warn("netty server stop interrupted", e);
            } finally {
                nettyChannel = null;
            }
        }

        super.stop();
    }

    @Override
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(
                context, "port");//SyslogSourceConfigurationConstants.CONFIG_PORT);
        port = context.getInteger("port");//SyslogSourceConfigurationConstants.CONFIG_PORT);
        host = context.getString("host");//SyslogSourceConfigurationConstants.CONFIG_HOST);
        //formaterProp = context.getSubProperties("PROP");//SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
    }

}

I did debug on messageRecieved and see in stacktrace that here:

/**
     * Sends a {@code "messageReceived"} event to the first
     * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
     * the specified {@link Channel} belongs.
     *
     * @param message        the received message
     * @param remoteAddress  the remote address where the received message
     *                       came from
     */
    public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {
        channel.getPipeline().sendUpstream(
                new UpstreamMessageEvent(channel, message, remoteAddress));
    }

My Object message is already 768 bytes length.

The root is here org.jboss.netty.channel.socket.oio.OioDatagramWorker:

byte[] buf = new byte[predictor.nextReceiveBufferSize()];
            DatagramPacket packet = new DatagramPacket(buf, buf.length);

Predictor sets buffer size to 768 Then:

fireMessageReceived(
                    channel,
                    channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
                    packet.getSocketAddress());

I do get only first 768 bytes. is there any chance to change predictor behavior?

Upvotes: 1

Views: 896

Answers (2)

Capacytron
Capacytron

Reputation: 3739

I've found this topic: Netty Different Pipeline Per UDP Datagram

it's possible to "inject" predictor with desired behavior using special properties. So full solution is:

public class XvlrUdpSource extends AbstractSource
        implements EventDrivenSource, Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(XvlrUdpSource.class);


    private int port;
    private String host;
    private Channel nettyChannel;

    private CounterGroup counterGroup = new CounterGroup();

    public class XvlrUpdHander extends SimpleChannelHandler {

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
            try {
                ChannelBuffer channelBuffer = (ChannelBuffer)mEvent.getMessage();
                int actualSizeOfUdpPacket = channelBuffer.readableBytes();
                byte[] body = Arrays.copyOf(channelBuffer.array(), actualSizeOfUdpPacket);
                Event xvlrPacketEvent = EventBuilder.withBody(body);
                LOG.debug("Event.body length is: {} ", xvlrPacketEvent.getBody().length);
                if(xvlrPacketEvent == null){
                    return;
                }
                getChannelProcessor().processEvent(xvlrPacketEvent);
                counterGroup.incrementAndGet("events.success");
            } catch (ChannelException ex) {
                counterGroup.incrementAndGet("events.dropped");
                LOG.error("Error writting to channel", ex);
                return;
            }
        }
    }


    @Override
    public void start() {
        OioDatagramChannelFactory oioDatagramChannelFactory =   new OioDatagramChannelFactory(                                                                           Executors.newCachedThreadPool());
        ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap(oioDatagramChannelFactory);
        serverBootstrap.setOption("sendBufferSize", 65536);
        serverBootstrap.setOption("receiveBufferSize", 65536);
        serverBootstrap.setOption("receiveBufferSizePredictorFactory",
                                    new AdaptiveReceiveBufferSizePredictorFactory(8192, 8192, 16384));


        final XvlrUpdHander handler = new XvlrUpdHander();

        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(handler);
            }
        });
        if (host == null) {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
        } else {
            nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
        }

    }

        @Override
    public void stop() {
        LOG.info("Syslog UDP Source stopping...");
        LOG.info("Metrics:{}", counterGroup);
        if (nettyChannel != null) {
            nettyChannel.close();
            try {
                nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.warn("netty server stop interrupted", e);
            } finally {
                nettyChannel = null;
            }
        }

        super.stop();
    }

    @Override
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(context, "port");
        port = context.getInteger("port");
        host = context.getString("host");
   }
}

Upvotes: 1

user207421
user207421

Reputation: 310980

Either you are sending 768 bytes or the receiving buffer is only 768 bytes long. It certainly has nothing to do with carriage returns, unless there is some buggy handling of them in your code.

Upvotes: 0

Related Questions