diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 215176e7b46f..f508e6ce6567 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -19,6 +19,7 @@ import com.google.api.core.NanoClock; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.GaxProperties; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.TimedAttemptSettings; @@ -126,11 +127,6 @@ class ConnectionWorker implements AutoCloseable { */ private final FlowController.LimitExceededBehavior limitExceededBehavior; - /* - * TraceId for debugging purpose. - */ - private final String traceId; - /* * Enables compression on the wire. */ @@ -562,6 +558,7 @@ public ConnectionWorker( long maxInflightBytes, Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, + String clientlibId, String traceId, @Nullable String compressorName, BigQueryWriteSettings clientSettings, @@ -578,6 +575,7 @@ public ConnectionWorker( maxInflightBytes, maxRetryDuration, limitExceededBehavior, + clientlibId, traceId, compressorName, clientSettings, @@ -595,6 +593,7 @@ public ConnectionWorker( long maxInflightBytes, Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, + String clientlibId, String traceId, @Nullable String compressorName, BigQueryWriteSettings clientSettings, @@ -611,6 +610,7 @@ public ConnectionWorker( maxInflightBytes, maxRetryDuration, limitExceededBehavior, + clientlibId, traceId, compressorName, clientSettings, @@ -628,6 +628,7 @@ public ConnectionWorker( long maxInflightBytes, Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, + String clientlibId, String traceId, @Nullable String compressorName, BigQueryWriteSettings clientSettings, @@ -652,14 +653,14 @@ public ConnectionWorker( this.maxInflightRequests = maxInflightRequests; this.maxInflightBytes = maxInflightBytes; this.limitExceededBehavior = limitExceededBehavior; - this.traceId = traceId; + String fullTraceId = getFullTraceId(clientlibId, this.writerId, traceId); this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); this.compressorName = compressorName; this.retrySettings = retrySettings; this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler); this.telemetryMetrics = - new TelemetryMetrics(this, enableOpenTelemetry, getTableName(), writerId, traceId); + new TelemetryMetrics(this, enableOpenTelemetry, getTableName(), writerId, fullTraceId); this.healthCheckMetrics = new HealthCheckMetrics(); this.isMultiplexing = isMultiplexing; @@ -987,6 +988,22 @@ static long calculateSleepTimeMilli(long retryCount) { return (long) Math.min(Math.pow(2, retryCount) * 50, 60000); } + private static String getFullTraceId(String clientlibId, String writerId, String traceId) { + String clientWithVersion = + GaxProperties.getLibraryVersion(StreamWriter.class).isEmpty() + ? clientlibId + : clientlibId + ":" + GaxProperties.getLibraryVersion(StreamWriter.class); + if (writerId != null && !writerId.isEmpty()) { + clientWithVersion = + clientWithVersion.isEmpty() ? writerId : clientWithVersion + ":" + writerId; + } + if (traceId == null || traceId.isEmpty()) { + return clientWithVersion; + } else { + return clientWithVersion + " " + traceId; + } + } + @VisibleForTesting void setTestOnlyAppendLoopSleepTime(long testOnlyAppendLoopSleepTime) { this.testOnlyAppendLoopSleepTime = testOnlyAppendLoopSleepTime; @@ -1260,7 +1277,11 @@ private void appendLoop() { // If we are at the first request for every table switch, including the first request in // the connection, we will attach both stream name and table schema to the request. destinationSet.add(streamName); - originalRequestBuilder.setTraceId(wrapper.streamWriter.getFullTraceId()); + originalRequestBuilder.setTraceId( + getFullTraceId( + wrapper.streamWriter.getClientId(), + this.writerId, + wrapper.streamWriter.getTraceId())); } else if (!isMultiplexing) { // If we are not in multiplexing and not in the first request, clear the stream name. originalRequestBuilder.clearWriteStream(); diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index f9c722c270fb..edffe603ee9e 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -317,7 +317,8 @@ private ConnectionWorker createOrReuseConnectionWorker( streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getWriterSchema(), - streamWriter.getFullTraceId()); + streamWriter.getClientId(), + streamWriter.getTraceId()); } else { ConnectionWorker existingBestConnection = pickBestLoadConnection( @@ -337,7 +338,8 @@ private ConnectionWorker createOrReuseConnectionWorker( streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getWriterSchema(), - streamWriter.getFullTraceId()); + streamWriter.getClientId(), + streamWriter.getTraceId()); } else { // Stick to the original connection if all the connections are overwhelmed. if (existingConnectionWorker != null) { @@ -396,7 +398,8 @@ private ConnectionWorker createConnectionWorker( String streamName, String location, AppendFormats.AppendRowsSchema writeSchema, - String fullTraceId) + String clientId, + String traceId) throws IOException { if (enableTesting) { // Though atomic integer is super lightweight, add extra if check in case adding future logic. @@ -411,7 +414,8 @@ private ConnectionWorker createConnectionWorker( maxInflightBytes, maxRetryDuration, limitExceededBehavior, - fullTraceId, + clientId, + traceId, compressorName, clientSettings, retrySettings, diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 2da1b0f61791..6bf659eb173c 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -19,7 +19,6 @@ import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; -import com.google.api.gax.core.GaxProperties; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.Credentials; @@ -93,8 +92,11 @@ private static Cache allocateProjectLocationCache() { */ private final String streamName; - /** This is the library version may or may not include library version id. */ - private final String fullTraceId; + /** Client id of the writer. */ + private final String clientId; + + /** User provided trace id. */ + private final String traceId; /** Every writer has a fixed proto schema or arrow schema. */ private final AppendRowsSchema writerSchema; @@ -256,7 +258,8 @@ private StreamWriter(Builder builder) throws IOException { BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder); this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler); - this.fullTraceId = builder.getFullTraceId(); + this.clientId = builder.clientId; + this.traceId = builder.traceId; if (builder.enableRequestProfiler) { // Request profiler is enabled on singleton level, from now on a periodical flush will be // started @@ -275,7 +278,8 @@ private StreamWriter(Builder builder) throws IOException { builder.maxInflightBytes, builder.maxRetryDuration, builder.limitExceededBehavior, - builder.getFullTraceId(), + builder.clientId, + builder.traceId, builder.compressorName, clientSettings, builder.retrySettings, @@ -395,8 +399,12 @@ static void recreateProjectLocationCache(long durationExpireMillis) { projectAndDatasetToLocation = allocateProjectLocationCache(); } - String getFullTraceId() { - return fullTraceId; + String getClientId() { + return clientId; + } + + String getTraceId() { + return traceId; } AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() { @@ -1119,18 +1127,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) { public StreamWriter build() throws IOException { return new StreamWriter(this); } - - String getFullTraceId() { - String clientWithVersion = - GaxProperties.getLibraryVersion(StreamWriter.class).isEmpty() - ? clientId - : clientId + ":" + GaxProperties.getLibraryVersion(StreamWriter.class); - if (traceId == null || traceId.isEmpty()) { - return clientWithVersion; - } else { - return clientWithVersion + " " + traceId; - } - } } private String generateRequestUniqueId() { diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 44bb25105d12..fdf3b6a169c4 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -514,6 +514,7 @@ void testAppendButInflightQueueFull() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -574,6 +575,7 @@ void testThrowExceptionWhileWithinAppendLoop() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -646,6 +648,7 @@ void testLocationMismatch() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -682,6 +685,7 @@ void testStreamNameMismatch() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -746,7 +750,8 @@ private ConnectionWorker createMultiplexedConnectionWorker( maxBytes, maxRetryDuration, FlowController.LimitExceededBehavior.Block, - TEST_TRACE_ID, + "java-streamwriter", + traceId, null, client.getSettings(), retrySettings, @@ -905,6 +910,7 @@ void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exceptio 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -983,6 +989,7 @@ void testLongTimeIdleWontFail() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -1030,6 +1037,7 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", null, null, client.getSettings(), @@ -1072,6 +1080,7 @@ void exerciseOpenTelemetryAttributesWithTraceId( 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", traceId, null, client.getSettings(), @@ -1143,6 +1152,7 @@ void testDoubleDisconnectWithShorterRetryDuration() throws Exception { 100000, Duration.ofMillis(1), // very small maxRetryDuration FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -1176,6 +1186,7 @@ void testHealthCheck() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -1251,6 +1262,7 @@ void testHealthCheckThresholds() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -1355,6 +1367,7 @@ void testInflightRetryCountHealthMetric() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -1450,6 +1463,7 @@ void testInflightRetryCountHealthMetricExactlyOnce() throws Exception { 100000, Duration.ofSeconds(100), FlowController.LimitExceededBehavior.Block, + "java-streamwriter", TEST_TRACE_ID, null, client.getSettings(), @@ -1504,4 +1518,48 @@ void testInflightRetryCountHealthMetricExactlyOnce() throws Exception { assertEquals(3, healthCheckFields.responseCodes.get(Status.Code.OK.value())); assertEquals("projects/p1/datasets/d1/tables/t1/streams/s1", healthCheckFields.streamName); } + + @Test + void testTraceIdContainsWriterId() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .setTraceId(TEST_TRACE_ID) + .build(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + try (ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 6, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + "java-streamwriter", + TEST_TRACE_ID, + null, + client.getSettings(), + retrySettings, + /* enableRequestProfiler= */ false, + /* enableOpenTelemetry= */ false, + /* isMultiplexing= */ false)) { + + ApiFuture future = + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(0)}), 0); + future.get(); + + String writerId = connectionWorker.getWriterId(); + assertThat(testBigQueryWrite.getAppendRequests()).hasSize(1); + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(0); + + String expectedTraceIdRegex = + "java-streamwriter(:[^\\s:]+)?:" + writerId + " " + TEST_TRACE_ID; + assertThat(serverRequest.getTraceId()).matches(expectedTraceIdRegex); + } + } }