Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,8 @@ StandardSQLTypeName getSqlType(String name) {
}
return null;
}

int getParametersArraySize() {
return this.parametersArraySize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@
class BigQueryPreparedStatement extends BigQueryStatement implements PreparedStatement {
private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString());
private static final char POSITIONAL_PARAMETER_CHAR = '?';
// Making this protected so BigQueryCallableStatement subclass can access the parameters.
protected final BigQueryParameterHandler parameterHandler;
// parameterHandler is inherited from BigQueryStatement
protected int parameterCount = 0;
protected String currentQuery;
private Queue<ArrayList<BigQueryJdbcParameter>> batchParameters = new LinkedList<>();
Expand All @@ -91,48 +90,25 @@ private int getParameterCount(String query) {
@Override
public ResultSet executeQuery() throws SQLException {
logQueryExecutionStart(this.currentQuery);
try {
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
jobConfiguration.setParameterMode("POSITIONAL");
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
runQuery(this.currentQuery, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcRuntimeException("Interrupted during executeQuery", ex);
}
return getCurrentResultSet();
return super.executeQuery(this.currentQuery);
}
Comment on lines 91 to 94

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The query execution start is logged twice because super.executeQuery(this.currentQuery) eventually delegates to executeQueryImpl(sql), which also calls logQueryExecutionStart(sql). You can safely remove the redundant call to logQueryExecutionStart here.

Suggested change
public ResultSet executeQuery() throws SQLException {
logQueryExecutionStart(this.currentQuery);
try {
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
jobConfiguration.setParameterMode("POSITIONAL");
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
runQuery(this.currentQuery, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcRuntimeException("Interrupted during executeQuery", ex);
}
return getCurrentResultSet();
return super.executeQuery(this.currentQuery);
}
@Override
public ResultSet executeQuery() throws SQLException {
return super.executeQuery(this.currentQuery);
}


@Override
public long executeLargeUpdate() throws SQLException {
logQueryExecutionStart(this.currentQuery);
try {
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
jobConfiguration.setParameterMode("POSITIONAL");
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
runQuery(this.currentQuery, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcRuntimeException("Interrupted during executeLargeUpdate", ex);
}
return this.currentUpdateCount;
return super.executeLargeUpdate(this.currentQuery);
}
Comment on lines 97 to 100

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The query execution start is logged twice because super.executeLargeUpdate(this.currentQuery) eventually delegates to executeLargeUpdateImpl(sql), which also calls logQueryExecutionStart(sql). You can safely remove the redundant call to logQueryExecutionStart here.

Suggested change
public long executeLargeUpdate() throws SQLException {
logQueryExecutionStart(this.currentQuery);
try {
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
jobConfiguration.setParameterMode("POSITIONAL");
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
runQuery(this.currentQuery, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcRuntimeException("Interrupted during executeLargeUpdate", ex);
}
return this.currentUpdateCount;
return super.executeLargeUpdate(this.currentQuery);
}
@Override
public long executeLargeUpdate() throws SQLException {
return super.executeLargeUpdate(this.currentQuery);
}


@Override
public int executeUpdate() throws SQLException {
return checkUpdateCount(executeLargeUpdate());
logQueryExecutionStart(this.currentQuery);
return super.executeUpdate(this.currentQuery);
}
Comment on lines 103 to 106

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The query execution start is logged twice because super.executeUpdate(this.currentQuery) eventually delegates to executeLargeUpdateImpl(sql), which also calls logQueryExecutionStart(sql). You can safely remove the redundant call to logQueryExecutionStart here.

Suggested change
public int executeUpdate() throws SQLException {
return checkUpdateCount(executeLargeUpdate());
logQueryExecutionStart(this.currentQuery);
return super.executeUpdate(this.currentQuery);
}
@Override
public int executeUpdate() throws SQLException {
return super.executeUpdate(this.currentQuery);
}


@Override
public boolean execute() throws SQLException {
logQueryExecutionStart(this.currentQuery);
try {
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
jobConfiguration.setParameterMode("POSITIONAL");
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
runQuery(this.currentQuery, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcRuntimeException("Interrupted during execute", ex);
}
return getCurrentResultSet() != null;
return super.execute(this.currentQuery);
}
Comment on lines 109 to 112

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The query execution start is logged twice because super.execute(this.currentQuery) eventually delegates to executeImpl(sql), which also calls logQueryExecutionStart(sql). You can safely remove the redundant call to logQueryExecutionStart here.

Suggested change
public boolean execute() throws SQLException {
logQueryExecutionStart(this.currentQuery);
try {
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
jobConfiguration.setParameterMode("POSITIONAL");
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
runQuery(this.currentQuery, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcRuntimeException("Interrupted during execute", ex);
}
return getCurrentResultSet() != null;
return super.execute(this.currentQuery);
}
@Override
public boolean execute() throws SQLException {
return super.execute(this.currentQuery);
}


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class BigQueryStatement extends BigQueryNoOpsStatement {
protected int currentJobIdIndex = -1;
protected List<String> batchQueries = new ArrayList<>();
protected BigQueryConnection connection;
protected BigQueryParameterHandler parameterHandler = null;
protected String connectionId;
protected int maxFieldSize = 0;
protected int maxRows = 0;
Expand Down Expand Up @@ -244,9 +245,10 @@ public ResultSet executeQuery(String sql) throws SQLException {
private ResultSet executeQueryImpl(String sql) throws SQLException {
logQueryExecutionStart(sql);
try {
QueryJobConfiguration jobConfiguration =
setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build());
runQuery(sql, jobConfiguration);
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
jobConfiguration = applyParametersIfPresent(jobConfiguration);
jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration);
runQuery(sql, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcException("Interrupted during executeQuery", ex);
}
Expand All @@ -268,6 +270,7 @@ private long executeLargeUpdateImpl(String sql) throws SQLException {
logQueryExecutionStart(sql);
try {
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
jobConfiguration = applyParametersIfPresent(jobConfiguration);
runQuery(sql, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcRuntimeException("Interrupted during executeLargeUpdate", ex);
Expand Down Expand Up @@ -303,12 +306,13 @@ public boolean execute(String sql) throws SQLException {
private boolean executeImpl(String sql) throws SQLException {
logQueryExecutionStart(sql);
try {
QueryJobConfiguration jobConfiguration = getJobConfig(sql).build();
// If Large Results are enabled, ensure query type is SELECT
if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) {
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
jobConfiguration = applyParametersIfPresent(jobConfiguration);
if (isLargeResultsEnabled()
&& getQueryType(jobConfiguration.build(), null) == SqlType.SELECT) {
jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration);
}
runQuery(sql, jobConfiguration);
runQuery(sql, jobConfiguration.build());
} catch (InterruptedException ex) {
throw new BigQueryJdbcRuntimeException("Interrupted during execute", ex);
}
Expand Down Expand Up @@ -625,35 +629,43 @@ void runQuery(String query, QueryJobConfiguration jobConfiguration)
}
}

private boolean isLargeResultsEnabled() {
protected QueryJobConfiguration.Builder applyParametersIfPresent(
QueryJobConfiguration.Builder jobConfigurationBuilder) throws SQLException {
if (this.parameterHandler != null && this.parameterHandler.getParametersArraySize() > 0) {
jobConfigurationBuilder.setParameterMode("POSITIONAL");
jobConfigurationBuilder = this.parameterHandler.configureParameters(jobConfigurationBuilder);
}
return jobConfigurationBuilder;
}

boolean isLargeResultsEnabled() {
String destinationTable = this.querySettings.getDestinationTable();
String destinationDataset = this.querySettings.getDestinationDataset();
return destinationDataset != null || destinationTable != null;
}

private QueryJobConfiguration setDestinationDatasetAndTableInJobConfig(
QueryJobConfiguration jobConfiguration) {
QueryJobConfiguration.Builder setDestinationDatasetAndTableInJobConfig(
QueryJobConfiguration.Builder jobConfigurationBuilder) {
String destinationTable = this.querySettings.getDestinationTable();
String destinationDataset = this.querySettings.getDestinationDataset();
if (destinationDataset != null || destinationTable != null) {
if (destinationDataset != null) {
checkIfDatasetExistElseCreate(destinationDataset);
}
if (jobConfiguration.useLegacySql() && destinationDataset == null) {
if (getUseLegacySql() && destinationDataset == null) {
checkIfDatasetExistElseCreate(DEFAULT_DATASET_NAME);
destinationDataset = DEFAULT_DATASET_NAME;
}
if (destinationTable == null) {
destinationTable = getDefaultDestinationTable();
}
return jobConfiguration.toBuilder()
return jobConfigurationBuilder
.setAllowLargeResults(this.querySettings.getAllowLargeResults())
.setDestinationTable(TableId.of(destinationDataset, destinationTable))
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE)
.build();
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE);
}
return jobConfiguration;
return jobConfigurationBuilder;
}

Job getNextJob() {
Expand Down Expand Up @@ -1349,14 +1361,16 @@ QueryJobConfiguration.Builder getJobConfig(String query) {
if (this.querySettings.getQueryProperties() != null) {
queryConfigBuilder.setConnectionProperties(this.querySettings.getQueryProperties());
}
boolean useLegacy =
QueryDialectType.BIG_QUERY.equals(
QueryDialectType.valueOf(this.querySettings.getQueryDialect()));
queryConfigBuilder.setUseLegacySql(useLegacy);
queryConfigBuilder.setUseLegacySql(getUseLegacySql());

return queryConfigBuilder;
}

private boolean getUseLegacySql() {
return QueryDialectType.BIG_QUERY.equals(
QueryDialectType.valueOf(this.querySettings.getQueryDialect()));
}

private void checkIfDatasetExistElseCreate(String datasetName) {
Dataset dataset = bigQuery.getDataset(DatasetId.of(datasetName));
if (dataset == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,4 +689,84 @@ public void testWrapperMethods() throws SQLException {
BigQueryJdbcException.class, () -> bigQueryStatement.unwrap(java.sql.Connection.class));
assertTrue(e.getMessage().contains("Cannot unwrap to java.sql.Connection"));
}

@Test
public void testPreparedStatementExecuteQueryWithLargeResults() throws Exception {
// Setup connection mocks to return large results settings
doReturn(true).when(bigQueryConnection).isAllowLargeResults();
doReturn("test_dataset").when(bigQueryConnection).getDestinationDataset();
doReturn("test_table").when(bigQueryConnection).getDestinationTable();

com.google.cloud.bigquery.Dataset dataset = mock(com.google.cloud.bigquery.Dataset.class);
doReturn(dataset).when(bigquery).getDataset(any(com.google.cloud.bigquery.DatasetId.class));

// Create PreparedStatement
BigQueryPreparedStatement preparedStatement =
new BigQueryPreparedStatement(bigQueryConnection, query);
BigQueryPreparedStatement preparedStatementSpy = Mockito.spy(preparedStatement);

TableResult result = Mockito.mock(TableResult.class);
BigQueryJsonResultSet jsonResultSet = mock(BigQueryJsonResultSet.class);
QueryJobConfiguration jobConfiguration = QueryJobConfiguration.newBuilder(query).build();
Job job = getJobMock(result, jobConfiguration, StatementType.SELECT);

doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any());
doReturn(jsonResultSet).when(preparedStatementSpy).processJsonResultSet(result);

Job dryRunJob = getJobMock(null, jobConfiguration, StatementType.SELECT);
doReturn(dryRunJob).when(bigquery).create(any(JobInfo.class));

// Act
preparedStatementSpy.executeQuery();

// Assert
ArgumentCaptor<QueryJobConfiguration> captor =
ArgumentCaptor.forClass(QueryJobConfiguration.class);
verify(bigquery).queryWithTimeout(captor.capture(), any(), any());
QueryJobConfiguration capturedConfig = captor.getValue();

assertThat(capturedConfig.getDestinationTable())
.isEqualTo(TableId.of("test_dataset", "test_table"));
assertThat(capturedConfig.allowLargeResults()).isTrue();
}

@Test
public void testPreparedStatementExecuteWithLargeResults() throws Exception {
// Setup connection mocks to return large results settings
doReturn(true).when(bigQueryConnection).isAllowLargeResults();
doReturn("test_dataset").when(bigQueryConnection).getDestinationDataset();
doReturn("test_table").when(bigQueryConnection).getDestinationTable();

com.google.cloud.bigquery.Dataset dataset = mock(com.google.cloud.bigquery.Dataset.class);
doReturn(dataset).when(bigquery).getDataset(any(com.google.cloud.bigquery.DatasetId.class));

// Create PreparedStatement
BigQueryPreparedStatement preparedStatement =
new BigQueryPreparedStatement(bigQueryConnection, query);
BigQueryPreparedStatement preparedStatementSpy = Mockito.spy(preparedStatement);

TableResult result = Mockito.mock(TableResult.class);
BigQueryJsonResultSet jsonResultSet = mock(BigQueryJsonResultSet.class);
QueryJobConfiguration jobConfiguration = QueryJobConfiguration.newBuilder(query).build();
Job job = getJobMock(result, jobConfiguration, StatementType.SELECT);

doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any());
doReturn(jsonResultSet).when(preparedStatementSpy).processJsonResultSet(result);

Job dryRunJob = getJobMock(null, jobConfiguration, StatementType.SELECT);
doReturn(dryRunJob).when(bigquery).create(any(JobInfo.class));

// Act
preparedStatementSpy.execute();

// Assert
ArgumentCaptor<QueryJobConfiguration> captor =
ArgumentCaptor.forClass(QueryJobConfiguration.class);
verify(bigquery).queryWithTimeout(captor.capture(), any(), any());
QueryJobConfiguration capturedConfig = captor.getValue();

assertThat(capturedConfig.getDestinationTable())
.isEqualTo(TableId.of("test_dataset", "test_table"));
assertThat(capturedConfig.allowLargeResults()).isTrue();
}
}
Loading