Radhwane Chebaane
Radhwane Chebaane

Reputation: 864

Spark refuse to create empty dataframe when using pyarrow

I want to create an empty dataframe out of an existing spark dataframe. I use pyarrow support (enabled in spark conf). When I try to create an empty dataframe out of an empty RDD and the same schema as my existing dataframe I got a java.lang.NegativeArraySizeException. Here is the entire code to reproduce the error

spark = SparkSession.builder \
                    .config("spark.sql.execution.arrow.enabled", "true") \
                    .getOrCreate()
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
empty_pandas_df = empty_df.toPandas()

And here is the full stack trace:

/conda_env/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py:2139: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation.
  An error occurred while calling o349.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
    at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:874)
    at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:870)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
    at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3293)
    at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3287)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply$mcV$sp(PythonRDD.scala:456)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:457)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:453)
    at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:994)
    at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:988)
    at org.apache.spark.api.python.PythonServer$$anonfun$11$$anonfun$apply$9.apply(PythonRDD.scala:853)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:853)
    at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:852)
    at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:908)

  warnings.warn(msg)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-61602774c141> in <module>
----> 1 empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
   2120                         _check_dataframe_localize_timestamps
   2121                     import pyarrow
-> 2122                     batches = self._collectAsArrow()
   2123                     if len(batches) > 0:
   2124                         table = pyarrow.Table.from_batches(batches)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in _collectAsArrow(self)
   2182                 return list(_load_from_socket((port, auth_secret), ArrowStreamSerializer()))
   2183             finally:
-> 2184                 jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
   2185
   2186     ##########################################################################################

/conda_env/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258
   1259         for temp_arg in temp_args:

/conda_env/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/conda_env/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

The error disappear when I disable pyarrow with

spark.conf.set("spark.sql.execution.arrow.enabled","false")

Is this a know issue with pyspark or related to pyarrow ?

N.B: this error is reproducible with pyspark>=2.4.4 only.

Upvotes: 3

Views: 1827

Answers (1)

oetzi
oetzi

Reputation: 1052

workaround for the issue to collect RDD and create pandas dataframe from result as below: other issue in your code was using ':' to be replaced to ','

from pyspark.sql import SparkSession
import pyarrow as pa
import pandas as pd


spark = SparkSession.builder.config("spark.sql.execution.arrow.enabled", "true").getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], "string").toDF("age")

empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema, verifySchema=True)
empty_pandas_df = empty_df.collect()
empty_pandas_df = pd.DataFrame(empty_pandas_df)

print(empty_pandas_df)
df.show()

output

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/22 11:08:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Empty DataFrame
Columns: []
Index: []
[Stage 2:>                                                          (0 + 3) / 3]+---+
|age|
                                                                                +---+
| 10|
| 11|
| 13|
+---+

Upvotes: 2

Related Questions