Kardu
Kardu

Reputation: 895

Apache Spark - sqlContext.sql to pandas

Hy, I have a Spark DataFrame and I made some transformation using SQL context, for example, select only two Columns in all data.

df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")

but now I want transform this sqlcontext a pandas dataframe, and I'm using

pddf = df_oraAS.toPandas()

but the output stop here and I need restart the IDE (spyder)

6/01/22 16:04:01 INFO DAGScheduler: Got job 0 (toPandas at <stdin>:1) with 3 output partitions
16/01/22 16:04:01 INFO DAGScheduler: Final stage: ResultStage 0 (toPandas at <stdin>:1)
16/01/22 16:04:01 INFO DAGScheduler: Parents of final stage: List()
16/01/22 16:04:01 INFO DAGScheduler: Missing parents: List()
16/01/22 16:04:01 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1), which has no missing parents
16/01/22 16:04:01 INFO SparkContext: Starting job: toPandas at <stdin>:1
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.4 KB, free 9.4 KB)
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.9 KB, free 14.3 KB)
16/01/22 16:04:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:50877 (size: 4.9 KB, free: 511.1 MB)
16/01/22 16:04:01 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/01/22 16:04:01 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1)
16/01/22 16:04:01 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.
16/01/22 16:04:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 119523958 bytes)
16/01/22 16:04:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 117876401 bytes)
Exception in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:200)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:462)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:252)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:247)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:317)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:315)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:315)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:63)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

What I did wrong? thanks

EDIT: more completed: I load the date from Oracle Database (cx_Oracle) and put the data in a pandas dataframe

df_ora = pd.read_sql('SELECT* FROM DEC_CLIENTES', con=connection) 

Next I created a sparkContext to manipulate the dataframe

sqlContext = SQLContext(sc)

df_oraAS = sqlContext.createDataFrame(df_ora)

df_oraAS.registerTempTable("df_oraAS")

df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")

and I want convert again from sqlcontext to a pandas dataframe

 pddf = df_oraAS.toPandas() 

Upvotes: 0

Views: 5515

Answers (2)

Hamel Kothari
Hamel Kothari

Reputation: 737

Your pd.read_sql call reads the full database into a pandas dataframe. This is local to the driver. When you call createDataFrame, it then creates a Spark DataFrame from your python pandas dataframe, which results in a really large task size (see the log line below):

16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.

Even though you are selecting only 5 rows, you're actually first loading the full database into memory using that pd.read_sql call. If you're reading from an Oracle SQL database, why don't you use the spark JDBC driver and then perform your select filters and then call toPandas?

What your code is doing is reading the whole DB to pandas, writing to Spark, filtering and reading back to Pandas.

Upvotes: 0

zero323
zero323

Reputation: 330393

toPandas is basically collect in disguise. An output is a local Pandas DataFrame. If data doesn't fit into driver memory it will simply fail hence the error you see.

Upvotes: 3

Related Questions