Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion aikido_zen/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,20 @@ def set_body_internal(self, body):
# Make sure that empty bodies like b"" don't get sent.
self.body = None
if isinstance(self.body, bytes):
self.body = self.body.decode("utf-8") # Decode byte input to string.
# json.loads on bytes uses surrogatepass internally, so try it first.
# This handles bodies with surrogate/invalid bytes that would otherwise
# cause decode("utf-8") to raise and leave the JSON unparsed.
try:
parsed_body = json.loads(self.body)
if parsed_body:
self.body = parsed_body
return
except (JSONDecodeError, ValueError):
Copy link
Copy Markdown

@aikido-pr-checks aikido-pr-checks Bot May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty except clause swallowing JSONDecodeError/ValueError during body parsing; log or handle the exception instead of using 'pass'.

Suggested change
except (JSONDecodeError, ValueError):
except (JSONDecodeError, ValueError) as e:
# JSON parsing failed, will fall back to UTF-8 decoding
logger.debug("Failed to parse body as JSON: %s", e)
Details

✨ AI Reasoning
​A new try/except was added around JSON parsing of the request body. The except clause catches JSONDecodeError and ValueError but contains only pass, silently swallowing parsing failures. Silently ignoring errors during body parsing can hide parsing issues and make debugging or security analysis harder, especially since this code manipulates user-controlled input. The try block attempts to json.loads bytes and then falls through to other decoding logic; swallowing errors with no logging or handling loses visibility into why parsing failed.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

pass
# Use errors="replace" so invalid bytes become � instead of raising.
# A strict decode would let attackers bypass detection by prepending a
# single invalid byte to any payload.
self.body = self.body.decode("utf-8", errors="replace")
if not isinstance(self.body, str):
return
if self.body.strip()[0] in ["{", "[", '"']:
Expand Down
25 changes: 23 additions & 2 deletions aikido_zen/context/init_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ def test_set_normal_byte_string():


def test_set_byte_string_wrong_encoding():
body = "hello world! 😊".encode("utf-16") # UTF-16 unique character
body = "hello world! 😊".encode("utf-16") # UTF-16 bytes are not valid UTF-8
context = Context(req=basic_wsgi_req, body=body, source="flask")
assert context.body == body # Body remains unchanged because utf-8 failed.
# Invalid bytes are replaced with � so the body is still scannable.
assert context.body == body.decode("utf-8", errors="replace")


def test_set_none():
Expand Down Expand Up @@ -296,3 +297,23 @@ def test_set_protection_forced_off():
assert context.protection_forced_off is False
context.set_force_protection_off(None)
assert context.protection_forced_off is None


def test_set_bytes_with_invalid_utf8_prefix():
# Regression: AIKIDO-5RDTZW1V — a single invalid UTF-8 byte (e.g. \xff) prepended
# to a path traversal payload must not bypass detection. The body must be decoded
# with errors="replace" so the traversal string remains visible to sinks.
body = b"\xff/../../../../../etc/passwd"
context = Context(req=basic_wsgi_req, body=body, source="flask")
assert isinstance(context.body, str)
assert "/../../../../../etc/passwd" in context.body


def test_set_bytes_json_with_surrogate_bytes():
# Regression: AIKIDO-B3YABOSP — surrogate bytes embedded in a JSON body must not
# bypass detection. json.loads(bytes) uses surrogatepass internally, so the dict
# is parsed and the attack payload (e.g. {"$regex": ".*"}) is visible.
body = b'{"username": {"$regex": ".*"}, "bypass": "\xed\xa0\x80"}'
context = Context(req=basic_wsgi_req, body=body, source="flask")
assert isinstance(context.body, dict)
assert context.body.get("username") == {"$regex": ".*"}
8 changes: 4 additions & 4 deletions aikido_zen/helpers/path_to_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ def path_to_string(path):
return path

if isinstance(path, bytes):
try:
return path.decode("utf-8")
except UnicodeDecodeError:
return None
# Use errors="replace" so invalid bytes (e.g. \xff, surrogate sequences)
# don't silently suppress path traversal detection — the replacement char
# preserves the traversal components that follow.
return path.decode("utf-8", errors="replace")
if isinstance(path, PurePath):
# Stringify PurePath. This can still allow path traversal but in extremely
# limited cases so it's safe to just stringify for now.
Expand Down
6 changes: 5 additions & 1 deletion aikido_zen/helpers/path_to_string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ def test_path_to_string_with_valid_url():
def test_path_to_string_with_bytes():
assert path_to_string(b"test.txt") == "test.txt"
assert path_to_string(b"/home/user/file.txt") == "/home/user/file.txt"
assert path_to_string(b"\xff") is None # Invalid UTF-8 byte sequence
# Invalid bytes are replaced with � so traversal components are preserved.
assert path_to_string(b"\xff") == "�"
assert path_to_string(b"\xff/../../../etc/passwd") == "�/../../../etc/passwd"
# Surrogate bytes (AIKIDO-B3YABOSP pattern) also survive as replacement chars.
assert path_to_string(b"\xed\xa0\x80/../etc/passwd") == "���/../etc/passwd"


