balias
balias

Reputation: 540

Scope to optimize the performance of this parallel stream processing of csv

I am writing a csv stream processor. to create an IdMapping as shown. Here after switching from normal stream to parallel and introducing concurrent map, performance increased significantly. I have around 500k rows in this csvstream, please find sample row:

Country,StudioNetwork,MediaId,Title,OriginalReleaseDate,MediaType,SeriesMediaId,SeasonMediaId,SeriesTitle,SeasonNumber,EpisodeNumber,LicenseType,ActualRetailPrice,OfferStartDate,OfferEndDate,Actors,Director,XboxLiveURL UNITED STATES,,531b964f-0cb9-4968-9b77-e547f2435225,Zoey 101: Peeps at PCA,1/23/2012 12:00:00 AM,TvSeason,c3eddf64-372f-4dfc-a763-2681cce65ac2,,Zoey 101: Peeps at PCA,1,,EST HD,12.99,1/30/2012 8:01:00 AM,1/1/3000 8:01:00 AM,,,https://video.xbox.com/tv-season/531b964f-0cb9-4968-9b77-e547f2435225

 @Override
public List<Matcher.IdMapping> match(Matcher.CsvStream externalDb, MatchingContext matchingContext) {

    // Map to store the ID mappings between internal and external databases
    final Map<Integer, Matcher.IdMapping> idMappingMap = new ConcurrentHashMap<>();

    // Extract column indices from the header row of the external database
    final Map<String, Integer> columnIndices = ParsingUtils.generateHeaderMap(externalDb.getHeaderRow());
    int titleIdx = columnIndices.get("Title");
    int dateIdx = columnIndices.get("OriginalReleaseDate");
    int directorIdx = columnIndices.get("Director");
    int actorIdx = columnIndices.get("Actors");
    int mediaId = columnIndices.get("MediaId");

    // Retrieve the internal movie index from the matching context
    Map<String, Movie> movieIndex = matchingContext.getMovieIndex();

    //AtomicInteger count = new AtomicInteger();
    externalDb.getDataRows().parallel().forEach(row -> {
        //count.getAndIncrement();
        try (CSVReader csvReader = new CSVReader(new StringReader(row))) {
            String[] columns = csvReader.readNext();

            if (columns != null) {
                String externalTitle = columns[titleIdx].trim().toLowerCase();
                String externalDate = columns[dateIdx].trim();
                String externalDirector = columns[directorIdx].trim().toLowerCase();
                List<String> externalActors = Arrays.stream(columns[actorIdx].split(","))
                        .map(String::trim)
                        .map(String::toLowerCase)
                        .collect(Collectors.toList());

                // Parse year from OriginalReleaseDate if needed
                int externalYear = ParsingUtils.parseYear(extractYear(externalDate));

                String key = externalTitle + "_" + externalYear;

                // Find matching movie in internal DB
                Optional<Movie> matchedMovie = Optional.ofNullable(movieIndex.get(key))
                        .filter(movie -> isMatch(movie, externalTitle, externalYear, externalActors, externalDirector, matchingContext));


                matchedMovie.ifPresent(movie -> {
                    // Use ConcurrentMap to handle concurrent access TODO
                    idMappingMap.putIfAbsent(movie.getId(), new Matcher.IdMapping(movie.getId(), columns[mediaId].trim()));
                });
            }
        } catch (Exception e) {
            LOGGER.error("Error processing row: {}", row, e);
        }
    });
   //LOGGER.info("processed {} xbox rows", count);

    return new ArrayList<>(idMappingMap.values());
}

Since parallel streams dont give guarantee of row size, will this work when the number of rows get increaed to lets say 5 million? Any way to optimize this further?

Upvotes: 0

Views: 53

Answers (0)

Related Questions