arsenal
arsenal

Reputation: 24174

Multiple Thread modifying the same command attributes

Problem Statement:-

In my below program I am using ThreadPoolExecutor with an ArrayBlockingQueue.

Each thread needs to use an UNIQUE ID every time and it has to run for 60 minutes or more, So in that 60 minutes it is possible that all the ID's will get finished so I need to reuse those ID's again. So I am using ArrayBlockingQueue concept here.

Two Scenario:-

  1. If the command.getDataCriteria() contains Previous then each thread always needs to use UNIQUE ID between 1 and 1000 and release it for reusing again.
  2. Else if the command.getDataCriteria() contains New then each thread always needs to use UNIQUE ID between 2000 and 3000 and release it for reusing again.

What Problem I am facing currently with the Below Program-

One problem that I am facing is

else if(command.getDataCriteria().equals("New")) {

If Multiple Thread is modifying it, then how I can overcome this problem? Whatever problem is happening it's happening in run method. Any suggestions will be of great help as I am stuck on this from a long time. May be we need to Synchronize the threads, so that no other thread should modify the command when another thread is trying to execute it.

public synchronized void runNextCommand() {

LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();
LinkedList<Integer> availableNewIds = new LinkedList<Integer>();

executorService = new ThreadPoolExecutor(noOfThreads, noOfThreads, 500L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(noOfThreads), new ThreadPoolExecutor.CallerRunsPolicy());

    // If there are any free threads in the thread pool
    if (!(((ThreadPoolExecutor) executorService).getActiveCount() < noOfThreads))
    return;

    for (int i = 1; i <= 1000; i++) {
    availableExistingIds.add(i);
    }

    for (int n = 2000; n <= 3000; n++) {
    availableNewIds.add(n);
    }

    BlockingQueue<Integer> existIdPool = new ArrayBlockingQueue<Integer>(1000, false, availableExistingIds);
    BlockingQueue<Integer> newIdPool = new ArrayBlockingQueue<Integer>(1001, false, availableNewIds);

    // Running for particular duration of time
    while(System.currentTimeMillis() <= endTime) {
    Command nextCommand = getNextCommandToExecute();
    Task nextCommandExecutorRunnable = new Task(nextCommand, existIdPool, newIdPool);
    executorService.submit(nextCommandExecutorRunnable);
    }

    executorService.shutdown();
    if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
    executorService.shutdownNow();
    }
}

Implementation of runnable (the real unit level command executor)

private static final class Task implements Runnable {
private Command command;
private DPSclient m_DPSclient = null;
private DPSclient psc = null;
private BlockingQueue<Integer> existPool;
private BlockingQueue<Integer> newPool;
private int existId;
private int newId;
private static Object syncObject = new Object();  


public Task(Command command, BlockingQueue<Integer> pool1, BlockingQueue<Integer> pool2) {
    this.command = command;
    this.existPool = pool1;
    this.newPool = pool2;
}

public void run() {

  synchronized(syncObject) {
    if(command.getDataCriteria().equals("Previous")) {
    try {
        // Getting existing id from the existPool
        existId = existPool.take();
        attributeGetSetMethod(existId);
    } catch (Exception e) {
        getLogger().log(LogLevel.ERROR, e.getLocalizedMessage());
    } finally {
        // And releasing that existing ID for re-use
        existPool.offer(existId);       
    }
} else if(command.getDataCriteria().equals("New")) {
     try {
        // Getting new id from the newPool
        newId = newPool.take();
        attributeGetSetMethod(newId);
    } catch (Exception e) {
        getLogger().log(LogLevel.ERROR, e.getLocalizedMessage());
    } finally {
        // And releasing that new ID for re-use
        newPool.offer(newId);   
    }
    }
}
  }
}

I will be appreciating your help into this. Thanks

Update- Code for getNextCommandToExecute method as suggested by Matt

