Skip to content

Commit c13eaa4

Browse files
authored
Merge pull request #82 from Podcastindex-org/unstable
memory leak defense in iroh gossip lib
2 parents 64aee64 + 478edad commit c13eaa4

10 files changed

Lines changed: 155 additions & 18 deletions

File tree

gossip-listener/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gossip-listener/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gossip-listener"
3-
version = "0.5.9"
3+
version = "0.6.0"
44
authors = ["Dave Jones"]
55
edition = "2021"
66

gossip-listener/src/main.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,24 @@ const ISOLATION_CHECK_INTERVAL_SECS: u64 = 300; // Check for topology isolation
3535
const ISOLATION_MIN_UNIQUE_PEERS: usize = 3; // Minimum unique source peers to consider healthy
3636
const ENDPOINT_RESET_AFTER_RECONNECTS: u32 = 3; // Create fresh endpoint after N consecutive reconnects
3737
const RECONNECT_AFTER_FAILURES: u64 = 5;
38+
const PERIODIC_RESET_INTERVAL_SECS: u64 = 12 * 3600; // Recycle iroh endpoint every 12h to bound memory growth
39+
const RSS_CEILING_BYTES: u64 = 1024 * 1024 * 1024; // 1 GB RSS ceiling — safety valve for endpoint recycle
3840
const BROADCAST_TIMEOUT_SECS: u64 = 10;
41+
42+
/// Read process resident set size in bytes. Returns 0 on non-Linux or read failure.
43+
fn read_rss_bytes() -> u64 {
44+
#[cfg(target_os = "linux")]
45+
{
46+
if let Ok(s) = std::fs::read_to_string("/proc/self/statm") {
47+
if let Some(pages) = s.split_whitespace().nth(1).and_then(|p| p.parse::<u64>().ok()) {
48+
return pages * 4096;
49+
}
50+
}
51+
0
52+
}
53+
#[cfg(not(target_os = "linux"))]
54+
{ 0 }
55+
}
3956
const ARCHIVE_SYNC_ALPN: &[u8] = b"/podping-archive-sync/1";
4057
const DEFAULT_SSE_BIND_ADDR: &str = "0.0.0.0:8089";
4158
const DEFAULT_SSE_BUFFER_SIZE: usize = 1000;
@@ -833,6 +850,7 @@ async fn main() -> anyhow::Result<()> {
833850
let shutdown_flag = Arc::new(AtomicBool::new(false));
834851
let reconnect_requested = Arc::new(AtomicBool::new(false));
835852
let reconnect_notify = Arc::new(Notify::new());
853+
let force_endpoint_reset = Arc::new(AtomicBool::new(false));
836854
let receive_generation = Arc::new(AtomicU64::new(0));
837855

838856
//Joining peers from the known peers file serves as fallback/insurance if DHT no work
@@ -1076,6 +1094,7 @@ async fn main() -> anyhow::Result<()> {
10761094
let reconnect_failures = broadcast_failures.clone();
10771095
let reconnect_requested = reconnect_requested.clone();
10781096
let reconnect_notify = reconnect_notify.clone();
1097+
let reconnect_force_reset = force_endpoint_reset.clone();
10791098
let reconnect_counter = reconnect_count.clone();
10801099
let reconnect_neighbor_count = neighbor_count.clone();
10811100
let reconnect_neighbor_ids = neighbor_ids.clone();
@@ -1114,7 +1133,8 @@ async fn main() -> anyhow::Result<()> {
11141133
}
11151134
let failures = reconnect_failures.load(Ordering::Relaxed);
11161135
let requested = reconnect_requested.swap(false, Ordering::Relaxed);
1117-
if !(failures >= RECONNECT_AFTER_FAILURES || requested) {
1136+
let forced = reconnect_force_reset.swap(false, Ordering::Relaxed);
1137+
if !(failures >= RECONNECT_AFTER_FAILURES || requested || forced) {
11181138
// No reconnect needed this cycle — gossip is stable
11191139
if consecutive_reconnects > 0
11201140
&& last_reconnect.elapsed() > std::time::Duration::from_secs(ISOLATION_CHECK_INTERVAL_SECS)
@@ -1127,12 +1147,16 @@ async fn main() -> anyhow::Result<()> {
11271147
// Reset failures immediately to prevent re-entrant reconnects
11281148
reconnect_failures.store(0, Ordering::Relaxed);
11291149

1130-
if last_reconnect.elapsed() < std::time::Duration::from_secs(30) {
1150+
if !forced && last_reconnect.elapsed() < std::time::Duration::from_secs(30) {
11311151
eprintln!("\x1b[33m[RECONNECT] Skipping — reconnect cooldown active\x1b[0m");
11321152
continue;
11331153
}
11341154

11351155
consecutive_reconnects += 1;
1156+
if forced {
1157+
// Health-thread-driven recycle: force the fresh-endpoint branch
1158+
consecutive_reconnects = ENDPOINT_RESET_AFTER_RECONNECTS + 1;
1159+
}
11361160

11371161
// If we've reconnected multiple times without recovery, the endpoint
11381162
// itself may be degraded (e.g., iroh path exhaustion). Create a fresh one.
@@ -1317,9 +1341,13 @@ async fn main() -> anyhow::Result<()> {
13171341
let h_notifs = notifications_received.clone();
13181342
let h_failures = broadcast_failures.clone();
13191343
let h_last_notif = last_notification_time.clone();
1344+
let h_force_reset = force_endpoint_reset.clone();
1345+
let h_reconnect_requested = reconnect_requested.clone();
1346+
let h_reconnect_notify = reconnect_notify.clone();
13201347
tokio::spawn(async move {
13211348
let mut prev_notifs: u64 = 0;
13221349
let mut prev_failures: u64 = 0;
1350+
let mut last_reset = std::time::Instant::now();
13231351
loop {
13241352
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
13251353
let notifs = h_notifs.load(Ordering::Relaxed);
@@ -1329,6 +1357,7 @@ async fn main() -> anyhow::Result<()> {
13291357
let since_last = now.saturating_sub(last_notif);
13301358
let delta_notifs = notifs - prev_notifs;
13311359
let delta_failures = failures.saturating_sub(prev_failures);
1360+
let rss_bytes = read_rss_bytes();
13321361

13331362
let status = if since_last > REBOOTSTRAP_TIMEOUT {
13341363
"\x1b[1;31mSTALLED\x1b[0m"
@@ -1339,10 +1368,26 @@ async fn main() -> anyhow::Result<()> {
13391368
};
13401369

13411370
println!(
1342-
"\x1b[90m[HEALTH] {} | recv={} | bcast_failures={} | last_notif={}s ago\x1b[0m",
1343-
status, delta_notifs, delta_failures, since_last,
1371+
"\x1b[90m[HEALTH] {} | recv={} | bcast_failures={} | last_notif={}s ago | rss={}MB\x1b[0m",
1372+
status, delta_notifs, delta_failures, since_last, rss_bytes / 1_048_576,
13441373
);
13451374

1375+
// Memory-bounded endpoint recycle: time-based (12h) + RSS-ceiling safety valve
1376+
let time_elapsed = last_reset.elapsed() >= std::time::Duration::from_secs(PERIODIC_RESET_INTERVAL_SECS);
1377+
let rss_exceeded = rss_bytes > RSS_CEILING_BYTES;
1378+
if time_elapsed || rss_exceeded {
1379+
let reason = if rss_exceeded {
1380+
format!("RSS {}MB exceeds ceiling {}MB", rss_bytes / 1_048_576, RSS_CEILING_BYTES / 1_048_576)
1381+
} else {
1382+
format!("{}h elapsed since last reset", last_reset.elapsed().as_secs() / 3600)
1383+
};
1384+
eprintln!("\x1b[1;35m[HEALTH] Triggering endpoint reset — {}\x1b[0m", reason);
1385+
h_force_reset.store(true, Ordering::Relaxed);
1386+
h_reconnect_requested.store(true, Ordering::Relaxed);
1387+
h_reconnect_notify.notify_one();
1388+
last_reset = std::time::Instant::now();
1389+
}
1390+
13461391
prev_notifs = notifs;
13471392
prev_failures = failures;
13481393
}

gossip-monitor/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gossip-monitor/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gossip-monitor"
3-
version = "0.5.9"
3+
version = "0.6.0"
44
authors = ["Dave Jones"]
55
edition = "2021"
66

gossip-monitor/src/main.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,23 @@ const BROADCAST_TIMEOUT_SECS: u64 = 10;
6060
const ISOLATION_CHECK_INTERVAL_SECS: u64 = 300;
6161
const ISOLATION_MIN_UNIQUE_PEERS: usize = 3;
6262
const ENDPOINT_RESET_AFTER_RECONNECTS: u32 = 3;
63+
const PERIODIC_RESET_INTERVAL_SECS: u64 = 12 * 3600; // Recycle iroh endpoint every 12h to bound memory growth
64+
const RSS_CEILING_BYTES: u64 = 1024 * 1024 * 1024; // 1 GB RSS ceiling — safety valve for endpoint recycle
65+
66+
/// Read process resident set size in bytes. Returns 0 on non-Linux or read failure.
67+
fn read_rss_bytes() -> u64 {
68+
#[cfg(target_os = "linux")]
69+
{
70+
if let Ok(s) = std::fs::read_to_string("/proc/self/statm") {
71+
if let Some(pages) = s.split_whitespace().nth(1).and_then(|p| p.parse::<u64>().ok()) {
72+
return pages * 4096;
73+
}
74+
}
75+
0
76+
}
77+
#[cfg(not(target_os = "linux"))]
78+
{ 0 }
79+
}
6380

6481
#[tokio::main]
6582
async fn main() -> anyhow::Result<()> {
@@ -148,6 +165,7 @@ async fn main() -> anyhow::Result<()> {
148165
let shutdown_flag = Arc::new(AtomicBool::new(false));
149166
let reconnect_requested = Arc::new(AtomicBool::new(false));
150167
let reconnect_notify = Arc::new(Notify::new());
168+
let force_endpoint_reset = Arc::new(AtomicBool::new(false));
151169
let receive_generation = Arc::new(AtomicU64::new(0));
152170

153171
let now_secs = SystemTime::now()
@@ -447,6 +465,7 @@ async fn main() -> anyhow::Result<()> {
447465
let reconnect_failures = broadcast_failures.clone();
448466
let reconnect_requested_flag = reconnect_requested.clone();
449467
let reconnect_notify_handle = reconnect_notify.clone();
468+
let reconnect_force_reset = force_endpoint_reset.clone();
450469
let reconnect_counter = reconnect_count.clone();
451470
let reconnect_neighbor_count = neighbor_count.clone();
452471
let reconnect_neighbor_ids = neighbor_ids.clone();
@@ -477,7 +496,8 @@ async fn main() -> anyhow::Result<()> {
477496
}
478497
let failures = reconnect_failures.load(Ordering::Relaxed);
479498
let requested = reconnect_requested_flag.swap(false, Ordering::Relaxed);
480-
if !(failures >= RECONNECT_AFTER_FAILURES || requested) {
499+
let forced = reconnect_force_reset.swap(false, Ordering::Relaxed);
500+
if !(failures >= RECONNECT_AFTER_FAILURES || requested || forced) {
481501
// No reconnect needed — gossip is stable
482502
if consecutive_reconnects > 0
483503
&& last_reconnect.elapsed()
@@ -491,12 +511,16 @@ async fn main() -> anyhow::Result<()> {
491511
// Reset failures immediately to prevent re-entrant reconnects
492512
reconnect_failures.store(0, Ordering::Relaxed);
493513

494-
if last_reconnect.elapsed() < std::time::Duration::from_secs(30) {
514+
if !forced && last_reconnect.elapsed() < std::time::Duration::from_secs(30) {
495515
eprintln!("[RECONNECT] Skipping — reconnect cooldown active");
496516
continue;
497517
}
498518

499519
consecutive_reconnects += 1;
520+
if forced {
521+
// Health-thread-driven recycle: force the fresh-endpoint branch
522+
consecutive_reconnects = ENDPOINT_RESET_AFTER_RECONNECTS + 1;
523+
}
500524

501525
// If we've reconnected multiple times without recovery, the endpoint
502526
// itself may be degraded. Create a fresh one.
@@ -682,9 +706,13 @@ async fn main() -> anyhow::Result<()> {
682706
let h_notifs = notifications_received.clone();
683707
let h_failures = broadcast_failures.clone();
684708
let h_last_notif = last_notification_time.clone();
709+
let h_force_reset = force_endpoint_reset.clone();
710+
let h_reconnect_requested = reconnect_requested.clone();
711+
let h_reconnect_notify = reconnect_notify.clone();
685712
tokio::spawn(async move {
686713
let mut prev_notifs: u64 = 0;
687714
let mut prev_failures: u64 = 0;
715+
let mut last_reset = std::time::Instant::now();
688716
loop {
689717
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
690718
let notifs = h_notifs.load(Ordering::Relaxed);
@@ -697,6 +725,7 @@ async fn main() -> anyhow::Result<()> {
697725
let since_last = now.saturating_sub(last_notif);
698726
let delta_notifs = notifs - prev_notifs;
699727
let delta_failures = failures.saturating_sub(prev_failures);
728+
let rss_bytes = read_rss_bytes();
700729

701730
let status = if since_last > REBOOTSTRAP_TIMEOUT {
702731
"STALLED"
@@ -707,10 +736,26 @@ async fn main() -> anyhow::Result<()> {
707736
};
708737

709738
println!(
710-
"[HEALTH] {} | recv={} | bcast_failures={} | last_notif={}s ago",
711-
status, delta_notifs, delta_failures, since_last,
739+
"[HEALTH] {} | recv={} | bcast_failures={} | last_notif={}s ago | rss={}MB",
740+
status, delta_notifs, delta_failures, since_last, rss_bytes / 1_048_576,
712741
);
713742

743+
// Memory-bounded endpoint recycle: time-based (12h) + RSS-ceiling safety valve
744+
let time_elapsed = last_reset.elapsed() >= std::time::Duration::from_secs(PERIODIC_RESET_INTERVAL_SECS);
745+
let rss_exceeded = rss_bytes > RSS_CEILING_BYTES;
746+
if time_elapsed || rss_exceeded {
747+
let reason = if rss_exceeded {
748+
format!("RSS {}MB exceeds ceiling {}MB", rss_bytes / 1_048_576, RSS_CEILING_BYTES / 1_048_576)
749+
} else {
750+
format!("{}h elapsed since last reset", last_reset.elapsed().as_secs() / 3600)
751+
};
752+
eprintln!("[HEALTH] Triggering endpoint reset — {}", reason);
753+
h_force_reset.store(true, Ordering::Relaxed);
754+
h_reconnect_requested.store(true, Ordering::Relaxed);
755+
h_reconnect_notify.notify_one();
756+
last_reset = std::time::Instant::now();
757+
}
758+
714759
prev_notifs = notifs;
715760
prev_failures = failures;
716761
}

gossip-monitor/src/swarm.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const PRUNE_THRESHOLD_SECS: u64 = 1800;
1414
// ---------------------------------------------------------------------------
1515

1616
#[derive(Debug, Clone, Deserialize)]
17+
#[allow(dead_code)]
1718
pub struct PeerAnnounce {
1819
#[serde(rename = "type")]
1920
pub msg_type: String,

gossip-writer/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gossip-writer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "gossip-writer"
3-
version = "0.5.9"
3+
version = "0.6.0"
44
authors = ["Dave Jones"]
55
edition = "2021"
66
build = "build.rs"

0 commit comments

Comments
 (0)