Taycir Yahmed
Taycir Yahmed

Reputation: 449

pyspark: train kmeans streaming with data retrieved from kafka

I want to train a streaming kmeans model with data consumed from a kafka topic.

My problem is how to present the data for kmeans streamig model

sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)

zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])

lines.pprint()

This outputs (these are my features separated by '|'):

1.0|2.0|0.0|21.0|2.0

1.0|2.0|0.0|21.0|2.0

Then I want to do this

model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
model.trainOn(lines)

If I combine the two slices of code I get the error:

TypeError: Cannot convert type <type 'unicode'> into Vector

Upvotes: 1

Views: 425

Answers (1)

Taycir Yahmed
Taycir Yahmed

Reputation: 449

The first issue was formatting the stream extracted from kafka. Here is what was worked for a pipe separated data

sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)

zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

raw = kvs.flatMap(lambda kafkaS: [kafkaS])
lines = raw.map(lambda xs: xs[1].split("|"))

lines = lines.map(lambda x: DenseVector(x))

The second issue was the dimension of the data: first parameter of setRandomCenters (it should by the same as the number of features)

Upvotes: 1

Related Questions