Reputation: 1100
Consider there are 2 tables or table references in spark which you want to compare, e.g. to ensure that your backup worked correctly. Is there a possibility to do that remote in spark? Because it's not useful to copy all the data to R using collect()
.
library(sparklyr)
library(dplyr)
library(DBI)
##### create spark connection here
# sc <- spark_connect(<yourcodehere>)
spark_connection(sc)
spark_context(sc)
trees1_tbl <- sdf_copy_to(sc, trees, "trees1")
trees2_tbl <- sdf_copy_to(sc, trees, "trees2")
identical(trees1_tbl, trees2_tbl) # FALSE
identical(collect(trees1_tbl), collect(trees2_tbl)) # TRUE
setequal(trees1_tbl, trees2_tbl) # FALSE
setequal(collect(trees1_tbl), (trees2_tbl)) # TRUE
spark_disconnect(sc)
Would be nice, if dplyr::setequal()
could be used directly.
Upvotes: 6
Views: 886
Reputation: 1100
Thanks @Cosmin for the hint!
First use setdiff()
, which has a method for tbl_lazy
-objects provided by dplyr
(unlike setequal
), count the rows and compare them with 0.
trees1_tbl %>% setdiff(trees2_tbl) %>% sdf_nrow() == 0
## TRUE
Would result in TRUE
if all data from trees1_tbl
is contained in trees2_tbl
.
If they differ, one can leave out the == 0
to get the number of rows missing in trees2_tbl
.
Upvotes: 2
Reputation: 716
I wrote an example of how I think you can do it. Basically, you just have to union both tables, and after that just apply distinct() to the result of union. After distinct() just compare the number of rows of resulted dataframe with the initial number of rows.
>>> rdd = spark.sparkContext.parallelize([("test","test1")])
>>> rdd.collect()
[('test', 'test1')]
>>> df1 = spark.createDataFrame(rdd).toDF("col1","col2")
>>> df1.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+
>>> df2 = spark.createDataFrame(rdd).toDF("col1","col2")
>>> df2.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+
>>> df3 = df1.union(df2)
>>> df3.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
|test|test1|
+----+-----+
>>> df3.distinct().show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+
>>> df1.count()
1
>>> df3.distinct().count()
1
Upvotes: 1
Reputation: 330423
It is just not gonna work. The main point to remember here, is that Spark DataFrames
* are not data containers. There are descriptions of transformations, that will be applied on the data, once pipeline is executed. It means, that result can be different every time you evaluate the data. The only meaningful question you can ask here is if both DataFrames
describes the same execution plan, which is obviously not useful in your case.
So how to compare the data? There is really no universal answer here.
Testing
If it is a part of an unit test collecting data and comparing local objects is the way to go (although please keep in mind that using sets can miss some subtle but common problems).
Production
Outside unit test you can try to check if
This however is very expensive and if feasible might significantly increase the cost of the process. So in practice you might prefer methods which don't provide strict guarantees, but have better performance profile. These will differ depending on the input and output source as well as the failure model (for example file based sources are more reliable than ones using databases or message queues).
In the simplest case you can manually inspect basic invariants, like the number of rows read and written, using Spark web UI. For more advanced monitoring you can implement your own Spark listeners (check for example Spark: how to get the number of written rows?), query listeners, or accumulators, but all this components are not exposed in sparklyr
and will require writing native (Scala or Java) code.
* I refer here to Spark, but using dplyr
with database backend is not that different.
Upvotes: 1