Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package ai.docling.serve.api;

import java.time.Duration;

import org.jspecify.annotations.Nullable;

import ai.docling.serve.api.convert.request.ConvertDocumentRequest;

/**
* Docling Serve API interface.
*/
public interface DoclingServeApi
extends DoclingServeHealthApi, DoclingServeConvertApi, DoclingServeChunkApi, DoclingServeClearApi, DoclingServeTaskApi {

/**
* Creates and returns a builder instance capable of constructing a duplicate or modified
* version of the current API instance. The builder provides a customizable way to adjust
Expand Down Expand Up @@ -103,6 +108,30 @@ default B prettyPrint() {
return prettyPrint(true);
}

/**
* Sets the polling interval for async operations.
*
* <p>This configures how frequently the client will check the status of async
* conversion tasks when using {@link DoclingServeApi#convertSourceAsync(ConvertDocumentRequest)} (ConvertDocumentRequest)}.
*
* @param asyncPollInterval the polling interval (must not be null or negative)
* @return this builder instance for method chaining
* @throws IllegalArgumentException if asyncPollInterval is null or negative
*/
B asyncPollInterval(Duration asyncPollInterval);

/**
* Sets the timeout for async operations.
*
* <p>This configures the maximum time to wait for an async conversion task to complete
* when using {@link DoclingServeApi#convertSourceAsync(ConvertDocumentRequest)} (ConvertDocumentRequest)}.
*
* @param asyncTimeout the timeout duration (must not be null or negative)
* @return this builder instance for method chaining
* @throws IllegalArgumentException if asyncTimeout is null or negative
*/
B asyncTimeout(Duration asyncTimeout);

/**
* Builds and returns an instance of the specified type, representing the completed configuration
* of the builder. The returned instance is typically an implementation of the Docling API.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ai.docling.serve.api;

import java.util.concurrent.CompletableFuture;

import ai.docling.serve.api.convert.request.ConvertDocumentRequest;
import ai.docling.serve.api.convert.response.ConvertDocumentResponse;

Expand All @@ -9,11 +11,33 @@
* a structured or processed document format based on the specified conversion options.
*/
public interface DoclingServeConvertApi {
/**
/**
* Converts the provided document source(s) into a processed document based on the specified options.
*
* @param request the {@link ConvertDocumentRequest} containing the source(s), conversion options, and optional target.
* @return a {@link ConvertDocumentResponse} containing the processed document data, processing details, and any errors.
*/
ConvertDocumentResponse convertSource(ConvertDocumentRequest request);

/**
* Initiates an asynchronous conversion of the provided document source(s) and returns a
* {@link CompletableFuture} that completes when the conversion is done.
*
* <p>This method starts the conversion, polls the status in the background, and completes
* the future with the result when the conversion finishes.
*
* <p>Example usage:
* <pre>{@code
* client.convertSourceAsync(request)
* .thenApply(response -> response.getDocument().getMarkdownContent())
* .thenAccept(markdown -> System.out.println(markdown))
* .exceptionally(ex -> { ex.printStackTrace(); return null; });
* }</pre>
*
* @param request the {@link ConvertDocumentRequest} containing the source(s) and conversion options.
* @return a {@link CompletableFuture} that completes with the {@link ConvertDocumentResponse}
* when the conversion is finished, or completes exceptionally if the conversion fails
* or times out.
*/
CompletableFuture<ConvertDocumentResponse> convertSourceAsync(ConvertDocumentRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Subscriber;

import org.jspecify.annotations.Nullable;
Expand Down Expand Up @@ -75,12 +77,14 @@ public abstract class DoclingServeClient extends HttpOperations implements Docli
private final boolean logResponses;
private final boolean prettyPrintJson;
private final @Nullable String apiKey;
private final Duration asyncPollInterval;
private final Duration asyncTimeout;

private final HealthOperations healthOps = new HealthOperations(this);
private final ConvertOperations convertOps = new ConvertOperations(this);
private final ChunkOperations chunkOps = new ChunkOperations(this);
private final ClearOperations clearOps = new ClearOperations(this);
private final TaskOperations taskOps = new TaskOperations(this);
private final HealthOperations healthOps;
private final ConvertOperations convertOps;
private final ChunkOperations chunkOps;
private final ClearOperations clearOps;
private final TaskOperations taskOps;

protected DoclingServeClient(DoclingServeClientBuilder builder) {
this.baseUrl = ensureNotNull(builder.baseUrl, "baseUrl");
Expand All @@ -97,6 +101,15 @@ protected DoclingServeClient(DoclingServeClientBuilder builder) {
this.logResponses = builder.logResponses;
this.prettyPrintJson = builder.prettyPrintJson;
this.apiKey = builder.apiKey;
this.asyncPollInterval = builder.asyncPollInterval;
this.asyncTimeout = builder.asyncTimeout;

// Initialize operations handlers
this.healthOps = new HealthOperations(this);
this.taskOps = new TaskOperations(this);
this.convertOps = new ConvertOperations(this, this.taskOps, this.asyncPollInterval, this.asyncTimeout);
this.chunkOps = new ChunkOperations(this);
this.clearOps = new ClearOperations(this);
}

/**
Expand Down Expand Up @@ -285,6 +298,11 @@ public ClearResponse clearResults(ClearResultsRequest request) {
return this.clearOps.clearResults(request);
}

@Override
public CompletableFuture<ConvertDocumentResponse> convertSourceAsync(ConvertDocumentRequest request) {
return this.convertOps.convertSourceAsync(request);
}

private class LoggingBodyPublisher<T> implements BodyPublisher {
private final BodyPublisher delegate;
private final String stringContent;
Expand Down Expand Up @@ -333,6 +351,8 @@ public abstract static class DoclingServeClientBuilder<C extends DoclingServeCli
private boolean logResponses = false;
private boolean prettyPrintJson = false;
private @Nullable String apiKey;
private Duration asyncPollInterval = Duration.ofSeconds(2);
private Duration asyncTimeout = Duration.ofMinutes(5);

/**
* Protected constructor for use by subclasses of {@link DoclingServeClientBuilder}.
Expand Down Expand Up @@ -424,5 +444,44 @@ public B prettyPrint(boolean prettyPrint) {
this.prettyPrintJson = prettyPrint;
return (B) this;
}

/**
* Sets the polling interval for async operations.
*
* <p>This configures how frequently the client will check the status of async
* conversion tasks when using {@link DoclingServeApi#convertSourceAsync(ConvertDocumentRequest)} (ConvertDocumentRequest)}.
*
* @param asyncPollInterval the polling interval (must not be null or negative)
* @return this builder instance for method chaining
* @throws IllegalArgumentException if asyncPollInterval is null or negative
*/
@Override
public B asyncPollInterval(Duration asyncPollInterval) {
if (asyncPollInterval == null || asyncPollInterval.isNegative() || asyncPollInterval.isZero()) {
throw new IllegalArgumentException("asyncPollInterval must be a positive duration");
}

this.asyncPollInterval = asyncPollInterval;
return (B) this;
}

/**
* Sets the timeout for async operations.
*
* <p>This configures the maximum time to wait for an async conversion task to complete
* when using {@link DoclingServeApi#convertSourceAsync(ConvertDocumentRequest)} (ConvertDocumentRequest)}.
*
* @param asyncTimeout the timeout duration (must not be null or negative)
* @return this builder instance for method chaining
* @throws IllegalArgumentException if asyncTimeout is null or negative
*/
@Override
public B asyncTimeout(Duration asyncTimeout) {
if (asyncTimeout == null || asyncTimeout.isNegative() || asyncTimeout.isZero()) {
throw new IllegalArgumentException("asyncTimeout must be a positive duration");
}
this.asyncTimeout = asyncTimeout;
return (B) this;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,51 @@
package ai.docling.serve.client.operations;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ai.docling.serve.api.DoclingServeConvertApi;
import ai.docling.serve.api.DoclingServeTaskApi;
import ai.docling.serve.api.convert.request.ConvertDocumentRequest;
import ai.docling.serve.api.convert.response.ConvertDocumentResponse;
import ai.docling.serve.api.task.request.TaskResultRequest;
import ai.docling.serve.api.task.request.TaskStatusPollRequest;
import ai.docling.serve.api.task.response.TaskStatusPollResponse;
import ai.docling.serve.api.util.ValidationUtils;

/**
* Base class for document conversion API operations. Provides access to document
* conversion functionality.
* conversion functionality including both synchronous and asynchronous operations.
*/
public final class ConvertOperations implements DoclingServeConvertApi {
private final HttpOperations httpOperations;
private static final Logger LOG = LoggerFactory.getLogger(ConvertOperations.class);

public ConvertOperations(HttpOperations httpOperations) {
this.httpOperations = httpOperations;
}
private final HttpOperations httpOperations;
private final DoclingServeTaskApi taskApi;
private final Duration asyncPollInterval;
private final Duration asyncTimeout;

/**
* Converts the provided document source(s) into a processed document based on the specified options.
* Creates a new ConvertOperations instance.
*
* @param request the {@link ConvertDocumentRequest} containing the source(s), conversion options, and optional target.
* @return a {@link ConvertDocumentResponse} containing the processed document data, processing details, and any errors.
* @param httpOperations the HTTP operations handler for executing requests
* @param taskApi the task operations handler for polling and retrieving results
* @param asyncPollInterval the interval between status polls for async operations
* @param asyncTimeout the maximum time to wait for async operations to complete
*/
public ConvertOperations(HttpOperations httpOperations, DoclingServeTaskApi taskApi,
Duration asyncPollInterval, Duration asyncTimeout) {
this.httpOperations = httpOperations;
this.taskApi = taskApi;
this.asyncPollInterval = asyncPollInterval;
this.asyncTimeout = asyncTimeout;
}

@Override
public ConvertDocumentResponse convertSource(ConvertDocumentRequest request) {
ValidationUtils.ensureNotNull(request, "request");
return this.httpOperations.executePost(createRequestContext("/v1/convert/source", request));
Expand All @@ -34,4 +58,78 @@ private <I> RequestContext<I, ConvertDocumentResponse> createRequestContext(Stri
.uri(uri)
.build();
}

private <I> RequestContext<I, TaskStatusPollResponse> createAsyncRequestContext(String uri, I request) {
return RequestContext.<I, TaskStatusPollResponse>builder()
.request(request)
.responseType(TaskStatusPollResponse.class)
.uri(uri)
.build();
}

@Override
public CompletableFuture<ConvertDocumentResponse> convertSourceAsync(ConvertDocumentRequest request) {
ValidationUtils.ensureNotNull(request, "request");

// Start the async conversion and chain the polling logic
return CompletableFuture.supplyAsync(() ->
this.httpOperations.executePost(createAsyncRequestContext("/v1/convert/source/async", request))
).thenCompose(taskResponse -> {
var taskId = taskResponse.getTaskId();
LOG.info("Started async conversion with task ID: {}", taskId);

long startTime = System.currentTimeMillis();
return pollTaskUntilComplete(taskId, startTime);
});
}

/**
* Recursively polls a task until it completes, fails, or times out.
*
* @param taskId the ID of the task to poll
* @param startTime the timestamp when polling started (for timeout calculation)
* @return a CompletableFuture that completes with the conversion result
*/
private CompletableFuture<ConvertDocumentResponse> pollTaskUntilComplete(String taskId, long startTime) {
// Check if we've timed out
if (System.currentTimeMillis() - startTime > asyncTimeout.toMillis()) {
return CompletableFuture.failedFuture(
new RuntimeException("Async conversion timed out after " + asyncTimeout + " for task: " + taskId));
}

// Poll the task status
var pollRequest = TaskStatusPollRequest.builder().taskId(taskId).build();

return CompletableFuture.supplyAsync(() -> taskApi.pollTaskStatus(pollRequest))
.thenCompose(statusResponse -> pollTaskStatus(statusResponse, startTime));
}

private CompletionStage<ConvertDocumentResponse> pollTaskStatus(TaskStatusPollResponse statusResponse, long startTime) {
var status = statusResponse.getTaskStatus();
var taskId = statusResponse.getTaskId();
LOG.debug("Task {} status: {}", taskId, status);

return switch (statusResponse.getTaskStatus()) {
case SUCCESS -> {
LOG.info("Task {} completed successfully", taskId);
// Retrieve the result
var resultRequest = TaskResultRequest.builder().taskId(taskId).build();
yield CompletableFuture.supplyAsync(() -> taskApi.convertTaskResult(resultRequest));
}
case FAILURE -> {
var errorMessage = "Task failed";
if (statusResponse.getTaskStatusMetadata()!=null) {
errorMessage = "Task failed: " + statusResponse.getTaskStatusMetadata();
}
yield CompletableFuture.failedFuture(
new RuntimeException("Async conversion failed for task " + taskId + ": " + errorMessage));
}
default ->
// Still in progress (PENDING or STARTED), schedule next poll after delay
CompletableFuture.supplyAsync(
() -> null,
CompletableFuture.delayedExecutor(asyncPollInterval.toMillis(), TimeUnit.MILLISECONDS)
).thenCompose(v -> pollTaskUntilComplete(taskId, startTime));
};
}
}
Loading