Jay Prajapati
Jay Prajapati

Reputation: 382

spark giving incorrect output for some value and correct output for some value

I am working with Apache spark in spark-shell when I perform SQL query I get different outputs

I have the spark cluster having one master and one slave node. on the master, there is one worker and on the slave node, there is one worker. so I have total two worker nodes.

now when I am persisting some data and then performing some filters onto that persisted data each time I got two different outputs but these two different outputs are distinct they are not changing every time, in short, i am getting two different output for one SQL query.

I have the MySQL database on both master and slave node in which I have a table having 50000 records into it on the slave and on master also there are 50000 records this 50k + 50k records are different not the same.

so when I am querying the result becomes different. here is my code that I am trying and also a screenshot of output.

spark-shell --conf spark.sql.warehouse.dir=C:\spark-warehouse --master spark://192.168.0.31:7077

val jdbcDF = spark.read.format("jdbc").options( Map("url" ->  "jdbc:mysql://localhost:3306/cmanalytics?zeroDateTimeBehavior=convertToNull&user=root&password=root", "dbtable" -> "cmanalytics.demo_call", "fetchSize" -> "1000", "partitionColumn" -> "newpartition", "lowerBound" -> "0", "upperBound" -> "4", "numPartitions" -> "4")).load()

jdbcDF.createOrReplaceTempView("demo_call")

val sqlDF = sql("select * from demo_call").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

sqlDF.show()

val d = sqlDF.filter(sqlDF("campaign_id")===141).groupBy("classification_id").count

d.count

and the screenshot of output is

enter image description here

can any one help to solve this problem?

Thanks

Upvotes: 0

Views: 1966

Answers (1)

Glennie Helles Sindholt
Glennie Helles Sindholt

Reputation: 13154

As you are probably aware, Spark does lazy evaluation and your problem here is simply that you assume that show will force evaluation of your DataFrame, but that assumption is wrong. show has no such guarantees and may very well only evaluate a subset of rows. To force evaluation of the entire DataFrame you would need to call an action like, say, count first.

val sqlDF = sql("sql("select count(*) from demo_call where classification_id = 141").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

sqlDF.count // to force evaluation

Every call to show should give you the same result from now on

Upvotes: 2

Related Questions