kingluo
kingluo

Reputation: 1785

postgresql: does logical replication include rollback transaction?

Does the logical replication parse the WAL files in transaction unit? What about the rollback transaction?

And, what's API to ingress data changes at the receiver side, without replaying them in SQL level? Just like the built-in streaming replication receiver of postgresql, no matter logical or physical.

Edit:

Let me clarify my question more.

The logical streaming flow is shown as below:

postgresql instance (sender, create slot with specific output plugin) -------streaming protocol----------> postgresql instance (receiver, get copy data)

Here the format of the copy data is determined by the output plugin, let's assume it's plain text. Then in straightforward way, we could treat it as SQL statements and replay them in the receiver postgresql, but it's obviously low-efficiency. Is there any low-level API to ingress the copy data?

Upvotes: 1

Views: 815

Answers (1)

kingluo
kingluo

Reputation: 1785

I take some time to investigate the source codes and I try to answer the question myself.

  • The postgresql walsender only includes committed transaction, ignoring the abort ones.

The related code path and snippet:

pg_logical_slot_get_changes_guts() -> LogicalDecodingProcessRecord() -> DecodeXactOp() -> 
ReorderBufferCommit() -> ReorderBufferIterTXNNext()

DecodeXactOp():

switch (info)
{
    case XLOG_XACT_COMMIT:
    case XLOG_XACT_COMMIT_PREPARED:
        {
            xl_xact_commit *xlrec;
            xl_xact_parsed_commit parsed;
            TransactionId xid;

            xlrec = (xl_xact_commit *) XLogRecGetData(r);
            ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);

            if (!TransactionIdIsValid(parsed.twophase_xid))
                xid = XLogRecGetXid(r);
            else
                xid = parsed.twophase_xid;

            DecodeCommit(ctx, buf, &parsed, xid);
            break;
        }
  • The built-in logical apply worker does not replay SQL, but uses many internal API to apply changes at the receiver side.

https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L585

/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());

/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);

ExecOpenIndices(estate->es_result_relation_info, false);

/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);

/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();

The logical replication of PG10 uses pgoutput as the output plugin, which is in binary format and suitable to ingress data directly.

https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c

Upvotes: 1

Related Questions