Reputation: 895
Hy, I have a Spark DataFrame and I made some transformation using SQL context, for example, select only two Columns in all data.
df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")
but now I want transform this sqlcontext a pandas dataframe, and I'm using
pddf = df_oraAS.toPandas()
but the output stop here and I need restart the IDE (spyder)
6/01/22 16:04:01 INFO DAGScheduler: Got job 0 (toPandas at <stdin>:1) with 3 output partitions
16/01/22 16:04:01 INFO DAGScheduler: Final stage: ResultStage 0 (toPandas at <stdin>:1)
16/01/22 16:04:01 INFO DAGScheduler: Parents of final stage: List()
16/01/22 16:04:01 INFO DAGScheduler: Missing parents: List()
16/01/22 16:04:01 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1), which has no missing parents
16/01/22 16:04:01 INFO SparkContext: Starting job: toPandas at <stdin>:1
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.4 KB, free 9.4 KB)
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.9 KB, free 14.3 KB)
16/01/22 16:04:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:50877 (size: 4.9 KB, free: 511.1 MB)
16/01/22 16:04:01 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/01/22 16:04:01 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1)
16/01/22 16:04:01 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.
16/01/22 16:04:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 119523958 bytes)
16/01/22 16:04:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 117876401 bytes)
Exception in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.io.ByteArrayOutputStream.grow(Unknown Source)
at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
at java.io.ByteArrayOutputStream.write(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
at java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:200)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:462)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:252)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:247)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:317)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:315)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:315)
at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84)
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:63)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
What I did wrong? thanks
EDIT: more completed: I load the date from Oracle Database (cx_Oracle) and put the data in a pandas dataframe
df_ora = pd.read_sql('SELECT* FROM DEC_CLIENTES', con=connection)
Next I created a sparkContext to manipulate the dataframe
sqlContext = SQLContext(sc)
df_oraAS = sqlContext.createDataFrame(df_ora)
df_oraAS.registerTempTable("df_oraAS")
df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")
and I want convert again from sqlcontext to a pandas dataframe
pddf = df_oraAS.toPandas()
Upvotes: 0
Views: 5515
Reputation: 737
Your pd.read_sql call reads the full database into a pandas dataframe. This is local to the driver. When you call createDataFrame, it then creates a Spark DataFrame from your python pandas dataframe, which results in a really large task size (see the log line below):
16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.
Even though you are selecting only 5 rows, you're actually first loading the full database into memory using that pd.read_sql call. If you're reading from an Oracle SQL database, why don't you use the spark JDBC driver and then perform your select filters and then call toPandas?
What your code is doing is reading the whole DB to pandas, writing to Spark, filtering and reading back to Pandas.
Upvotes: 0
Reputation: 330393
toPandas
is basically collect
in disguise. An output is a local Pandas DataFrame. If data doesn't fit into driver memory it will simply fail hence the error you see.
Upvotes: 3