Reputation: 382
I am working with Apache spark in spark-shell when I perform SQL query I get different outputs
I have the spark cluster having one master and one slave node. on the master, there is one worker and on the slave node, there is one worker. so I have total two worker nodes.
now when I am persisting some data and then performing some filters onto that persisted data each time I got two different outputs but these two different outputs are distinct they are not changing every time, in short, i am getting two different output for one SQL query.
I have the MySQL database on both master and slave node in which I have a table having 50000 records into it on the slave and on master also there are 50000 records this 50k + 50k records are different not the same.
so when I am querying the result becomes different. here is my code that I am trying and also a screenshot of output.
spark-shell --conf spark.sql.warehouse.dir=C:\spark-warehouse --master spark://192.168.0.31:7077
val jdbcDF = spark.read.format("jdbc").options( Map("url" -> "jdbc:mysql://localhost:3306/cmanalytics?zeroDateTimeBehavior=convertToNull&user=root&password=root", "dbtable" -> "cmanalytics.demo_call", "fetchSize" -> "1000", "partitionColumn" -> "newpartition", "lowerBound" -> "0", "upperBound" -> "4", "numPartitions" -> "4")).load()
jdbcDF.createOrReplaceTempView("demo_call")
val sqlDF = sql("select * from demo_call").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
sqlDF.show()
val d = sqlDF.filter(sqlDF("campaign_id")===141).groupBy("classification_id").count
d.count
and the screenshot of output is
can any one help to solve this problem?
Thanks
Upvotes: 0
Views: 1966
Reputation: 13154
As you are probably aware, Spark does lazy evaluation and your problem here is simply that you assume that show
will force evaluation of your DataFrame
, but that assumption is wrong. show
has no such guarantees and may very well only evaluate a subset of rows. To force evaluation of the entire DataFrame
you would need to call an action like, say, count
first.
val sqlDF = sql("sql("select count(*) from demo_call where classification_id = 141").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
sqlDF.count // to force evaluation
Every call to show
should give you the same result from now on
Upvotes: 2