Reputation: 21
Now I am writing a Spark streaming program to detect the abnormal of network in a data center. I try to use regression algorithm. For example, I use the training data set to compute the model (i.e., the coef), and then how can I use this previous computed model in the data stream. I use the following join, but get the exception.
Traceback (most recent call last):
File "/home/xiuli/PycharmProjects/benchmark/parx.py", line 98, in <module>
joinedStream = testRDD.join(trainingRDD)
File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 362, in join
File "/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 313, in transformWith
AttributeError: 'PipelinedRDD' object has no attribute '_jdstream'
I could see Spark streaming guide give an example, but it lack of the details.
Stream-dataset joins
This has already been shown earlier while explain DStream.transform operation. Here is yet another example of joining a windowed stream with a dataset.
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
Following is my code:
from __future__ import print_function
import sys,os,datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.context import SQLContext
from pyspark.resultiterable import ResultIterable
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
import numpy as np
import statsmodels.api as sm
def splitLine(line, delimiter='|'):
values = line.split(delimiter)
st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
return (values[0],st.hour), values[2:]
def reg_m(y, x):
ones = np.ones(len(x[0]))
X = sm.add_constant(np.column_stack((x[0], ones)))
for ele in x[1:]:
X = sm.add_constant(np.column_stack((ele, X)))
results = sm.OLS(y, X).fit()
return results
def train(line):
y,x = [],[]
y, x = [],[[],[],[],[],[],[]]
reading_tmp,temp_tmp = [],[]
i = 0
for reading, temperature in line[1]:
if i%4==0 and len(reading_tmp)==4:
y.append(reading_tmp.pop())
x[0].append(reading_tmp.pop())
x[1].append(reading_tmp.pop())
x[2].append(reading_tmp.pop())
temp = float(temp_tmp[0])
del temp_tmp[:]
x[3].append(temp-20.0 if temp>20.0 else 0.0)
x[4].append(16.0-temp if temp<16.0 else 0.0)
x[5].append(5.0-temp if temp<5.0 else 0.0)
reading_tmp.append(float(reading))
temp_tmp.append(float(temperature))
i = i + 1
return str(line[0]),reg_m(y, x).params.tolist()
def detect(line):
y,x = [],[]
y, x = [],[[],[],[],[],[],[]]
reading_tmp,temp_tmp = [],[]
i = 0
for reading, temperature in line[1]:
if i%4==0 and len(reading_tmp)==4:
y.append(reading_tmp.pop())
x[0].append(reading_tmp.pop())
x[1].append(reading_tmp.pop())
x[2].append(reading_tmp.pop())
temp = float(temp_tmp[0])
del temp_tmp[:]
x[3].append(temp-20.0 if temp>20.0 else 0.0)
x[4].append(16.0-temp if temp<16.0 else 0.0)
x[5].append(5.0-temp if temp<5.0 else 0.0)
reading_tmp.append(float(reading))
temp_tmp.append(float(temperature))
i = i + 1
return line[0],reg_m(y, x).params.tolist()
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: parx.py <checkpointDir> <trainingDataDir> <streamDataDir>", file=sys.stderr)
exit(-1)
checkpoint, trainingInput, streamInput = sys.argv[1:]
sc = SparkContext("local[2]", appName="BenchmarkSparkStreaming")
trainingLines = sc.textFile(trainingInput)
trainingRDD = trainingLines.map(lambda line: splitLine(line, "|"))\
.groupByKey()\
.map(lambda line: train(line)).cache()
ssc = StreamingContext(sc, 1)
ssc.checkpoint(checkpoint)
lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, "|"))
testRDD = lines.groupByKeyAndWindow(1,1).map(lambda line:(str(line[0]), line[1]))
joinedStream = testRDD.join(trainingRDD)
joinedStream.pprint(20)
ssc.start()
ssc.awaitTermination()
Upvotes: 2
Views: 2689
Reputation: 141
According to the documentation that you referred to, try:
testRDD.transform(lambda rdd: rdd.join(trainingRDD))
Upvotes: 1