Reputation: 1015
I have this code that dumps documents into MongoDB once an ArrayBlockingQueue fills it's quota. When I run the code, it seems to only run once and then gives me a stack trace. My guess is that the BulkWriteOperation someone has to 'reset' or start over again.
Also, I create the BulkWriteOperations in the constructor...
bulkEvent = eventsCollection.initializeOrderedBulkOperation();
bulkSession = sessionsCollection.initializeOrderedBulkOperation();
Here's the stacktrace.
10 records inserted
java.lang.IllegalStateException: already executed
at org.bson.util.Assertions.isTrue(Assertions.java:36)
at com.mongodb.BulkWriteOperation.insert(BulkWriteOperation.java:62)
at willkara.monkai.impl.managers.DataManagers.MongoDBManager.dumpQueue(MongoDBManager.java:104)
at willkara.monkai.impl.managers.DataManagers.MongoDBManager.addToQueue(MongoDBManager.java:85)
Here's the code for the Queues:
public void addToQueue(Object item) {
if (item instanceof SakaiEvent) {
if (eventQueue.offer((SakaiEvent) item)) {
} else {
dumpQueue(eventQueue);
}
}
if (item instanceof SakaiSession) {
if (sessionQueue.offer((SakaiSession) item)) {
} else {
dumpQueue(sessionQueue);
}
}
}
And here is the code that reads from the queues and adds them to an BulkWriteOperation (initializeOrderedBulkOperation) to execute it and then dump it to the database. Only 10 documents get written and then it fails.
private void dumpQueue(BlockingQueue q) {
Object item = q.peek();
Iterator itty = q.iterator();
BulkWriteResult result = null;
if (item instanceof SakaiEvent) {
while (itty.hasNext()) {
bulkEvent.insert(((SakaiEvent) itty.next()).convertToDBObject());
//It's failing at that line^^
}
result = bulkEvent.execute();
}
if (item instanceof SakaiSession) {
while (itty.hasNext()) {
bulkSession.insert(((SakaiSession) itty.next()).convertToDBObject());
}
result = bulkSession.execute();
}
System.out.println(result.getInsertedCount() + " records inserted");
}
Upvotes: 1
Views: 2740
Reputation: 151112
The general documentation applies to all driver implementations in this case:
"After execution, you cannot re-execute the Bulk() object without reinitializing."
So the .execute()
method effectively "drains" the current list of operations that have been sent to it and now contains state information about how the commands were actually sent. So you cannot add more entries or call .execute()
again on the same instance without reinitializing .
So after you call execute on each "Bulk" object, you need to call the intialize again:
bulkEvent = eventsCollection.initializeOrderedBulkOperation();
bulkSession = sessionsCollection.initializeOrderedBulkOperation();
Each of those lines placed again repectively after each .execute()
call in your function. Then further calls to those instances can add operations and call execute again continuing the cycle.
Note that "Bulk" operations objects will store as many items as you want to put into them but will break up requests to the server into maximum amounts of 1000 items. After execution the state of the operations list will reflect exactly how this is done should you want to inspect that.
Upvotes: 4