Guforu
Guforu

Reputation: 4023

Spark scalability

I use currently one master (local machine) and two workers (2*32 cores, Memory 2*61.9 GB) for standard ALS algorithm of Spark and produce the next code for the time evaluation:

import numpy as np
from scipy.sparse.linalg import spsolve
import random
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import hashlib

#Spark configuration settings
conf = SparkConf().setAppName("Temp").setMaster("spark://<myip>:7077").set("spark.cores.max","64").set("spark.executor.memory", "61g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

#first time
t1 = time.time()

#load the DataFrame and transform it into RDD<Rating>
rddob = sqlContext.read.json("file.json").rdd
rdd1 = rddob.map(lambda line:(line.ColOne, line.ColTwo))
rdd2 = rdd1.map(lambda line: (line, 1))
rdd3 = rdd2.reduceByKey(lambda a,b: a+b)
ratings = rdd3.map(lambda (line, rating): Rating(int(hash(line[0]) % (10 ** 8)), int(line[1]), float(rating)))
ratings.cache()

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 5
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

#second time
t2 = time.time()

#print results
print "Time of ALS",t2-t1

In this code I hold all parameters constant excepted parameter set("spark.cores.max","x") for which I use the next values for x: 1,2,4,8,16,32,64. I got the next time evaluation:

#cores  time [s]
1       20722
2       11803
4       5596
8       3131
16      2125
32      2000
64      2051

The results of evaluation are a little bit strange for me. I see a good linear scalability by the small number of cores. But in the range of 16, 32 and 64 possible cores I don't see either any scalability, or improvement of time performance anymore. How is it possible? My input file is approximately 70 GB big and has 200 000 000 lines.

Upvotes: 1

Views: 2095

Answers (1)

user6022341
user6022341

Reputation:

Linear scalability in distributed system like Spark is only in a small part a result of increasing number of cores. The most important part is opportunity to distribute disk / network IO. If you have constant number of workers and don't scale storage at the same time you'll quickly get to the point where throughput is limited by IO.

Upvotes: 3

Related Questions