Reputation: 5146
I have a method which takes a parameter which is Partition
enum. This method will be called by multiple background threads (15 max) around same time period by passing different value of partition
. Here dataHoldersByPartition
is a ImmutableMap
of Partition
and ConcurrentLinkedQueue<DataHolder>
.
private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;
//... some code to populate entry in `dataHoldersByPartition` map
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
continue;
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
clientKeyBytesAndProcessBytesHolder = new HashMap<>();
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
// calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
}
}
And below is my Message
class:
public final class Message {
private final byte dataCenter;
private final byte recordVersion;
private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
// Output of this method should always be less than 50k always
public byte[] serialize() {
// 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder);
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
// header layout
byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
.putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
.put(recordsPartition).put(replicated);
// data layout
for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
byte keyType = 0;
byte[] key = entry.getKey();
byte[] value = entry.getValue();
byte keyLength = (byte) key.length;
short valueLength = (short) value.length;
ByteBuffer dataBuffer = ByteBuffer.wrap(value);
long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
.put(value);
}
return byteBuffer.array();
}
private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
int size = 36;
for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}
// getters and to string method here
}
Basically, what I have to make sure is whenever the sendToDatabase
method is called in validateAndSend
method, size of message.serialize()
byte array should always be less than 50k at all cost. My sendToDatabase
method sends byte array coming out from serialize
method. So for example if I have 60k records in dataHolders
CLQ then I will send in two chunks in validateAndSend
method:
message.serialize()
is less than 50k) and call sendToDatabase
method on it.sendToDatabase
method for remianing part.And to accomplish above things, I was having totalSize
variable in validateAndSend
method which is trying to measure 50k size but looks like my approach might not be right and I maybe dropping some records or sending more than 50k everytime I guess?
Looks like my Message
class knows about clientKeyBytesAndProcessBytesHolder
map and I can use this map to accurately define the size by calling getBufferCapacity
method and if is approx less than 50k then call sendToDatabase
method?
Upvotes: 5
Views: 1575
Reputation: 4013
You might get cleaner code by sorting responsibilities.
Currently, Message
class is responsible for converting DataHolder items into a serialized form. But also t is expected to ensure size restriction is fulfilled. Unfortunately, the calling method is checking the size expectations without knowing anything about the size requirements of Message
class.
I suggest putting responsibility on sending out proper junks of data to the Message
class and thus remove the "knowledge about the proper data junk formatting" to the Message
class itself.
You might also have noticed that current implementation is accounting for a full header size per item, while a header is only added once per serialize()
Please find below a sketch of a suggested improvement. The code would need further polishing. But it is primarily intended for illustrating elementary improvements on structure and readability/maintainability.
For isolating the sendToDatabase()
functionality from Message
class I just added a simple interface:
// decoupling the sending logic from the formatting
// if external requirements suggest linking such functionality into the message class
// such interface would be unnecessary
public interface DatabaseDelivery {
void sendToDatabase(long addres, byte[] messagePayload);
}
The message class is changed to dealing with junking and size restrictions. It is now Closeable
indicating you should call close()
finally. (So you might consider using proper constructs with current versions of java)
public final class Message implements Closeable {
// or initialize it from some external source if this might change dynamically
private static final int MAX_SIZE = 50000;
// better determine this in sync with addHeader() method
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final DatabaseDelivery delivery;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private int pendingItems = 0;
public Message(final Partition recordPartition, final DatabaseDelivery databaseDelivery) {
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
this.delivery = databaseDelivery;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter)
.put(recordVersion)
.putInt(items)
.putInt(buffer.capacity())
.putLong(address)
.putLong(addressFrom)
.putLong(addressOrigin)
.put(recordsPartition)
.put(replicated);
}
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
//Properties: itemBuffer serialized size == 0
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
itembuffer.flip();
buffer.put(itemBuffer);
delivery.sendToDatabase(address, Arrays.copyOf(buffer.array(),buffer.position());
itemBuffer.clear();
pendingItems = 0;
//Properties: itemBuffer serialized size == 0
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
if (data.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
//Properties: itemBuffer serialized size < MAX
if (newSize >= (MAX_SIZE-HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE-HEADER_SIZE)) {
//XXX Use exception that is appropriate for your application
//XXX You might add sizes involved for ease of analysis
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
//Properties: itemBuffer size (old+new or new) < MAX
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength).put(data);
pendingItems++ ;
//Properties: itemBuffer size < MAX }
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
Finally your calling code will mutate to:
private void validateAndSend(final Partition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
// the instance providing sendToDatabase() method
// just for cutting off details external to the discussion
final DatabaseDelivery delivery = this;
final Message message = new Message(partition, this);
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
// XXX: why is client key using explicit encoding while process bytes is not?
message.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8), dataHolder.getProcessBytes());
}
message.close();
}
Please note that I added some markers (XXX
) at places that might need attention. (Those, however, could be explained from information external to what has been provided)
There are some more details that could be considered.
E.g. I'm not convinced that using ByteBuffer
is a proper collection for the given use case (at most places).
Edit:
With respect to testing, due to the small size of the code, you might consider applying formal verification (at least partially). This is similar to what modern compilers due with static code analysis: You walk through your code (with paper and pencil) and derive properties that hold at that very place. I added comments to the code above (marked //Properties
) for illustrating what you might get doing so. (Mind: this is a simple illustration, and definitely would need more properties to be derived and done for each statement). I just did some minimal attributions for resulting buffer size. (using MAX' as placeholder for the maximum acceptable size of the item part of the final buffer, aka
MAX_SIZE-HEADER_SIZE`).
Of course, may people will (correctly) suggest writing tests for the critical cases. That would be whitebox tests in that case. testing for proper functionality of the code at the corner cases of the (known) implementation. You would, also need to have blackbox tests testing behavior of your code with respect to a specification.
And also you might add runtime checks for ensuring proper behavior at critical parts. E.g. when doing sendToDatabase()
you could check for maximum size requirement. However, such testing would need suitable input to plausibalize proper behavior. Using properties derived from code by static analysis, could provide proof of well-behavior without the final doubt of not having found the one test case that would have caused a failure.
Upvotes: 4
Reputation: 1044
So here is my attempt (the question is probably better be addressed to the Code Review community, but anyway). It relies on some design changes to Message
so that it becomes more like a Builder
pattern. The buffer becomes part of the message. Its occupancy is controlled by reacting on BufferOverflowException
exception. Once it occurs the buffer is rolled back to the last successfully added result, new message is allocated, and attempt to add the same piece of data is retried. Upon buffer completion the total number of records and the overall size are written to header, and the whole buffer is dumped to a byte array (I would probably try to avoid this extra conversion and operate on the buffer directly in sendToDatabase
, but that's out of scope for now):
// TODO: structure has been adjusted for testing purposes
enum Partition
{
A(0x1);
private final int _partition;
int getPartition()
{
return _partition;
}
Partition(final int partition)
{
_partition = partition;
}
}
// TODO: structure has been adjusted for testing purposes
final static class DataHolder
{
private final String _clientKey;
private final byte[] _processBytes;
public DataHolder(
final String clientKey,
final String value)
{
_clientKey = clientKey;
byte[] valueBytes = value.getBytes();
// simulate payload including extra bytes for the header
final ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + valueBytes.length)
.order(ByteOrder.BIG_ENDIAN);
buffer.putInt(0).putLong(System.currentTimeMillis()).put(valueBytes);
_processBytes = readToBytes(buffer);
}
String getClientKey()
{
return _clientKey;
}
byte[] getProcessBytes()
{
return _processBytes;
}
}
// API has been changed to something more like the Builder pattern
final static class Message
{
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final ByteBuffer buffer;
private final int writeStatsPosition;
private int payloadCount;
Message(Partition recordPartition, int sizeLimit)
{
this.recordsPartition = (byte) recordPartition.getPartition();
this.replicated = 0;
// TODO: temporarily replaced with a hard-coded constant
long packedAddress = 123456789L;
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
buffer = ByteBuffer.allocate(sizeLimit).order(ByteOrder.BIG_ENDIAN);
// TODO: temporarily replaced with a hard-coded constant
byte dataCenter = 0x1;
byte recordVersion = 1;
buffer.put(dataCenter).put(recordVersion);
writeStatsPosition = buffer.position();
buffer.putInt(datacenter).putInt(recordVersion);
buffer.putLong(address).putLong(addressFrom).putLong(addressOrigin)
.put(recordsPartition).put(replicated);
}
/**
* Tries to add another pair of client key and process bytes to
* the current message. Returns true if successfully added, false -
* if the data cannot be accommodated due to message binary size limit.
*/
boolean add(byte[] key, byte[] value)
{
try
{
byte keyType = 0;
byte keyLength = (byte) key.length;
short valueLength = (short) value.length;
ByteBuffer valueAsBuffer = ByteBuffer.wrap(value);
long timestamp = valueAsBuffer.capacity() > 10 ? valueAsBuffer.getLong(2) : System.currentTimeMillis();
payloadCount++;
// remember position in the buffer to roll back to in case of overflow
buffer.mark();
buffer.put(keyType).put(keyLength).put(key);
buffer.putLong(timestamp).putShort(valueLength).put(value);
return true;
}
catch (BufferOverflowException e)
{
payloadCount--;
buffer.reset();
return false;
}
}
byte[] serialize()
{
int finalPosition = buffer.position();
// adjust the message header with the totals
buffer.putInt(writeStatsPosition, payloadCount)
.putInt(writeStatsPosition + 4, finalPosition);
return readToBytes(buffer);
}
}
static void validateAndSend(final Partition partition, final Supplier<Message> messageFactory)
throws InterruptedException
{
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Message message = messageFactory.get();
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null)
{
final byte[] keyBytes = dataHolder.getClientKey()
.getBytes(StandardCharsets.UTF_8);
final int keyLength = keyBytes.length;
if (keyLength > 255)
{
continue;
}
while (!message.add(keyBytes, dataHolder.getProcessBytes()))
{
// TODO: consider proper handling of the case when the buffer size is too small to accept even a single pair
Preconditions.checkState(message.payloadCount > 0,
"buffer size too small to accommodate payload");
final byte[] serializedMessage = message.serialize();
// TODO: makes sense to introduce a message consumer interface and call it here instead of sendToDatabase() - simplifies testing
sendToDatabase(message.address, serializedMessage);
message = messageFactory.get();
}
}
if (message.payloadCount > 0)
{
byte[] serializedMessage = message.serialize();
sendToDatabase(message.address, serializedMessage);
}
}
static void sendToDatabase(long address, byte[] serializedMessage)
{
// TODO: added simulating activity
System.out.printf("Sending %d bytes to %d: %s%n",
serializedMessage.length, address, DatatypeConverter.printHexBinary(serializedMessage));
}
static byte[] readToBytes(ByteBuffer buffer)
{
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
}
public static void main(String[] args)
throws ExecutionException, InterruptedException
{
// TODO: using small value for testing - must be set to 50K in real case
final int maxMessageSize = 80;
final Supplier<Message> messageFactory = new Supplier<Message>()
{
@Override
public Message get()
{
return new Message(Partition.A, maxMessageSize);
}
};
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(Partition.A);
dataHolders.add(new DataHolder("0000000001", "alpha"));
dataHolders.add(new DataHolder("0000000002", "bravo"));
dataHolders.add(new DataHolder("0000000003", "charlie"));
dataHolders.add(new DataHolder("0000000004", "delta"));
dataHolders.add(new DataHolder("0000000005", "echo"));
dataHolders.add(new DataHolder("0000000006", "foxtrot"));
validateAndSend(Partition.A, messageFactory);
}
Upvotes: 3