Skip to content

Commit d3bd231

Browse files
authored
Merge pull request #5672 from BryanFauble/plfm-9726-search-convergence-timeout
PLFM-9726: Fix SearchIndex stuck in CREATING when AOSS convergence probe times out
2 parents a7bd1f3 + 5100b40 commit d3bd231

7 files changed

Lines changed: 56 additions & 356 deletions

File tree

integration-test/src/test/java/org/sagebionetworks/ITSearchQueryTest.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.sagebionetworks.repo.model.search.dsl.TermsAggregation;
6262
import org.sagebionetworks.repo.model.search.table.SearchAutocompleteRequest;
6363
import org.sagebionetworks.repo.model.search.table.SearchIndexQuery;
64+
import org.sagebionetworks.util.TimeUtils;
6465

6566
/**
6667
* Integration tests for the SearchIndex query path.
@@ -420,8 +421,13 @@ public void testAutocompleteWithEdgeNgram() throws Exception {
420421
.setQuery(new Query().setMatch_bool_prefix(
421422
Map.of("geneName", new MatchBoolPrefixFieldOptions().setQuery("BRC")))));
422423

423-
// call under test
424-
SearchQueryResults autocompleteResults = synapse.searchAutocomplete(autocompleteRequest);
424+
// call under test — AOSS is eventually consistent per query type and per replica, so
425+
// the match_all wait above does not guarantee this match_bool_prefix autocomplete sees
426+
// every document yet. Poll the autocomplete itself until both BRCA hits surface.
427+
SearchQueryResults autocompleteResults = waitForQuery(
428+
() -> synapse.searchAutocomplete(autocompleteRequest),
429+
r -> r.getHits() != null && r.getHits().size() >= 2,
430+
"at least 2 autocomplete hits for 'BRC' (BRCA1, BRCA2)");
425431
assertNotNull(autocompleteResults);
426432
assertNotNull(autocompleteResults.getHits());
427433
assertTrue(autocompleteResults.getHits().size() >= 2,
@@ -453,6 +459,44 @@ public void testStartSearchQueryWithUnsupportedKey() {
453459
assertEquals("JSON Element in Entity is Unsupported: notPartOfSpecification", message);
454460
}
455461

462+
/**
463+
* A search/autocomplete call against the live cluster that may throw {@link SynapseException}.
464+
*/
465+
@FunctionalInterface
466+
private interface QueryCall {
467+
SearchQueryResults call() throws SynapseException;
468+
}
469+
470+
/**
471+
* Poll a synchronous search/autocomplete call until {@code condition} holds, then return the
472+
* matching result. AOSS is eventually consistent per query type and per replica, so a freshly
473+
* built index that already answers a {@code match_all} probe may transiently under-return for a
474+
* different query shape (autocomplete, match, post_filter). Tests asserting on a specific
475+
* synchronous query must poll that exact query rather than trusting a prior wait. Mirrors the
476+
* {@code waitForSearch}/{@code waitForSearchHits} helpers in {@code OpenSearchManagerImplAutoWiredTest}.
477+
*
478+
* @param queryCall the call under test
479+
* @param condition the readiness predicate on the result (e.g. expected hit count reached)
480+
* @param description what is being waited for, used in the timeout message
481+
* @return the result that satisfied {@code condition}
482+
*/
483+
private SearchQueryResults waitForQuery(QueryCall queryCall,
484+
java.util.function.Predicate<SearchQueryResults> condition, String description) {
485+
SearchQueryResults[] holder = { null };
486+
boolean ready = TimeUtils.waitForExponential(MAX_QUERY_TIMEOUT_MS, 1000L, null, (v) -> {
487+
try {
488+
SearchQueryResults r = queryCall.call();
489+
holder[0] = r;
490+
return r != null && condition.test(r);
491+
} catch (SynapseException e) {
492+
// index_not_found / still-propagating — keep polling
493+
return false;
494+
}
495+
});
496+
assertTrue(ready, "Timed out waiting for " + description);
497+
return holder[0];
498+
}
499+
456500
private void grantPublicRead(String entityId) throws SynapseException {
457501
AccessControlList acl = synapse.getACL(entityId);
458502
ResourceAccess publicAccess = new ResourceAccess();

services/repository-managers/src/main/java/org/sagebionetworks/repo/manager/search/OpenSearchManager.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -93,28 +93,6 @@ Optional<String> createIndex(String indexName, List<ColumnModel> columns,
9393
*/
9494
void waitForIndexWritable(String indexName) throws RecoverableMessageException;
9595

96-
/**
97-
* Block until {@code indexName} reports a document count of at least
98-
* {@code expectedCount}. AOSS is eventually consistent: a {@code bulkIndex} call can
99-
* return success while the documents are not yet visible to {@code _search} or
100-
* {@code _count}. Polling {@code _count} (allowed on AOSS under
101-
* {@code aoss:ReadDocument}) is the only convergence signal available.
102-
*
103-
* <p>The comparison is {@code actual >= expectedCount} rather than strict equality so
104-
* a leftover readiness-probe sentinel (whose cleanup is best-effort, see
105-
* {@link #waitForIndexWritable}) does not strand convergence one short.</p>
106-
*
107-
* <p>No-op when {@code expectedCount == 0} — an empty index reports zero immediately
108-
* and skipping the call avoids an unnecessary round-trip.</p>
109-
*
110-
* @param indexName the OpenSearch index name
111-
* @param expectedCount the document count the index must reach before returning
112-
* @throws RecoverableMessageException when the index does not converge within the
113-
* retry budget, so the SearchIndex lifecycle message goes back on SQS for a
114-
* later attempt instead of being marked permanently FAILED
115-
*/
116-
void waitForDocumentCount(String indexName, long expectedCount) throws RecoverableMessageException;
117-
11896
/**
11997
* Execute a search query against the OpenSearch index. The {@code options} set controls
12098
* which sections of the OpenSearch request are populated: omitting HITS switches the

services/repository-managers/src/main/java/org/sagebionetworks/repo/manager/search/OpenSearchManagerImpl.java

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import org.opensearch.client.opensearch._types.mapping.Property;
3131
import org.opensearch.client.opensearch.core.BulkRequest;
3232
import org.opensearch.client.opensearch.core.BulkResponse;
33-
import org.opensearch.client.opensearch.core.CountRequest;
34-
import org.opensearch.client.opensearch.core.CountResponse;
3533
import org.opensearch.client.opensearch.core.DeleteRequest;
3634
import org.opensearch.client.opensearch.core.IndexRequest;
3735
import org.opensearch.client.opensearch.core.SearchResponse;
@@ -105,13 +103,6 @@ public class OpenSearchManagerImpl implements OpenSearchManager {
105103
static int VALIDATE_MAX_RETRIES = 10;
106104
static long VALIDATE_INITIAL_BACKOFF_MS = 1000L;
107105

108-
// Convergence probe for a freshly-finished bulk index. AOSS acknowledges bulk writes
109-
// before the documents are visible to _search or _count, so we poll _count until the
110-
// index reports the expected number of documents. Same budget as the writability probe
111-
// for consistency. Non-final so unit tests can lower the values and avoid real sleeps.
112-
static int COUNT_PROBE_MAX_RETRIES = 10;
113-
static long COUNT_PROBE_INITIAL_BACKOFF_MS = 10000L;
114-
115106
// Cleanup retry for the readiness-probe sentinel. AOSS doesn't honor refresh=wait_for,
116107
// so a single delete that fails on a transient network blip would orphan the sentinel
117108
// (visible only to MATCH_ALL queries since _row_id = -1 cannot collide with real ids,
@@ -543,55 +534,6 @@ public void waitForIndexWritable(String indexName) throws RecoverableMessageExce
543534
}
544535
}
545536

546-
@Override
547-
public void waitForDocumentCount(String indexName, long expectedCount) throws RecoverableMessageException {
548-
// Empty indexes report zero immediately; skip the round-trip.
549-
if (expectedCount <= 0L) {
550-
return;
551-
}
552-
final int[] attempt = {0};
553-
final long[] lastObserved = {-1L};
554-
try {
555-
TimeUtils.waitForExponentialMaxRetry(COUNT_PROBE_MAX_RETRIES, COUNT_PROBE_INITIAL_BACKOFF_MS,
556-
() -> {
557-
attempt[0]++;
558-
try {
559-
CountResponse response = openSearchClient.count(CountRequest.of(r -> r
560-
.index(indexName)));
561-
long actual = response.count();
562-
lastObserved[0] = actual;
563-
// >= rather than == so a leftover readiness-probe sentinel cannot
564-
// permanently strand convergence one short.
565-
if (actual >= expectedCount) {
566-
return Boolean.TRUE;
567-
}
568-
LOG.warn("Index {} not yet converged (attempt {}/{}): {} of {} documents visible",
569-
indexName, attempt[0], COUNT_PROBE_MAX_RETRIES, actual, expectedCount);
570-
throw new RetryException("count " + actual + " of " + expectedCount);
571-
} catch (OpenSearchException e) {
572-
LOG.warn("Index {} count probe failed (attempt {}/{}): {}",
573-
indexName, attempt[0], COUNT_PROBE_MAX_RETRIES, describeError(e.error()));
574-
throw new RetryException(e);
575-
} catch (IOException e) {
576-
LOG.warn("Index {} count probe failed (attempt {}/{}): {}",
577-
indexName, attempt[0], COUNT_PROBE_MAX_RETRIES, e.getMessage());
578-
throw new RetryException(e);
579-
}
580-
});
581-
} catch (RetryException e) {
582-
LOG.error("Index {} did not converge to expected count after {} attempts ({} of {})",
583-
indexName, COUNT_PROBE_MAX_RETRIES, lastObserved[0], expectedCount);
584-
throw new RecoverableMessageException(
585-
"AOSS index " + indexName + " did not converge to expected count ("
586-
+ lastObserved[0] + " of " + expectedCount + ") within the retry budget",
587-
e.getCause());
588-
} catch (RuntimeException e) {
589-
throw e;
590-
} catch (Exception e) {
591-
throw new RuntimeException("Failed convergence probe for search index: " + indexName, e);
592-
}
593-
}
594-
595537
@Override
596538
public long bulkIndex(String indexName, List<BulkOperation> operations) {
597539
// Callers must hand us idempotent ops (index/delete with explicit _id). When a

services/repository-managers/src/main/java/org/sagebionetworks/repo/manager/search/SearchIndexLifecycleManagerImpl.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -313,13 +313,6 @@ private void buildIndex(ProgressCallback progressCallback, String entityId, Long
313313
// failure is not in the search index's configuration.
314314
throw e;
315315
} catch (Throwable e) {
316-
// SearchIndexRowHandler.close() tunnels RecoverableMessageException through
317-
// IOException because RowHandler.close() can't declare anything else. Unwrap
318-
// here so a convergence-probe timeout reaches the do-not-mark-FAILED branch
319-
// above on retry instead of permanently failing the index.
320-
if (e instanceof IOException && e.getCause() instanceof RecoverableMessageException) {
321-
throw (RecoverableMessageException) e.getCause();
322-
}
323316
// Another worker is currently deleting this same AOSS index. Translate
324317
// to a recoverable SQS retry: by the time the retry runs, the winning
325318
// delete has finished and our deleteIndex on retry no-ops via
@@ -616,17 +609,6 @@ private void flush() {
616609
@Override
617610
public void close() throws IOException {
618611
flush();
619-
// AOSS bulk writes acknowledge before the documents are visible to _search or
620-
// _count. Block until the index reports the row count we streamed, so the
621-
// SearchIndex isn't flipped to ACTIVE while a query would still under-return.
622-
try {
623-
client.waitForDocumentCount(indexName, totalRows);
624-
} catch (RecoverableMessageException e) {
625-
// RowHandler.close() can only declare IOException; tunnel the recoverable
626-
// signal through it. The lifecycle build path unwraps before its FAILED
627-
// branch so the message goes back on SQS instead of marking the index FAILED.
628-
throw new IOException(e);
629-
}
630612
}
631613
}
632614

services/repository-managers/src/test/java/org/sagebionetworks/repo/manager/search/OpenSearchManagerImplAutoWiredTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,9 +1236,10 @@ public void testSearchWithPostFilter() {
12361236
.setSize(10L)
12371237
.setFrom(0L);
12381238

1239-
// call under test — post_filter narrows hits; aggregations stay at full population
1240-
SearchQueryResults results = openSearchManager.search(indexName, body, columns,
1241-
EnumSet.of(SearchQueryPart.HITS, SearchQueryPart.TOTAL_HITS));
1239+
// call under test — post_filter narrows hits; aggregations stay at full population.
1240+
// Poll until post_filter returns the expected 2 ACTIVE hits.
1241+
SearchQueryResults results = waitForSearchHits(body, columns,
1242+
EnumSet.of(SearchQueryPart.HITS, SearchQueryPart.TOTAL_HITS), 2);
12421243

12431244
assertEquals(2L, results.getTotalHits(),
12441245
"totalHits must reflect post_filter narrowing — only ACTIVE rows");
@@ -1285,9 +1286,10 @@ public void testSearchWithHighlight() {
12851286
.setSize(10L)
12861287
.setFrom(0L);
12871288

1288-
// call under test — highlight payload round-trips and SearchHit.highlights is populated
1289-
SearchQueryResults results = openSearchManager.search(indexName, body, columns,
1290-
EnumSet.of(SearchQueryPart.HITS, SearchQueryPart.TOTAL_HITS));
1289+
// call under test — highlight payload round-trips and SearchHit.highlights is populated.
1290+
// Poll until the match query returns all 3 hits.
1291+
SearchQueryResults results = waitForSearchHits(body, columns,
1292+
EnumSet.of(SearchQueryPart.HITS, SearchQueryPart.TOTAL_HITS), 3);
12911293

12921294
assertEquals(3L, results.getTotalHits());
12931295
assertNotNull(results.getHits());

services/repository-managers/src/test/java/org/sagebionetworks/repo/manager/search/OpenSearchManagerImplTest.java

Lines changed: 0 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ private static IndexSettingsAnalysis toAnalysis(String settingsJson) {
136136

137137
private long originalBulkInitialBackoffMs;
138138
private long originalProbeInitialBackoffMs;
139-
private long originalCountProbeInitialBackoffMs;
140139
private long originalSentinelCleanupInitialBackoffMs;
141140

142141
@BeforeEach
@@ -147,8 +146,6 @@ public void setUp() {
147146
OpenSearchManagerImpl.BULK_INDEX_INITIAL_BACKOFF_MS = 1L;
148147
originalProbeInitialBackoffMs = OpenSearchManagerImpl.INDEX_WRITABLE_INITIAL_BACKOFF_MS;
149148
OpenSearchManagerImpl.INDEX_WRITABLE_INITIAL_BACKOFF_MS = 1L;
150-
originalCountProbeInitialBackoffMs = OpenSearchManagerImpl.COUNT_PROBE_INITIAL_BACKOFF_MS;
151-
OpenSearchManagerImpl.COUNT_PROBE_INITIAL_BACKOFF_MS = 1L;
152149
originalSentinelCleanupInitialBackoffMs = OpenSearchManagerImpl.SENTINEL_CLEANUP_INITIAL_BACKOFF_MS;
153150
OpenSearchManagerImpl.SENTINEL_CLEANUP_INITIAL_BACKOFF_MS = 1L;
154151
}
@@ -157,7 +154,6 @@ public void setUp() {
157154
public void tearDown() {
158155
OpenSearchManagerImpl.BULK_INDEX_INITIAL_BACKOFF_MS = originalBulkInitialBackoffMs;
159156
OpenSearchManagerImpl.INDEX_WRITABLE_INITIAL_BACKOFF_MS = originalProbeInitialBackoffMs;
160-
OpenSearchManagerImpl.COUNT_PROBE_INITIAL_BACKOFF_MS = originalCountProbeInitialBackoffMs;
161157
OpenSearchManagerImpl.SENTINEL_CLEANUP_INITIAL_BACKOFF_MS = originalSentinelCleanupInitialBackoffMs;
162158
}
163159

@@ -1718,117 +1714,6 @@ public void testWaitForIndexWritableSentinelCleanupRetriesAndSucceeds() throws E
17181714
verify(openSearchClient, times(2)).delete(argThat((DeleteRequest req) -> req != null));
17191715
}
17201716

1721-
// --- waitForDocumentCount ---
1722-
1723-
@Test
1724-
public void testWaitForDocumentCountWithImmediateMatchReturns() throws Exception {
1725-
// Happy path: AOSS reports the expected count on the first attempt; no retry, no log noise.
1726-
when(openSearchClient.count(any(org.opensearch.client.opensearch.core.CountRequest.class)))
1727-
.thenReturn(org.opensearch.client.opensearch.core.CountResponse.of(b -> b
1728-
.count(5L)
1729-
.shards(s -> s.total(1).successful(1).failed(0))));
1730-
1731-
// call under test
1732-
manager.waitForDocumentCount("search-index-syn1", 5L);
1733-
1734-
verify(openSearchClient, times(1))
1735-
.count(any(org.opensearch.client.opensearch.core.CountRequest.class));
1736-
}
1737-
1738-
@Test
1739-
public void testWaitForDocumentCountConvergesAfterUnderCount() throws Exception {
1740-
// AOSS is eventually consistent: the first probe sees fewer documents than were
1741-
// bulk-indexed; the second sees the full count and returns.
1742-
when(openSearchClient.count(any(org.opensearch.client.opensearch.core.CountRequest.class)))
1743-
.thenReturn(countResponse(2L))
1744-
.thenReturn(countResponse(5L));
1745-
1746-
// call under test
1747-
manager.waitForDocumentCount("search-index-syn1", 5L);
1748-
1749-
verify(openSearchClient, times(2))
1750-
.count(any(org.opensearch.client.opensearch.core.CountRequest.class));
1751-
}
1752-
1753-
@Test
1754-
public void testWaitForDocumentCountTreatsExcessAsConverged() throws Exception {
1755-
// >= rather than == so a leftover readiness-probe sentinel cannot strand convergence
1756-
// one short. An excess count is therefore a successful return, not a retry.
1757-
when(openSearchClient.count(any(org.opensearch.client.opensearch.core.CountRequest.class)))
1758-
.thenReturn(countResponse(6L));
1759-
1760-
// call under test
1761-
manager.waitForDocumentCount("search-index-syn1", 5L);
1762-
1763-
verify(openSearchClient, times(1))
1764-
.count(any(org.opensearch.client.opensearch.core.CountRequest.class));
1765-
}
1766-
1767-
@Test
1768-
public void testWaitForDocumentCountExhaustsRetriesAndThrowsRecoverableMessageException() throws Exception {
1769-
// Persistent under-count: every attempt sees fewer documents than expected. The probe
1770-
// must throw RecoverableMessageException so the lifecycle worker re-queues the message
1771-
// without writing FAILED — convergence is transient by definition.
1772-
when(openSearchClient.count(any(org.opensearch.client.opensearch.core.CountRequest.class)))
1773-
.thenReturn(countResponse(2L));
1774-
1775-
// call under test
1776-
RecoverableMessageException ex = assertThrows(RecoverableMessageException.class,
1777-
() -> manager.waitForDocumentCount("search-index-syn1", 5L));
1778-
1779-
assertTrue(ex.getMessage().contains("did not converge"), ex.getMessage());
1780-
assertTrue(ex.getMessage().contains("2 of 5"), ex.getMessage());
1781-
verify(openSearchClient, times(OpenSearchManagerImpl.COUNT_PROBE_MAX_RETRIES))
1782-
.count(any(org.opensearch.client.opensearch.core.CountRequest.class));
1783-
}
1784-
1785-
@Test
1786-
public void testWaitForDocumentCountWithIOExceptionExhaustsRetries() throws Exception {
1787-
when(openSearchClient.count(any(org.opensearch.client.opensearch.core.CountRequest.class)))
1788-
.thenThrow(new IOException("connection reset"));
1789-
1790-
// call under test
1791-
assertThrows(RecoverableMessageException.class,
1792-
() -> manager.waitForDocumentCount("search-index-syn1", 5L));
1793-
1794-
verify(openSearchClient, times(OpenSearchManagerImpl.COUNT_PROBE_MAX_RETRIES))
1795-
.count(any(org.opensearch.client.opensearch.core.CountRequest.class));
1796-
}
1797-
1798-
@Test
1799-
public void testWaitForDocumentCountWithOpenSearchExceptionExhaustsRetries() throws Exception {
1800-
// AOSS may briefly return index_not_found_exception (or any other transient error) from
1801-
// _count while shards are still propagating the freshly-written documents.
1802-
ErrorResponse notFound = ErrorResponse.of(er -> er
1803-
.error(ErrorCause.of(e -> e.type("index_not_found_exception").reason("no such index")))
1804-
.status(404));
1805-
when(openSearchClient.count(any(org.opensearch.client.opensearch.core.CountRequest.class)))
1806-
.thenThrow(new OpenSearchException(notFound));
1807-
1808-
// call under test
1809-
assertThrows(RecoverableMessageException.class,
1810-
() -> manager.waitForDocumentCount("search-index-syn1", 5L));
1811-
1812-
verify(openSearchClient, times(OpenSearchManagerImpl.COUNT_PROBE_MAX_RETRIES))
1813-
.count(any(org.opensearch.client.opensearch.core.CountRequest.class));
1814-
}
1815-
1816-
@Test
1817-
public void testWaitForDocumentCountWithZeroExpectedShortCircuits() throws Exception {
1818-
// An empty SearchIndex needs no probe — _count would just return 0 and we'd skip the
1819-
// retry path anyway. Avoid the round-trip entirely.
1820-
// call under test
1821-
manager.waitForDocumentCount("search-index-syn1", 0L);
1822-
1823-
verifyZeroInteractions(openSearchClient);
1824-
}
1825-
1826-
private static org.opensearch.client.opensearch.core.CountResponse countResponse(long count) {
1827-
return org.opensearch.client.opensearch.core.CountResponse.of(b -> b
1828-
.count(count)
1829-
.shards(s -> s.total(1).successful(1).failed(0)));
1830-
}
1831-
18321717
// --- per-document fallback on partial batch failure ---
18331718

18341719
@Test

0 commit comments

Comments
 (0)