Skip to content

Commit 9a641bd

Browse files
committed
Fix HTTP/2 stream priority memory leak by dropping unused PriorityValue cache from AbstractH2StreamMultiplexer. (#617)
(cherry picked from commit 4e76bac)
1 parent dbc8788 commit 9a641bd

3 files changed

Lines changed: 93 additions & 19 deletions

File tree

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@
3535
import java.util.Deque;
3636
import java.util.Iterator;
3737
import java.util.List;
38-
import java.util.Map;
3938
import java.util.Queue;
40-
import java.util.concurrent.ConcurrentHashMap;
4139
import java.util.concurrent.ConcurrentLinkedDeque;
4240
import java.util.concurrent.ConcurrentLinkedQueue;
4341
import java.util.concurrent.atomic.AtomicInteger;
@@ -141,7 +139,6 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
141139
private EndpointDetails endpointDetails;
142140
private boolean goAwayReceived;
143141

144-
private final Map<Integer, PriorityValue> priorities = new ConcurrentHashMap<>();
145142
private volatile boolean peerNoRfc7540Priorities;
146143

147144
AbstractH2StreamMultiplexer(
@@ -1011,6 +1008,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
10111008
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PRIORITY_UPDATE payload");
10121009
}
10131010
final int prioritizedId = payload.getInt() & 0x7fffffff;
1011+
if (prioritizedId == 0) {
1012+
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "PRIORITY_UPDATE stream id is 0");
1013+
}
10141014
final int len = payload.remaining();
10151015
final String field;
10161016
if (len > 0) {
@@ -1020,9 +1020,11 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
10201020
} else {
10211021
field = "";
10221022
}
1023-
final PriorityValue pv = PriorityParamsParser.parse(field).toValueWithDefaults();
1024-
priorities.put(prioritizedId, pv);
1025-
requestSessionOutput();
1023+
final PriorityValue pv = parsePriorityValue(field);
1024+
final H2Stream prioritizedStream = streams.lookupSeen(prioritizedId);
1025+
if (prioritizedStream != null) {
1026+
prioritizedStream.setPriorityValue(pv);
1027+
}
10261028
}
10271029
break;
10281030
}
@@ -1097,7 +1099,7 @@ private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) thr
10971099
if (streamListener != null) {
10981100
streamListener.onHeaderInput(this, streamId, headers);
10991101
}
1100-
recordPriorityFromHeaders(streamId, headers);
1102+
recordPriorityFromHeaders(stream, headers);
11011103
stream.consumeHeader(headers, frame.isFlagSet(FrameFlag.END_STREAM));
11021104
} else {
11031105
continuation.copyPayload(payload);
@@ -1116,7 +1118,7 @@ private void consumeContinuationFrame(final RawFrame frame, final H2Stream strea
11161118
if (streamListener != null) {
11171119
streamListener.onHeaderInput(this, streamId, headers);
11181120
}
1119-
recordPriorityFromHeaders(streamId, headers);
1121+
recordPriorityFromHeaders(stream, headers);
11201122
if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
11211123
stream.consumePromise(headers);
11221124
} else {
@@ -1368,19 +1370,27 @@ H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler strea
13681370
return streams.createActive(channel, streamHandler);
13691371
}
13701372

1371-
private void recordPriorityFromHeaders(final int streamId, final List<? extends Header> headers) {
1373+
private void recordPriorityFromHeaders(final H2Stream stream, final List<? extends Header> headers) {
13721374
if (headers == null || headers.isEmpty()) {
13731375
return;
13741376
}
13751377
for (final Header h : headers) {
13761378
if (HttpHeaders.PRIORITY.equalsIgnoreCase(h.getName())) {
1377-
final PriorityValue pv = PriorityParamsParser.parse(h.getValue()).toValueWithDefaults();
1378-
priorities.put(streamId, pv);
1379+
final PriorityValue pv = parsePriorityValue(h);
1380+
stream.setPriorityValue(pv);
13791381
break;
13801382
}
13811383
}
13821384
}
13831385

1386+
private PriorityValue parsePriorityValue(final Header header) {
1387+
return PriorityParamsParser.parse(header).toValueWithDefaults();
1388+
}
1389+
1390+
private PriorityValue parsePriorityValue(final String field) {
1391+
return PriorityParamsParser.parse(field).toValueWithDefaults();
1392+
}
1393+
13841394
class H2StreamChannelImpl implements H2StreamChannel {
13851395

13861396
private final int id;
@@ -1428,18 +1438,21 @@ public void submit(final List<Header> headers, final boolean endStream) throws I
14281438
return;
14291439
}
14301440
ensureNotClosed();
1431-
if (peerNoRfc7540Priorities && streams.isSameSide(id)) {
1441+
if (peerNoRfc7540Priorities) {
14321442
for (final Header h : headers) {
14331443
if (HttpHeaders.PRIORITY.equalsIgnoreCase(h.getName())) {
14341444
final byte[] ascii = h.getValue() != null
14351445
? h.getValue().getBytes(StandardCharsets.US_ASCII)
14361446
: new byte[0];
1447+
1448+
final int sid = id & 0x7fffffff;
14371449
final ByteArrayBuffer b = new ByteArrayBuffer(4 + ascii.length);
1438-
b.append((byte) (id >> 24));
1439-
b.append((byte) (id >> 16));
1440-
b.append((byte) (id >> 8));
1441-
b.append((byte) id);
1450+
b.append((byte) (sid >> 24));
1451+
b.append((byte) (sid >> 16));
1452+
b.append((byte) (sid >> 8));
1453+
b.append((byte) sid);
14421454
b.append(ascii, 0, ascii.length);
1455+
14431456
final ByteBuffer pl = ByteBuffer.wrap(b.array(), 0, b.length());
14441457
final RawFrame priUpd = new RawFrame(FrameType.PRIORITY_UPDATE.getValue(), 0, 0, pl);
14451458
commitFrameInternal(priUpd);

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import org.apache.hc.core5.http.nio.HandlerFactory;
4545
import org.apache.hc.core5.http2.H2Error;
4646
import org.apache.hc.core5.http2.H2StreamResetException;
47+
import org.apache.hc.core5.http2.priority.PriorityValue;
48+
import org.apache.hc.core5.util.Timeout;
4749

4850
class H2Stream {
4951

@@ -61,6 +63,11 @@ enum State { RESERVED, OPEN, CLOSED }
6163
private volatile boolean reserved;
6264
private volatile boolean remoteClosed;
6365

66+
private volatile long lastActivityNanos;
67+
68+
private volatile Timeout idleTimeout;
69+
private volatile PriorityValue priorityValue;
70+
6471
H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer<State> stateChangeCallback) {
6572
this.channel = channel;
6673
this.handler = handler;
@@ -285,11 +292,30 @@ public String toString() {
285292
buf.append("[")
286293
.append("id=").append(channel.getId())
287294
.append(", reserved=").append(reserved)
288-
.append(", removeClosed=").append(remoteClosed)
295+
.append(", remoteClosed=").append(remoteClosed)
289296
.append(", localClosed=").append(channel.isLocalClosed())
290297
.append(", localReset=").append(channel.isLocalReset())
291298
.append("]");
292299
return buf.toString();
293300
}
294301

302+
private void touch() {
303+
this.lastActivityNanos = System.nanoTime();
304+
}
305+
306+
long getLastActivityNanos() {
307+
return lastActivityNanos;
308+
}
309+
310+
Timeout getIdleTimeout() {
311+
return idleTimeout;
312+
}
313+
314+
PriorityValue getPriorityValue() {
315+
return priorityValue;
316+
}
317+
318+
void setPriorityValue(final PriorityValue priorityValue) {
319+
this.priorityValue = priorityValue;
320+
}
295321
}

httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,10 @@ void testPriorityUpdateInputAccepted() throws Exception {
789789
h2StreamListener,
790790
() -> streamHandler);
791791

792+
// Make stream id=1 "seen" so lookupSeen(1) does not fail.
793+
final H2StreamChannel channel = mux.createChannel(1);
794+
mux.createStream(channel, streamHandler);
795+
792796
final WritableByteChannelMock writable = new WritableByteChannelMock(1024);
793797
final FrameOutputBuffer fob = new FrameOutputBuffer(16 * 1024);
794798

@@ -802,16 +806,47 @@ void testPriorityUpdateInputAccepted() throws Exception {
802806
fob.write(priUpd, writable);
803807
final byte[] bytes = writable.toByteArray();
804808

805-
// Should NOT throw; server must accept PRIORITY_UPDATE from client
806809
Assertions.assertDoesNotThrow(() -> mux.onInput(ByteBuffer.wrap(bytes)));
807810

808-
// Listener sees the incoming frame
809811
Mockito.verify(h2StreamListener).onFrameInput(
810812
ArgumentMatchers.same(mux),
811813
ArgumentMatchers.eq(0),
812814
ArgumentMatchers.any());
813815
}
814816

817+
@Test
818+
void testPriorityUpdateInputRejectedForUnseenStream() throws Exception {
819+
final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
820+
protocolIOSession,
821+
FRAME_FACTORY,
822+
StreamIdGenerator.ODD,
823+
httpProcessor,
824+
CharCodingConfig.DEFAULT,
825+
H2Config.custom().setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE).build(),
826+
h2StreamListener,
827+
() -> streamHandler);
828+
829+
final WritableByteChannelMock writable = new WritableByteChannelMock(1024);
830+
final FrameOutputBuffer fob = new FrameOutputBuffer(16 * 1024);
831+
832+
final byte[] ascii = "u=3,i".getBytes(StandardCharsets.US_ASCII);
833+
final ByteBuffer payload = ByteBuffer.allocate(4 + ascii.length);
834+
payload.putInt(1); // prioritized stream id = 1 (unseen)
835+
payload.put(ascii);
836+
payload.flip();
837+
838+
final RawFrame priUpd = new RawFrame(FrameType.PRIORITY_UPDATE.getValue(), 0, 0, payload);
839+
fob.write(priUpd, writable);
840+
final byte[] bytes = writable.toByteArray();
841+
842+
final H2ConnectionException ex = Assertions.assertThrows(
843+
H2ConnectionException.class,
844+
() -> mux.onInput(ByteBuffer.wrap(bytes)));
845+
846+
Assertions.assertEquals("Unexpected stream id: 1", ex.getMessage());
847+
}
848+
849+
815850
// Helper: minimal stream handler that sends our headers once
816851
static final class PriorityHeaderSender implements H2StreamHandler {
817852
private final H2StreamChannel channel;

0 commit comments

Comments
 (0)