Reputation: 231
I'm trying to build a Streaming Dataflow Job which read events from Pub/Sub and write them into BigQuery.
According to the documentation, Dataflow can detect duplicate messages delivery if a Record ID is used (see: https://cloud.google.com/dataflow/model/pubsub-io#using-record-ids)
But even using this Record ID, I still have some duplicates (around 0.0002%).
Did I miss something ?
EDIT:
I use Spotify Async PubSub Client to publish messages with the following snipplet:
Message
.builder()
.data(new String(Base64.encodeBase64(json.getBytes())))
.attributes("myid", id, "mytimestamp", timestamp.toString)
.build()
Then I use Spotify scio to read the message from pub/sub and save it to DataFlow:
val input = sc.withName("ReadFromSubscription")
.pubsubSubscription(subscriptionName, "myid", "mytimestamp")
input
.withName("FixedWindow")
.withFixedWindows(windowSize) // apply windowing logic
.toWindowed // convert to WindowedSCollection
//
.withName("ParseJson")
.map { wv =>
wv.copy(value = TableRow(
"message_id" -> (Json.parse(wv.value) \ "id").as[String],
"message" -> wv.value)
)
}
//
.toSCollection // convert back to normal SCollection
//
.withName("SaveToBigQuery")
.saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND)
The Window size is 1 minute.
After only few seconds injecting messages I already have duplicates in BigQuery.
I use this query to count duplicates:
SELECT
COUNT(message_id) AS TOTAL,
COUNT(DISTINCT message_id) AS DISTINCT_TOTAL
FROM my_dataset.my_table
//returning 273666 273564
And this one to look at them:
SELECT *
FROM my_dataset.my_table
WHERE message_id IN (
SELECT message_id
FROM my_dataset.my_table
GROUP BY message_id
HAVING COUNT(*) > 1
) ORDER BY message_id
//returning for instance:
row|id | processed_at | processed_at_epoch
1 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410367 { ...json1... }
2 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410368 { ...json1... }
3 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973907 { ...json2... }
4 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973741 { ...json2... }
5 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:09:10 UTC 1486044550489 { ...json3... }
6 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:08:52 UTC 1486044532680 { ...json3... }
Upvotes: 4
Views: 4375
Reputation: 6130
The BigQuery documentation states that there may be rare cases where duplicates arrive:
You may want to try the instructions for manually removing duplicates. This will also allow you to see the insertID
that was used with each row to determine if the problem was on the Dataflow side (generating different insertID
s for the same record) or on the BigQuery side (failing to deduplicate rows based on their insertID
).
Upvotes: 2