def test_path_to_string_with_empty_string():
Expand Down
2 changes: 1 addition & 1 deletion aikido_zen/sources/flask/extract_form_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def extract_form_data_from_flask_request_and_save_data(req):
if req.form:
context.set_body(req.form)
else:
context.set_body(req.data.decode("utf-8"))
context.set_body(req.data)
context.set_as_current_context()
except Exception as e:
logger.debug("Exception occurred whilst extracting flask body data: %s", e)
2 changes: 1 addition & 1 deletion aikido_zen/sources/quart.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def _handle_request_before(func, instance, args, kwargs):
context.set_body(form)
else:
data = await request.data
context.set_body(data.decode("utf-8"))
context.set_body(data)
context.cookies = request.cookies.to_dict()
context.set_as_current_context()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,21 @@ def test_path_normalization():
# Combined slashes and dot: ///.///etc/passwd should normalize to /etc/passwd
assert detect_path_traversal("///.///etc/passwd", "///.///etc") is True
assert detect_path_traversal("///.///etc/passwd", "///.///etc/passwd") is True


def test_replacement_char_prefix_does_not_hide_traversal():
# Regression: AIKIDO-5RDTZW1V / AIKIDO-B3YABOSP — an attacker prepends
# invalid UTF-8 bytes (\xff or surrogate sequences) to a traversal payload.
# After decode("utf-8", errors="replace") both the stored body string and the
# path_to_string() output start with the replacement character �, so the
# user-input substring is still found in the file path and traversal is detected.
replacement = "�"
traversal = "/../../../../../etc/passwd"
assert (
detect_path_traversal(replacement + traversal, replacement + traversal) is True
)
# Three replacement chars (from \xed\xa0\x80, three separate bad bytes)
assert (
detect_path_traversal(replacement * 3 + traversal, replacement * 3 + traversal)
is True
)
42 changes: 42 additions & 0 deletions end2end/django_mysql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,45 @@ def test_initial_heartbeat():
assert req_stats["rateLimited"] == 0
assert req_stats["attacksDetected"] == {"blocked": 2, "total": 2}
assert req_stats["attackWaves"] == {"total": 0, "blocked": 0}


# --- AIKIDO-5RDTZW1V regression: invalid UTF-8 bytes must not bypass detection ---

def test_bypass_invalid_utf8_bytes_path_traversal():
# An attacker prepends \xff (invalid UTF-8) to a path traversal payload.
# Before the fix, decode("utf-8") raised UnicodeDecodeError and the body was
# never stored, so the firewall saw nothing. After the fix the body is decoded
# with errors="replace" and the traversal is still detected.
body = b"\xff/../../../../../etc/passwd"
res = requests.post(base_url_fw + "/read", data=body)
assert res.status_code == 500

time.sleep(5)
events = fetch_events_from_mock("http://localhost:5000")
attacks = filter_on_event_type(events, "detected_attack")

assert len(attacks) == 3
assert attacks[2]["attack"]["kind"] == "path_traversal"
assert attacks[2]["attack"]["blocked"] is True
assert attacks[2]["attack"]["source"] == "body"


# --- AIKIDO-B3YABOSP regression: surrogate bytes in JSON must not bypass detection ---

def test_bypass_surrogate_bytes_sql_injection():
# Surrogate bytes (\xed\xa0\x80) make decode("utf-8") raise, so the old code
# never parsed the body as JSON and the SQL injection payload was invisible.
# After the fix, json.loads(bytes) is tried first (it uses surrogatepass internally)
# so the dict is extracted and the injection is caught when the cursor executes.
body = b'{"dog_name": "Dangerous bobby\\", 1); -- ", "bypass": "\xed\xa0\x80"}'
res = requests.post(base_url_fw + "/json-sql", data=body)
assert res.status_code == 500

time.sleep(5)
events = fetch_events_from_mock("http://localhost:5000")
attacks = filter_on_event_type(events, "detected_attack")

assert len(attacks) == 4
assert attacks[3]["attack"]["kind"] == "sql_injection"
assert attacks[3]["attack"]["blocked"] is True
assert attacks[3]["attack"]["source"] == "body"
42 changes: 42 additions & 0 deletions end2end/flask_mongo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,45 @@ def test_dangerous_auth_fw_force():
'source': "body",
'user': None
}


# --- AIKIDO-5RDTZW1V regression: invalid UTF-8 bytes must not bypass detection ---

