Reputation: 11
I'm a beginner at Hadoop and Linux.
2021-08-08 22:53:12,350 INFO mapreduce.Job: map 100% reduce 67%
.2021-08-08 19:44:13,350 INFO mapreduce.Job: map 100% reduce 100%
.#!/usr/bin/env python3
import sys
from itertools import islice
from operator import itemgetter
def read_input(file):
# read file except first line
for line in islice(file, 1, None):
# split the line into words
yield line.split(',')
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
# for each row we take only the needed columns
data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
data_row[7] = data_row[7].replace('\n', '')
# taking year and month No.from first column to create the
# key that will send to reducer
date = data_row[0].split(' ')[0].split('-')
key = str(date[0]) + '_' + str(date[1])
# value that will send to reducer
value = ','.join(data_row)
# print here will send the output pair (key, value)
print('%s%s%s' % (key, separator, value))
if __name__ == "__main__":
main()
#!/usr/bin/env python3
from itertools import groupby
from operator import itemgetter
import sys
import pandas as pd
import numpy as np
import time
def read_mapper_output(file):
for line in file:
yield line
def main(separator='\t'):
all_rows_2015 = []
all_rows_2016 = []
start_time = time.time()
names = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance',
'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
'dropoff_latitude', 'total_amount']
df = pd.DataFrame(columns=names)
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin)
for words in data:
# get key & value from Mapper
key, value = words.split(separator)
row = value.split(',')
# split data with belong to 2015 from data belong to 2016
if key in '2015_01 2015_02 2015_03':
all_rows_2015.append(row)
if len(all_rows_2015) >= 10:
df=df.append(pd.DataFrame(all_rows_2015, columns=names))
all_rows_2015 = []
elif key in '2016_01 2016_02 2016_03':
all_rows_2016.append(row)
if len(all_rows_2016) >= 10:
df=df.append(pd.DataFrame(all_rows_2016, columns=names))
all_rows_2016 = []
print(df.to_string())
print("--- %s seconds ---" % (time.time() - start_time))
if __name__ == "__main__":
main()
I'm using Hadoop v3.2.1 on Linux installed on VMware to run MapReduce job in Python.
Reduce Job in Numbers:
Input Data Size | Number of rows | Reduce job time | |
---|---|---|---|
~98 Kb | 600 rows | ~0.1 sec | good |
~953 Kb | 6,000 rows | ~1 sec | good |
~9.5 Mb | 60,000 rows | ~52 sec | good |
~94 Mb | 600,000 rows | ~5647 sec (~94 min) | very slow |
~11 Gb | 76,000,000 rows | ?? | impossible |
The goal is running on ~76M rows input data, it's impossible with this issue remaining.
Upvotes: 1
Views: 506
Reputation: 3955
I see some problems here.
In the reduce phase you don't make any summarization, just fiter 2015Q1 and 2015Q2 - reduce is supposed to be used for summarization like grouping by key or doing some calculations based on the keys.
If you just need to filter data, do it on the map phase to save cycles (assume you're billed for all data):
You store a lot of stuff in RAM inside a dataframe. Since you don't know how big is the key, you are experiencing trashing. This combined with heavy keys will make your process do a page fault on every DataFrame.append after some time.
There are some fixes:
Do you really need a reduce phase? Since you are just filtering the first three months os 2015 and 2016 you cand do this on the Map phase. This will make the process go a bit faster if you need a reduce later since it will need less data for the reduce phase.
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
# for each row we take only the needed columns
data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
# Find out first if you are filtering this data
# taking year and month No.from first column to create the
# key that will send to reducer
date = data_row[0].split(' ')[0].split('-')
# Filter out
if (date[1] in [1,2,3]) and (date[0] in [2015,2016]):
# We keep this data. Calulate key and clean up data_row[7]
key = str(date[0]) + '_' + str(date[1])
data_row[7] = data_row[7].replace('\n', '')
# value that will send to reducer
value = ','.join(data_row)
# print here will send the output pair (key, value)
print('%s%s%s' % (key, separator, value))
Try not to store data in memory during the reduce. Since you are filtering, print() the results as soon as you have it. If your source data is not sorted, the reduce will serve as a way to have all data from the same month together.
You've got a bug in your reduce phase: you're losing number_of_records_per_key modulo 10 because you don't append the results to the dataframe. Dont' append to the dataframe and print the result asap.
Upvotes: 0
Reputation: 5541
"when reduce hit the 67% only one CPU keep running at the time at 100% and the rest of them are sleeping" - you have skew. One key has far more values than any other key.
Upvotes: 1