From 36f7539a98e44ca8d71ffcd5cf53c739f70797fd Mon Sep 17 00:00:00 2001 From: Blake Li Date: Mon, 15 Jun 2026 17:54:14 +0000 Subject: [PATCH 1/5] refactor(test): migrate Thread.sleep wait loops to Awaitility in GAX tests --- .../api/gax/batching/BatcherImplTest.java | 103 +++++++++--------- .../api/gax/batching/Semaphore64Test.java | 23 ++-- .../ScheduledRetryingExecutorTest.java | 53 +++++---- 3 files changed, 93 insertions(+), 86 deletions(-) diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 1d77df8e0fe6..e0289e157586 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -33,6 +33,7 @@ import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -599,15 +600,14 @@ void testPushCurrentBatchRunnable() throws Exception { // Batcher present inside runnable should be GCed after following loop. batcher.close(); batcher = null; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - isExecutorCancelled = pushBatchRunnable.isCancelled(); - if (isExecutorCancelled) { - break; - } - Thread.sleep(DELAY_TIME * (1L << retry)); - } + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + return pushBatchRunnable.isCancelled(); + }); // ScheduledFuture should be isCancelled now. assertThat(pushBatchRunnable.isCancelled()).isTrue(); } @@ -735,15 +735,14 @@ public void splitResponse( void testUnclosedBatchersAreLogged() throws Exception { final long DELAY_TIME = 30L; int actualRemaining = 0; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - actualRemaining = BatcherReference.cleanQueue(); - if (actualRemaining == 0) { - break; - } - Thread.sleep(DELAY_TIME * (1L << retry)); - } + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + return BatcherReference.cleanQueue() == 0; + }); assertThat(actualRemaining).isAtMost(0); underTest = createDefaultBatcherImpl(batchingSettings, null); Batcher extraBatcher = createDefaultBatcherImpl(batchingSettings, null); @@ -771,20 +770,16 @@ public boolean isLoggable(LogRecord record) { underTest = null; // That *should* have been the last reference. Try to reclaim it. - boolean success = false; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - int orphans = BatcherReference.cleanQueue(); - if (orphans == 1) { - success = true; - break; - } - // Validates that there are no other batcher instance present while GC cleanup. - assertWithMessage("unexpected extra orphans").that(orphans).isEqualTo(0); - Thread.sleep(DELAY_TIME * (1L << retry)); - } - assertWithMessage("Batcher was not garbage collected").that(success).isTrue(); + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + int orphans = BatcherReference.cleanQueue(); + assertWithMessage("unexpected extra orphans").that(orphans).isAtMost(1); + return orphans == 1; + }); LogRecord lr; synchronized (records) { @@ -809,15 +804,14 @@ void testClosedBatchersAreNotLogged() throws Exception { // Clean out the existing instances final long DELAY_TIME = 30L; int actualRemaining = 0; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - actualRemaining = BatcherReference.cleanQueue(); - if (actualRemaining == 0) { - break; - } - Thread.sleep(DELAY_TIME * (1L << retry)); - } + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + return BatcherReference.cleanQueue() == 0; + }); assertThat(actualRemaining).isAtMost(0); // Capture logs @@ -849,12 +843,17 @@ public boolean isLoggable(LogRecord record) { } } // Run GC a few times to give the batchers a chance to be collected - for (int retry = 0; retry < 100; retry++) { - System.gc(); - System.runFinalization(); - BatcherReference.cleanQueue(); - Thread.sleep(10); - } + final AtomicInteger runs = new AtomicInteger(0); + await() + .pollInterval(Duration.ofMillis(10)) + .atMost(Duration.ofSeconds(2)) + .until( + () -> { + System.gc(); + System.runFinalization(); + BatcherReference.cleanQueue(); + return runs.incrementAndGet() >= 20; + }); synchronized (records) { assertThat(records).isEmpty(); @@ -990,10 +989,12 @@ void testThrottlingBlocking() throws Exception { // resulting in a shorter total_throttled_time at the verification of throttledTime // at the end of the test. // https://github.com/googleapis/sdk-platform-java/issues/1193 - do { - Thread.sleep(10); - } while (batcherAddThreadHolder.isEmpty() - || batcherAddThreadHolder.get(0).getState() != Thread.State.WAITING); + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> + !batcherAddThreadHolder.isEmpty() + && batcherAddThreadHolder.get(0).getState() == Thread.State.WAITING); long beforeGetCall = System.currentTimeMillis(); executor.submit( diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java index 96fe3c81e943..d1ce30763d89 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java @@ -29,12 +29,14 @@ */ package com.google.api.gax.batching; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -68,8 +70,7 @@ void testBlocking() throws InterruptedException { Thread t = new Thread(() -> semaphore.acquire(1)); t.start(); - Thread.sleep(50); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); semaphore.release(1); t.join(); @@ -95,8 +96,7 @@ void testReducePermitLimitBlocking() throws InterruptedException { Thread t = new Thread(() -> semaphore.acquire(1)); t.start(); - Thread.sleep(50); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); semaphore.release(1); t.join(); @@ -124,8 +124,7 @@ void testAcquirePartialBlocking() throws Exception { Thread t1 = new Thread(() -> semaphore.acquire(1)); t1.start(); // wait for thread to start - Thread.sleep(100); - assertTrue(t1.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t1.getState() == Thread.State.WAITING); semaphore.release(6); t1.join(); @@ -133,8 +132,7 @@ void testAcquirePartialBlocking() throws Exception { Thread t2 = new Thread(() -> semaphore.acquirePartial(6)); t2.start(); // wait fo thread to start - Thread.sleep(100); - assertTrue(t2.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t2.getState() == Thread.State.WAITING); // limit should still be 5 and get limit should not block assertEquals(5, semaphore.getPermitLimit()); } @@ -158,8 +156,7 @@ void testIncreasePermitLimitBlocking() throws Exception { Thread t = new Thread(() -> semaphore.acquire(1)); t.start(); - Thread.sleep(50); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); semaphore.increasePermitLimit(1); t.join(); @@ -208,8 +205,7 @@ void testReleaseWontOverflowBlocking() throws Exception { semaphore.release(10); Thread t = new Thread(() -> semaphore.acquire(11)); t.start(); - Thread.sleep(100); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); } @Test @@ -239,7 +235,6 @@ void testPermitLimitUnderflowBlocking() throws Exception { semaphore.release(10); Thread t = new Thread(() -> semaphore.acquire(11)); t.start(); - Thread.sleep(100); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); } } diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java index 7d57733c5005..d5bb5fe2d537 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java @@ -30,6 +30,7 @@ package com.google.api.gax.retrying; import static com.google.api.gax.retrying.FailingCallable.FAST_RETRY_SETTINGS; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -41,6 +42,7 @@ import com.google.api.core.NanoClock; import com.google.api.gax.retrying.FailingCallable.CustomException; import com.google.api.gax.rpc.testing.FakeCallContext; +import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -99,26 +101,30 @@ void testSuccessWithFailuresPeekAttempt() throws Exception { future.setAttemptFuture(executor.submit(future)); - int failedAttempts = 0; - while (!future.isDone()) { - ApiFuture attemptResult = future.peekAttemptResult(); - if (attemptResult != null) { - assertTrue(attemptResult.isDone()); - assertFalse(attemptResult.isCancelled()); - try { - attemptResult.get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof CustomException) { - failedAttempts++; - } - } - } - Thread.sleep(0L, 100); - } + final int[] failedAttempts = {0}; + await() + .pollInterval(Duration.ofMillis(2)) + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + ApiFuture attemptResult = future.peekAttemptResult(); + if (attemptResult != null) { + assertTrue(attemptResult.isDone()); + assertFalse(attemptResult.isCancelled()); + try { + attemptResult.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof CustomException) { + failedAttempts[0]++; + } + } + } + return future.isDone(); + }); assertFutureSuccess(future); assertEquals(15, future.getAttemptSettings().getAttemptCount()); - assertTrue(failedAttempts > 0); + assertTrue(failedAttempts[0] > 0); } } @@ -260,9 +266,12 @@ void testCancelOuterFutureAfterStart() throws Exception { callable.setExternalFuture(future); future.setAttemptFuture(executor.submit(future)); - // The test sleeps a duration long enough to ensure that the future has been submitted for - // execution - Thread.sleep(150L); + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> + future.getAttemptSettings() != null + && future.getAttemptSettings().getAttemptCount() > 0); boolean res = future.cancel(false); assertTrue(res); @@ -302,7 +311,9 @@ void testCancelProxiedFutureAfterStart() throws Exception { callable.setExternalFuture(future); future.setAttemptFuture(executor.submit(future)); - Thread.sleep(50L); + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> callable.getFirstAttemptFinishedLatch().getCount() == 0); // Note that shutdownNow() will not cancel internal FutureTasks automatically, which // may potentially cause another thread handing on RetryingFuture#get() call forever. From a23d73b76c9fb46944ee8a342087aa63c9adc24e Mon Sep 17 00:00:00 2001 From: Blake Li Date: Mon, 15 Jun 2026 20:15:58 +0000 Subject: [PATCH 2/5] refactor(test): remove redundant variables and assertions in BatcherImplTest --- .../java/com/google/api/gax/batching/BatcherImplTest.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index e0289e157586..2c9828c67b72 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -733,8 +733,6 @@ public void splitResponse( */ @Test void testUnclosedBatchersAreLogged() throws Exception { - final long DELAY_TIME = 30L; - int actualRemaining = 0; await() .atMost(Duration.ofSeconds(5)) .until( @@ -743,7 +741,6 @@ void testUnclosedBatchersAreLogged() throws Exception { System.runFinalization(); return BatcherReference.cleanQueue() == 0; }); - assertThat(actualRemaining).isAtMost(0); underTest = createDefaultBatcherImpl(batchingSettings, null); Batcher extraBatcher = createDefaultBatcherImpl(batchingSettings, null); @@ -802,8 +799,6 @@ public boolean isLoggable(LogRecord record) { @Test void testClosedBatchersAreNotLogged() throws Exception { // Clean out the existing instances - final long DELAY_TIME = 30L; - int actualRemaining = 0; await() .atMost(Duration.ofSeconds(5)) .until( @@ -812,7 +807,6 @@ void testClosedBatchersAreNotLogged() throws Exception { System.runFinalization(); return BatcherReference.cleanQueue() == 0; }); - assertThat(actualRemaining).isAtMost(0); // Capture logs final List records = new ArrayList<>(1); From b08aa0d879953a2cd3493d21fc847b2bd446dd0d Mon Sep 17 00:00:00 2001 From: Blake Li Date: Mon, 15 Jun 2026 20:28:26 +0000 Subject: [PATCH 3/5] refactor(test): use AtomicInteger instead of int[] array for tracking failed attempts in ScheduledRetryingExecutorTest --- .../api/gax/retrying/ScheduledRetryingExecutorTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java index d5bb5fe2d537..479abb44d318 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -101,7 +102,7 @@ void testSuccessWithFailuresPeekAttempt() throws Exception { future.setAttemptFuture(executor.submit(future)); - final int[] failedAttempts = {0}; + final AtomicInteger failedAttempts = new AtomicInteger(0); await() .pollInterval(Duration.ofMillis(2)) .atMost(Duration.ofSeconds(5)) @@ -115,7 +116,7 @@ void testSuccessWithFailuresPeekAttempt() throws Exception { attemptResult.get(); } catch (ExecutionException e) { if (e.getCause() instanceof CustomException) { - failedAttempts[0]++; + failedAttempts.incrementAndGet(); } } } @@ -124,7 +125,7 @@ void testSuccessWithFailuresPeekAttempt() throws Exception { assertFutureSuccess(future); assertEquals(15, future.getAttemptSettings().getAttemptCount()); - assertTrue(failedAttempts[0] > 0); + assertTrue(failedAttempts.get() > 0); } } From c81821d88b13b9d8c2a45f7f42048e68bdd0249d Mon Sep 17 00:00:00 2001 From: Blake Li Date: Mon, 15 Jun 2026 20:33:34 +0000 Subject: [PATCH 4/5] refactor(test): address PR review comments for during() and AtomicReference in GAX tests --- .../com/google/api/gax/batching/BatcherImplTest.java | 10 ++++------ .../gax/retrying/ScheduledRetryingExecutorTest.java | 5 ++++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 2c9828c67b72..5d4a0836ff4f 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -837,21 +837,19 @@ public boolean isLoggable(LogRecord record) { } } // Run GC a few times to give the batchers a chance to be collected - final AtomicInteger runs = new AtomicInteger(0); await() .pollInterval(Duration.ofMillis(10)) + .during(Duration.ofMillis(200)) .atMost(Duration.ofSeconds(2)) .until( () -> { System.gc(); System.runFinalization(); BatcherReference.cleanQueue(); - return runs.incrementAndGet() >= 20; + synchronized (records) { + return records.isEmpty(); + } }); - - synchronized (records) { - assertThat(records).isEmpty(); - } } finally { // reset logging batcherLogger.setFilter(oldFilter); diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java index 479abb44d318..89b807bd5610 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java @@ -50,6 +50,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -103,13 +104,15 @@ void testSuccessWithFailuresPeekAttempt() throws Exception { future.setAttemptFuture(executor.submit(future)); final AtomicInteger failedAttempts = new AtomicInteger(0); + final AtomicReference> lastSeenAttempt = new AtomicReference<>(); await() .pollInterval(Duration.ofMillis(2)) .atMost(Duration.ofSeconds(5)) .until( () -> { ApiFuture attemptResult = future.peekAttemptResult(); - if (attemptResult != null) { + if (attemptResult != null && attemptResult != lastSeenAttempt.get()) { + lastSeenAttempt.set(attemptResult); assertTrue(attemptResult.isDone()); assertFalse(attemptResult.isCancelled()); try { From ddfa30c19fbc2966de7a7676bf23767f1cc9c0b8 Mon Sep 17 00:00:00 2001 From: Blake Li Date: Mon, 15 Jun 2026 20:45:14 +0000 Subject: [PATCH 5/5] refactor(test): increase during duration to 1s to match original test maximum in BatcherImplTest --- .../java/com/google/api/gax/batching/BatcherImplTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 5d4a0836ff4f..ce4f43b45c53 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -839,8 +839,8 @@ public boolean isLoggable(LogRecord record) { // Run GC a few times to give the batchers a chance to be collected await() .pollInterval(Duration.ofMillis(10)) - .during(Duration.ofMillis(200)) - .atMost(Duration.ofSeconds(2)) + .during(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(5)) .until( () -> { System.gc();