Ibrahim Ahmed-Nour
Ibrahim Ahmed-Nour

Reputation: 29

Kmeans clustering with map reduce in spark

Hello can someone help me to do map reduce with Kmeans using Spark . Actually can do Kmeans with spark , but i dont how to map and reduce it . Thanks .

Upvotes: 2

Views: 983

Answers (2)

el_pazzu
el_pazzu

Reputation: 414

Below a proposed pseudo-code for your exercise:

centroids = k random sampled points from the dataset

Map:

  • Given a point and the set of centroids

  • Calculate the distance between the point and each centroid

  • Emit the point and the closest centroid

Reduce:

  • Given the centroid and the points belonging to its cluster

  • Calculate the new centroid as the arithmetic mean position of the points

  • Emit the new centroid

prev_centroids = centroids

centroids = new_centroids

while prev_centroids - centroids > threshold

The mapper class calculates the distance between the data point and each centroid. Then emits the index of the closest centroid and the data point:

class MAPPER
method MAP(file_offset, point)
    min_distance = POSITIVE_INFINITY
    closest_centroid = -1
    for all centroid in list_of_centroids
        distance = distance(centroid, point)
        if (distance < min_distance)
            closest_centroid = index_of(centroid)
            min_distance = distance
    EMIT(closest_centroid, point) 

The reducer calculates the new approximation of the centroid and emits it.

class REDUCER
method REDUCER(centroid_index, list_of_point_sums)
    number_of_points = partial_sum.number_of_points
    point_sum = 0
    for all partial_sum in list_of_partial_sums:
        point_sum += partial_sum
        point_sum.number_of_points += partial_sum.number_of_points
    centroid_value = point_sum / point_sum.number_of_points
    EMIT(centroid_index, centroid_value)

The actual K-Means Spark implementation:

First you read the file with the points and generate the initial centroids with a random sampling, using takeSample(False, k): this function takes k random samples, without replacement, from the RDD; so, the application generates the initial centroids in a distributed manner, avoiding to move all the data to the driver. You may reuse the RDD in an iterative algorithm, hence cache it in memory with cache() to avoid to re-evaluate it every time an action is triggered:

points = sc.textFile(INPUT_PATH).map(Point).cache()
initial_centroids = init_centroids(points, k=parameters["k"])

def init_centroids(dataset, k):
    start_time = time.time()
    initial_centroids = dataset.takeSample(False, k)
    print("init centroid execution:", len(initial_centroids), "in", 
    (time.time() - start_time), "s")
    return initial_centroids

After that, you iterate the mapper and the reducer stages until the stopping criterion is verified or when the maximum number of iterations is reached.

while True:
    print("--Iteration n. {itr:d}".format(itr=n+1), end="\r", 
    flush=True)
    cluster_assignment_rdd = points.map(assign_centroids)
    sum_rdd = cluster_assignment_rdd.reduceByKey(lambda x, y: x.sum(y))
    centroids_rdd = sum_rdd.mapValues(lambda x: 
    x.get_average_point()).sortByKey(ascending=True)

    new_centroids = [item[1] for item in centroids_rdd.collect()]
    stop = stopping_criterion(new_centroids,parameters["threshold"])

    n += 1
    if(stop == False and n < parameters["maxiteration"]):
        centroids_broadcast = sc.broadcast(new_centroids)
    else:
        break

The stopping condition is computed this way:

def stopping_criterion(new_centroids, threshold):
    old_centroids = centroids_broadcast.value
    for i in range(len(old_centroids)):
        check = old_centroids[i].distance(new_centroids[i], 
        distance_broadcast.value) <= threshold
        if check == False:
            return False
    return True

In order to represent the points, a class Point has been defined. It's characterized by the following fields:

  • a numpyarray of components
  • number of points: a point can be seen as the aggregation of many points, so this variable is used to track the number of points that are represented by the object

It includes the following operations:

  • distance (it is possible to pass as parameter the type of distance)

  • sum

  • get_average_point: this method returns a point that has as components the average of the actual components on the number of the points represented by the object

    class Point: def init(self, line): values = line.split(",") self.components = np.array([round(float(k), 5) for k in values]) self.number_of_points = 1

      def sum(self, p):
          self.components = np.add(self.components, p.components)
          self.number_of_points += p.number_of_points
          return self
    
      def distance(self, p, h):
          if (h < 0):
             h = 2
          return linalg.norm(self.components - p.components, h)
    
      def get_average_point(self):
          self.components = np.around(np.divide(self.components, 
          self.number_of_points), 5)
          return self
    

The mapper method is invoked, at each iteration, on the input file, that contains the points from the dataset

cluster_assignment_rdd = points.map(assign_centroids)

The assign_centroids function, for each point on which is invoked, assign the closest centroid to that point. The centroids are taken from the broadcast variable. The function returns the result as a tuple (id of the centroid, point)

 def assign_centroids(p):
     min_dist = float("inf")
     centroids = centroids_broadcast.value
     nearest_centroid = 0
     for i in range(len(centroids)):
         distance = p.distance(centroids[i], distance_broadcast.value)
         if(distance < min_dist):
             min_dist = distance
             nearest_centroid = i
     return (nearest_centroid, p)

The reduce stage is done using two spark transformations:

  • reduceByKey: for each cluster, compute the sum of the points belonging to it. It is mandatory to pass one associative function as a parameter. The associative function (which accepts two arguments and returns a single element) should be commutative and associative in mathematical nature

    sum_rdd = cluster_assignment_rdd.reduceByKey(lambda x, y: x.sum(y))

  • mapValues: it is used to calculate the average point for each cluster at the end of each stage. The points are already divided by key. This trasformation works only on the value of a key. The results are sorted in order to make easier comparisons.

    centroids_rdd = sum_rdd.mapValues(lambda x: x.get_average_point()).sortBy(lambda x: x[1].components[0])

The get_average_point() function returns the new computed centroid.

 def get_average_point(self):
     self.components = np.around(np.divide(self.components, 
     self.number_of_points), 5)
     return self

Upvotes: 2

Rahul Kumar
Rahul Kumar

Reputation: 2344

You don't need to write map-reduce. You can use spark dataframe API and use Spark ML library.

You can read more about it here.

https://spark.apache.org/docs/latest/ml-clustering.html

Upvotes: 0

Related Questions