Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -5264,4 +5265,89 @@ private void loadDriverVersionProperties() {
throw ex;
}
}

// TODO(keshav): This is a temporary compatibility bridge to wrap raw Threads into Futures.
// This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService
// directly.
static Future<?>[] wrapThread(final Thread thread) {
if (thread == null) {
return null;
}
return new Future<?>[] {
new Future<Object>() {
private volatile boolean cancelled = false;

@Override
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
if (cancelled || thread.getState() == Thread.State.TERMINATED) {
return false;
}
cancelled = true;
if (mayInterruptIfRunning) {
thread.interrupt();
}
return true;
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public boolean isDone() {
return cancelled || thread.getState() == Thread.State.TERMINATED;
}

@Override
public Object get() throws InterruptedException, CancellationException {
if (isCancelled()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the code duplication, we can call get(timeout, unit) with some absurdly large limit, like 100 days.

throw new CancellationException();
}
while (thread.getState() != Thread.State.TERMINATED) {
if (isCancelled()) {
throw new CancellationException();
}
if (thread.getState() == Thread.State.NEW) {
Thread.sleep(50);
} else {
thread.join(50);
}
}
return null;
}

@Override
public Object get(long timeout, TimeUnit unit)
throws InterruptedException, CancellationException, TimeoutException {
if (isCancelled()) {
throw new CancellationException();
}
long remainingNanos = unit.toNanos(timeout);
long deadline = System.nanoTime() + remainingNanos;
while (thread.getState() != Thread.State.TERMINATED) {
if (isCancelled()) {
throw new CancellationException();
}
if (remainingNanos <= 0) {
throw new TimeoutException();
}
long remainingMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos);
if (remainingMillis == 0) {
remainingMillis = 1;
}

long delay = Math.min(remainingMillis, 50);
if (thread.getState() == Thread.State.NEW) {
Thread.sleep(delay);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the difference sleep vs join?

Also we can just call thread.join(1000);, there is no need to check every 50ms if object was cancelled or not. We have 10-60 second cleanup waiters either way (or worry about < 50 ms remaining time)

} else {
thread.join(delay);
}
remainingNanos = deadline - System.nanoTime();
}
return null;
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@
import java.sql.Types;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -3308,4 +3312,125 @@ public void testMetadataAndResultSetMetadataTypeMappingConsistency(StandardSQLTy
assertEquals(
metadataTypeInfo.jdbcType, (int) resultSetType, "Type mapping mismatch for " + type);
}

@Test
public void testWrapThread_NullThread() {
assertNull(BigQueryDatabaseMetaData.wrapThread(null));
}

@Test
public void testWrapThread_BasicLifecycle() throws Exception {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(1);
Thread t =
new Thread(
() -> {
try {
startLatch.countDown();
finishLatch.await();
} catch (InterruptedException e) {
// ignore
}
});

Future<?>[] futures = BigQueryDatabaseMetaData.wrapThread(t);
assertNotNull(futures);
assertEquals(1, futures.length);
Future<?> f = futures[0];

// Thread is NEW (not started yet).
assertFalse(f.isDone());
assertFalse(f.isCancelled());

t.start();
startLatch.await();

// Thread is running.
assertFalse(f.isDone());
assertFalse(f.isCancelled());

finishLatch.countDown();
t.join();

// Thread is terminated.
assertTrue(f.isDone());
assertFalse(f.isCancelled());
assertNull(f.get());
}

@Test
public void testWrapThread_CancelBeforeStart() throws Exception {
Thread t =
new Thread(
() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
});

Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
assertTrue(f.cancel(true));
assertTrue(f.isCancelled());
assertTrue(f.isDone());

// cancel on already cancelled should return false
assertFalse(f.cancel(true));

assertThrows(CancellationException.class, () -> f.get());
assertThrows(CancellationException.class, () -> f.get(1, TimeUnit.SECONDS));
}

@Test
public void testWrapThread_CancelRunningWithInterrupt() throws Exception {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch interruptedLatch = new CountDownLatch(1);
Thread t =
new Thread(
() -> {
startLatch.countDown();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
interruptedLatch.countDown();
}
});

t.start();
startLatch.await();

Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
assertTrue(f.cancel(true));
assertTrue(f.isCancelled());
assertTrue(f.isDone());

assertTrue(interruptedLatch.await(5, TimeUnit.SECONDS));
assertThrows(CancellationException.class, () -> f.get());
}

@Test
public void testWrapThread_GetTimeout() throws Exception {
CountDownLatch startLatch = new CountDownLatch(1);
Thread t =
new Thread(
() -> {
startLatch.countDown();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// ignore
}
});

t.start();
startLatch.await();

Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
assertThrows(TimeoutException.class, () -> f.get(100, TimeUnit.MILLISECONDS));

// Cleanup: stop the thread
t.interrupt();
t.join();
}
}
Loading