// Get the next command to execute based on percentages
    private synchronized Command getNextCommandToExecute() {
    int commandWithMaxNegativeOffset = 0; // To initiate, assume the first one has the max negative offset
    if (totalExecuted != 0) {
        // Manipulate that who has max negative offset from its desired execution
        double executedPercentage = ((double)executedFrequency[commandWithMaxNegativeOffset] / (double)totalExecuted) * 100;
        double offsetOfCommandWithMaxNegative = executedPercentage - commands.get(commandWithMaxNegativeOffset).getExecutionPercentage();

        for (int j=1; j < commands.size(); j++) {
        double executedPercentageOfCurrentCommand = ((double)executedFrequency[j] / (double)totalExecuted) * 100;
        double offsetOfCurrentCommand = executedPercentageOfCurrentCommand - commands.get(j).getExecutionPercentage();

        if (offsetOfCurrentCommand < offsetOfCommandWithMaxNegative) {
            offsetOfCommandWithMaxNegative = offsetOfCurrentCommand;
            commandWithMaxNegativeOffset = j;
        }
        }
    }

    // Next command to execute is the one with max negative offset
    executedFrequency[commandWithMaxNegativeOffset] ++;
    totalExecuted ++;

    // This is for User Logging/No User Logging and Data is Previous/New
    LinkedHashMap<String, Double> dataCriteriaMap = (LinkedHashMap<String, Double>) sortByValue(commands.get(commandWithMaxNegativeOffset).getDataUsageCriteria());
    Set<Map.Entry<String, Double>> entriesData = dataCriteriaMap.entrySet();
    Iterator<Map.Entry<String, Double>> itData = entriesData.iterator();
    Map.Entry<String, Double> firstEntryData = itData.next();
    Map.Entry<String, Double> secondEntryData = itData.next();

    LinkedHashMap<Boolean, Double> userCriteriaMap = (LinkedHashMap<Boolean, Double>) sortByValue(commands.get(commandWithMaxNegativeOffset).getUserLoggingCriteria());
    Set<Map.Entry<Boolean, Double>> entriesUser = userCriteriaMap.entrySet();
    Iterator<Map.Entry<Boolean, Double>> itUser = entriesUser.iterator();
    Map.Entry<Boolean, Double> firstEntryUser = itUser.next();
    Map.Entry<Boolean, Double> secondEntryUser = itUser.next();

    double percent = r.nextDouble() * 100;

    if (percent < secondEntryData.getValue().doubleValue()) {
        commands.get(commandWithMaxNegativeOffset).setDataCriteria(secondEntryData.getKey());
    } else {
        commands.get(commandWithMaxNegativeOffset).setDataCriteria(firstEntryData.getKey());
    }

    if (percent < secondEntryUser.getValue().doubleValue()) {
        commands.get(commandWithMaxNegativeOffset).setUserLogging(secondEntryUser.getKey());
    } else { 
        commands.get(commandWithMaxNegativeOffset).setUserLogging(firstEntryUser.getKey());
    }

    return commands.get(commandWithMaxNegativeOffset);
    }

And commands has been declared at the top of the class as-

private static List<Command> commands;

Update One More method:-

