Reputation: 158
I am new to netty. I have to use static pipelines (because the project manager prefers it). And it is a little bit difficult, because I have to handle RTP and RTSP protocols on the same line.
Althought it almost works, but there is memory leak. I guess the fault in my splitter class. Moreover I think the error might be near bypass method (because the developers of netty - in order the avoide infinitive loop - do not allow to leave ByteBuf unchanged that is why I had to create the bypass method.)
If you have any idea, please help me! (Thanks in advance!)
Here is my code:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.ArrayList;
import java.util.List;
public class Splitter extends ByteToMessageDecoder {
private ByteBuf bb = Unpooled.buffer();
final RtspClientHandler rtspClientHandler;
final RtpClientHandler rtpClientHandler;
public Splitter(RtspClientHandler rtspClientHandler, RtpClientHandler rtpClientHandler) {
this.rtspClientHandler = rtspClientHandler;
this.rtpClientHandler = rtpClientHandler;
}
protected void bypass(ByteBuf in, MessageList<Object> out) {
bb.writeBytes(in);
in.discardReadBytes();
bb.retain();
out.add(bb);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
if (rtspClientHandler.getRTSPstate() == RtspClientHandler.RTSP_CLIENT_STATE.READY) {
if (in.getByte(0) == 0x24 && in.readableBytes() > 4) {
int lengthToRead = in.getUnsignedShort(2);
if (in.readableBytes() >= (lengthToRead + 4)) {
in.skipBytes(4);
if (in.getByte(16) == 0x67 || in.getByte(16) == 0x68) {
final byte bytes[] = new byte[lengthToRead];
in.readBytes(bytes);
in.discardReadBytes();
SPSPPSbuffer spspps = new SPSPPSbuffer();
spspps.setSPSPPS(bytes);
out.add(spspps);
} else {
final byte packetArray[] = new byte[lengthToRead];// copy packet.
in.readBytes(packetArray);
in.discardReadBytes();
out.add(packetArray);
}
}
} else {
bypass(in, out);
}
} else {
bypass(in, out);
}
}
}
Upvotes: 0
Views: 1153
Reputation: 158
It seems I could manage to solve it.
The main thing is: I had to use a collector ByteBuf in which I collects all bytes coming from the net (I had to clear the input ByteBuf), because there are 4 cases possible:
The num. of bytes (in the collector) are less than the RTP chunk size
The num. of bytes (in the collector) are equals the RTP chunk size
The num. of bytes (in the collector) are bigger than the RTP chunk size
More than one chunks are in the collector ByteBuf.
Here is the code:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
public class Splitter extends ByteToMessageDecoder {
private ByteBuf collector = Unpooled.buffer();
final RtspClientHandler rtspClientHandler;
final RtpClientHandler rtpClientHandler;
public Splitter(RtspClientHandler rtspClientHandler, RtpClientHandler rtpClientHandler) {
this.rtspClientHandler = rtspClientHandler;
this.rtpClientHandler = rtpClientHandler;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
collector.writeBytes(in);
in.discardReadBytes();
in.clear();
if (rtspClientHandler.getRTSPstate() != RtspClientHandler.RTSP_CLIENT_STATE.READY) {
System.out.println("RTSP communication in progress");
collector.retain();
out.add(collector);
return;
}
if (collector.readableBytes() > 0 && collector.getByte(0) != 0x24) {
System.out.println("Clearing the Unpooled.buffer() (because it does not start with 0x24)");
collector.readerIndex(collector.writerIndex());
collector.discardReadBytes();
}
System.out.println("*****New bytes arrived");
while (collector.readableBytes() > 0 && collector.getByte(0) == 0x24) {
System.out.println("Length: " + collector.readableBytes());
if (collector.readableBytes() > 4) {
int lengthToRead = collector.getUnsignedShort(2);
if (collector.readableBytes() >= (lengthToRead + 4)) {
collector.skipBytes(4);
if (collector.getByte(16) == 0x67 || collector.getByte(16) == 0x68) {
final byte bytes[] = new byte[lengthToRead];
collector.readBytes(bytes);
collector.discardReadBytes();
SPSPPSbuffer spspps = new SPSPPSbuffer();
spspps.setSPSPPS(bytes);
out.add(spspps);
} else {
final byte packetArray[] = new byte[lengthToRead];// copy packet.
collector.readBytes(packetArray);
collector.discardReadBytes();
out.add(packetArray);
}
} else {
System.out.println("Not enough length, " + (lengthToRead + 4) + " byte should be required (together with 4 bytes header)");
return;
}
} else {
System.out.println("Less than 5 bytes");
return;
}
}
}
}
Upvotes: 1