Willie S
Willie S

Reputation: 83

IBM CDC - Data Replication for Kafka (customization on LiveAudit)

We are using IBM AS400 as a source table. What I would like to do is:

  1. use descriptive column_name (like "Closing Price") instead of system_column_name ("CPHKD001")
  2. convert timestamp format "2020-03-10 18:25:31.123456000000" to "2020-03-10T18:25:31.123Z"

I think I should deal with the file KcopLiveAuditSingleRowIntegrated.java. For the timestamp, it is likely to have the solution there, but I don't think I can find an answer there.

Upvotes: 0

Views: 1147

Answers (1)

Shawn
Shawn

Reputation: 288

The KCOP infrastructure allows you programatic control over the Kafka producer record. For each operation on the source you may determine how many messages are written to Kafka, which topics they go too, and what the key and value bytes are.

In the KCOP, you can redefine the timestamp to be any format you like with java. This is because each call to createProducerRecords provides you with a suggested Avro generic record that has a schema which allows you to identify the table, the columns, and their types.

As per the Avro Generic Record behaviour, documented in the open source Avro documentation, you could then select the relevant values you were interested in and create a new Avro generic record with the transformed values. Then pass this new Avro Generic Record onto the rest of the KCOP.

Note that the audit KCOPs include code that does this, the single row Avro audit KCOP is a good example. You can find the code for all of our integrated KCOPs in the samples.jar file of your product install.

The single row Avro audit KCOP takes the before and after Avro generic records, and produces a new Avro Generic record that is a composite of them. If, while you were copying values, you checked the type of column, you could identify timestamp ones and alter the value you were placing into the new composite Avro generic record.

However, we do offer some flexibility of pre-formatting, as per the following link

https://www.ibm.com/support/knowledgecenter/en/SSTRGZ_11.4.0/com.ibm.cdcdoc.mcadminguide.doc/refs/mirror_timestamp_write_format_Kafka.html

Note to employ this optional pre-formatter you need to "To enable this parameter, you must set the mirror_write_format parameter to DYNAMIC."

You will note that it is possible to set a datastore parameter that will allow typically desired customizations of the timestamp data your KCOP receives.

"AVRO (default)

Formats TIMESTAMP column values as the number of microseconds from the UNIX epoch, 1 January 1970 (ISO calendar). For the purpose of this calculation, the timestamps are assumed to be UTC.

For TIMESTAMP columns with precision greater than microseconds, the values are formatted as strings. You can specify the string format by using the timestamp_format datastore parameter. The default value of timestamp_format is yyyy-MM-dd HH:mm:ss.SSSNNNnnnppp.

TIMESTAMP WITH TIMEZONE column values are formatted as strings. You can specify the string format by using the timestamp_tz_format datastore parameter. The default value of timestamp_tz_format is yyyy-MM-dd HH:mm:ss.SSSNNNnnnppp T."

If this does not give you the exact format desired, then you can choose the closest format and modify the value of timestamp columns in the KCOP.

Modifying the KCOP is done as follows...

https://www.ibm.com/support/knowledgecenter/en/SSTRGZ_11.4.0/com.ibm.cdcdoc.mcadminguide.doc/tasks/createkafkacop.html

The question regarding column name is the same. If the KCOP employed is using an Avro schema registry, then you can programatically change the schema registered with the schema registry. If it is a JSON one, you can alter the JSON string after it is created, or make a change before invoking the Avro to JSON method.

Alternatively, if your source supports derived columns, I believe in the Management Console you can define a derived column with a new name that is simply the value of the original column. My recollection then is that you can deselect the original column and thus your name will be changed.

Upvotes: 3

Related Questions