diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 32ed62d91fd6..8ed1fd8df325 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -72,6 +72,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -5264,4 +5265,80 @@ private void loadDriverVersionProperties() { throw ex; } } + + // TODO(keshav): This is a temporary compatibility bridge to wrap raw Threads into Futures. + // This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService + // directly. + static Future[] wrapThread(final Thread thread) { + if (thread == null) { + return null; + } + return new Future[] { + new Future() { + private volatile boolean cancelled = false; + + @Override + public synchronized boolean cancel(boolean mayInterruptIfRunning) { + if (cancelled || thread.getState() == Thread.State.TERMINATED) { + return false; + } + cancelled = true; + if (mayInterruptIfRunning) { + thread.interrupt(); + } + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return cancelled || thread.getState() == Thread.State.TERMINATED; + } + + @Override + public Object get() throws InterruptedException, CancellationException { + try { + return get(365, TimeUnit.DAYS); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object get(long timeout, TimeUnit unit) + throws InterruptedException, CancellationException, TimeoutException { + if (isCancelled()) { + throw new CancellationException(); + } + long remainingNanos = unit.toNanos(timeout); + long deadline = System.nanoTime() + remainingNanos; + while (thread.getState() != Thread.State.TERMINATED) { + if (isCancelled()) { + throw new CancellationException(); + } + if (remainingNanos <= 0) { + throw new TimeoutException(); + } + long remainingMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + if (remainingMillis == 0) { + remainingMillis = 1; + } + + long delay = Math.min(remainingMillis, 50); + if (thread.getState() == Thread.State.NEW) { + Thread.sleep(delay); + } else { + thread.join(delay); + } + remainingNanos = deadline - System.nanoTime(); + } + return null; + } + } + }; + } } diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java index 58a5a7212066..9b2b82644c35 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java @@ -44,9 +44,13 @@ import java.sql.Types; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -3308,4 +3312,125 @@ public void testMetadataAndResultSetMetadataTypeMappingConsistency(StandardSQLTy assertEquals( metadataTypeInfo.jdbcType, (int) resultSetType, "Type mapping mismatch for " + type); } + + @Test + public void testWrapThread_NullThread() { + assertNull(BigQueryDatabaseMetaData.wrapThread(null)); + } + + @Test + public void testWrapThread_BasicLifecycle() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch finishLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + try { + startLatch.countDown(); + finishLatch.await(); + } catch (InterruptedException e) { + // ignore + } + }); + + Future[] futures = BigQueryDatabaseMetaData.wrapThread(t); + assertNotNull(futures); + assertEquals(1, futures.length); + Future f = futures[0]; + + // Thread is NEW (not started yet). + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + + t.start(); + startLatch.await(); + + // Thread is running. + assertFalse(f.isDone()); + assertFalse(f.isCancelled()); + + finishLatch.countDown(); + t.join(); + + // Thread is terminated. + assertTrue(f.isDone()); + assertFalse(f.isCancelled()); + assertNull(f.get()); + } + + @Test + public void testWrapThread_CancelBeforeStart() throws Exception { + Thread t = + new Thread( + () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + }); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertTrue(f.cancel(true)); + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + + // cancel on already cancelled should return false + assertFalse(f.cancel(true)); + + assertThrows(CancellationException.class, () -> f.get()); + assertThrows(CancellationException.class, () -> f.get(1, TimeUnit.SECONDS)); + } + + @Test + public void testWrapThread_CancelRunningWithInterrupt() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch interruptedLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + startLatch.countDown(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + interruptedLatch.countDown(); + } + }); + + t.start(); + startLatch.await(); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertTrue(f.cancel(true)); + assertTrue(f.isCancelled()); + assertTrue(f.isDone()); + + assertTrue(interruptedLatch.await(5, TimeUnit.SECONDS)); + assertThrows(CancellationException.class, () -> f.get()); + } + + @Test + public void testWrapThread_GetTimeout() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + Thread t = + new Thread( + () -> { + startLatch.countDown(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // ignore + } + }); + + t.start(); + startLatch.await(); + + Future f = BigQueryDatabaseMetaData.wrapThread(t)[0]; + assertThrows(TimeoutException.class, () -> f.get(100, TimeUnit.MILLISECONDS)); + + // Cleanup: stop the thread + t.interrupt(); + t.join(); + } }