diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 1d77df8e0fe6..ce4f43b45c53 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -33,6 +33,7 @@ import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -599,15 +600,14 @@ void testPushCurrentBatchRunnable() throws Exception { // Batcher present inside runnable should be GCed after following loop. batcher.close(); batcher = null; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - isExecutorCancelled = pushBatchRunnable.isCancelled(); - if (isExecutorCancelled) { - break; - } - Thread.sleep(DELAY_TIME * (1L << retry)); - } + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + return pushBatchRunnable.isCancelled(); + }); // ScheduledFuture should be isCancelled now. assertThat(pushBatchRunnable.isCancelled()).isTrue(); } @@ -733,18 +733,14 @@ public void splitResponse( */ @Test void testUnclosedBatchersAreLogged() throws Exception { - final long DELAY_TIME = 30L; - int actualRemaining = 0; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - actualRemaining = BatcherReference.cleanQueue(); - if (actualRemaining == 0) { - break; - } - Thread.sleep(DELAY_TIME * (1L << retry)); - } - assertThat(actualRemaining).isAtMost(0); + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + return BatcherReference.cleanQueue() == 0; + }); underTest = createDefaultBatcherImpl(batchingSettings, null); Batcher extraBatcher = createDefaultBatcherImpl(batchingSettings, null); @@ -771,20 +767,16 @@ public boolean isLoggable(LogRecord record) { underTest = null; // That *should* have been the last reference. Try to reclaim it. - boolean success = false; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - int orphans = BatcherReference.cleanQueue(); - if (orphans == 1) { - success = true; - break; - } - // Validates that there are no other batcher instance present while GC cleanup. - assertWithMessage("unexpected extra orphans").that(orphans).isEqualTo(0); - Thread.sleep(DELAY_TIME * (1L << retry)); - } - assertWithMessage("Batcher was not garbage collected").that(success).isTrue(); + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + int orphans = BatcherReference.cleanQueue(); + assertWithMessage("unexpected extra orphans").that(orphans).isAtMost(1); + return orphans == 1; + }); LogRecord lr; synchronized (records) { @@ -807,18 +799,14 @@ public boolean isLoggable(LogRecord record) { @Test void testClosedBatchersAreNotLogged() throws Exception { // Clean out the existing instances - final long DELAY_TIME = 30L; - int actualRemaining = 0; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - actualRemaining = BatcherReference.cleanQueue(); - if (actualRemaining == 0) { - break; - } - Thread.sleep(DELAY_TIME * (1L << retry)); - } - assertThat(actualRemaining).isAtMost(0); + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + return BatcherReference.cleanQueue() == 0; + }); // Capture logs final List records = new ArrayList<>(1); @@ -849,16 +837,19 @@ public boolean isLoggable(LogRecord record) { } } // Run GC a few times to give the batchers a chance to be collected - for (int retry = 0; retry < 100; retry++) { - System.gc(); - System.runFinalization(); - BatcherReference.cleanQueue(); - Thread.sleep(10); - } - - synchronized (records) { - assertThat(records).isEmpty(); - } + await() + .pollInterval(Duration.ofMillis(10)) + .during(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + System.gc(); + System.runFinalization(); + BatcherReference.cleanQueue(); + synchronized (records) { + return records.isEmpty(); + } + }); } finally { // reset logging batcherLogger.setFilter(oldFilter); @@ -990,10 +981,12 @@ void testThrottlingBlocking() throws Exception { // resulting in a shorter total_throttled_time at the verification of throttledTime // at the end of the test. // https://github.com/googleapis/sdk-platform-java/issues/1193 - do { - Thread.sleep(10); - } while (batcherAddThreadHolder.isEmpty() - || batcherAddThreadHolder.get(0).getState() != Thread.State.WAITING); + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> + !batcherAddThreadHolder.isEmpty() + && batcherAddThreadHolder.get(0).getState() == Thread.State.WAITING); long beforeGetCall = System.currentTimeMillis(); executor.submit( diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java index 96fe3c81e943..d1ce30763d89 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java @@ -29,12 +29,14 @@ */ package com.google.api.gax.batching; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -68,8 +70,7 @@ void testBlocking() throws InterruptedException { Thread t = new Thread(() -> semaphore.acquire(1)); t.start(); - Thread.sleep(50); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); semaphore.release(1); t.join(); @@ -95,8 +96,7 @@ void testReducePermitLimitBlocking() throws InterruptedException { Thread t = new Thread(() -> semaphore.acquire(1)); t.start(); - Thread.sleep(50); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); semaphore.release(1); t.join(); @@ -124,8 +124,7 @@ void testAcquirePartialBlocking() throws Exception { Thread t1 = new Thread(() -> semaphore.acquire(1)); t1.start(); // wait for thread to start - Thread.sleep(100); - assertTrue(t1.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t1.getState() == Thread.State.WAITING); semaphore.release(6); t1.join(); @@ -133,8 +132,7 @@ void testAcquirePartialBlocking() throws Exception { Thread t2 = new Thread(() -> semaphore.acquirePartial(6)); t2.start(); // wait fo thread to start - Thread.sleep(100); - assertTrue(t2.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t2.getState() == Thread.State.WAITING); // limit should still be 5 and get limit should not block assertEquals(5, semaphore.getPermitLimit()); } @@ -158,8 +156,7 @@ void testIncreasePermitLimitBlocking() throws Exception { Thread t = new Thread(() -> semaphore.acquire(1)); t.start(); - Thread.sleep(50); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); semaphore.increasePermitLimit(1); t.join(); @@ -208,8 +205,7 @@ void testReleaseWontOverflowBlocking() throws Exception { semaphore.release(10); Thread t = new Thread(() -> semaphore.acquire(11)); t.start(); - Thread.sleep(100); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); } @Test @@ -239,7 +235,6 @@ void testPermitLimitUnderflowBlocking() throws Exception { semaphore.release(10); Thread t = new Thread(() -> semaphore.acquire(11)); t.start(); - Thread.sleep(100); - assertTrue(t.isAlive()); + await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING); } } diff --git a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java index 7d57733c5005..89b807bd5610 100644 --- a/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java +++ b/sdk-platform-java/gax-java/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java @@ -30,6 +30,7 @@ package com.google.api.gax.retrying; import static com.google.api.gax.retrying.FailingCallable.FAST_RETRY_SETTINGS; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -41,12 +42,15 @@ import com.google.api.core.NanoClock; import com.google.api.gax.retrying.FailingCallable.CustomException; import com.google.api.gax.rpc.testing.FakeCallContext; +import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -99,26 +103,32 @@ void testSuccessWithFailuresPeekAttempt() throws Exception { future.setAttemptFuture(executor.submit(future)); - int failedAttempts = 0; - while (!future.isDone()) { - ApiFuture attemptResult = future.peekAttemptResult(); - if (attemptResult != null) { - assertTrue(attemptResult.isDone()); - assertFalse(attemptResult.isCancelled()); - try { - attemptResult.get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof CustomException) { - failedAttempts++; - } - } - } - Thread.sleep(0L, 100); - } + final AtomicInteger failedAttempts = new AtomicInteger(0); + final AtomicReference> lastSeenAttempt = new AtomicReference<>(); + await() + .pollInterval(Duration.ofMillis(2)) + .atMost(Duration.ofSeconds(5)) + .until( + () -> { + ApiFuture attemptResult = future.peekAttemptResult(); + if (attemptResult != null && attemptResult != lastSeenAttempt.get()) { + lastSeenAttempt.set(attemptResult); + assertTrue(attemptResult.isDone()); + assertFalse(attemptResult.isCancelled()); + try { + attemptResult.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof CustomException) { + failedAttempts.incrementAndGet(); + } + } + } + return future.isDone(); + }); assertFutureSuccess(future); assertEquals(15, future.getAttemptSettings().getAttemptCount()); - assertTrue(failedAttempts > 0); + assertTrue(failedAttempts.get() > 0); } } @@ -260,9 +270,12 @@ void testCancelOuterFutureAfterStart() throws Exception { callable.setExternalFuture(future); future.setAttemptFuture(executor.submit(future)); - // The test sleeps a duration long enough to ensure that the future has been submitted for - // execution - Thread.sleep(150L); + await() + .atMost(Duration.ofSeconds(5)) + .until( + () -> + future.getAttemptSettings() != null + && future.getAttemptSettings().getAttemptCount() > 0); boolean res = future.cancel(false); assertTrue(res); @@ -302,7 +315,9 @@ void testCancelProxiedFutureAfterStart() throws Exception { callable.setExternalFuture(future); future.setAttemptFuture(executor.submit(future)); - Thread.sleep(50L); + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> callable.getFirstAttemptFinishedLatch().getCount() == 0); // Note that shutdownNow() will not cancel internal FutureTasks automatically, which // may potentially cause another thread handing on RetryingFuture#get() call forever.