diff --git a/docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeApi.java b/docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeApi.java index d6c8eb9..e496380 100644 --- a/docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeApi.java +++ b/docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeApi.java @@ -1,12 +1,17 @@ package ai.docling.serve.api; +import java.time.Duration; + import org.jspecify.annotations.Nullable; +import ai.docling.serve.api.convert.request.ConvertDocumentRequest; + /** * Docling Serve API interface. */ public interface DoclingServeApi extends DoclingServeHealthApi, DoclingServeConvertApi, DoclingServeChunkApi, DoclingServeClearApi, DoclingServeTaskApi { + /** * Creates and returns a builder instance capable of constructing a duplicate or modified * version of the current API instance. The builder provides a customizable way to adjust @@ -103,6 +108,30 @@ default B prettyPrint() { return prettyPrint(true); } + /** + * Sets the polling interval for async operations. + * + *

This configures how frequently the client will check the status of async + * conversion tasks when using {@link DoclingServeApi#convertSourceAsync(ConvertDocumentRequest)} (ConvertDocumentRequest)}. + * + * @param asyncPollInterval the polling interval (must not be null or negative) + * @return this builder instance for method chaining + * @throws IllegalArgumentException if asyncPollInterval is null or negative + */ + B asyncPollInterval(Duration asyncPollInterval); + + /** + * Sets the timeout for async operations. + * + *

This configures the maximum time to wait for an async conversion task to complete + * when using {@link DoclingServeApi#convertSourceAsync(ConvertDocumentRequest)} (ConvertDocumentRequest)}. + * + * @param asyncTimeout the timeout duration (must not be null or negative) + * @return this builder instance for method chaining + * @throws IllegalArgumentException if asyncTimeout is null or negative + */ + B asyncTimeout(Duration asyncTimeout); + /** * Builds and returns an instance of the specified type, representing the completed configuration * of the builder. The returned instance is typically an implementation of the Docling API. diff --git a/docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeConvertApi.java b/docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeConvertApi.java index 6c28631..fbe626c 100644 --- a/docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeConvertApi.java +++ b/docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeConvertApi.java @@ -1,5 +1,7 @@ package ai.docling.serve.api; +import java.util.concurrent.CompletableFuture; + import ai.docling.serve.api.convert.request.ConvertDocumentRequest; import ai.docling.serve.api.convert.response.ConvertDocumentResponse; @@ -9,11 +11,33 @@ * a structured or processed document format based on the specified conversion options. */ public interface DoclingServeConvertApi { - /** + /** * Converts the provided document source(s) into a processed document based on the specified options. * * @param request the {@link ConvertDocumentRequest} containing the source(s), conversion options, and optional target. * @return a {@link ConvertDocumentResponse} containing the processed document data, processing details, and any errors. */ ConvertDocumentResponse convertSource(ConvertDocumentRequest request); + + /** + * Initiates an asynchronous conversion of the provided document source(s) and returns a + * {@link CompletableFuture} that completes when the conversion is done. + * + *

This method starts the conversion, polls the status in the background, and completes + * the future with the result when the conversion finishes. + * + *

Example usage: + *

{@code
+   * client.convertSourceAsync(request)
+   *     .thenApply(response -> response.getDocument().getMarkdownContent())
+   *     .thenAccept(markdown -> System.out.println(markdown))
+   *     .exceptionally(ex -> { ex.printStackTrace(); return null; });
+   * }
+ * + * @param request the {@link ConvertDocumentRequest} containing the source(s) and conversion options. + * @return a {@link CompletableFuture} that completes with the {@link ConvertDocumentResponse} + * when the conversion is finished, or completes exceptionally if the conversion fails + * or times out. + */ + CompletableFuture convertSourceAsync(ConvertDocumentRequest request); } diff --git a/docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/DoclingServeClient.java b/docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/DoclingServeClient.java index 843b633..2feeafc 100644 --- a/docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/DoclingServeClient.java +++ b/docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/DoclingServeClient.java @@ -12,10 +12,12 @@ import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Subscriber; import org.jspecify.annotations.Nullable; @@ -75,12 +77,14 @@ public abstract class DoclingServeClient extends HttpOperations implements Docli private final boolean logResponses; private final boolean prettyPrintJson; private final @Nullable String apiKey; + private final Duration asyncPollInterval; + private final Duration asyncTimeout; - private final HealthOperations healthOps = new HealthOperations(this); - private final ConvertOperations convertOps = new ConvertOperations(this); - private final ChunkOperations chunkOps = new ChunkOperations(this); - private final ClearOperations clearOps = new ClearOperations(this); - private final TaskOperations taskOps = new TaskOperations(this); + private final HealthOperations healthOps; + private final ConvertOperations convertOps; + private final ChunkOperations chunkOps; + private final ClearOperations clearOps; + private final TaskOperations taskOps; protected DoclingServeClient(DoclingServeClientBuilder builder) { this.baseUrl = ensureNotNull(builder.baseUrl, "baseUrl"); @@ -97,6 +101,15 @@ protected DoclingServeClient(DoclingServeClientBuilder builder) { this.logResponses = builder.logResponses; this.prettyPrintJson = builder.prettyPrintJson; this.apiKey = builder.apiKey; + this.asyncPollInterval = builder.asyncPollInterval; + this.asyncTimeout = builder.asyncTimeout; + + // Initialize operations handlers + this.healthOps = new HealthOperations(this); + this.taskOps = new TaskOperations(this); + this.convertOps = new ConvertOperations(this, this.taskOps, this.asyncPollInterval, this.asyncTimeout); + this.chunkOps = new ChunkOperations(this); + this.clearOps = new ClearOperations(this); } /** @@ -285,6 +298,11 @@ public ClearResponse clearResults(ClearResultsRequest request) { return this.clearOps.clearResults(request); } + @Override + public CompletableFuture convertSourceAsync(ConvertDocumentRequest request) { + return this.convertOps.convertSourceAsync(request); + } + private class LoggingBodyPublisher implements BodyPublisher { private final BodyPublisher delegate; private final String stringContent; @@ -333,6 +351,8 @@ public abstract static class DoclingServeClientBuilderThis configures how frequently the client will check the status of async + * conversion tasks when using {@link DoclingServeApi#convertSourceAsync(ConvertDocumentRequest)} (ConvertDocumentRequest)}. + * + * @param asyncPollInterval the polling interval (must not be null or negative) + * @return this builder instance for method chaining + * @throws IllegalArgumentException if asyncPollInterval is null or negative + */ + @Override + public B asyncPollInterval(Duration asyncPollInterval) { + if (asyncPollInterval == null || asyncPollInterval.isNegative() || asyncPollInterval.isZero()) { + throw new IllegalArgumentException("asyncPollInterval must be a positive duration"); + } + + this.asyncPollInterval = asyncPollInterval; + return (B) this; + } + + /** + * Sets the timeout for async operations. + * + *

This configures the maximum time to wait for an async conversion task to complete + * when using {@link DoclingServeApi#convertSourceAsync(ConvertDocumentRequest)} (ConvertDocumentRequest)}. + * + * @param asyncTimeout the timeout duration (must not be null or negative) + * @return this builder instance for method chaining + * @throws IllegalArgumentException if asyncTimeout is null or negative + */ + @Override + public B asyncTimeout(Duration asyncTimeout) { + if (asyncTimeout == null || asyncTimeout.isNegative() || asyncTimeout.isZero()) { + throw new IllegalArgumentException("asyncTimeout must be a positive duration"); + } + this.asyncTimeout = asyncTimeout; + return (B) this; + } } } diff --git a/docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/operations/ConvertOperations.java b/docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/operations/ConvertOperations.java index 87e1839..14e866d 100644 --- a/docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/operations/ConvertOperations.java +++ b/docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/operations/ConvertOperations.java @@ -1,27 +1,51 @@ package ai.docling.serve.client.operations; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import ai.docling.serve.api.DoclingServeConvertApi; +import ai.docling.serve.api.DoclingServeTaskApi; import ai.docling.serve.api.convert.request.ConvertDocumentRequest; import ai.docling.serve.api.convert.response.ConvertDocumentResponse; +import ai.docling.serve.api.task.request.TaskResultRequest; +import ai.docling.serve.api.task.request.TaskStatusPollRequest; +import ai.docling.serve.api.task.response.TaskStatusPollResponse; import ai.docling.serve.api.util.ValidationUtils; /** * Base class for document conversion API operations. Provides access to document - * conversion functionality. + * conversion functionality including both synchronous and asynchronous operations. */ public final class ConvertOperations implements DoclingServeConvertApi { - private final HttpOperations httpOperations; + private static final Logger LOG = LoggerFactory.getLogger(ConvertOperations.class); - public ConvertOperations(HttpOperations httpOperations) { - this.httpOperations = httpOperations; - } + private final HttpOperations httpOperations; + private final DoclingServeTaskApi taskApi; + private final Duration asyncPollInterval; + private final Duration asyncTimeout; /** - * Converts the provided document source(s) into a processed document based on the specified options. + * Creates a new ConvertOperations instance. * - * @param request the {@link ConvertDocumentRequest} containing the source(s), conversion options, and optional target. - * @return a {@link ConvertDocumentResponse} containing the processed document data, processing details, and any errors. + * @param httpOperations the HTTP operations handler for executing requests + * @param taskApi the task operations handler for polling and retrieving results + * @param asyncPollInterval the interval between status polls for async operations + * @param asyncTimeout the maximum time to wait for async operations to complete */ + public ConvertOperations(HttpOperations httpOperations, DoclingServeTaskApi taskApi, + Duration asyncPollInterval, Duration asyncTimeout) { + this.httpOperations = httpOperations; + this.taskApi = taskApi; + this.asyncPollInterval = asyncPollInterval; + this.asyncTimeout = asyncTimeout; + } + + @Override public ConvertDocumentResponse convertSource(ConvertDocumentRequest request) { ValidationUtils.ensureNotNull(request, "request"); return this.httpOperations.executePost(createRequestContext("/v1/convert/source", request)); @@ -34,4 +58,78 @@ private RequestContext createRequestContext(Stri .uri(uri) .build(); } + + private RequestContext createAsyncRequestContext(String uri, I request) { + return RequestContext.builder() + .request(request) + .responseType(TaskStatusPollResponse.class) + .uri(uri) + .build(); + } + + @Override + public CompletableFuture convertSourceAsync(ConvertDocumentRequest request) { + ValidationUtils.ensureNotNull(request, "request"); + + // Start the async conversion and chain the polling logic + return CompletableFuture.supplyAsync(() -> + this.httpOperations.executePost(createAsyncRequestContext("/v1/convert/source/async", request)) + ).thenCompose(taskResponse -> { + var taskId = taskResponse.getTaskId(); + LOG.info("Started async conversion with task ID: {}", taskId); + + long startTime = System.currentTimeMillis(); + return pollTaskUntilComplete(taskId, startTime); + }); + } + + /** + * Recursively polls a task until it completes, fails, or times out. + * + * @param taskId the ID of the task to poll + * @param startTime the timestamp when polling started (for timeout calculation) + * @return a CompletableFuture that completes with the conversion result + */ + private CompletableFuture pollTaskUntilComplete(String taskId, long startTime) { + // Check if we've timed out + if (System.currentTimeMillis() - startTime > asyncTimeout.toMillis()) { + return CompletableFuture.failedFuture( + new RuntimeException("Async conversion timed out after " + asyncTimeout + " for task: " + taskId)); + } + + // Poll the task status + var pollRequest = TaskStatusPollRequest.builder().taskId(taskId).build(); + + return CompletableFuture.supplyAsync(() -> taskApi.pollTaskStatus(pollRequest)) + .thenCompose(statusResponse -> pollTaskStatus(statusResponse, startTime)); + } + + private CompletionStage pollTaskStatus(TaskStatusPollResponse statusResponse, long startTime) { + var status = statusResponse.getTaskStatus(); + var taskId = statusResponse.getTaskId(); + LOG.debug("Task {} status: {}", taskId, status); + + return switch (statusResponse.getTaskStatus()) { + case SUCCESS -> { + LOG.info("Task {} completed successfully", taskId); + // Retrieve the result + var resultRequest = TaskResultRequest.builder().taskId(taskId).build(); + yield CompletableFuture.supplyAsync(() -> taskApi.convertTaskResult(resultRequest)); + } + case FAILURE -> { + var errorMessage = "Task failed"; + if (statusResponse.getTaskStatusMetadata()!=null) { + errorMessage = "Task failed: " + statusResponse.getTaskStatusMetadata(); + } + yield CompletableFuture.failedFuture( + new RuntimeException("Async conversion failed for task " + taskId + ": " + errorMessage)); + } + default -> + // Still in progress (PENDING or STARTED), schedule next poll after delay + CompletableFuture.supplyAsync( + () -> null, + CompletableFuture.delayedExecutor(asyncPollInterval.toMillis(), TimeUnit.MILLISECONDS) + ).thenCompose(v -> pollTaskUntilComplete(taskId, startTime)); + }; + } } diff --git a/docling-serve/docling-serve-client/src/test/java/ai/docling/serve/client/AbstractDoclingServeClientTests.java b/docling-serve/docling-serve-client/src/test/java/ai/docling/serve/client/AbstractDoclingServeClientTests.java index 9ef3776..74c36ad 100644 --- a/docling-serve/docling-serve-client/src/test/java/ai/docling/serve/client/AbstractDoclingServeClientTests.java +++ b/docling-serve/docling-serve-client/src/test/java/ai/docling/serve/client/AbstractDoclingServeClientTests.java @@ -82,10 +82,10 @@ public void testFailed(ExtensionContext context, @Nullable Throwable cause) { Container logs: %s """.formatted( - getClass().getName(), - context.getTestMethod().map(Method::getName).orElse(""), - Optional.ofNullable(cause).map(Throwable::getMessage).orElse(""), - doclingContainer.getLogs()); + getClass().getName(), + context.getTestMethod().map(Method::getName).orElse(""), + Optional.ofNullable(cause).map(Throwable::getMessage).orElse(""), + doclingContainer.getLogs()); LOG.error(message); } @@ -415,6 +415,75 @@ void shouldHandleResponseWithDoclingDocument() { assertThat(doclingDocument.getName()).isNotEmpty(); assertThat(doclingDocument.getTexts().get(0).getLabel()).isEqualTo(DocItemLabel.TITLE); } + + @Test + void shouldConvertSourceAsync() { + ConvertDocumentRequest request = ConvertDocumentRequest.builder() + .source(HttpSource.builder().url(URI.create("https://docs.arconia.io/arconia-cli/latest/development/dev/")).build()) + .build(); + + ConvertDocumentResponse response = getDoclingClient().convertSourceAsync(request).join(); + + assertThat(response).isNotNull(); + assertThat(response.getStatus()).isNotEmpty(); + assertThat(response.getDocument()).isNotNull(); + assertThat(response.getDocument().getMarkdownContent()).isNotEmpty(); + } + + @Test + void shouldConvertFileSourceAsync() throws IOException { + var fileResource = readFileFromClasspath("story.pdf"); + ConvertDocumentRequest request = ConvertDocumentRequest.builder() + .source(FileSource.builder() + .filename("story.pdf") + .base64String(Base64.getEncoder().encodeToString(fileResource)) + .build() + ) + .build(); + + ConvertDocumentResponse response = getDoclingClient().convertSourceAsync(request).join(); + + assertThat(response).isNotNull(); + assertThat(response.getStatus()).isNotEmpty(); + assertThat(response.getDocument()).isNotNull(); + assertThat(response.getDocument().getFilename()).isEqualTo("story.pdf"); + assertThat(response.getDocument().getMarkdownContent()).isNotEmpty(); + } + + @Test + void shouldHandleAsyncConversionWithDifferentDocumentOptions() { + ConvertDocumentOptions options = ConvertDocumentOptions.builder() + .doOcr(true) + .includeImages(true) + .tableMode(TableFormerMode.FAST) + .documentTimeout(Duration.ofMinutes(1)) + .build(); + + ConvertDocumentRequest request = ConvertDocumentRequest.builder() + .source(HttpSource.builder().url(URI.create("https://docs.arconia.io/arconia-cli/latest/development/dev/")).build()) + .options(options) + .build(); + + ConvertDocumentResponse response = getDoclingClient().convertSourceAsync(request).join(); + + assertThat(response).isNotNull(); + assertThat(response.getStatus()).isNotEmpty(); + assertThat(response.getDocument()).isNotNull(); + } + + @Test + void shouldChainAsyncOperations() { + ConvertDocumentRequest request = ConvertDocumentRequest.builder() + .source(HttpSource.builder().url(URI.create("https://docs.arconia.io/arconia-cli/latest/development/dev/")).build()) + .build(); + + // Test chaining with thenApply + String markdownContent = getDoclingClient().convertSourceAsync(request) + .thenApply(response -> response.getDocument().getMarkdownContent()) + .join(); + + assertThat(markdownContent).isNotEmpty(); + } } @Nested diff --git a/docs/src/doc/docs/whats-new.md b/docs/src/doc/docs/whats-new.md index 64088c0..ffff11c 100644 --- a/docs/src/doc/docs/whats-new.md +++ b/docs/src/doc/docs/whats-new.md @@ -10,6 +10,7 @@ Docling Java {{ gradle.project_version }} provides a number of new features, enh * Adding `pretty-print` configuration option to `DoclingServeClient` to enable pretty printing of JSON requests and responses. * Adding the ability to specify an api key for the Docling Serve Testcontainer. * Adding the ability to specify the api key for the Docling Serve requests. +* Adding async conversion support via `convertSourceAsync(ConvertDocumentRequest)` which returns a `CompletableFuture`. The method handles all polling internally and completes the future when the conversion is done. Configurable via `asyncPollInterval(Duration)` and `asyncTimeout(Duration)` builder methods. * Added Javadoc comments for classes in the `ai.docling.serve.api` package. ### 0.3.0