Reputation: 865
I have a fairly straightforward Java class here which creates 2 thread pools....
Connects to a running URL stream and reads in entries line by line submitting each entry into a back end MySQL DB.
Spawns several threads each of which will carry out the same process (below)
1.Get the oldest DB entry from above
2.Parse and process it accordingly
3.Save several sections to another DB table
4.Delete this DB entry from the running table to signify that analysis is complete for it
5.End Thread
The reason I need 2 pools is because the read process is MUCH faster than the analyse and if I read & analyse each entry as it comes through the entries back up too fast and the incoming stream breaks. By putting in this separation the read can happen as fast as it needs to and the analyse can proceed as fast as it can knowing that the records to catch up with are safe and available to catch up on.
The problem I have is that each concurrent thread is getting the same oldest record. I need to know what the best way would be to ensure the separate threads all run concurrently but each access unique oldest DB entries.
Thanks in advance.
EDIT=================================
Thanks folks for the replies so far...
To further expand on the current setup I was attempting here perhaps this code segment will be helpful...
try
{
String strQuery1 = "SELECT lineID,line FROM lineProcessing ORDER BY lineID ASC LIMIT 1;";
String strQuery2 = "DELETE from lineProcessing WHERE lineID = ?";
DBConnector dbc = new DBConnector(driver,url,userName,passwd);
Connection con = dbc.getConnection();
con.setAutoCommit(false);
PreparedStatement pstmt = con.prepareStatement(strQuery1);
rs = pstmt.executeQuery();
//Now extract the line & Id from the returned result set
while (rs.next()) {
lineID = Integer.parseInt(rs.getString(1));
line = rs.getString(2);
} //end while
//Now delete that entry so that it cannot be analysed again...
pstmt = con.prepareStatement(strQuery2);
pstmt.setString(1, lineID.toString());
int res=pstmt.executeUpdate();
con.commit();
con.setAutoCommit(true);
con.close();
}
catch (SQLException e) {
System.out.println(">>>EXCEPTION FOUND IN QUERY = " + strQuery1 + " __or__ " + strQuery2);
e.printStackTrace();
}
...So as you can see basically opening a DB connection, setting it to "Autocommit = false", execute QUERY1, execute QUERY2, commit both transactions finally closing the connection. This should be all each individual thread will be required to complete. The problem is each of the X threads I have running in the analysis thread pool all get spawned and all execute this batch of code simultaneously (which I would expect) but do not respect the single connection access to the DB I think I have set up above. They all then return with the same line for analysis. When the threads next loop around for iteration #2, they all then return this new last row for analysis following the previous deletion.
Any further suggestions please - including maybe a good example of forced transactional SQL through java?
Thanks again folks.
Upvotes: 1
Views: 4695
Reputation: 2923
Another solution is to have the worker threads all wait on a singleton that contains the key to the row. Write the row, place the key in the object, and then notify. The "next" worker thread will pick up the key and operate on it. You will need to make sure that a worker was waiting and what not.
Upvotes: 0
Reputation: 51309
First, add a nullable datetime column that signifies that the row has been "picked up" at a certain time.
Then in your processing thread:
Make sure your isolation level is set to at least READ UNCOMMITTED
, and no two threads should get the same row. Also, if a processing thread dies and abandons it's row, you can find that out by periodically querying for rows with a "picked up" time of earlier than some value, and reprocess those by setting the picked up time to null.
Or just switch to a transactional message queue, which does most of this for you automatically.
Upvotes: 1