Reputation: 45
My buckets are:
Also, I have a kafka couchbase connector that pushs data from MyEventingBucket to kafka topic.
When we had a single data center, there wasn't any problem. Now, we have three data centers. We replicate our data with XDCR between data centers and we work as active-active. So, write requests can be from any data center.
When data is replicated on other data centers, the eventing service works on all data centers, and the same data is pushed three-time (because we have three data centers) on Kafka with Kafka connector.
How can we avoid pushing duplicate data o Kafka?
Ps: Of course, we can run an eventing service or Kafka connector in only one data center. So, we can publish data on Kafka just once. But this is not a good solution. Because we will be affected when a problem occurs in this data center. This was the main reason of using multi data center.
Upvotes: 3
Views: 801
Reputation: 586
Obviously in a perfect world XDCR would just work with Eventing on the replicated bucket.
I put together an Eventing based work around to overcome issues in an active / active XDCR configuration - it is a bit complex so I thought working code would be best. This is one way to perform the solution that Matthew Groves alluded to.
Documents are tagged and you have a shared via XDCR "cluster_state" document (see comments in the code) to coordinated which cluster is "primary" as you only want one cluster to fire the Eventing function.
I will give the code for an Eventing function "xcdr_supression_700" for version 7.0.0 with a minor change it will also work for 6.6.5.
Note, newer Couchbase releases have more functionality WRT Eventing and allow the Eventing function to be simplified for example:
Setting up XDCR and Eventing
The following code will successfully suppress all extra Eventing mutations on a bucket called "common" or in 7.0.X a keyspace of "common._default._default" with an active/active XDCR replication.
The example is for two (2) clusters but may be extended. This code is 7.0 specific (I can supply a 6.5.1 variant if needed - please DM me)
PS : The only thing it does is log a message (in the cluster that is processing the function). You can just set up two one node clusters, I named my clusters "couch01" and "couch03". Pretty easy to setup and test to ensure that mutations in your bucket are only processed once across two clusters with active/active XDCR
The Eventing Function is generic WRT the JavaScript BUT it does require a different constant alias on each cluster, see the comment just under the OnUpdate(doc,meta) entry point.
/*
PURPOSE suppress duplicate mutations by Eventing when we use an Active/Active XDCR setup
Make two clusters "couch01" and "couch03" each with bucket "common" (if 7.0.0 keyspace "common._default._default")
On cluster "couch01", setup XDCR replication of common from "couch01" => "couch03"
On cluster "couch03", setup XDCR replication of common from "couch03" => "couch01"
This is an active / active XDCR configuration.
We process all documents in "common" except those with "type": "cluster_state" the documents can contain anything
{
"data": "...something..."
}
We add "owner": "cluster" to every document, in this sample I have two clusters "couch01" and "couch03"
We add "crc": "crc" to every document, in this sample I have two clusters "couch01" and "couch03"
If either the "owner" or "crc" property does not exist we will add the properties ourselves to the document
{
"data": "...something...",
"owner": "couch01",
"crc": "a63a0af9428f6d2d"
}
A document must exist with KEY "cluster_state" when things are perfect it looks lke the following:
{
"type": "cluster_state",
"ci_offline": {"couch01": false, "couch03": false },
"ci_backups": {"couch03": "couch01", "couch01": "couch03" }
}
Note ci_offline is an indicator that the cluster is down, for example is a document has an "owner": "couch01"
and "ci_offline": {"couch01": true, "couch03": false } then the cluster "couch02" will take ownership and the
documents will be updated accordingly. An external process (ping/verify CB is running, etc.) runs every minute
or so and then updates the "cluster_state" if a change in cluster state occurs, however prior to updating
ci_offline to "true" the eventing Function on that cluster should either be undeployed or paused. In addition
re-enabeling the cluster setting the flag ci_offline to "false" must be done before the Function is resumed or
re-deployed.
The ci_backups tells which cluster is a backup for which cluster, pretty simple for two clusters.
If you have timers when the timer fires you MUST check if the doc.owner is correct if not ignore the timer, i.e.
do nothing. In addition, when you "take ownership" you will need to create a new timer. Finally, all timers should
have an id such that if we ping pong ci_offline that the timer will be overwritten, this implies 6.6.0+ else you
need do even to more work to suppress orphaned timers.
The 'near' identical Function will be deployed on both clusters "couch01" and "couch02" make sure you have
a constant binding for 7.0.0 THIS_CLUSTER "couch01" or THIS_CLUSTER "couch02", or for 6.6.0 uncomment the
appropriate var statement at the top of OnUpdate(). Next you should have a bucket binding of src_bkt to
keyspace "common._default._default" for 7.0.0 or to bucket "common" in 6.6.0 in mode read+write.
*/
function OnUpdate(doc, meta) {
// ********************************
// MUST MATCH THE CLUSTER AND ALSO THE DOC "cluster_state"
// *********
// var THIS_CLUSTER = "couch01"; // this could be a constant binding in 7.0.0, in 6.X we uncomment one of these to match he cluster name
// var THIS_CLUSTER = "couch03"; // this could be a constant binding in 7.0.0, in 6.X we uncomment one of these to match he cluster name
// ********************************
if (doc.type === "cluster_state") return;
var cs = src_bkt["cluster_state"]; // extra bucket op read the state of the clusters
if (cs.ci_offline[THIS_CLUSTER] === true) return; // this cluster is marked offline do nothing.
// ^^^^^^^^
// IMPORTANT: when an external process marks the cs.ci_offline[THIS_CLUSTER] back to false (as
// in this cluster becomes online) it is assumed that the Eventing function was undeployed
// (or was paused) when it was set "true" and will be redeployed or resumed AFTER it is set "false".
// This order of this procedure is very important else mutations will be lost.
var orig_owner = doc.owner;
var fallback_cluster = cs.ci_backups[THIS_CLUSTER]; // this cluster is the fallback for the fallback_cluster
/*
if (!doc.crc && !doc.owner) {
doc.owner = fallback_cluster;
src_bkt[meta.id] = doc;
return; // the fallback cluster NOT THIS CLUSTER is now the owner, the fallback
// cluster will then add the crc property, as we just made a mutation in that
// cluster via XDCR
}
*/
if (!doc.crc && !doc.owner) {
doc.owner = THIS_CLUSTER;
orig_owner = doc.owner;
// use CAS to avoid a potential 'race' between clusters
var result = couchbase.replace(src_bkt,meta,doc);
if (result.success) {
// log('success adv. replace: result',result);
} else {
// log('lost to other cluster failure adv. replace: id',meta.id,'result',result);
// re-read
doc = src_bkt[meta.id];
orig_owner = doc.owner;
}
}
// logic to take over a failed clusters data, requires updating "cluster_state"
if (orig_owner !== THIS_CLUSTER) {
if ( orig_owner === fallback_cluster && cs.ci_offline[fallback_cluster] === true) {
doc.owner = THIS_CLUSTER; // Here update the doc's owner
src_bkt[meta.id] = doc; // This cluster now will now process this doc's mutations.
} else {
return; // this isn't the fallback cluster.
}
}
var crc_changed = false;
if (!doc.crc) {
var cur_owner = doc.owner;
delete doc.owner;
doc.crc = crc64(doc); // crc DOES NOT include doc.owner && doc.crc
doc.owner = cur_owner;
crc_changed = true;
} else {
var cur_owner = doc.owner;
var cur_crc = doc.crc;
delete doc.owner;
delete doc.crc;
doc.crc = crc64(doc); // crc DOES NOT include doc.owner && doc.crc
doc.owner = cur_owner;
if (cur_crc != doc.crc) {
crc_changed = true;
} else {
return;
}
}
if (crc_changed) {
// update the data with the new crc, to suppress duplicate XDCR processing, and re-deploy form Everything
// we could use CAS here but at this point only one cluster will update the doc, so we can not have races.
src_bkt[meta.id] = doc;
}
// This is the action on a fresh unprocessed mutation, here it is just a log message.
log("A. Doc created/updated", meta.id, 'THIS_CLUSTER', THIS_CLUSTER, 'offline', cs.ci_offline[THIS_CLUSTER],
'orig_owner', orig_owner, 'owner', doc.owner, 'crc_changed', crc_changed,doc.crc);
}
Make sure you have two buckets prior to importing "xcdr_supression_700.json" or "xcdr_supression_660.json"
The 1st cluster's (cluster01) setup play attention to the constant alias as you will need to ensure you have THIS_CLUSTER set to "couch01"
The 2nd cluster's (cluster03) setup play attention to the constant alias as you will need to ensure you have THIS_CLUSTER set to "couch03"
Now if you are running version 6.6.5 you do not have Constant Alias bindings (which act as globals in your Eventing function's JavaScript) thus the requirement to uncomment the appropriate variable example for cluster couch01.
function OnUpdate(doc, meta) {
// ********************************
// MUST MATCH THE CLUSTER AND ALSO THE DOC "cluster_state"
// *********
var THIS_CLUSTER = "couch01"; // this could be a constant binding in 7.0.0, in 6.X we uncomment one of these to match he cluster name
// var THIS_CLUSTER = "couch03"; // this could be a constant binding in 7.0.0, in 6.X we uncomment one of these to match he cluster name
// ********************************
// .... code removed (see prior code example) ....
}
Some comments/details:
You may wonder why we need to use CRC function and store it in the document undergoing XDCR.
The CRC function, crc64(), built into Eventing is used to detect a non-change or a mutation possible due to a XDCR document update. The use of CRC and the properties "owner" and "crc" allow a) the determination of the owning cluster and b) the suppression of the Eventing function when the mutation is due to an XDCR cluster to cluster copy based on the "active" cluster.
Note when updating CRC in the document as part of timer function, the OnUpdate(doc,meta) entry point of the Eventing function will be triggered again. If you have timers when the timer fires you MUST check if the doc.owner is correct if it is not you ignore the timer, i.e. do nothing. In addition, when you "take ownership" you will need to create a new timer. Finally, all timers should have an id such that if we ping pong cluster_state.ci_offline that the timer will be overwritten, this implies you must use version 6.6.0+ else you need do even to more work to determine when a timer fires that the timer is orphaned and then suppress any action. Be very careful in older Couchbase versions because in 6.5 you cannot overwrite a timer by its id and all timer ids should be unique.
Any mutation made to the source bucket by an Eventing function is suppressed or not seen by that Eventing function whether a document is mutated by the main JavaScript code to by a timer callback. Yet these mutations will be seen via XCDR active/active replication in the other cluster.
As to using Eventing timers pay attention to the comment, I put in the prior paragraph about overwriting and suppressing especially if you insist on using Couchbase-server 6.5 which is getting a bit long of tooth so to speak.
Concerning the responsibility to update the cluster_state document, it is envisioned that this would be a periodic script outside of Couchbase run in a Linux cron that does "aliveness" tests with a manual override. Be careful here as you can easily go "split brain" due to a network partitioning issue.
A comment about the cluster_state, this document is subject to XCDR it is a persistent document that the active/active replication makes appear to be a single inter-cluster document. If a cluster is "down" changing it on the live cluster will result in it replicating when the "down" cluster is recovered.
Deploy/Undeploy will either process all current documents via the DCP mutation stream all over again (feed boundary == Everything) -or- only process items or mutations occurring after the time of deployment (feed boundary == From now). So you need careful coding in the first case to prevent acting on the same document twice and you will miss mutations in the second case.
It is best to design our Eventing Functions to be idempotent, where there is no additional effect if it is called more than once with the same input parameters. This can be achieved by storing state in the documents that are processed so you never reprocess them on a re-deploy.
Pause/Resume Invoking Pause will create a check point and shutdown the Eventing processing. The on a Resume the DCP stream will start form the checkpoint (for each vBucket) you will not miss a single mutation subject to DCP dedup. Furthermore all "active" timers that would have fired during the "pause" will fire as soon as possible (typically within the next 7 second timer scan interval).
Best
Jon Strabala Principal Product Manager - Couchbase
Upvotes: 3