lochi
lochi

Reputation: 880

Iterating over lists using multiple threads

Following code(part shown) creates objects of class DescCalculator and calculate descriptors and returns them as string arrays. A molecule and a ArrayList of Descriptor objects are passed in.

    private void calcDesc()
    {
        try
        {
        StatusPanel.setStatus("Calculating Molecular Descriptors Using CDK...\n");
        File df = new File(Settings.getCurrentDirectory() + sep + "molDesc.csv");
        FileWriter dfw = new FileWriter(df);
        LoadSDF lsdf1 = new LoadSDF(Settings.getCurrentDirectory() + sep + "marvin3D.sdf");
        List<IAtomContainer> mols3D = lsdf1.getCompounds();
        DescriptorEngine engine = new DescriptorEngine(DescriptorEngine.MOLECULAR);
        List<String> classNames = engine.getDescriptorClassNames();
        List<String> removeList = new ArrayList();
        removeList.add("org.openscience.cdk.qsar.descriptors.molecular.IPMolecularLearningDescriptor");
        classNames.removeAll(removeList);
        List<IDescriptor> instances = engine.instantiateDescriptors(classNames);
        engine.setDescriptorInstances(instances);

        List<String> headerItems = new ArrayList<String>();
        headerItems.add("CID");
        headerItems.add("MobCSA");
        for (IDescriptor descriptor : instances) {
            String[] names = descriptor.getDescriptorNames();
            headerItems.addAll(Arrays.asList(names));
        }
        ArrayList<IMolecularDescriptor> descriptors = new ArrayList();

        for (Object object : instances)
        {
        IMolecularDescriptor descriptor = (IMolecularDescriptor) object;
        String[] comps = descriptor.getSpecification().getSpecificationReference().split("#");
        descriptors.add(descriptor);
        }
        String headerLine = "";
        for (String header : headerItems) {
            headerLine = headerLine + header + ",";
        }

        dfw.append(headerLine+"\n");
        ExecutorService eservice = Executors.newFixedThreadPool(threads);
        CompletionService <List<String>> cservice = new ExecutorCompletionService <List<String>> (eservice);
        int k=0;
        for (IAtomContainer mol : mols3D)
        {
            DescCalculator dc = new DescCalculator(mol,descriptors);
            cservice.submit(dc);
            k=k+1;

        }
        for (int j=1 ; j<=k; j++)
        {
            StatusPanel.setStatus("Calculating Descriptors for Molecule "+j+"/"+compounds.size()+" Using "+threads+" Processors\n");
            List<String> dataItems = cservice.take().get();
                for (int i = 0; i < dataItems.size(); i++) {
                if (dataItems.get(i).equals("NaN")) {
                    dataItems.set(i, "NA");
                }
            }

            try {
                String dataLine = "";
                for (String data : dataItems) {
                    dataLine = dataLine + data + ",";
                }
                dfw.append(dataLine+"\n");
            } catch (Exception e) {
                System.out.println(e.toString());
            }
        }
 dfw.close();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

Inside the class there is a for loop that goes over the list of descriptors as follows (part shown). This code runs into concurrent modification exception. If I use threads=1 or descriptor iteration inside a synchronized{} block the code runs fine but i don't get the parallellization needed.How do I iterate over the list inside class DesCalculator ??

    public class DescCalculator implements Callable<List<String>>{

    private IAtomContainer mol = new Molecule();
    private ArrayList<IMolecularDescriptor> molDesc;

    DescCalculator(IAtomContainer mol_, ArrayList<IMolecularDescriptor> molDesc_)
    {
        this.mol = mol_;
        this.molDesc = molDesc_;
    }

    @Override
    public List<String> call() {
        List<String> dataItems = new ArrayList<String>();
        try
        {
            String title = (String) mol.getProperty("PUBCHEM_COMPOUND_CID");
            dataItems.add(title);
            //String csa = Double.toString(mobcalCSA.get(ind));
            String csa = "NA";
            dataItems.add(csa);
            int ndesc = 0;
            for (IMolecularDescriptor descriptor : molDesc) {
                descriptor.calculate(mol);
                DescriptorValue value = descriptor.calculate(mol);
                if (value.getException() != null) {
                    for (int i = 0; i < value.getNames().length; i++) {
                        dataItems.add("NA");
                    }
                    continue;
                }

                IDescriptorResult result = value.getValue();
                if (result instanceof DoubleResult) {
                    dataItems.add(String.valueOf(((DoubleResult) result).doubleValue()));
                } else if (result instanceof IntegerResult) {
                    dataItems.add(String.valueOf(((IntegerResult) result).intValue()));
                } else if (result instanceof DoubleArrayResult) {
                    for (int i = 0; i < ((DoubleArrayResult) result).length(); i++) {
                        dataItems.add(String.valueOf(((DoubleArrayResult) result).get(i)));
                    }
                } else if (result instanceof IntegerArrayResult) {
                    for (int i = 0; i < ((IntegerArrayResult) result).length(); i++) {
                        dataItems.add(String.valueOf(((IntegerArrayResult) result).get(i)));
                    }
                }

                ndesc++;
            } 

        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
        return dataItems;
    }

}

Print Stack Trace

java.util.ConcurrentModificationException
    at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
    at java.util.AbstractList$Itr.next(AbstractList.java:343)
    at org.openscience.cdk.ChemObject.notifyChanged(ChemObject.java:187)
    at org.openscience.cdk.ChemObject.setFlag(ChemObject.java:375)
    at org.openscience.cdk.graph.PathTools.depthFirstTargetSearch(PathTools.java:168)
    at org.openscience.cdk.graph.PathTools.depthFirstTargetSearch(PathTools.java:177)
    at org.openscience.cdk.graph.PathTools.depthFirstTargetSearch(PathTools.java:177)
    at org.openscience.cdk.graph.SpanningTree.getRing(SpanningTree.java:185)
    at org.openscience.cdk.graph.SpanningTree.getCyclicFragmentsContainer(SpanningTree.java:221)
    at org.openscience.cdk.atomtype.CDKAtomTypeMatcher.getRing(CDKAtomTypeMatcher.java:912)
    at org.openscience.cdk.atomtype.CDKAtomTypeMatcher.perceiveNitrogens(CDKAtomTypeMatcher.java:730)
    at org.openscience.cdk.atomtype.CDKAtomTypeMatcher.findMatchingAtomType(CDKAtomTypeMatcher.java:117)
    at org.openscience.cdk.tools.manipulator.AtomContainerManipulator.percieveAtomTypesAndConfigureAtoms(AtomContainerManipulator.java:719)
    at org.openscience.cdk.smiles.smarts.SMARTSQueryTool.initializeMolecule(SMARTSQueryTool.java:435)
    at org.openscience.cdk.smiles.smarts.SMARTSQueryTool.matches(SMARTSQueryTool.java:214)
    at org.openscience.cdk.smiles.smarts.SMARTSQueryTool.matches(SMARTSQueryTool.java:189)
    at org.openscience.cdk.qsar.descriptors.molecular.AcidicGroupCountDescriptor.calculate(AcidicGroupCountDescriptor.java:135)
    at edu.uconn.pharmacy.molfind.DescCalculator.call(DescCalculator.java:48)
    at edu.uconn.pharmacy.molfind.DescCalculator.call(DescCalculator.java:25)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:680)

Upvotes: 1

Views: 2216

Answers (3)

lochi
lochi

Reputation: 880

It turns out the problem is with descriptor objects which are not thread safe. I thank Affe for pointing it out ! Generating them inside DescCalculator class fixed the problem. I thank everyone for their input !

Upvotes: 0

esroberts
esroberts

Reputation: 58

Your molDesc field is shared state between 2 or more threads that access the call() method. As you stated, the only way to avoid the CMEs is to wrap the iteration in a synchronized() block (which will prevent you from parallelizing), or use a threadsafe structure like CopyOnWriteArrayList as Victor mentioned.

Upvotes: 0

Gray
Gray

Reputation: 116938

You should not get the exception unless you are actually modifying a Collection at the same time you are walking across it. Often this can be done in 1 thread by doing a delete from a Collection you are iterating across -- in a for for example.

But in your case I guess you are removing from molDesc list somewhere although I can't see that in the code sample you provided. If you need to remove entries from the list then you will have to use some other mechanism to do the deletes. You cannot alter the same collection in multiple threads unless it is somehow synchronized.

Couple other ideas:

  • Now sure if it needs to be the exact same list. You could have each thread work with a copy of the list.

    DescCalculator dc =
       new DescCalculator(mol, new ArrayList<IMolecularDescriptor>(descriptors));
    
  • You could just pass in a Collections.synchronizedList copy of molDesc although I'm not sure that's what you want.

    List<IMolecularDescriptor> syncedList =
        Collections.synchronizedList(descriptors);
    ...
    DescCalculator dc = new DescCalculator(mol, syncedList);
    
  • Each thread could keep a list of items that need to be deleted. At the end you could use the Future when the thread is reaped to get the deleted list of items and remove them from the list at the end.

Upvotes: 2

Related Questions