Abdulaziz Alshehri
Abdulaziz Alshehri

Reputation: 11

Hadoop stuck on reduce 67% (only with large data)

I'm a beginner at Hadoop and Linux.

The Problem

What ran successfully

Mapper (Python)

#!/usr/bin/env python3

import sys
from itertools import islice
from operator import itemgetter

def read_input(file):
    # read file except first line
    for line in islice(file, 1, None):
        # split the line into words
        yield line.split(',')

def main(separator='\t'):
    
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    
    for words in data:
        
        # for each row we take only the needed columns
        data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
        data_row[7] = data_row[7].replace('\n', '')

        # taking year and month No.from first column to create the
        # key that will send to reducer
        date = data_row[0].split(' ')[0].split('-')
        key = str(date[0]) + '_' + str(date[1])
        
        # value that will send to reducer
        value = ','.join(data_row)
        
        # print here will send the output pair (key, value)
        print('%s%s%s' % (key, separator, value))
        
if __name__ == "__main__":
    main()

Reducer (Python)

#!/usr/bin/env python3

from itertools import groupby
from operator import itemgetter
import sys
import pandas as pd
import numpy as np
import time

def read_mapper_output(file):
    for line in file:
        yield line

def main(separator='\t'):
    
    all_rows_2015 = []
    all_rows_2016 = []
    
    start_time = time.time()
    
    names = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 
             'pickup_longitude',     'pickup_latitude',       'dropoff_longitude', 
             'dropoff_latitude',     'total_amount']
    df = pd.DataFrame(columns=names)
    
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin)
    for words in data:

        # get key & value from Mapper
        key, value = words.split(separator)
        row = value.split(',')

        # split data with belong to 2015 from data belong to 2016
        if key in '2015_01 2015_02 2015_03': 
            all_rows_2015.append(row)
            if len(all_rows_2015) >= 10:
                df=df.append(pd.DataFrame(all_rows_2015, columns=names))
                all_rows_2015 = []
        elif key in '2016_01 2016_02 2016_03':
            all_rows_2016.append(row)
            if len(all_rows_2016) >= 10:
                df=df.append(pd.DataFrame(all_rows_2016, columns=names))
                all_rows_2016 = []
    
    print(df.to_string())
    print("--- %s seconds ---" % (time.time() - start_time))

if __name__ == "__main__":
    main()

More Info

I'm using Hadoop v3.2.1 on Linux installed on VMware to run MapReduce job in Python.

Reduce Job in Numbers:

Input Data Size Number of rows Reduce job time
~98 Kb 600 rows ~0.1 sec good
~953 Kb 6,000 rows ~1 sec good
~9.5 Mb 60,000 rows ~52 sec good
~94 Mb 600,000 rows ~5647 sec (~94 min) very slow
~11 Gb 76,000,000 rows ?? impossible

The goal is running on ~76M rows input data, it's impossible with this issue remaining.

Upvotes: 1

Views: 506

Answers (2)

Iñigo González
Iñigo González

Reputation: 3955

I see some problems here.

  • In the reduce phase you don't make any summarization, just fiter 2015Q1 and 2015Q2 - reduce is supposed to be used for summarization like grouping by key or doing some calculations based on the keys.

    If you just need to filter data, do it on the map phase to save cycles (assume you're billed for all data):

  • You store a lot of stuff in RAM inside a dataframe. Since you don't know how big is the key, you are experiencing trashing. This combined with heavy keys will make your process do a page fault on every DataFrame.append after some time.

There are some fixes:

  • Do you really need a reduce phase? Since you are just filtering the first three months os 2015 and 2016 you cand do this on the Map phase. This will make the process go a bit faster if you need a reduce later since it will need less data for the reduce phase.

    def main(separator='\t'):
    
        # input comes from STDIN (standard input)
        data = read_input(sys.stdin)
    
        for words in data:
    
            # for each row we take only the needed columns
            data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
    
            # Find out first if you are filtering this data
    
            # taking year and month No.from first column to create the
            # key that will send to reducer
            date = data_row[0].split(' ')[0].split('-')
    
            # Filter out
    
            if (date[1] in [1,2,3]) and (date[0] in [2015,2016]):
    
               # We keep this data. Calulate key and clean up data_row[7]
    
               key = str(date[0]) + '_' + str(date[1])
               data_row[7] = data_row[7].replace('\n', '')
    
               # value that will send to reducer
               value = ','.join(data_row)
    
               # print here will send the output pair (key, value)
               print('%s%s%s' % (key, separator, value))
    
  • Try not to store data in memory during the reduce. Since you are filtering, print() the results as soon as you have it. If your source data is not sorted, the reduce will serve as a way to have all data from the same month together.

  • You've got a bug in your reduce phase: you're losing number_of_records_per_key modulo 10 because you don't append the results to the dataframe. Dont' append to the dataframe and print the result asap.

Upvotes: 0

Ben Watson
Ben Watson

Reputation: 5541

"when reduce hit the 67% only one CPU keep running at the time at 100% and the rest of them are sleeping" - you have skew. One key has far more values than any other key.

Upvotes: 1

Related Questions