def test_bypass_invalid_utf8_bytes_path_traversal():
# An attacker prepends \xff (invalid UTF-8) to a path traversal payload.
# Before the fix, decode("utf-8") raised UnicodeDecodeError and the body was
# never stored, so the firewall saw nothing. After the fix the body is decoded
# with errors="replace" and the traversal is still detected.
body = b"\xff/../../../../../etc/passwd"
res = requests.post("http://localhost:8094/read", data=body)
assert res.status_code == 500

time.sleep(5)
events = fetch_events_from_mock("http://localhost:5000")
attacks = filter_on_event_type(events, "detected_attack")

assert len(attacks) == 3
assert attacks[2]["attack"]["kind"] == "path_traversal"
assert attacks[2]["attack"]["blocked"] is True
assert attacks[2]["attack"]["source"] == "body"


# --- AIKIDO-B3YABOSP regression: surrogate bytes in JSON must not bypass detection ---

def test_bypass_surrogate_bytes_nosql_injection():
# Surrogate bytes (\xed\xa0\x80) make decode("utf-8") raise, so the old code
# never parsed the JSON and the NoSQL injection payload {"$ne":""} was invisible.
# After the fix, json.loads(bytes) is tried first (it uses surrogatepass internally)
# so the dict body is fully parsed and the injection is caught.
body = b'{"dog_name": "bobby_tables", "pswd": {"$ne": ""}, "bypass": "\xed\xa0\x80"}'
res = requests.post("http://localhost:8094/auth-raw", data=body)
assert res.status_code == 500

time.sleep(5)
events = fetch_events_from_mock("http://localhost:5000")
attacks = filter_on_event_type(events, "detected_attack")

assert len(attacks) == 4
assert attacks[3]["attack"]["kind"] == "nosql_injection"
assert attacks[3]["attack"]["blocked"] is True
assert attacks[3]["attack"]["source"] == "body"
4 changes: 3 additions & 1 deletion sample-apps/django-mysql/sample_app/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
path("", views.index, name="index"),
path("dogpage/<int:dog_id>", views.dog_page, name="dog_page"),
path("shell/<str:user_command>", views.shell_url, name="shell"),
path("create", views.create_dogpage, name="create")
path("create", views.create_dogpage, name="create"),
path("read", views.read_file, name="read"),
path("json-sql", views.json_sql, name="json_sql"),
]
31 changes: 30 additions & 1 deletion sample-apps/django-mysql/sample_app/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .models import Dogs
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
# Create your views here.
import json
import subprocess

def index(request):
Expand Down Expand Up @@ -37,3 +37,32 @@ def create_dogpage(request):
print("QUERY : ", query)
cursor.execute(query)
return HttpResponse("Dog page created")


# --- bypass regression endpoints ---

@csrf_exempt
def read_file(request):
# Passes raw bytes body directly to open() — path traversal sink.
# Used by AIKIDO-5RDTZW1V regression test: a leading \xff byte must not
# prevent the firewall from detecting the traversal in the rest of the path.
if request.method == 'POST':
with open(request.body) as f:
return HttpResponse(f.read())
return HttpResponse("Use POST")


@csrf_exempt
def json_sql(request):
# Parses body via json.loads(bytes) without relying on Content-Type.
# Used by AIKIDO-B3YABOSP regression test: surrogate bytes (\xed\xa0\x80)
# embedded in the JSON body must not prevent the firewall from parsing the
# body and detecting the SQL injection payload.
if request.method == 'POST':
data = json.loads(request.body)
dog_name = data.get('dog_name', '')
with connection.cursor() as cursor:
query = 'INSERT INTO sample_app_dogs (dog_name, dog_boss) VALUES ("%s", "N/A")' % dog_name
cursor.execute(query)
return HttpResponse("OK")
return HttpResponse("Use POST")
28 changes: 28 additions & 0 deletions sample-apps/flask-mongo/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,31 @@ def post_auth2():
return f'Dog with name {dog_name} authenticated successfully'
else:
return f'Auth failed'


# --- bypass regression endpoints ---

@app.route("/read", methods=['POST'])
def read_file():
# Passes the raw bytes body directly to open() — path traversal sink.
# Used by AIKIDO-5RDTZW1V regression test: a leading \xff byte must not
# prevent the firewall from detecting the traversal in the rest of the path.
with open(request.data) as f:
return f.read()


@app.route("/auth-raw", methods=['POST'])
def post_auth_raw():
# Parses the body via json.loads(bytes) without relying on Content-Type.
# Used by AIKIDO-B3YABOSP regression test: surrogate bytes (\xed\xa0\x80)
# embedded in the JSON body must not prevent the firewall from parsing the
# body and detecting the NoSQL injection payload.
data = json.loads(request.data)
dog_info = {
'dog_name': data.get('dog_name'),
'pswd': data.get('pswd'),
}
dog = mongo.db.dogs.find_one(dog_info)
if dog:
return f'Dog with name {dog["dog_name"]} authenticated successfully'
return 'Auth failed'
Loading