Reputation: 409
I am processing messages from a vendor as a stream of data and want to store msgSeqNum locally in a local file. Reason:
They send msgSeqNum to uniquely identify each message. And they provide a 'sync-and-stream' functionality to stream messages on reconnecting from a given sequence number. Say if the msgSeqNum starts from 1 and my connection went down at msgSeqNum 50 and missed the next 100 messages (vendor server's current msgSeqNum is now 150), then when I reconnect to the vendor, I need to call 'sync-and-stream' with msgSeqNum=50 to get the missed 100 messages.
So I want to understand how I can persist the msgSeqNum locally for fast access. I assume
1) Since the read/writes happen frequently i.e. while processing every message (read to ignore dups, write to update msgSeqNum after processing a msg), I think it's best to use Java NIO's 'MappedByteBuffer'?
2) Could someone confirm if the below code is best for this where I expose the mapped byte buffer object to be reused for reads and writes and leave the FileChannel open for the lifetime of the process? Sample Junit code below:
I know this could be achieved with general Java file operations to read and write into a file but I need something fast which is equivalent to non-IO as I am using a single writer patten and want to be quick in processing these messages in a non-blocking manner.
private FileChannel fileChannel = null;
private MappedByteBuffer mappedByteBuffer = null;
private Charset utf8Charset = null;
private CharBuffer charBuffer = null;
@Before
public void setup() {
try {
charBuffer = CharBuffer.allocate( 24 ); // Long max/min are till 20 bytes anyway
System.out.println( "charBuffer length: " + charBuffer.length() );
Path pathToWrite = getFileURIFromResources();
FileChannel fileChannel = (FileChannel) Files
.newByteChannel( pathToWrite, EnumSet.of(
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING ));
mappedByteBuffer = fileChannel
.map( FileChannel.MapMode.READ_WRITE, 0, charBuffer.length() );
utf8Charset = Charset.forName( "utf-8" );
//charBuffer = CharBuffer.allocate( 8 );
} catch ( Exception e ) {
// handle it
}
}
@After
public void destroy() {
try {
fileChannel.close();
} catch ( IOException e ) {
// handle it
}
}
@Test
public void testWriteAndReadUsingSharedMappedByteBuffer() {
if ( mappedByteBuffer != null ) {
mappedByteBuffer.put( utf8Charset.encode( charBuffer.wrap( "101" ) )); // TODO improve this and try reusing the same buffer instead of creating a new one
} else {
System.out.println( "mappedByteBuffer null" );
fail();
}
mappedByteBuffer.flip();
assertEquals( "101", utf8Charset.decode(mappedByteBuffer).toString() );
}
Upvotes: 0
Views: 524