private synchronized void attributeGetSetMethod(int id_range) {

        requestlTransaction requestlTransaction = null;
        try {
        GUID_VALUES = new LinkedHashMap<Integer, String>();

        // I am not sure how CAL logging has to be done, it has to be at each attribute level or something else? So that is the reason I left this thing.

        if(!(command.getAttributeIDSet().isEmpty())) {

            requestlTransaction = requestlTransactionFactory.create("DPSLnPTest");
            m_DPSclient = setupDPS(command.getName(), getDPSAttributeKeys(command.getDataCriteria(), command.getUserLogging() , id_range));


            for(String attr: command.getAttributeIDSet()) {

            requestlTransaction.setName("DPSAttributeSet");
            requestlTransaction.setStatus("0");
            //requestlTransaction.addData("IpAddress", ipAddress);

            if(attr.contains("/")) {
                lengthOfString = Integer.parseInt(attr.split("/")[1]);
                attr = attr.split("/")[0];
            }
            DPSAttribute attr1 = new DPSAttribute();
            attr1.setRequestAttributeId(new DPSAttributeId(Integer.parseInt(attr)));
            DPSMetadataMgr mgr = DPSMetadataMgr.getInstance();
            DPSRequestAttributeMetadata metadata = mgr.getRequestAttributeMetadataById(Integer.parseInt(attr));
            int maxOccurs = metadata.getMaxOccurs();
            String dataType = metadata.getAttributeTypeAlias();

            DPSAttributeValue attrValue1 = getRequestAttribute(dataType, lengthOfString);

            if(maxOccurs>1) {
                DPSListAttributeValue listAttrValue = new DPSListAttributeValue();
                List<DPSAttributeValue> list = new ArrayList<DPSAttributeValue>();
                list.add(attrValue1);
                listAttrValue.setList(list);
                attr1.setRequestAttributeValue(listAttrValue);
                m_DPSclient.setDPSAttribute(attr1);
            } else {
                attr1.setRequestAttributeValue(attrValue1);         
                m_DPSclient.setDPSAttribute(attr1);
            }
            }

            List<DPSAttribute> idKeys = m_DPSclient.release(PersistenceEnum.COMMIT, false);

            // Iterating through the keys and storing into HashMap
            Iterator<DPSAttribute> i = idKeys.iterator();
            while (i.hasNext()) {
            DPSAttribute DPSAttribute = (DPSAttribute)(i.next());
            DPSAttributeId id = DPSAttribute.getAttributeId();
            DPSAttributeValue value = DPSAttribute.getRequestAttribute();

            if(id.getId() == DPSLnPConstants.CGUID_ID && (value)!= null) {
                DPSLnPConstants.CGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.CGUID_ID, DPSLnPConstants.CGUID_VALUE);
            } else if(id.getId() == DPSLnPConstants.SGUID_ID && (value)!= null) {
                DPSLnPConstants.SGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.SGUID_ID, DPSLnPConstants.SGUID_VALUE);
            } else if(id.getId() == DPSLnPConstants.PGUID_ID && (value)!= null) {
                DPSLnPConstants.PGUID_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.PGUID_ID, DPSLnPConstants.PGUID_VALUE);
            } else if(id.getId() == DPSLnPConstants.UID_ID && (value)!= null) {
                DPSLnPConstants.UID_VALUE = String.valueOf(((DPSLongAttributeValue)value).getValue());
                GUID_VALUES.put(DPSLnPConstants.UID_ID, DPSLnPConstants.UID_VALUE);
            } else if(id.getId() == DPSLnPConstants.SITE_ID && (value)!= null) {
                DPSLnPConstants.SITEID_VALUE = String.valueOf(((DPSIntAttributeValue)value).getValue());
                GUID_VALUES.put(DPSLnPConstants.SITE_ID, DPSLnPConstants.SITEID_VALUE);
            } else if(id.getId() == DPSLnPConstants.ALOC_ID && (value)!= null) {
                DPSLnPConstants.ALOC_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.ALOC_ID, DPSLnPConstants.ALOC_VALUE);
            } else if(id.getId() == DPSLnPConstants.ULOC_ID && (value)!= null) {
                DPSLnPConstants.ULOC_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.ULOC_ID, DPSLnPConstants.ULOC_VALUE);
            } else if(id.getId() == DPSLnPConstants.SLOC_ID && (value)!= null) {
                DPSLnPConstants.SLOC_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.SLOC_ID, DPSLnPConstants.SLOC_VALUE);
            } else if(id.getId() == DPSLnPConstants.PLOC_ID && (value)!= null) {
                DPSLnPConstants.PLOC_VALUE = ((DPSStringAttributeValue)value).getValue();
                GUID_VALUES.put(DPSLnPConstants.PLOC_ID, DPSLnPConstants.PLOC_VALUE);
            }
            }

            // Storing all the locators, guid in a map corresponding to an ID, then later on insert everything directly into db
            GUID_ID_MAPPING.put(id_range, GUID_VALUES);

            // Sleeping the command for particular milliseconds
            // One thing not sure, I should be sleeping the command here or I should put it above this comment line '// Iterating through the keys'
            Thread.sleep(command.getSleepTime());
        } 

        // for get attributes   
        // And also how CAL logging has to be done here too. And we can use same DPS Smart Client that got created above to get the attributes value?

        if(!(command.getAttributeIDGet().isEmpty())) {

            requestlTransaction.setName("DPSAttributeGet");
            requestlTransaction.setStatus("1");
            psc = setupDPS(command.getName(), getDPSAttributeKeys(command.getDataCriteria(), command.getUserLogging() , id_range));

            for(String attr: command.getAttributeIDGet()) {
            DPSAttribute attribute = new DPSAttribute();
            attribute = psc.getDPSAttribute(new DPSAttributeId(Integer.parseInt(attr)));
            //System.out.println(attribute.getAttributeId()+ " " +attribute.getRequestAttribute());
            }
        }
        } catch(Exception e) {
        getLogger().log(LogLevel.ERROR, e);
        } finally {
        requestlTransaction.completed();
        }

    }

Upvotes: 3

Views: 296

Answers (1)

Brad
Brad

Reputation: 15879

From what I can see if command.getDataCriteria() is supposed to be accessed by only one Thread, then its' value shouldn't change due to threading synchronization issues. Assuming this we want to ensure that no two threads obtain a handle to an object instance.

You have synchronized getNextCommandToExecute() but there is no evidence of synchronizing your commands Collection based on your definition private static List<Command> commands;. If the assumption above is to hold we want to guranteee that no two threads can get() the same object instance from the Collection.

Where else in the code is the commands Collection being accessed from?

If commands.get() can be synchronized properly then there shouldn't be any thread contention on the instance returned. Which ever thread get's an object owns it.

...feel free to tell me I'm barking up the wrong tree

[Edit] following your comments

It is difficult for me to say for any certainty what's going wrong because I don't have all the code and there are assumptions to make. It looks to me like you're adding the synchronized keyword on many methods and expecting it to solve your problems. I think the best approach is to minimize the lines of code really need synchronizing and this will require a clearer understanding of what really needs to be synchronized.

  1. You don't want to share references to a Command object across threads so make sure your get() and add() operations on List<Command> commands are synchronized. You could employ the use of a synchronizedList or perhaps a ConcurrentLinkedQueue

  2. Can you make the Command objects immutable? (i.e. no setX() methods)

Upvotes: 1

Related Questions