Robert Lewis
Robert Lewis

Reputation: 1907

How to assemble packets from an Observable into a new Observable?

My application subscribes to an Observable<Timestamped<byte[]>> of data packets arriving in sequence, and assembles them into larger frames. It must examine each packet to find the "Start of Frame" header and do some minor processing to assemble the packets into a valid frame.

How can I create a new Observable<Frame> that will emit these completed frames to a Subscriber?

Update: the suggested answer doesn't want to work for me. Some details:

  1. My source Observable emits Timestamped<byte[]> packets.
  2. Desired Output is an Observable of DataFrame objects, each including the data from several packets along with some other fields.
  3. I have a class FrameAssembler with a method DataFrame receivePacket( Timestamped<byte[]> packet ). It returns null until it has assembled a frame, which it then returns and gets ready for the next one.

I can't create the output Observable. I'm trying this

Observable<DataFrame> source = Observable
    .just( new Timestamped<byte[]>(100, new byte[10]) ) // sample packet
    .scan( new FrameAssembler(), (acc, packet) -> acc.receivePacket( packet ))  
    .filter( frame -> frame != null )

but the lambda is underlined, with the message "Bad return type in lambda expression: DataFrame cannot be converted to TestScan.FrameAssembler".

I'm thoroughly stumped by this. What is acc and what's it doing there? Why does it want to convert the DataFrame returned by receivePacket into FrameAssembler? And why is new FrameAssembler() used as the first argument to scan()?

Upvotes: 0

Views: 156

Answers (2)

Robert Lewis
Robert Lewis

Reputation: 1907

I couldn't get the proposed solution using the scan() operator to work. I believe the problem was the null being returned until a complete set of packets was received. Observable operator chains don't seem to like nulls.

How I solved it: In the onNext() handler of the data packet Observable subscription:

Thread.currentThread().setPriority( DATA_RX_PRIORITY );
packetArrayList = DataOps.addPacket( packetArrayList, dataPacket );
if( packetArrayList != null ) {  // we have a new complete packet buffer
    DataOps.DataFrame frameReturned = DataOps.pBuf2dFrame( packetArrayList );
    frameRelayer.onNext( frameReturned ); // send the new frame to the BehaviorSubject
}

The addPacket() routine adds each received packet to an ArrayList, but returns null except when a complete Frame's packets have been accumulated, when it returns the filled ArrayList.

When a non-null ArrayList is received, the pBuf2dFrame() method parses the packets and assembles them into a new DataFrame object. Then comes the trick that converts the Observable of packets into an Observable of DataFrames: frameRelayer is a BehaviorSubject (an RxJava object that can function as both an Observable and a Subscriber). All you have to do is call its onNext() method with the new DataFrame to have it passed on to any Subscribers to frameRelayer.

Upvotes: 0

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

You probably want to use the 2-parameter scan operator:

  class ByteAccumulator {
     private byte[] buffer = ...
     public byte[] receivePacket(byte[] receivedPacket) {
        // add the received packet to the buffer
        if(containsFullFrame(buffer)) {
            return extractFrameAndTrimBuffer();
        } else {
            return null;
        }
     }
  }

  Observable<byte[]> source = ...
  source.scan(new ByteAccumulator(), ByteAccumulator::receivePacket)
        .filter(frame -> frame != null)
        ...

Edit: You need an intermediate class to adapt your FrameAssembler to what scan expects:

public FrameScanner {
  private final FrameAssembler assembler;
  private final DataFrame frame;
  public FrameScanner() {this(new FrameAssembler(), null);}
  public FrameScanner(FrameAssembler assembler,DataFrame frame) {
    this.frame=frame; this.assembler=assembler;
  }
  public getFrame() {return frame;}
  public FrameScanner scan(Timestamped<byte[]> nextBytes) {
    return new FrameScanner(assembler, assembler.receivePacker(nextBytes));
  }
}

Now you should be able to use it like this:

.scan(new FrameScanner(), FrameScanner::scan)
.map(FrameScanner::getFrame)
.filter(Objects::nonNull)

Hmm... now that I think about it, instead of the abofethis might also work:

FrameAssembler assembler=new FrameAssembler();
...
.scan((DataFrame)null, (ignore, packet) -> assembler.receivePacket( packet))
.filter(Objects::nonNull)

Upvotes: 1

Related Questions