Reputation: 2981
I am doing a churn
analysis for telecom industry and I have a sample dataset. I have written this code below where I am using decision tree
algorithm in Spark
through python
. In the dataset I have multiple columns and I am selecting the columns that I need for my feature
set.
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
import os.path
import numpy as np
inputPath = os.path.join('file1.csv')
file_name = os.path.join(inputPath)
data = sc.textFile(file_name).zipWithIndex().filter(lambda (line,rownum): rownum>0).map(lambda (line, rownum): line)
final_data = data.map(lambda line: line.split(",")).filter(lambda line: len(line)>1).map(lambda line:LabeledPoint(1 if line[5] == 'True' else 0,[line[6],line[7]]))
(trainingdata, testdata) = final_data.randomSplit([0.7, 0.3])
model = DecisionTree.trainRegressor(trainingdata, categoricalFeaturesInfo={},
impurity='variance', maxDepth=5, maxBins=32)
predictions = model.predict(testdata.map(lambda x: x.features))
prediction= predictions.collect()
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
Now this code works fine and does the prediction but what I am missing is the identifier for each customer in the prediction
set or testdata
. In my dataset there is a column for customerid
(column number 4) which as of now I am not selecting as its not a feature to be considered in the model. I am having difficulty in associating this customerid
column with the testdata
for the customers whose detail is in the testdata
. If I add this select this column from the dataset in the feature
vector I am forming in the LabeledPoint
then this would lead to error as its not a feature value.
How can I add this column in my analysis so that I can get say top 50 customers who have higher churn value?
Upvotes: 2
Views: 714
Reputation: 330093
You can do it exactly the same way as you add the label after prediction.
Small helper:
customerIndex = ... # Put index of the column
def extract(line):
"""Given a line create a tuple (customerId, labeledPoint)"""
label = 1 if line[5] == 'True' else 0
point = LabeledPoint(label, [line[6], line[7]])
customerId = line[customerIndex]
return (customerId, point)
Prepare date using the extract
function:
final_data = (data
.map(lambda line: line.split(","))
.filter(lambda line: len(line) >1 )
.map(extract)) # Map to tuples
Train:
# As before
(trainingdata, testdata) = final_data.randomSplit([0.7, 0.3])
# Use only points, put the rest of the arguments in place of ...
model = DecisionTree.trainRegressor(trainingdata.map(lambda x: x[1]), ...)
Predict:
# Make predictions using points
predictions = model.predict(testdata.map(lambda x: x[1].features))
# Add customer id and label
labelsIdsAndPredictions = (testData
.map(lambda x: (x[0], x[1].label))
.zip(predictions))
Extract top 50:
top50 = labelsIdsAndPredictions.top(50, key=lambda x: x[1])
Upvotes: 2