Reputation: 864
I want to create an empty dataframe out of an existing spark dataframe. I use pyarrow support (enabled in spark conf). When I try to create an empty dataframe out of an empty RDD and the same schema as my existing dataframe I got a java.lang.NegativeArraySizeException. Here is the entire code to reproduce the error
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.enabled", "true") \
.getOrCreate()
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
empty_pandas_df = empty_df.toPandas()
And here is the full stack trace:
/conda_env/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py:2139: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation.
An error occurred while calling o349.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:874)
at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:870)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3293)
at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3287)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply$mcV$sp(PythonRDD.scala:456)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:457)
at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:453)
at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:994)
at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:988)
at org.apache.spark.api.python.PythonServer$$anonfun$11$$anonfun$apply$9.apply(PythonRDD.scala:853)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:853)
at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:852)
at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:908)
warnings.warn(msg)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-18-61602774c141> in <module>
----> 1 empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
2120 _check_dataframe_localize_timestamps
2121 import pyarrow
-> 2122 batches = self._collectAsArrow()
2123 if len(batches) > 0:
2124 table = pyarrow.Table.from_batches(batches)
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in _collectAsArrow(self)
2182 return list(_load_from_socket((port, auth_secret), ArrowStreamSerializer()))
2183 finally:
-> 2184 jsocket_auth_server.getResult() # Join serving thread and raise any exceptions
2185
2186 ##########################################################################################
/conda_env/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/conda_env/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/conda_env/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
The error disappear when I disable pyarrow with
spark.conf.set("spark.sql.execution.arrow.enabled","false")
Is this a know issue with pyspark or related to pyarrow ?
N.B: this error is reproducible with pyspark>=2.4.4 only.
Upvotes: 3
Views: 1827
Reputation: 1052
workaround for the issue to collect RDD and create pandas dataframe from result as below: other issue in your code was using ':' to be replaced to ','
from pyspark.sql import SparkSession
import pyarrow as pa
import pandas as pd
spark = SparkSession.builder.config("spark.sql.execution.arrow.enabled", "true").getOrCreate()
df = spark.createDataFrame(["10", "11", "13"], "string").toDF("age")
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema, verifySchema=True)
empty_pandas_df = empty_df.collect()
empty_pandas_df = pd.DataFrame(empty_pandas_df)
print(empty_pandas_df)
df.show()
output
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/22 11:08:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Empty DataFrame
Columns: []
Index: []
[Stage 2:> (0 + 3) / 3]+---+
|age|
+---+
| 10|
| 11|
| 13|
+---+
Upvotes: 2