Reputation: 24174
Problem Statement:-
In my below program I am using ThreadPoolExecutor
with an ArrayBlockingQueue
.
Each thread needs to use an UNIQUE ID
every time and it has to run for 60 minutes or more
, So in that 60 minutes
it is possible that all the ID's will get finished
so I need to reuse those ID's again. So I am using ArrayBlockingQueue
concept here.
Two Scenario:-
command.getDataCriteria()
contains Previous
then each
thread always needs to use UNIQUE ID between 1 and 1000
and
release it for reusing again.command.getDataCriteria()
contains New
then each
thread always needs to use UNIQUE ID between 2000 and 3000
and
release it for reusing again.What Problem I am facing currently with the Below Program-
One problem that I am facing is
run method
if the command.getDataCriteria()
is Previous
then also it gets entered in the else if block(which is for New)
which shouldn't be happening right? and also I am doing an .equals check
? Why this is happening? May be because many threads will launch at same time and have modified the command before?else if(command.getDataCriteria().equals("New")) {
If Multiple Thread is modifying it, then how I can overcome this problem? Whatever problem is happening it's happening in run method
. Any suggestions will be of great help as I am stuck on this from a long time. May be we need to Synchronize the threads, so that no other thread should modify the command when another thread is trying to execute it.
public synchronized void runNextCommand() {
LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();
LinkedList<Integer> availableNewIds = new LinkedList<Integer>();
executorService = new ThreadPoolExecutor(noOfThreads, noOfThreads, 500L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(noOfThreads), new ThreadPoolExecutor.CallerRunsPolicy());
// If there are any free threads in the thread pool
if (!(((ThreadPoolExecutor) executorService).getActiveCount() < noOfThreads))
return;
for (int i = 1; i <= 1000; i++) {
availableExistingIds.add(i);
}
for (int n = 2000; n <= 3000; n++) {
availableNewIds.add(n);
}
BlockingQueue<Integer> existIdPool = new ArrayBlockingQueue<Integer>(1000, false, availableExistingIds);
BlockingQueue<Integer> newIdPool = new ArrayBlockingQueue<Integer>(1001, false, availableNewIds);
// Running for particular duration of time
while(System.currentTimeMillis() <= endTime) {
Command nextCommand = getNextCommandToExecute();
Task nextCommandExecutorRunnable = new Task(nextCommand, existIdPool, newIdPool);
executorService.submit(nextCommandExecutorRunnable);
}
executorService.shutdown();
if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
executorService.shutdownNow();
}
}
Implementation of runnable (the real unit level command executor)
private static final class Task implements Runnable {
private Command command;
private DPSclient m_DPSclient = null;
private DPSclient psc = null;
private BlockingQueue<Integer> existPool;
private BlockingQueue<Integer> newPool;
private int existId;
private int newId;
private static Object syncObject = new Object();
public Task(Command command, BlockingQueue<Integer> pool1, BlockingQueue<Integer> pool2) {
this.command = command;
this.existPool = pool1;
this.newPool = pool2;
}
public void run() {
synchronized(syncObject) {
if(command.getDataCriteria().equals("Previous")) {
try {
// Getting existing id from the existPool
existId = existPool.take();
attributeGetSetMethod(existId);
} catch (Exception e) {
getLogger().log(LogLevel.ERROR, e.getLocalizedMessage());
} finally {
// And releasing that existing ID for re-use
existPool.offer(existId);
}
} else if(command.getDataCriteria().equals("New")) {
try {
// Getting new id from the newPool
newId = newPool.take();
attributeGetSetMethod(newId);
} catch (Exception e) {
getLogger().log(LogLevel.ERROR, e.getLocalizedMessage());
} finally {
// And releasing that new ID for re-use
newPool.offer(newId);
}
}
}
}
}
I will be appreciating your help into this. Thanks
Update- Code for getNextCommandToExecute method
as suggested by Matt
// Get the next command to execute based on percentages
private synchronized Command getNextCommandToExecute() {
int commandWithMaxNegativeOffset = 0; // To initiate, assume the first one has the max negative offset
if (totalExecuted != 0) {
// Manipulate that who has max negative offset from its desired execution
double executedPercentage = ((double)executedFrequency[commandWithMaxNegativeOffset] / (double)totalExecuted) * 100;
double offsetOfCommandWithMaxNegative = executedPercentage - commands.get(commandWithMaxNegativeOffset).getExecutionPercentage();
for (int j=1; j < commands.size(); j++) {
double executedPercentageOfCurrentCommand = ((double)executedFrequency[j] / (double)totalExecuted) * 100;
double offsetOfCurrentCommand = executedPercentageOfCurrentCommand - commands.get(j).getExecutionPercentage();
if (offsetOfCurrentCommand < offsetOfCommandWithMaxNegative) {
offsetOfCommandWithMaxNegative = offsetOfCurrentCommand;
commandWithMaxNegativeOffset = j;
}
}
}
// Next command to execute is the one with max negative offset
executedFrequency[commandWithMaxNegativeOffset] ++;
totalExecuted ++;
// This is for User Logging/No User Logging and Data is Previous/New
LinkedHashMap<String, Double> dataCriteriaMap = (LinkedHashMap<String, Double>) sortByValue(commands.get(commandWithMaxNegativeOffset).getDataUsageCriteria());
Set<Map.Entry<String, Double>> entriesData = dataCriteriaMap.entrySet();
Iterator<Map.Entry<String, Double>> itData = entriesData.iterator();
Map.Entry<String, Double> firstEntryData = itData.next();
Map.Entry<String, Double> secondEntryData = itData.next();
LinkedHashMap<Boolean, Double> userCriteriaMap = (LinkedHashMap<Boolean, Double>) sortByValue(commands.get(commandWithMaxNegativeOffset).getUserLoggingCriteria());
Set<Map.Entry<Boolean, Double>> entriesUser = userCriteriaMap.entrySet();
Iterator<Map.Entry<Boolean, Double>> itUser = entriesUser.iterator();
Map.Entry<Boolean, Double> firstEntryUser = itUser.next();
Map.Entry<Boolean, Double> secondEntryUser = itUser.next();
double percent = r.nextDouble() * 100;
if (percent < secondEntryData.getValue().doubleValue()) {
commands.get(commandWithMaxNegativeOffset).setDataCriteria(secondEntryData.getKey());
} else {
commands.get(commandWithMaxNegativeOffset).setDataCriteria(firstEntryData.getKey());
}
if (percent < secondEntryUser.getValue().doubleValue()) {
commands.get(commandWithMaxNegativeOffset).setUserLogging(secondEntryUser.getKey());
} else {
commands.get(commandWithMaxNegativeOffset).setUserLogging(firstEntryUser.getKey());
}
return commands.get(commandWithMaxNegativeOffset);
}
And commands has been declared at the top of the class as-
private static List<Command> commands;
Update One More method:-
private synchronized void attributeGetSetMethod(int id_range) {
requestlTransaction requestlTransaction = null;
try {
GUID_VALUES = new LinkedHashMap<Integer, String>();
// I am not sure how CAL logging has to be done, it has to be at each attribute level or something else? So that is the reason I left this thing.
if(!(command.getAttributeIDSet().isEmpty())) {
requestlTransaction = requestlTransactionFactory.create("DPSLnPTest");
m_DPSclient = setupDPS(command.getName(), getDPSAttributeKeys(command.getDataCriteria(), command.getUserLogging() , id_range));
for(String attr: command.getAttributeIDSet()) {
requestlTransaction.setName("DPSAttributeSet");
requestlTransaction.setStatus("0");
//requestlTransaction.addData("IpAddress", ipAddress);
if(attr.contains("/")) {
lengthOfString = Integer.parseInt(attr.split("/")[1]);
attr = attr.split("/")[0];
}
DPSAttribute attr1 = new DPSAttribute();
attr1.setRequestAttributeId(new DPSAttributeId(Integer.parseInt(attr)));
DPSMetadataMgr mgr = DPSMetadataMgr.getInstance();
DPSRequestAttributeMetadata metadata = mgr.getRequestAttributeMetadataById(Integer.parseInt(attr));
int maxOccurs = metadata.getMaxOccurs();
String dataType = metadata.getAttributeTypeAlias();
DPSAttributeValue attrValue1 = getRequestAttribute(dataType, lengthOfString);
if(maxOccurs>1) {
DPSListAttributeValue listAttrValue = new DPSListAttributeValue();
List<DPSAttributeValue> list = new ArrayList<DPSAttributeValue>();
list.add(attrValue1);
listAttrValue.setList(list);
attr1.setRequestAttributeValue(listAttrValue);
m_DPSclient.setDPSAttribute(attr1);
} else {
attr1.setRequestAttributeValue(attrValue1);
m_DPSclient.setDPSAttribute(attr1);
}
}
List<DPSAttribute> idKeys = m_DPSclient.release(PersistenceEnum.COMMIT, false);
// Iterating through the keys and storing into HashMap
Iterator<DPSAttribute> i = idKeys.iterator();
while (i.hasNext()) {
DPSAttribute DPSAttribute = (DPSAttribute)(i.next());
DPSAttributeId id = DPSAttribute.getAttributeId();
DPSAttributeValue value = DPSAttribute.getRequestAttribute();
if(id.getId() == DPSLnPConstants.CGUID_ID && (value)!= null) {
DPSLnPConstants.CGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
GUID_VALUES.put(DPSLnPConstants.CGUID_ID, DPSLnPConstants.CGUID_VALUE);
} else if(id.getId() == DPSLnPConstants.SGUID_ID && (value)!= null) {
DPSLnPConstants.SGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
GUID_VALUES.put(DPSLnPConstants.SGUID_ID, DPSLnPConstants.SGUID_VALUE);
} else if(id.getId() == DPSLnPConstants.PGUID_ID && (value)!= null) {
DPSLnPConstants.PGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
GUID_VALUES.put(DPSLnPConstants.PGUID_ID, DPSLnPConstants.PGUID_VALUE);
} else if(id.getId() == DPSLnPConstants.UID_ID && (value)!= null) {
DPSLnPConstants.UID_VALUE = String.valueOf(((DPSLongAttributeValue)value).getValue());
GUID_VALUES.put(DPSLnPConstants.UID_ID, DPSLnPConstants.UID_VALUE);
} else if(id.getId() == DPSLnPConstants.SITE_ID && (value)!= null) {
DPSLnPConstants.SITEID_VALUE = String.valueOf(((DPSIntAttributeValue)value).getValue());
GUID_VALUES.put(DPSLnPConstants.SITE_ID, DPSLnPConstants.SITEID_VALUE);
} else if(id.getId() == DPSLnPConstants.ALOC_ID && (value)!= null) {
DPSLnPConstants.ALOC_VALUE = ((DPSStringAttributeValue)value).getValue();
GUID_VALUES.put(DPSLnPConstants.ALOC_ID, DPSLnPConstants.ALOC_VALUE);
} else if(id.getId() == DPSLnPConstants.ULOC_ID && (value)!= null) {
DPSLnPConstants.ULOC_VALUE = ((DPSStringAttributeValue)value).getValue();
GUID_VALUES.put(DPSLnPConstants.ULOC_ID, DPSLnPConstants.ULOC_VALUE);
} else if(id.getId() == DPSLnPConstants.SLOC_ID && (value)!= null) {
DPSLnPConstants.SLOC_VALUE = ((DPSStringAttributeValue)value).getValue();
GUID_VALUES.put(DPSLnPConstants.SLOC_ID, DPSLnPConstants.SLOC_VALUE);
} else if(id.getId() == DPSLnPConstants.PLOC_ID && (value)!= null) {
DPSLnPConstants.PLOC_VALUE = ((DPSStringAttributeValue)value).getValue();
GUID_VALUES.put(DPSLnPConstants.PLOC_ID, DPSLnPConstants.PLOC_VALUE);
}
}
// Storing all the locators, guid in a map corresponding to an ID, then later on insert everything directly into db
GUID_ID_MAPPING.put(id_range, GUID_VALUES);
// Sleeping the command for particular milliseconds
// One thing not sure, I should be sleeping the command here or I should put it above this comment line '// Iterating through the keys'
Thread.sleep(command.getSleepTime());
}
// for get attributes
// And also how CAL logging has to be done here too. And we can use same DPS Smart Client that got created above to get the attributes value?
if(!(command.getAttributeIDGet().isEmpty())) {
requestlTransaction.setName("DPSAttributeGet");
requestlTransaction.setStatus("1");
psc = setupDPS(command.getName(), getDPSAttributeKeys(command.getDataCriteria(), command.getUserLogging() , id_range));
for(String attr: command.getAttributeIDGet()) {
DPSAttribute attribute = new DPSAttribute();
attribute = psc.getDPSAttribute(new DPSAttributeId(Integer.parseInt(attr)));
//System.out.println(attribute.getAttributeId()+ " " +attribute.getRequestAttribute());
}
}
} catch(Exception e) {
getLogger().log(LogLevel.ERROR, e);
} finally {
requestlTransaction.completed();
}
}
Upvotes: 3
Views: 296
Reputation: 15879
From what I can see if command.getDataCriteria()
is supposed to be accessed by only one Thread, then its' value shouldn't change due to threading synchronization issues. Assuming this we want to ensure that no two threads obtain a handle to an object instance.
You have synchronized getNextCommandToExecute()
but there is no evidence of synchronizing your commands
Collection based on your definition private static List<Command> commands;
. If the assumption above is to hold we want to guranteee that no two threads can get()
the same object instance from the Collection.
Where else in the code is the commands
Collection being accessed from?
If commands.get()
can be synchronized properly then there shouldn't be any thread contention on the instance returned. Which ever thread get's an object owns it.
...feel free to tell me I'm barking up the wrong tree
[Edit] following your comments
It is difficult for me to say for any certainty what's going wrong because I don't have all the code and there are assumptions to make. It looks to me like you're adding the synchronized
keyword on many methods and expecting it to solve your problems. I think the best approach is to minimize the lines of code really need synchronizing and this will require a clearer understanding of what really needs to be synchronized.
You don't want to share references to a Command
object across threads so make sure your get() and add() operations on List<Command> commands
are synchronized. You could employ the use of a synchronizedList or perhaps a ConcurrentLinkedQueue
Can you make the Command objects immutable? (i.e. no setX() methods)
Upvotes: 1