Parzival
Parzival

Reputation: 2064

type error when using Sparkit-Learn's SparkCountVectorizer()

I want to use Sparkit-Learn to vectorize a collection of texts. I read the texts from SQL Server. What I get back is a DataFrame, which I convert to an RDD (as Sparkit-Learn doesn't handle DataFrames) and then to an ArrayRDD. Problem is, I get a type error when I try to vectorize the ArrayRDD:

from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkCountVectorizer

jdbcDF = spark.read.format('jdbc').option('url', 'jdbc:sqlserver://XXX:1433;database=XXX;user=XXX;password=XXX;').option('dbtable', 'XXX.XXX').load()

my_rdd = jdbcDF.select(jdbcDF.columnoftexts).rdd
my_rdd.take(2)
# [Row(columnoftexts='some perfectly reasonable string'), Row(columnoftexts='another perfectly reasonable string')]

array_rdd = ArrayRDD(my_rdd)
counter = SparkCountVectorizer()
tf = counter.fit_transform(array_rdd)

# 17/01/09 15:07:50 WARN BlockManager: Putting block rdd_5_0 failed
# 17/01/09 15:07:50 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
# org.apache.spark.api.python.PythonException: Traceback (most recent call last):
# File "/home/cgu.local/thiagovm/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
# process()
# File "/home/cgu.local/thiagovm/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
# serializer.dump_stream(func(split_index, iterator), outfile)
# File "/home/cgu.local/thiagovm/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
# vs = list(itertools.islice(iterator, batch))
# File "/usr/local/lib/python3.5/site-packages/splearn/feature_extraction/text.py", line 289, in <lambda>
# A = Z.transform(lambda X: list(map(analyze, X)), column='X').persist()
# File "/usr/local/lib/python3.5/site-packages/sklearn/feature_extraction/text.py", line 238, in <lambda>
# tokenize(preprocess(self.decode(doc))), stop_words)
# File "/usr/local/lib/python3.5/site-packages/sklearn/feature_extraction/text.py", line 204, in <lambda>
# return lambda x: strip_accents(x.lower())
# AttributeError: 'numpy.ndarray' object has no attribute 'lower'

#     at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
#     at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
#     at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
#     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
#     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
#     at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
#     at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
#     at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
#     at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
#     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
#     at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
#     at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
#     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
#     at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
#     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
#     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
#     at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
#     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
#     at org.apache.spark.scheduler.Task.run(Task.scala:85)
#     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
#     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
#     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
#     at java.lang.Thread.run(Thread.java:745)

Oddly enough, this works:

my_rdd2 = sc.parallelize(['some perfectly reasonable string', 'another perfectly reasonable string'])
array_rdd_2 = ArrayRDD(my_rdd2)
tf = counter.fit_transform(array_rdd2)
tf
# <class 'splearn.rdd.SparseRDD'> from PythonRDD[20] at RDD at PythonRDD.scala:48

Even though my_rdd2.dtype is <class 'numpy.ndarray'>.

What am I doing wrong? Why does it work with a manually created RDD but not with an RDD created from a JDBC source?

Upvotes: 1

Views: 76

Answers (1)

zero323
zero323

Reputation: 330453

Then source of the problem is in front of your eyes. Let's simplify this a bit.

my_rdd_of_rows = sc.parallelize([("some text ", )]).toDF(["columnoftexts"]).rdd

When you apply ArrayRDD you get a two dimensional array for each record:

ArrayRDD(my_rdd_of_rows).first().shape
(1, 1)

Compared to that the required structure is one-dimensional:

ArrayRDD(sc.parallelize(["some text "])).first().shape
(1,)

This is because PySpark Row is a tuple. Just flatten the input and it'll work fine:

array_rdd = ArrayRDD(my_rdd_of_rows.flatMap(lambda x: x))

To handle None (SQL NULL) values you can add a filter on RDD:

my_rdd_of_rows.flatMap(lambda x: x).filter(lambda x: x is not None)

or drop NULLS befor converting to `RDD

jdbcDf.na.drop(subset=["columnoftexts"])

Upvotes: 1

Related Questions