Reputation: 1785
Does the logical replication parse the WAL files in transaction unit? What about the rollback transaction?
And, what's API to ingress data changes at the receiver side, without replaying them in SQL level? Just like the built-in streaming replication receiver of postgresql, no matter logical or physical.
Edit:
Let me clarify my question more.
The logical streaming flow is shown as below:
postgresql instance (sender, create slot with specific output plugin) -------streaming protocol----------> postgresql instance (receiver, get copy data)
Here the format of the copy data is determined by the output plugin, let's assume it's plain text. Then in straightforward way, we could treat it as SQL statements and replay them in the receiver postgresql, but it's obviously low-efficiency. Is there any low-level API to ingress the copy data?
Upvotes: 1
Views: 815
Reputation: 1785
I take some time to investigate the source codes and I try to answer the question myself.
The related code path and snippet:
pg_logical_slot_get_changes_guts() -> LogicalDecodingProcessRecord() -> DecodeXactOp() ->
ReorderBufferCommit() -> ReorderBufferIterTXNNext()
DecodeXactOp():
switch (info)
{
case XLOG_XACT_COMMIT:
case XLOG_XACT_COMMIT_PREPARED:
{
xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed;
TransactionId xid;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
if (!TransactionIdIsValid(parsed.twophase_xid))
xid = XLogRecGetXid(r);
else
xid = parsed.twophase_xid;
DecodeCommit(ctx, buf, &parsed, xid);
break;
}
https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L585
/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
ExecOpenIndices(estate->es_result_relation_info, false);
/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();
The logical replication of PG10 uses pgoutput
as the output plugin, which is in binary format and suitable to ingress data directly.
https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c
Upvotes: 1