Henrique Viana
Henrique Viana

Reputation: 1

Issue with Debezium Snapshot: DateTimeParseException in PostgreSQL Connector

I'm encountering an issue with Debezium version 2.3.1 while processing a snapshot from a PostgreSQL database. The error occurs during the snapshot phase and seems to be related to parsing a date/time value. The specific error message is:

Caused by: org.apache.kafka.connect.errors.ConnectException:     
java.time.format.DateTimeParseException: Text 'f' could not be parsed at index 0
        at io.debezium.connector.postgresql.connection.DateTimeFormat$ISODateTimeFormat.format(DateTimeFormat.java:166)
        at io.debezium.connector.postgresql.connection.DateTimeFormat$ISODateTimeFormat.timestampToInstant(DateTimeFormat.java:172)
        at io.debezium.connector.postgresql.connection.AbstractColumnValue.asInstant(AbstractColumnValue.java:81)
        at io.debezium.connector.postgresql.connection.ReplicationMessageColumnValueResolver.resolveValue(ReplicationMessageColumnValueResolver.java:110)
        at io.debezium.connector.postgresql.connection.pgoutput.PgOutputReplicationMessage.getValue(PgOutputReplicationMessage.java:92)
        at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder$1.getValue(PgOutputMessageDecoder.java:748)
        at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.columnValues(PostgresChangeRecordEmitter.java:179)
        at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.getNewColumnValues(PostgresChangeRecordEmitter.java:125)
        at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
        at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
        at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:94)
        at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:296)
        ... 17 more
    Caused by: java.time.format.DateTimeParseException: Text 'f' could not be parsed at index 0
        at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2046)
        at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1874)
        at io.debezium.connector.postgresql.connection.DateTimeFormat$ISODateTimeFormat.lambda$timestampToInstant$3(DateTimeFormat.java:172)
        at io.debezium.connector.postgresql.connection.DateTimeFormat$ISODateTimeFormat.format(DateTimeFormat.java:162)

This issue arises when using a table configured for incremental snapshots. Here is an example of the connector configuration I’m using:

*

*{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "connector.displayName": "PostgreSQL",
  "database.user": "docker",
  "database.dbname": "exampledb",
  "transforms": "unwrap",
  "database.server.name": "localhost",
  "heartbeat.interval.ms": "60000",
  "database.port": "5432",
  "plugin.name": "pgoutput",
  "slot.max.retries": "10",
  "schema.include.list": "public",
  "slot.retry.delay.ms": "15000",
  "heartbeat.action.query": "INSERT INTO public.debezium_heartbeat VALUES ('debezium', now())",
  "decimal.handling.mode": "string",
  "database.hostname": "postgres",
  "database.password": "docker",
  "transforms.unwrap.drop.tombstones": "false",
  "signal.data.collection": "public.debezium_signal",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "table.include.list": "public.table, public.debezium_signal",
  "max.batche.size": "65000",
  "max.queue.size": "275000",
  "incremental.snapshot.chunk.size": "6500",
  "connector.id": "postgres",
  "topic.prefix": "db-prod"
}**

The error seems related to a date/time field in the snapshot data, as indicated by the DateTimeParseException with the text 'f', suggesting a potential issue with date/time formatting or parsing.

DDL for my table:

CREATE TABLE IF NOT EXISTS public.table
(
    id bigint NOT NULL DEFAULT nextval('table_id_seq'::regclass),
    federal_tax_id character varying(255) COLLATE pg_catalog."default",
    external_id character varying(255) COLLATE pg_catalog."default",
    first_name character varying(255) COLLATE pg_catalog."default",
    last_name character varying(255) COLLATE pg_catalog."default",
    email character varying(255) COLLATE pg_catalog."default",
    phone character varying(255) COLLATE pg_catalog."default",
    cellphone character varying(255) COLLATE pg_catalog."default",
    address_street character varying(255) COLLATE pg_catalog."default",
    address_street_number character varying(255) COLLATE pg_catalog."default",
    address_complement character varying(255) COLLATE pg_catalog."default",
    address_city_district character varying(255) COLLATE pg_catalog."default",
    address_post_code character varying(255) COLLATE pg_catalog."default",
    address_city character varying(255) COLLATE pg_catalog."default",
    address_city_code character varying(255) COLLATE pg_catalog."default",
    address_state_code character varying(255) COLLATE pg_catalog."default",
    address_country character varying(255) COLLATE pg_catalog."default",
    address_latitude numeric,
    address_longitude numeric,
    address_geo geometry(Point,4326) GENERATED ALWAYS AS (st_setsrid(st_makepoint((address_longitude)::double precision, (address_latitude)::double precision), 4326)) STORED,
    created_at timestamp without time zone NOT NULL,
    updated_at timestamp without time zone NOT NULL,
    is_company boolean NOT NULL DEFAULT false,
    state_tax_id character varying(255) COLLATE pg_catalog."default",
    official_name character varying(255) COLLATE pg_catalog."default",
    CONSTRAINT table_pkey PRIMARY KEY (id)
)

TABLESPACE pg_default;

Applications Versions:

Debezium: 2.3.1 PostgreSQL: 13.15

I tried to reproduce the error using the same version of Debezium and the error occurred when I took a snapshot and then performed an update with the following query:

UPDATE public.table
SET updated_at = current_timestamp
WHERE id = 28948709;
example snapshot incremental:
INSERT INTO debezium_signal (id, type, data)
VALUES (
  'sdadsada-sdasdas-9b45-12608f88b2a6',
  'execute-snapshot',
  '{"data-collections": ["public.table"]}'
);

I could integrate cdc with streaming, but I need the history.

Updated: I enabled logs in debezium and found this:

024-08-10 22:09:43,120 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,121 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,121 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,122 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,123 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,123 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,123 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,124 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,124 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,125 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,126 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,126 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
2024-08-10 22:09:43,127 ERROR  Postgres|company-db-prod|streaming  Failed to properly convert data value for 'public.table.is_company' of type bool   [io.debezium.relational.TableSchemaBuilder]
java.lang.IllegalArgumentException: Column 'address_geo' not found in result set 'id, federal_tax_id, external_id, first_name, last_name, email, phone, cellphone, address_street, address_street_number, address_complement, address_city_district, address_post_code, address_city, address_city_code, address_state_code, address_country, address_latitude, address_longitude, address_geo, created_at, updated_at, is_company, state_tax_id, official_name' for table 'public.table', columns: {

address_geo is not in the schema registry. could this cause problems?

Upvotes: -1

Views: 78

Answers (1)

Henrique Viana
Henrique Viana

Reputation: 1

I discovered the cause of the problem, this only happens in the pgouput plugin, in decoderbufs I didn't find the same scenario. I managed to trace it and find the problem, I was advised to open an bug in Jira: https://issues.redhat.com/browse/DBZ-8150

For check discussion of the case check this link:

https://debezium.zulipchat.com/#narrow/stream/348249-community-postgresql/topic/Issue.20with.20Debezium.20Snapshot.3A.20DateTimeParseException.20in.20Post

Upvotes: 0

Related Questions