Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.NanoClock;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedAttemptSettings;
Expand Down Expand Up @@ -126,11 +127,6 @@ class ConnectionWorker implements AutoCloseable {
*/
private final FlowController.LimitExceededBehavior limitExceededBehavior;

/*
* TraceId for debugging purpose.
*/
private final String traceId;

/*
* Enables compression on the wire.
*/
Expand Down Expand Up @@ -562,6 +558,7 @@ public ConnectionWorker(
long maxInflightBytes,
Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String clientlibId,
String traceId,
@Nullable String compressorName,
BigQueryWriteSettings clientSettings,
Expand All @@ -578,6 +575,7 @@ public ConnectionWorker(
maxInflightBytes,
maxRetryDuration,
limitExceededBehavior,
clientlibId,
traceId,
compressorName,
clientSettings,
Expand All @@ -595,6 +593,7 @@ public ConnectionWorker(
long maxInflightBytes,
Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String clientlibId,
String traceId,
@Nullable String compressorName,
BigQueryWriteSettings clientSettings,
Expand All @@ -611,6 +610,7 @@ public ConnectionWorker(
maxInflightBytes,
maxRetryDuration,
limitExceededBehavior,
clientlibId,
traceId,
compressorName,
clientSettings,
Expand All @@ -628,6 +628,7 @@ public ConnectionWorker(
long maxInflightBytes,
Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String clientlibId,
String traceId,
@Nullable String compressorName,
BigQueryWriteSettings clientSettings,
Expand All @@ -652,14 +653,14 @@ public ConnectionWorker(
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
String fullTraceId = getFullTraceId(clientlibId, this.writerId, traceId);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the constructor, writerId is available as a local variable (as seen in the TelemetryMetrics constructor call on line 663). Using this.writerId here might refer to an uninitialized field (which would be null) if the field assignment this.writerId = writerId; has not occurred yet. Please use the local variable writerId instead of this.writerId to ensure the correct value is used.

Suggested change
String fullTraceId = getFullTraceId(clientlibId, this.writerId, traceId);
String fullTraceId = getFullTraceId(clientlibId, writerId, traceId);

this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.compressorName = compressorName;
this.retrySettings = retrySettings;
this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler);
this.telemetryMetrics =
new TelemetryMetrics(this, enableOpenTelemetry, getTableName(), writerId, traceId);
new TelemetryMetrics(this, enableOpenTelemetry, getTableName(), writerId, fullTraceId);
this.healthCheckMetrics = new HealthCheckMetrics();
this.isMultiplexing = isMultiplexing;

Expand Down Expand Up @@ -987,6 +988,22 @@ static long calculateSleepTimeMilli(long retryCount) {
return (long) Math.min(Math.pow(2, retryCount) * 50, 60000);
}

private static String getFullTraceId(String clientlibId, String writerId, String traceId) {
String clientWithVersion =
GaxProperties.getLibraryVersion(StreamWriter.class).isEmpty()
? clientlibId
: clientlibId + ":" + GaxProperties.getLibraryVersion(StreamWriter.class);
if (writerId != null && !writerId.isEmpty()) {
clientWithVersion =
clientWithVersion.isEmpty() ? writerId : clientWithVersion + ":" + writerId;
}
if (traceId == null || traceId.isEmpty()) {
return clientWithVersion;
} else {
return clientWithVersion + " " + traceId;
}
}

@VisibleForTesting
void setTestOnlyAppendLoopSleepTime(long testOnlyAppendLoopSleepTime) {
this.testOnlyAppendLoopSleepTime = testOnlyAppendLoopSleepTime;
Expand Down Expand Up @@ -1260,7 +1277,11 @@ private void appendLoop() {
// If we are at the first request for every table switch, including the first request in
// the connection, we will attach both stream name and table schema to the request.
destinationSet.add(streamName);
originalRequestBuilder.setTraceId(wrapper.streamWriter.getFullTraceId());
originalRequestBuilder.setTraceId(
getFullTraceId(
wrapper.streamWriter.getClientId(),
this.writerId,
wrapper.streamWriter.getTraceId()));
} else if (!isMultiplexing) {
// If we are not in multiplexing and not in the first request, clear the stream name.
originalRequestBuilder.clearWriteStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ private ConnectionWorker createOrReuseConnectionWorker(
streamWriter.getStreamName(),
streamWriter.getLocation(),
streamWriter.getWriterSchema(),
streamWriter.getFullTraceId());
streamWriter.getClientId(),
streamWriter.getTraceId());
} else {
ConnectionWorker existingBestConnection =
pickBestLoadConnection(
Expand All @@ -337,7 +338,8 @@ private ConnectionWorker createOrReuseConnectionWorker(
streamWriter.getStreamName(),
streamWriter.getLocation(),
streamWriter.getWriterSchema(),
streamWriter.getFullTraceId());
streamWriter.getClientId(),
streamWriter.getTraceId());
} else {
// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
Expand Down Expand Up @@ -396,7 +398,8 @@ private ConnectionWorker createConnectionWorker(
String streamName,
String location,
AppendFormats.AppendRowsSchema writeSchema,
String fullTraceId)
String clientId,
String traceId)
throws IOException {
if (enableTesting) {
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
Expand All @@ -411,7 +414,8 @@ private ConnectionWorker createConnectionWorker(
maxInflightBytes,
maxRetryDuration,
limitExceededBehavior,
fullTraceId,
clientId,
traceId,
compressorName,
clientSettings,
retrySettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
Expand Down Expand Up @@ -93,8 +92,11 @@ private static Cache<String, String> allocateProjectLocationCache() {
*/
private final String streamName;

/** This is the library version may or may not include library version id. */
private final String fullTraceId;
/** Client id of the writer. */
private final String clientId;

/** User provided trace id. */
private final String traceId;

/** Every writer has a fixed proto schema or arrow schema. */
private final AppendRowsSchema writerSchema;
Expand Down Expand Up @@ -256,7 +258,8 @@ private StreamWriter(Builder builder) throws IOException {
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
this.requestProfilerHook =
new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
this.fullTraceId = builder.getFullTraceId();
this.clientId = builder.clientId;
this.traceId = builder.traceId;
if (builder.enableRequestProfiler) {
// Request profiler is enabled on singleton level, from now on a periodical flush will be
// started
Expand All @@ -275,7 +278,8 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxInflightBytes,
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.getFullTraceId(),
builder.clientId,
builder.traceId,
builder.compressorName,
clientSettings,
builder.retrySettings,
Expand Down Expand Up @@ -395,8 +399,12 @@ static void recreateProjectLocationCache(long durationExpireMillis) {
projectAndDatasetToLocation = allocateProjectLocationCache();
}

String getFullTraceId() {
return fullTraceId;
String getClientId() {
return clientId;
}

String getTraceId() {
return traceId;
}

AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() {
Expand Down Expand Up @@ -1119,18 +1127,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
public StreamWriter build() throws IOException {
return new StreamWriter(this);
}

String getFullTraceId() {
String clientWithVersion =
GaxProperties.getLibraryVersion(StreamWriter.class).isEmpty()
? clientId
: clientId + ":" + GaxProperties.getLibraryVersion(StreamWriter.class);
if (traceId == null || traceId.isEmpty()) {
return clientWithVersion;
} else {
return clientWithVersion + " " + traceId;
}
}
}

private String generateRequestUniqueId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ void testAppendButInflightQueueFull() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -574,6 +575,7 @@ void testThrowExceptionWhileWithinAppendLoop() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -646,6 +648,7 @@ void testLocationMismatch() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -682,6 +685,7 @@ void testStreamNameMismatch() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -746,7 +750,8 @@ private ConnectionWorker createMultiplexedConnectionWorker(
maxBytes,
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
"java-streamwriter",
traceId,
null,
client.getSettings(),
retrySettings,
Expand Down Expand Up @@ -905,6 +910,7 @@ void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exceptio
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -983,6 +989,7 @@ void testLongTimeIdleWontFail() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -1030,6 +1037,7 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
null,
null,
client.getSettings(),
Expand Down Expand Up @@ -1072,6 +1080,7 @@ void exerciseOpenTelemetryAttributesWithTraceId(
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
traceId,
null,
client.getSettings(),
Expand Down Expand Up @@ -1143,6 +1152,7 @@ void testDoubleDisconnectWithShorterRetryDuration() throws Exception {
100000,
Duration.ofMillis(1), // very small maxRetryDuration
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -1176,6 +1186,7 @@ void testHealthCheck() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -1251,6 +1262,7 @@ void testHealthCheckThresholds() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -1355,6 +1367,7 @@ void testInflightRetryCountHealthMetric() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -1450,6 +1463,7 @@ void testInflightRetryCountHealthMetricExactlyOnce() throws Exception {
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
Expand Down Expand Up @@ -1504,4 +1518,48 @@ void testInflightRetryCountHealthMetricExactlyOnce() throws Exception {
assertEquals(3, healthCheckFields.responseCodes.get(Status.Code.OK.value()));
assertEquals("projects/p1/datasets/d1/tables/t1/streams/s1", healthCheckFields.streamName);
}

@Test
void testTraceIdContainsWriterId() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setLocation("us")
.setWriterSchema(schema1)
.setTraceId(TEST_TRACE_ID)
.build();
testBigQueryWrite.addResponse(createAppendResponse(0));

try (ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
"us",
createProtoSchema("foo"),
6,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
"java-streamwriter",
TEST_TRACE_ID,
null,
client.getSettings(),
retrySettings,
/* enableRequestProfiler= */ false,
/* enableOpenTelemetry= */ false,
/* isMultiplexing= */ false)) {

ApiFuture<AppendRowsResponse> future =
sendTestMessage(
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(0)}), 0);
future.get();

String writerId = connectionWorker.getWriterId();
assertThat(testBigQueryWrite.getAppendRequests()).hasSize(1);
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(0);

String expectedTraceIdRegex =
"java-streamwriter(:[^\\s:]+)?:" + writerId + " " + TEST_TRACE_ID;
assertThat(serverRequest.getTraceId()).matches(expectedTraceIdRegex);
}
}
}
Loading