Reputation: 1907
My application subscribes to an Observable<Timestamped<byte[]>>
of data packets arriving in sequence, and assembles them into larger frames. It must examine each packet to find the "Start of Frame" header and do some minor processing to assemble the packets into a valid frame.
How can I create a new Observable<Frame>
that will emit these completed frames to a Subscriber
?
Update: the suggested answer doesn't want to work for me. Some details:
Observable
emits Timestamped<byte[]>
packets. Observable
of DataFrame
objects, each including the data from several packets along with some other fields. FrameAssembler
with a method DataFrame receivePacket( Timestamped<byte[]> packet )
. It returns null
until it has assembled a frame, which it then returns and gets ready for the next one. I can't create the output Observable. I'm trying this
Observable<DataFrame> source = Observable
.just( new Timestamped<byte[]>(100, new byte[10]) ) // sample packet
.scan( new FrameAssembler(), (acc, packet) -> acc.receivePacket( packet ))
.filter( frame -> frame != null )
but the lambda is underlined, with the message "Bad return type in lambda expression: DataFrame cannot be converted to TestScan.FrameAssembler".
I'm thoroughly stumped by this. What is acc
and what's it doing there? Why does it want to convert the DataFrame
returned by receivePacket
into FrameAssembler
? And why is new FrameAssembler()
used as the first argument to scan()
?
Upvotes: 0
Views: 156
Reputation: 1907
I couldn't get the proposed solution using the scan()
operator to work. I believe the problem was the null
being returned until a complete set of packets was received. Observable operator chains don't seem to like nulls.
How I solved it:
In the onNext()
handler of the data packet Observable subscription:
Thread.currentThread().setPriority( DATA_RX_PRIORITY );
packetArrayList = DataOps.addPacket( packetArrayList, dataPacket );
if( packetArrayList != null ) { // we have a new complete packet buffer
DataOps.DataFrame frameReturned = DataOps.pBuf2dFrame( packetArrayList );
frameRelayer.onNext( frameReturned ); // send the new frame to the BehaviorSubject
}
The addPacket()
routine adds each received packet to an ArrayList
, but returns null
except when a complete Frame's packets have been accumulated, when it returns the filled ArrayList
.
When a non-null ArrayList
is received, the pBuf2dFrame()
method parses the packets and assembles them into a new DataFrame
object.
Then comes the trick that converts the Observable of packets into an Observable of DataFrames: frameRelayer
is a BehaviorSubject
(an RxJava object that can function as both an Observable and a Subscriber). All you have to do is call its onNext()
method with the new DataFrame
to have it passed on to any Subscribers to frameRelayer
.
Upvotes: 0
Reputation: 16142
You probably want to use the 2-parameter scan
operator:
class ByteAccumulator {
private byte[] buffer = ...
public byte[] receivePacket(byte[] receivedPacket) {
// add the received packet to the buffer
if(containsFullFrame(buffer)) {
return extractFrameAndTrimBuffer();
} else {
return null;
}
}
}
Observable<byte[]> source = ...
source.scan(new ByteAccumulator(), ByteAccumulator::receivePacket)
.filter(frame -> frame != null)
...
Edit: You need an intermediate class to adapt your FrameAssembler
to what scan
expects:
public FrameScanner {
private final FrameAssembler assembler;
private final DataFrame frame;
public FrameScanner() {this(new FrameAssembler(), null);}
public FrameScanner(FrameAssembler assembler,DataFrame frame) {
this.frame=frame; this.assembler=assembler;
}
public getFrame() {return frame;}
public FrameScanner scan(Timestamped<byte[]> nextBytes) {
return new FrameScanner(assembler, assembler.receivePacker(nextBytes));
}
}
Now you should be able to use it like this:
.scan(new FrameScanner(), FrameScanner::scan)
.map(FrameScanner::getFrame)
.filter(Objects::nonNull)
Hmm... now that I think about it, instead of the abofethis might also work:
FrameAssembler assembler=new FrameAssembler();
...
.scan((DataFrame)null, (ignore, packet) -> assembler.receivePacket( packet))
.filter(Objects::nonNull)
Upvotes: 1