Reputation: 540
I am writing a csv stream processor. to create an IdMapping as shown. Here after switching from normal stream to parallel and introducing concurrent map, performance increased significantly. I have around 500k rows in this csvstream, please find sample row:
Country,StudioNetwork,MediaId,Title,OriginalReleaseDate,MediaType,SeriesMediaId,SeasonMediaId,SeriesTitle,SeasonNumber,EpisodeNumber,LicenseType,ActualRetailPrice,OfferStartDate,OfferEndDate,Actors,Director,XboxLiveURL UNITED STATES,,531b964f-0cb9-4968-9b77-e547f2435225,Zoey 101: Peeps at PCA,1/23/2012 12:00:00 AM,TvSeason,c3eddf64-372f-4dfc-a763-2681cce65ac2,,Zoey 101: Peeps at PCA,1,,EST HD,12.99,1/30/2012 8:01:00 AM,1/1/3000 8:01:00 AM,,,https://video.xbox.com/tv-season/531b964f-0cb9-4968-9b77-e547f2435225
@Override
public List<Matcher.IdMapping> match(Matcher.CsvStream externalDb, MatchingContext matchingContext) {
// Map to store the ID mappings between internal and external databases
final Map<Integer, Matcher.IdMapping> idMappingMap = new ConcurrentHashMap<>();
// Extract column indices from the header row of the external database
final Map<String, Integer> columnIndices = ParsingUtils.generateHeaderMap(externalDb.getHeaderRow());
int titleIdx = columnIndices.get("Title");
int dateIdx = columnIndices.get("OriginalReleaseDate");
int directorIdx = columnIndices.get("Director");
int actorIdx = columnIndices.get("Actors");
int mediaId = columnIndices.get("MediaId");
// Retrieve the internal movie index from the matching context
Map<String, Movie> movieIndex = matchingContext.getMovieIndex();
//AtomicInteger count = new AtomicInteger();
externalDb.getDataRows().parallel().forEach(row -> {
//count.getAndIncrement();
try (CSVReader csvReader = new CSVReader(new StringReader(row))) {
String[] columns = csvReader.readNext();
if (columns != null) {
String externalTitle = columns[titleIdx].trim().toLowerCase();
String externalDate = columns[dateIdx].trim();
String externalDirector = columns[directorIdx].trim().toLowerCase();
List<String> externalActors = Arrays.stream(columns[actorIdx].split(","))
.map(String::trim)
.map(String::toLowerCase)
.collect(Collectors.toList());
// Parse year from OriginalReleaseDate if needed
int externalYear = ParsingUtils.parseYear(extractYear(externalDate));
String key = externalTitle + "_" + externalYear;
// Find matching movie in internal DB
Optional<Movie> matchedMovie = Optional.ofNullable(movieIndex.get(key))
.filter(movie -> isMatch(movie, externalTitle, externalYear, externalActors, externalDirector, matchingContext));
matchedMovie.ifPresent(movie -> {
// Use ConcurrentMap to handle concurrent access TODO
idMappingMap.putIfAbsent(movie.getId(), new Matcher.IdMapping(movie.getId(), columns[mediaId].trim()));
});
}
} catch (Exception e) {
LOGGER.error("Error processing row: {}", row, e);
}
});
//LOGGER.info("processed {} xbox rows", count);
return new ArrayList<>(idMappingMap.values());
}
Since parallel streams dont give guarantee of row size, will this work when the number of rows get increaed to lets say 5 million? Any way to optimize this further?
Upvotes: 0
Views: 53