Reputation: 77
I'm trying to print a dataframe by looping through each row of that dataframe. I then used the map() transformation to the dataframe's RDD to apply lambda function and tried converting it back into a dataframe. I'm running this program on Jupyter Notebook through conda env. My guess is there is some problem in applying the rlike() function because the mapping works fine without that. Here's the code below :
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate();
data = [("Sunday",20000), ("734",1000), ("Fruday",5001),("Tuesday",17000)]
rdd=spark.sparkContext.parallelize(data)
columns = ["day","users_count"]
df = spark.createDataFrame(data=data, schema=columns)
def func1(x):
day = x.day
users_count = x.users_count
cond1 = F.when(F.col(day).rlike('^(Sun|Mon|Tues|Wednes|Thurs|Fri|Satur)day$'),"Success").otherwise("Error")
return (day, users_count, id, cond1)
rdd2 = df.rdd.map(lambda x: func1(x))
Columns = ["day","users_count","cond1"]
dffinal = rdd2.toDF(Columns)
dffinal.show()
Here is the error :
Py4JJavaError Traceback (most recent call last)
Input In [7], in <cell line: 1>()
----> 1 dffinal = rdd2.toDF(Columns)
2 dffinal.show()
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:66, in _monkey_patch_RDD.<locals>.toDF(self, schema, sampleRatio)
39 def toDF(self, schema=None, sampleRatio=None):
40 """
41 Converts current :class:`RDD` into a :class:`DataFrame`
42
(...)
64 [Row(name='Alice', age=1)]
65 """
---> 66 return sparkSession.createDataFrame(self, schema, sampleRatio)
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:675, in SparkSession.createDataFrame(self, data, schema, samplingRatio, verifySchema)
671 if has_pandas and isinstance(data, pandas.DataFrame):
672 # Create a DataFrame from pandas DataFrame.
673 return super(SparkSession, self).createDataFrame(
674 data, schema, samplingRatio, verifySchema)
--> 675 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:698, in SparkSession._create_dataframe(self, data, schema, samplingRatio, verifySchema)
695 prepare = lambda obj: obj
697 if isinstance(data, RDD):
--> 698 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
699 else:
700 rdd, schema = self._createFromLocal(map(prepare, data), schema)
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:486, in SparkSession._createFromRDD(self, rdd, schema, samplingRatio)
482 """
483 Create an RDD for DataFrame from an existing RDD, returns the RDD and schema.
484 """
485 if schema is None or isinstance(schema, (list, tuple)):
--> 486 struct = self._inferSchema(rdd, samplingRatio, names=schema)
487 converter = _create_converter(struct)
488 rdd = rdd.map(converter)
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:460, in SparkSession._inferSchema(self, rdd, samplingRatio, names)
444 def _inferSchema(self, rdd, samplingRatio=None, names=None):
445 """
446 Infer schema from an RDD of Row, dict, or tuple.
447
(...)
458 :class:`pyspark.sql.types.StructType`
459 """
--> 460 first = rdd.first()
461 if not first:
462 raise ValueError("The first row in RDD is empty, "
463 "can not infer schema")
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\rdd.py:1586, in RDD.first(self)
1573 def first(self):
1574 """
1575 Return the first element in this RDD.
1576
(...)
1584 ValueError: RDD is empty
1585 """
-> 1586 rs = self.take(1)
1587 if rs:
1588 return rs[0]
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\rdd.py:1566, in RDD.take(self, num)
1563 taken += 1
1565 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1566 res = self.context.runJob(self, takeUpToNumLeft, p)
1568 items += res
1569 partsScanned += numPartsToTry
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\context.py:1233, in SparkContext.runJob(self, rdd, partitionFunc, partitions, allowLocal)
1229 # Implementation note: This is implemented as a mapPartitions followed
1230 # by runJob() in order to avoid having to pass a Python lambda into
1231 # SparkContext#runJob.
1232 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1233 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
1234 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\py4j\java_gateway.py:1309, in JavaMember.__call__(self, *args)
1303 command = proto.CALL_COMMAND_NAME +\
1304 self.command_header +\
1305 args_command +\
1306 proto.END_COMMAND_PART
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1312 for temp_arg in temp_args:
1313 temp_arg._detach()
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (skdjsjkdksad.com executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\rdd.py", line 1561, in takeUpToNumLeft
except StopIteration:
File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
return f(*args, **kwargs)
File "D:\Users\AJ\AppData\Local\Temp\1\ipykernel_4312\679190413.py", line -1, in <lambda>
File "D:\Users\AJ\AppData\Local\Temp\1\ipykernel_4312\679190413.py", line 5, in func1
File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 106, in col
return _invoke_function("col", col)
File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 57, in _invoke_function
jf = _get_get_jvm_function(name, SparkContext._active_spark_context)
File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 49, in _get_get_jvm_function
return getattr(sc._jvm.functions, name)
AttributeError: 'NoneType' object has no attribute '_jvm'
NOTE: The map() and RDD transformation are a must. If the problem is in rlike(), please suggest an alternative to check RegEx and print results accordingly. Please help me in mitigating this error, thanks in advance!!
Upvotes: 0
Views: 976
Reputation: 6644
AFAIK, one can't use pyspark sql function within an rdd.map()
, it should be pure python implementation, as the sql functions work on dataframes.
As you want to use RDD transformation, you can solve your problem using python's re
module.
Here's an example.
def func1(x):
import re
# only keeping `day` for this example
day = x.day
if re.match('^(Sun|Mon|Tues|Wednes|Thurs|Fri|Satur)day$', day):
cond1 = 'Success'
else:
cond1 = 'Error'
return (day, cond1)
spark.sparkContext.parallelize([('Sunday',), ('Sun',)]).toDF(['day']). \
rdd. \
map(lambda x: func1(x)). \
toDF(['day', 'cond1']). \
show()
# +------+-------+
# | day| cond1|
# +------+-------+
# |Sunday|Success|
# | Sun| Error|
# +------+-------+
Upvotes: 1