Skip to content

Commit d0b862c

Browse files
committed
GH-62: fixing missing concurrent close
1 parent 7fa86f0 commit d0b862c

3 files changed

Lines changed: 24 additions & 4 deletions

File tree

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.arrow.util.AutoCloseables;
3333
import org.apache.arrow.vector.VectorSchemaRoot;
3434
import org.apache.arrow.vector.types.pojo.Schema;
35+
import org.apache.calcite.avatica.AvaticaConnection;
3536
import org.apache.calcite.avatica.AvaticaResultSet;
3637
import org.apache.calcite.avatica.AvaticaResultSetMetaData;
3738
import org.apache.calcite.avatica.AvaticaStatement;
@@ -209,6 +210,13 @@ public boolean next() throws SQLException {
209210
continue;
210211
}
211212

213+
// No more data. If the queue was closed concurrently (e.g. statement.cancel()
214+
// racing with the reader past super.next()), surface as "Statement canceled"
215+
// to match Avatica's cancellation semantics.
216+
if (flightEndpointDataQueue.isClosed()) {
217+
throw AvaticaConnection.HELPER.createException("Statement canceled");
218+
}
219+
212220
if (statement != null && statement.isCloseOnCompletion()) {
213221
statement.close();
214222
}

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,17 @@ public void enqueue(final Collection<CloseableEndpointStreamPair> endpointReques
177177
/** Adds given {@link FlightStream} to the queue. */
178178
public synchronized void enqueue(final CloseableEndpointStreamPair endpointRequest) {
179179
checkNotNull(endpointRequest);
180-
checkOpen();
180+
if (isClosed()) {
181+
// Concurrent close() raced with enqueue (e.g. statement.cancel() while the reader
182+
// was mid-loop). Treat as end-of-stream: close the endpoint and return silently.
183+
// The reader detects cancellation via a subsequent isClosed() check.
184+
try {
185+
endpointRequest.close();
186+
} catch (final Exception e) {
187+
LOGGER.error("Failed to close endpoint after queue was closed.", e);
188+
}
189+
return;
190+
}
181191
endpointsToClose.add(endpointRequest);
182192
futures.add(
183193
completionService.submit(

flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueueTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.hamcrest.MatcherAssert.assertThat;
2222
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
2323
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.verify;
2425

2526
import java.util.concurrent.CompletionService;
2627
import org.apache.arrow.driver.jdbc.client.CloseableEndpointStreamPair;
@@ -54,10 +55,11 @@ public void testNextShouldReturnNullUponClose() throws Exception {
5455
}
5556

5657
@Test
57-
public void testEnqueueShouldThrowExceptionUponClose() throws Exception {
58+
public void testEnqueueShouldCloseEndpointSilentlyAfterClose() throws Exception {
5859
queue.close();
59-
ThrowableAssertionUtils.simpleAssertThrowableClass(
60-
IllegalStateException.class, () -> queue.enqueue(mock(CloseableEndpointStreamPair.class)));
60+
final CloseableEndpointStreamPair endpoint = mock(CloseableEndpointStreamPair.class);
61+
assertDoesNotThrow(() -> queue.enqueue(endpoint));
62+
verify(endpoint).close();
6163
}
6264

6365
@Test

0 commit comments

Comments
 (0)