feat: add async pagination support#156
Merged
Merged
Conversation
Pagination was sync-only: Paginator walks pages over an HttpClient and a consumer must block a thread per page. This adds AsyncPaginator, the non-blocking counterpart that drives the same page-to-page walk over an AsyncHttpClient and returns a CompletableFuture, so no thread parks waiting on a page. AsyncPaginator reuses the existing PaginationStrategy/Page wire-convention seam unchanged, so any strategy written for Paginator works here. It exposes forEachAsync(Consumer) and collectAllAsync(), preserves the maxPages safety cap, and closes each Response after the strategy parses it (including on the exceptional path). The driver is trampolined so a long run of synchronously-completed page futures stays stack-safe. The surface stays transport-agnostic and in sdk-core with no new dependencies (java.util.concurrent only); Reactor/coroutines bridges belong in the adapter modules.
Wire the future returned by forEachAsync/collectAllAsync back into the walk so completing it from the outside (cancel, complete, completeExceptionally, orTimeout) halts pagination: the driver stops before the next fetch and best-effort aborts the in-flight page exchange by cancelling its transport future. The cancellation contract is documented on the class. Harden the driver's failure handling: - surface a throwing Page.hasNext/nextPageRequest() through the result future instead of letting it escape the driver and strand the walk - fail cleanly if a transport violates the SPI by completing with a null Response, rather than NPE-ing inside parse/close Narrow collectAllAsync's return type to CompletableFuture<List<T>> so the accumulator is not handed back as mutable. Add tests for cancellation propagation, external completion stopping the walk before the next fetch, the null-Response guard, and the throwing-nextPageRequest path.
The page consumer runs inline on the thread that completes each page — a transport callback thread — so a consumer that blocks ties up the transport's callback pool. Add forEachAsync(Consumer, Executor) and collectAllAsync(Executor) overloads that run the page-draining driver, and therefore every consumer invocation, on the supplied executor; parse and close stay bound to each transport future's own completion. Ordering, single-page back-pressure, the single-entry trampoline, and cancellation semantics are unchanged, and a rejected executor dispatch fails the walk rather than stranding it. Also covers the eager-throw path of executeAsync and makes the consumer-threading and page-granularity cancellation contracts explicit in the KDoc.
The Cancellation KDoc claimed an in-flight page's response is always closed and discarded on abort. That holds only when the response is delivered before the cancel settles the transport future; if the cancel wins that race, the transport may have built a Response that never reaches the paginator's close path. Document this as an SPI-level limitation rather than promising more than the AsyncHttpClient contract can deliver. Also fix the inFlight field comment, which described the field as null between fetches. It is null only before the first fetch; afterward it retains the last (now-completed) transport future, and cancelling that completed future is a harmless no-op.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Pagination was sync-only.
Paginatorwalks pages over anHttpClient, which means a consumer must block a thread per page. This addsAsyncPaginator— the non-blocking counterpart that drives the same page-to-page walk over anAsyncHttpClientand returns aCompletableFuture, so no thread parks waiting on a page.What this adds
org.dexpace.sdk.core.pagination.AsyncPaginator<T>, mirroringPaginator<T>:forEachAsync(Consumer<in T>): CompletableFuture<Void>— walks every item across every page, invoking the consumer per item; the future completes when the walk finishes or completes exceptionally on transport/parse/consumer failure.forEachAsync(Consumer<in T>, Executor)/collectAllAsync(Executor)— the same walk, but the page-draining driver and therefore every consumer invocation run on the supplied executor, so a consumer that blocks does not tie up the transport's callback threads. Ordering and one-page-in-flight back-pressure are unchanged; a rejected dispatch fails the walk.collectAllAsync(): CompletableFuture<List<T>>— convenience that buffers all items.@JvmOverloadsconstructor shape asPaginator(client, initialRequest, strategy, optionalmaxPages).Design notes
PaginationStrategy/Pagewire-convention types, so any strategy written for the syncPaginator(cursor, page-number,Link-header) works here with no changes.maxPagesstill bounds the number of HTTP exchanges, and eachResponseis closed after the strategy parses it — including on the exceptional path, before the result future fails.thenCompose, so a long run of already-complete pages does not overflow the stack. A 5,000-page synchronous-completion test covers this, and a second covers the same depth when the walk is entered through an executor.sdk-coreand uses onlyjava.util.concurrent(CompletableFuture,Executor). Reactor/coroutinesFlow/Fluxbridges remain the responsibility of the adapter modules, per the layering inCLAUDE.md.Tests
A new
AsyncPaginatorTestexercises multi-page walks (both synchronously-completed and completed on a background executor), themaxPagescap, transport failures (including a transport that throws synchronously out ofexecuteAsync), strategy-parse failures (asserting the response is still closed), consumer-thrown aborts, response-close counting, empty terminal pages, external cancellation/completion, and deep stack-safety. The executor overloads add cases that pin the consumer to a dedicated executor and prove it never runs on a transport callback thread, keep the trampoline stack-safe when entered through an executor, and fail the walk when the executor rejects a dispatch. Backed by a newStubAsyncHttpClientfake that can complete inline or via an executor.Gated build (run locally; root build skipped per repo guidance to avoid R8 + kover aggregate)
All green.
:sdk-core:apiDumpwas run and the regeneratedsdk-core/api/sdk-core.apiis committed.Dependency note
Issue #34 lists #30 (pagination-stack unification) as a blocker. That unification (PR #145) is still open and not on
main, so this targets the currentpaginationsurface (Paginator+PaginationStrategy+Page). If #145 lands,AsyncPaginatorshould be re-pointed at the unified surface; because it only depends onPaginationStrategy/Page, that follow-up is mechanical.Closes #34