Reputation: 125
I'm using Java's Concurrent Executors to read in several files at once and write the info to a hashmap. The files are for daily logs that look something like this:
+-----------+-------+-----------+-------+
| Member ID | Time | Weight | Fat % |
+-----------+-------+-----------+-------+
| 1123141 | 1:03 | 162 | 21.2 |
| 5321430 | 1:10 | 131 | 25.3 |
| ... | ... | ... | ... |
+-----------+-------+-----------+-------+
And the hashmap has Member names as keys, and for the values I've created a class "MemberProperties" that has lists for the time and other properties. I'm trying to use multiple threads to read in multiple days at once, write the data to the hashmap, and then do data analysis on a per-member basis. Here's how the code looks
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.ArrayList;
import java.util.Date;
import java.io.IOException;
class MemberProps {
ArrayList<Date> time;
ArrayList<Integer> weight;
ArrayList<Float> fat;
public MemberProps() {
this.time = new ArrayList<>();
this.weight = new ArrayList<>();
this.fat = new ArrayList<>();
}
}
public class MemberFileReader {
public ConcurrentHashMap<Integer, MemberProps> MemberHash;
public ExecutorService executor = Executors.newFixedThreadPool(2);
public void readFiles() {
//Get this week's files
File folder = new File("C:\\User\\Temp");
File[] files = folder.listFiles();
for (File fl : files) {
executor.execute(new FileReader(fl.toString()));
}
executor.shutdown();
}
private final class FileReader implements Runnable {
private String filepath;
public FileReader(String filepath) {
this.filepath = filepath;
}
@Override
public synchronized void run() {
try {
String line = null;
BufferedReader in = new BufferedReader(new FileReader(filepath));
while ((line = in.readLine()) != null) {
MemberProps member = new MemberProps();
String[] split_line = line.split("\\t");
int member_id = Integer.parseInt(split_line[0]);
member.time.add(Date.parse(split_line[1]));
member.weight.add(Integer.parseInt(split_line[2]));
member.fat.add(Float.parseFloat(split_line[3]));
// If not absent, will return member's values
member = MemberHash.putIfAbsent(member_id, member);
if (member != null){
member.time.add(Date.parse(split_line[1]));
member.weight.add(Integer.parseInt(split_line[2]));
member.fat.add(Float.parseFloat(split_line[3]));
MemberHash.put(member_id, member);
}
}
} catch (IOException E){
// caught exception
}
}
}
}
My issue is that for some reason, values come up missing in the lists. For example, for a member with 10 logs, I might have 10 times but only 9 weights/fats, or similar 9 times but 10 weights, etc. From debugging, it seems like the issue might happen when different threads try to get
the same member and put
at the same time, but I'm not sure. Any ideas? To fix this I'm thinking about having different hashmaps for each thread, and then joining the maps after, but idk if there's a simpler solution.
Upvotes: 2
Views: 1243
Reputation: 13570
ArrayList.add is not thread safe. You should synchronize adding to those lists in MemberProps e.g.
class MemberProps {
private ArrayList<Date> time;
private ArrayList<Integer> weight;
private ArrayList<Float> fat;
public MemberProps() {
this.time = new ArrayList<>();
this.weight = new ArrayList<>();
this.fat = new ArrayList<>();
}
public synchronized add(Date d, Integer w, Float f) {
this.time.add(d);
this.weight.add(d);
this.fat.add(f);
}
}
Then use those synchronized methods like this:
if (member != null)) {
Date d = Date.parse(split_line[1]);
Integer w = Integer.parseInt(split_line[2]);
Float f = Float.parseFloat(split_line[3]);
member.add(d, w, f);
}
Note that you should not put the member into the map a second time since it is already there, you just need to add to the existing instance.
Upvotes: 2
Reputation: 140319
You're not adding to the ArrayList
s safely. You need to externally synchronize structural modifications, as stated in the Javadoc.
Specifically, your approach of "put if absent, then put again if there was already something there" doesn't work because multiple threads can be executing that code at the same time.
Use compute
:
MemberHash.compute(member_id, (k, member) -> {
if (member == null) {
member = new MemberProps();
}
// Add things to member.
return member;
});
compute
executes atomically, so you won't get two threads trying to add at the same time.
Upvotes: 1