diff --git a/src/spock_apply.c b/src/spock_apply.c index 2eb777ea..595687b1 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -1418,6 +1418,106 @@ handle_relation(StringInfo s) (void) spock_read_rel(s); } +/* Return the GUC name of the active exception behaviour mode. */ +static const char * +get_exception_behaviour_name(void) +{ + switch (exception_behaviour) + { + case DISCARD: + return "discard"; + case TRANSDISCARD: + return "transdiscard"; + case SUB_DISABLE: + return "sub_disable"; + default: + return "unknown"; + } +} + +/* + * Format an error message with its SQLSTATE prefix so the root cause recorded + * in spock.exception_log (and the apply log) is unambiguous. exception_log has + * no dedicated sqlstate column, so we carry it inline in the message text. + * + * The result is allocated in ApplyOperationContext (reset per row/message) + * rather than the long-lived TopTransactionContext that is current on the + * exception path, so repeated logging within one large transaction does not + * accumulate. Callers copy it into the durable tuple immediately. + */ +static char * +errmsg_with_sqlstate(ErrorData *edata) +{ + MemoryContext oldctx = MemoryContextSwitchTo(ApplyOperationContext); + char *msg; + int sqlerrcode = edata ? edata->sqlerrcode : ERRCODE_INTERNAL_ERROR; + const char *detail = (edata && edata->message) ? edata->message + : "(no error detail captured)"; + + /* + * Prefix the SQLSTATE only when it carries information. A bare + * elog(ERROR) defaults to ERRCODE_INTERNAL_ERROR (XX000), which is just + * noise (e.g. spock's own "did not find row to update"). Real errors -- + * constraint violations, deadlocks, connection failures -- keep their + * code, which is what makes the cause unambiguous. + */ + if (sqlerrcode == ERRCODE_INTERNAL_ERROR) + msg = pstrdup(detail); + else + msg = psprintf("[SQLSTATE %s] %s", unpack_sql_state(sqlerrcode), detail); + + MemoryContextSwitchTo(oldctx); + return msg; +} + +/* + * Build the error_message for an entry discarded as collateral -- i.e. it was + * not itself the failing command (the caller passed a NULL message). 'what' + * names the discarded entry for the message: "tuple" on the DML path, + * "statement" on the SQL/DDL path (where no tuple is involved). + * + * Normally we just point the operator at the entry that carries the real error + * via its command_counter, which avoids repeating the same message on every + * discarded entry. But when the failure is not attributable to a specific row, + * failed_action is 0 and that pointer dangles (no entry's counter is 0), so we + * surface the captured root cause instead -- or note that none was captured. + */ +static char * +discard_collateral_message(const char *what) +{ + SpockExceptionLog *e; + MemoryContext oldctx; + char *msg; + + /* + * Match the paranoia level of the surrounding code: every caller reaches + * this only after exception logging has been set up for this worker. + */ + Assert(exception_log_ptr != NULL && my_exception_log_index >= 0); + + e = &exception_log_ptr[my_exception_log_index]; + + /* + * Allocate in ApplyOperationContext (reset per row/message) so logging + * every discarded row of a large transaction does not pile up in the + * long-lived TopTransactionContext current on the exception path. + */ + oldctx = MemoryContextSwitchTo(ApplyOperationContext); + + if (e->failed_action != 0) + msg = psprintf("%s: %s discarded due to exception at command_counter %u", + get_exception_behaviour_name(), what, e->failed_action); + else if (e->initial_error_message[0] != '\0') + msg = psprintf("%s: %s discarded due to exception: %s", + get_exception_behaviour_name(), what, e->initial_error_message); + else + msg = psprintf("%s: %s discarded due to exception (root cause not captured)", + get_exception_behaviour_name(), what); + + MemoryContextSwitchTo(oldctx); + return msg; +} + static void log_insert_exception(bool failed, char *errmsg, SpockRelation *rel, SpockTupleData *oldtup, SpockTupleData *newtup, @@ -1433,6 +1533,14 @@ log_insert_exception(bool failed, char *errmsg, SpockRelation *rel, if (!should_log_exception(failed)) return; + /* + * A NULL message means this entry is collateral damage -- some other + * command in the same transaction failed, not this one. Build an + * informative message rather than recording the opaque "unavailable". + */ + if (errmsg == NULL) + errmsg = discard_collateral_message("tuple"); + /* * Run the exception-log work in ApplyOperationContext so its JSON and * tuple allocations get released by the per-message reset instead of @@ -1463,7 +1571,7 @@ log_insert_exception(bool failed, char *errmsg, SpockRelation *rel, rel, localtup, oldtup, newtup, NULL, NULL, action_name, - (failed) ? errmsg : NULL); + errmsg); MemoryContextSwitchTo(oldctx); } @@ -1597,7 +1705,7 @@ handle_insert(StringInfo s) /* Let's create an exception log entry if true. */ { - char *err_msg = failed ? (edata ? edata->message : NULL) : + char *err_msg = failed ? errmsg_with_sqlstate(edata) : (xact_action_counter == exception_log_ptr[my_exception_log_index].failed_action && exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ? @@ -1764,7 +1872,7 @@ handle_update(StringInfo s) /* Let's create an exception log entry if true. */ { - char *err_msg = failed ? (edata ? edata->message : NULL) : + char *err_msg = failed ? errmsg_with_sqlstate(edata) : (xact_action_counter == exception_log_ptr[my_exception_log_index].failed_action && exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ? @@ -1869,7 +1977,7 @@ handle_delete(StringInfo s) /* Let's create an exception log entry if true. */ { - char *err_msg = failed ? (edata ? edata->message : NULL) : + char *err_msg = failed ? errmsg_with_sqlstate(edata) : (xact_action_counter == exception_log_ptr[my_exception_log_index].failed_action && exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ? @@ -2521,12 +2629,20 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started) if (should_log_exception(failed)) { MemoryContext oldctx; - char *err_msg = failed ? edata->message : + char *err_msg = failed ? errmsg_with_sqlstate(edata) : (xact_action_counter == exception_log_ptr[my_exception_log_index].failed_action && exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ? exception_log_ptr[my_exception_log_index].initial_error_message : NULL; + /* + * A NULL message means this statement is collateral damage -- some + * other command in the same transaction failed. Build an + * informative message rather than recording "unavailable". + */ + if (err_msg == NULL) + err_msg = discard_collateral_message("statement"); + /* See note in log_insert_exception about the context switch. */ oldctx = MemoryContextSwitchTo(ApplyOperationContext); @@ -3540,20 +3656,39 @@ apply_work(PGconn *streamConn) AbortOutOfAnyTransaction(); MemoryContextSwitchTo(MessageContext); - elog(LOG, "SPOCK: caught initial exception - %s", edata->message); + elog(LOG, "SPOCK: caught initial exception - %s", + errmsg_with_sqlstate(edata)); /* * Save the initial error message and which action triggered it. - * On the retry pass, the matching row gets this message in - * exception_log; all other rows get NULL ("unavailable"). + * The message carries its SQLSTATE (via errmsg_with_sqlstate, which + * omits the uninformative XX000) so that whichever entry surfaces it + * later (the matching row, or every collateral row when the failure is + * not attributable) carries an unambiguous root cause. On the retry + * pass, the matching row gets this message in exception_log; other + * rows get an informative collateral message built by + * discard_collateral_message(). */ if (exception_log_ptr != NULL) { snprintf(exception_log_ptr[my_exception_log_index].initial_error_message, sizeof(exception_log_ptr[my_exception_log_index].initial_error_message), - "%s", edata->message); - exception_log_ptr[my_exception_log_index].failed_action = - xact_action_counter; + "%s", errmsg_with_sqlstate(edata)); + + /* + * A failure during COMMIT (e.g. a deferred constraint trigger that + * fires at commit) is not attributable to any replayed row: + * handle_commit has already bumped the action counter, so no row's + * command_counter would match and the pointer would dangle. Treat + * it as non-attributable (failed_action = 0) so the replay surfaces + * the captured root cause instead of a dangling command_counter. + */ + if (errcallback_arg.action_name != NULL && + strcmp(errcallback_arg.action_name, "COMMIT") == 0) + exception_log_ptr[my_exception_log_index].failed_action = 0; + else + exception_log_ptr[my_exception_log_index].failed_action = + xact_action_counter; } FlushErrorState(); diff --git a/src/spock_exception_handler.c b/src/spock_exception_handler.c index 42661b44..8a1694b6 100644 --- a/src/spock_exception_handler.c +++ b/src/spock_exception_handler.c @@ -186,10 +186,15 @@ add_entry_to_exception_log(Oid remote_origin, TimestampTz remote_commit_ts, values[Anum_exception_log_ddl_user - 1] = CStringGetTextDatum(ddl_user); } - if (error_message == NULL) - values[Anum_exception_log_error_message - 1] = CStringGetTextDatum("unavailable"); - else - values[Anum_exception_log_error_message - 1] = CStringGetTextDatum(error_message); + /* + * All callers now build an informative message (the failing command's + * error, or a collateral-discard message) before reaching here. Keep a + * defensive fallback so a future caller passing NULL degrades to a + * placeholder rather than crashing. + */ + Assert(error_message != NULL); + values[Anum_exception_log_error_message - 1] = + CStringGetTextDatum(error_message != NULL ? error_message : "unavailable"); values[Anum_exception_log_retry_errored_at - 1] = TimestampTzGetDatum(GetCurrentTimestamp()); tup = heap_form_tuple(tupDesc, values, nulls); diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index d7cdd146..bca55445 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -449,14 +449,14 @@ SELECT remote_new_tup,error_message FROM spock.exception_log ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+------------+-----------+----------------------------------------------------+--------------------------- + command_counter | table_schema | table_name | operation | remote_new_tup | error_message +-----------------+--------------+------------+-----------+----------------------------------------------------+--------------------------------------------------------------------- 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable + 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | transdiscard: tuple discarded due to exception at command_counter 1 4 | | | INSERT | | Spock can't find relation 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable + 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | discard: tuple discarded due to exception at command_counter 1 (6 rows) \c :provider_dsn @@ -564,13 +564,13 @@ ORDER BY command_counter; -----------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable + 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | transdiscard: tuple discarded due to exception at command_counter 1 4 | | | INSERT | | Spock can't find relation 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable + 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | discard: tuple discarded due to exception at command_counter 1 7 | | | UPDATE | | Spock can't find relation 8 | | | UPDATE | | Spock can't find relation - 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | unavailable + 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | discard: tuple discarded due to exception at command_counter 2 10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) (10 rows) diff --git a/tests/tap/t/013_exception_handling.pl b/tests/tap/t/013_exception_handling.pl index 0d8a8798..6c4b9b99 100644 --- a/tests/tap/t/013_exception_handling.pl +++ b/tests/tap/t/013_exception_handling.pl @@ -307,12 +307,15 @@ sub set_exception_behaviour { # =========================================================================== # Part 5: TRANSDISCARD error_message quality (backport fix regression test) # =========================================================================== -# Before the fix, TRANSDISCARD logged bystander rows with error_message = -# 'unknown' because the NULL fallback in add_entry_to_exception_log was the -# string "unknown". After the fix: -# - The NULL fallback is "unavailable" -# - The originally-failing row carries the real constraint-violation message -# - Bystander rows carry "unavailable" +# Backport of ed17523 + b32ef95 to v5_STABLE. Before the fix, every row of a +# discarded transaction that was not itself the failing command was logged with +# the opaque placeholder error_message = 'unavailable' (the NULL fallback in +# add_entry_to_exception_log). After the fix: +# - No row carries the opaque 'unavailable' placeholder. +# - The originally-failing row carries the real constraint-violation message, +# prefixed with its SQLSTATE so the root cause is unambiguous. +# - Bystander (collateral-discard) rows carry an informative message that +# names the active behaviour and points at the command that failed. # # Scenario: three-table transaction where t_ehx_b has NOT NULL on n2 only. # The INSERT with v=NULL succeeds on n1 but fails on n2, triggering TRANSDISCARD. @@ -365,28 +368,41 @@ sub set_exception_behaviour { is($p5_unknown_cnt, '0', "P5: no exception_log entries with error_message = 'unknown' (regression guard)"); -# Failing row must carry the real NOT NULL constraint-violation message. +# Failing row must carry the real NOT NULL constraint-violation message, +# now prefixed with its SQLSTATE so the root cause is unambiguous. my $p5_b_err = scalar_query(2, "SELECT error_message FROM spock.exception_log " . "WHERE table_name = 't_ehx_b' " . "ORDER BY retry_errored_at DESC LIMIT 1"); like($p5_b_err, qr/null|not.null|violates/i, 'P5: t_ehx_b exception_log has real constraint-violation message'); +# scalar_query() strips whitespace, so match the SQLSTATE prefix space-agnostically. +like($p5_b_err, qr/\[SQLSTATE\s*23502\]/, + 'P5: t_ehx_b message carries SQLSTATE 23502 (not_null_violation)'); -# Bystander rows must have 'unavailable', not 'unknown'. +# The opaque 'unavailable' placeholder must be gone entirely. +my $p5_unavail_cnt = scalar_query(2, + "SELECT COUNT(*) FROM spock.exception_log " . + "WHERE error_message = 'unavailable' " . + "AND table_name IN ('t_ehx_a', 't_ehx_b', 't_ehx_c')"); +is($p5_unavail_cnt, '0', + "P5: no exception_log entries with opaque 'unavailable' placeholder"); + +# Bystander rows must now carry an informative collateral-discard message +# (names the behaviour and points at the failing command), not 'unavailable'. my $p5_a_err = scalar_query(2, "SELECT error_message FROM spock.exception_log " . "WHERE table_name = 't_ehx_a' " . "ORDER BY retry_errored_at DESC LIMIT 1"); -is($p5_a_err, 'unavailable', - "P5: bystander t_ehx_a has error_message = 'unavailable'"); +like($p5_a_err, qr/discarded\s*due\s*to\s*exception/, + 'P5: bystander t_ehx_a has informative discard message (not unavailable)'); my $p5_c_err = scalar_query(2, "SELECT error_message FROM spock.exception_log " . "WHERE table_name = 't_ehx_c' " . "ORDER BY retry_errored_at DESC LIMIT 1"); -is($p5_c_err, 'unavailable', - "P5: bystander t_ehx_c has error_message = 'unavailable'"); +like($p5_c_err, qr/discarded\s*due\s*to\s*exception/, + 'P5: bystander t_ehx_c has informative discard message (not unavailable)'); # TRANSDISCARD rolls back the entire transaction — no rows land on n2. sleep(3); diff --git a/tests/tap/t/104_iptables_conn_block.pl b/tests/tap/t/104_iptables_conn_block.pl new file mode 100644 index 00000000..707e54d4 --- /dev/null +++ b/tests/tap/t/104_iptables_conn_block.pl @@ -0,0 +1,237 @@ +use strict; +use warnings; +use Test::More; +use lib '.'; +use SpockTest qw( + create_cluster destroy_cluster + system_maybe + get_test_config scalar_query psql_or_bail + wait_for_sub_status +); + +# ============================================================================= +# Test 104: Connection block via iptables (firewall-style outage) +# ============================================================================= +# Reproduces the customer scenario: a firewall briefly blocks the replication +# connection, a transaction is committed on the provider during the outage, and +# the connection is then restored. +# +# Exercises the b75cb4fc connection-loss handling together with the +# exception_log message backport (ed17523 + b32ef95). Asserts that: +# +# 1. The block stops replication: rows committed during the outage do not +# reach the subscriber while blocked. +# 2. In SUB_DISABLE mode the outage does NOT disable the subscription (a +# connection error rethrows before the disable path; a stall just waits). +# 3. After the block is lifted the subscription returns to 'replicating' and +# the during-outage transaction is applied with NO row loss. +# 4. The connection blip produces NO spurious spock.exception_log entries. +# (Any that appear are dumped; their messages now carry [SQLSTATE ...].) +# +# Whether iptables tears the connection down or merely stalls it is host +# dependent (over loopback it often stalls); both are safe and the test reports +# which occurred as diag rather than asserting a specific teardown. +# +# Mechanics: the harness's control psql connects over the unix socket (no -h), +# while spock's replication connection uses host=127.0.0.1 (TCP). So we sever +# replication with iptables on TCP to/from 127.0.0.1: while still +# driving the provider over the socket. We use REJECT --reject-with tcp-reset +# (not DROP) so the broken connection is detected immediately and reconnect +# attempts fail fast, rather than silently stalling. +# +# Detection is proved via an observable, log-independent signal (the walsender +# PID change), because the harness's server-log location is environment +# dependent (logging_collector with a relative log_directory lands under each +# node's datadir). A best-effort log scrape is emitted as diag only. +# +# Requires iptables usable as root or via passwordless sudo; otherwise skipped. +# Not in the schedule -- run manually: +# PERL5LIB=t prove -v t/104_iptables_conn_block.pl +# ============================================================================= + +# --------------------------------------------------------------------------- +# Probe for a usable iptables before any expensive setup. +# --------------------------------------------------------------------------- +chomp(my $ipt_bin = `command -v iptables 2>/dev/null`); +plan skip_all => 'iptables not found in PATH' unless $ipt_bin; + +my @IPT; +if ($> == 0) { + @IPT = ($ipt_bin); +} +elsif (system("sudo -n $ipt_bin -S OUTPUT >/dev/null 2>&1") == 0) { + @IPT = ('sudo', '-n', $ipt_bin); +} +else { + plan skip_all => + 'iptables requires root or passwordless sudo (neither available)'; +} + +# Lexicals the helper subs close over; filled in after create_cluster(). +my (@RULES, %installed, $node_datadirs); + +sub block_conn { + for my $r (@RULES) { + my ($chain, @spec) = @$r; + my $rc = system(@IPT, '-I', $chain, @spec); + $installed{"@$r"} = $r if $rc == 0; + } + return scalar keys %installed; +} + +sub unblock_conn { + for my $key (keys %installed) { + my ($chain, @spec) = @{ $installed{$key} }; + system(@IPT, '-D', $chain, @spec); + delete $installed{$key}; + } +} + +# Safety net: remove our rules even if the test dies or bails out. +END { unblock_conn() if %installed; } + +sub set_guc_n2 { + my ($kv) = @_; + open(my $fh, '>>', "$node_datadirs->[1]/postgresql.conf") + or die "cannot append to n2 postgresql.conf: $!"; + print $fh "$kv\n"; + close($fh); + psql_or_bail(2, "SELECT pg_reload_conf()"); +} + +# --------------------------------------------------------------------------- +create_cluster(2, 'Create 2-node cluster for iptables connection-block test'); + +my $config = get_test_config(); +my $node_ports = $config->{node_ports}; +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; +$node_datadirs = $config->{node_datadirs}; + +my $p1 = $node_ports->[0]; # n1 — provider +my $p2 = $node_ports->[1]; # n2 — subscriber + +my $conn_n1 = "host=$host dbname=$dbname port=$p1 user=$db_user password=$db_password"; + +# REJECT both directions of the apply<->walsender TCP flow with a TCP reset so +# the connection breaks immediately and reconnects fail fast. Control psql +# uses the unix socket and is unaffected. +@RULES = ( + ['OUTPUT', '-p', 'tcp', '-d', $host, '--dport', $p1, '-j', 'REJECT', '--reject-with', 'tcp-reset'], + ['INPUT', '-p', 'tcp', '-s', $host, '--sport', $p1, '-j', 'REJECT', '--reject-with', 'tcp-reset'], +); + +# Low ping timeout as a backstop in case a direction goes quiet rather than +# resetting. +set_guc_n2('wal_sender_timeout = 5s'); +set_guc_n2("spock.exception_behaviour = 'sub_disable'"); +sleep(1); + +psql_or_bail(1, "CREATE TABLE t_ipt (id INT PRIMARY KEY, v TEXT)"); +psql_or_bail(2, "CREATE TABLE t_ipt (id INT PRIMARY KEY, v TEXT)"); + +psql_or_bail(2, + "SELECT spock.sub_create('sub_ipt', '$conn_n1', " . + "ARRAY['default', 'default_insert_only'], false, false)"); + +ok(wait_for_sub_status(2, 'sub_ipt', 'replicating', 30), + 'sub_ipt reaches replicating state'); + +# Baseline rows must replicate before we cut the connection. +psql_or_bail(1, "INSERT INTO t_ipt SELECT g, 'pre_' || g FROM generate_series(1, 50) g"); + +my $baseline_ok = 0; +for (1 .. 15) { + last if ($baseline_ok = (scalar_query(2, "SELECT count(*) FROM t_ipt") == 50)); + sleep 1; +} +ok($baseline_ok, 'baseline 50 rows replicated n1->n2 before block'); + +# Capture the provider's walsender PID for this subscription before the block. +my $wal_pid_before = + scalar_query(1, "SELECT pid FROM pg_stat_replication WHERE state = 'streaming' LIMIT 1"); +ok($wal_pid_before =~ /^\d+$/, + "provider has a streaming walsender before the block (pid $wal_pid_before)"); + +# --------------------------------------------------------------------------- +# Block the replication connection. +# --------------------------------------------------------------------------- +is(block_conn(), scalar(@RULES), + 'iptables REJECT rules installed on both directions of provider TCP port'); + +# Commit a transaction on the provider DURING the outage (over the unix +# socket, which iptables does not touch). The walsender's attempt to ship it +# triggers the reset. +psql_or_bail(1, + "INSERT INTO t_ipt SELECT g, 'mid_' || g FROM generate_series(51, 100) g"); + +# Whether the connection is torn down or merely stalls is host dependent +# (over loopback, iptables often stalls rather than resets the established +# connection). Report which happened -- both are safe outcomes -- but do not +# gate the test on it. +sleep 8; +my $still = scalar_query(1, + "SELECT count(*) FROM pg_stat_replication WHERE pid = $wal_pid_before"); +diag($still eq '0' + ? "connection TORN DOWN during block (old walsender pid $wal_pid_before gone)" + : "connection STALLED during block (old walsender pid $wal_pid_before still present)"); + +# During-outage rows must NOT have reached the subscriber. +is(scalar_query(2, "SELECT count(*) FROM t_ipt WHERE v LIKE 'mid_%'"), '0', + 'during-outage rows are not on n2 while blocked'); + +# A *connection* error must not disable the subscription in SUB_DISABLE mode. +isnt(scalar_query(2, + "SELECT sub_enabled FROM spock.subscription WHERE sub_name = 'sub_ipt'"), + 'f', + 'SUB_DISABLE: connection error did NOT disable the subscription'); + +# --------------------------------------------------------------------------- +# Restore the connection. +# --------------------------------------------------------------------------- +unblock_conn(); +pass('iptables rules removed (connection restored)'); + +ok(wait_for_sub_status(2, 'sub_ipt', 'replicating', 60), + 'sub_ipt returns to replicating after the block is lifted'); + +# Report the post-recovery walsender (new PID => reconnected; same => stalled +# connection resumed). Informational -- the row-count check below is the real +# proof that replication recovered. +my $wal_pid_after = scalar_query(1, + "SELECT pid FROM pg_stat_replication WHERE state = 'streaming' LIMIT 1"); +diag(($wal_pid_after =~ /^\d+$/ && $wal_pid_after ne $wal_pid_before) + ? "RECONNECTED after unblock (new walsender pid $wal_pid_after, was $wal_pid_before)" + : "stalled connection RESUMED after unblock (walsender pid $wal_pid_after)"); + +# All 100 rows (50 pre + 50 mid) must eventually be present -- no loss. +my $final_count = 0; +for (1 .. 30) { + $final_count = scalar_query(2, "SELECT count(*) FROM t_ipt"); + last if $final_count == 100; + sleep 2; +} +is($final_count, '100', + 'all 100 rows present on n2 after recovery (no row loss across disconnect)'); + +# The connection blip should not have produced spurious exception_log entries. +my $exc_cnt = scalar_query(2, "SELECT count(*) FROM spock.exception_log"); +is($exc_cnt, '0', + 'no spock.exception_log entries produced by the connection blip'); +if ($exc_cnt ne '0') { + diag("Unexpected exception_log entries after connection blip:"); + my $dump = `$pg_bin/psql -X -p $p2 -d $dbname -t -A -F'|' -c ` . + "\"SELECT table_name, operation, error_message FROM spock.exception_log ORDER BY retry_errored_at\""; + diag(" $_") for split /\n/, ($dump // ''); +} + +# --------------------------------------------------------------------------- +system_maybe("$pg_bin/psql", '-p', $p2, '-d', $dbname, + '-c', "SELECT spock.sub_drop('sub_ipt')"); + +destroy_cluster('Destroy iptables connection-block test cluster'); + +done_testing();