From 997e55cc53c86bc55372714aaeb125ec7eceb87b Mon Sep 17 00:00:00 2001 From: Mihir-Choudhary Date: Sat, 4 Apr 2026 15:47:36 +0530 Subject: [PATCH] Add Windows API hook detection plugin --- test/plugins/windows/apihooks_case_data.py | 1048 ++++++++ test/plugins/windows/apihooks_test_support.py | 390 +++ test/plugins/windows/test_apihooks.py | 323 +++ test/plugins/windows/test_apihooks_corpus.py | 488 ++++ .../windows/test_apihooks_fp_reduction.py | 391 +++ .../windows/test_data/apihooks/gap_ledger.md | 11 + .../test_data/apihooks/inline_fixtures.json | 8 + .../test_data/apihooks/patch_fixtures.json | 10 + .../test_data/apihooks/pe_layouts.json | 28 + .../plugins/windows/malware/apihooks.py | 2275 +++++++++++++++++ 10 files changed, 4972 insertions(+) create mode 100644 test/plugins/windows/apihooks_case_data.py create mode 100644 test/plugins/windows/apihooks_test_support.py create mode 100644 test/plugins/windows/test_apihooks.py create mode 100644 test/plugins/windows/test_apihooks_corpus.py create mode 100644 test/plugins/windows/test_apihooks_fp_reduction.py create mode 100644 test/plugins/windows/test_data/apihooks/gap_ledger.md create mode 100644 test/plugins/windows/test_data/apihooks/inline_fixtures.json create mode 100644 test/plugins/windows/test_data/apihooks/patch_fixtures.json create mode 100644 test/plugins/windows/test_data/apihooks/pe_layouts.json create mode 100644 volatility3/framework/plugins/windows/malware/apihooks.py diff --git a/test/plugins/windows/apihooks_case_data.py b/test/plugins/windows/apihooks_case_data.py new file mode 100644 index 0000000000..6ed846c9cc --- /dev/null +++ b/test/plugins/windows/apihooks_case_data.py @@ -0,0 +1,1048 @@ +from typing import Dict, Iterable, List, Sequence + +from test.plugins.windows.apihooks_test_support import ( + ApiHookCase, + PE_FIXTURES, + ReportObservation, + SourceRecord, + apihooks, + build_case_id, + load_inline_fixture, + load_patch_fixture, +) + + +SOURCE_RECORDS: List[SourceRecord] = [ + SourceRecord( + "attack_t1056_004", + "Credential API Hooking", + "MITRE ATT&CK T1056.004 Credential API Hooking", + "https://attack.mitre.org/techniques/T1056/004/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_t0874", + "Hooking", + "MITRE ATT&CK T0874 Hooking", + "https://attack.mitre.org/techniques/T0874/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0484", + "Carberp", + "MITRE ATT&CK S0484 Carberp", + "https://attack.mitre.org/software/S0484/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0363", + "Empire", + "MITRE ATT&CK S0363 Empire", + "https://attack.mitre.org/software/S0363/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0182", + "FinFisher", + "MITRE ATT&CK S0182 FinFisher", + "https://attack.mitre.org/software/S0182/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0353", + "NOKKI", + "MITRE ATT&CK S0353 NOKKI", + "https://attack.mitre.org/software/S0353/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_g0068", + "PLATINUM", + "MITRE ATT&CK G0068 PLATINUM", + "https://attack.mitre.org/groups/G0068/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0416", + "RDFSNIFFER", + "MITRE ATT&CK S0416 RDFSNIFFER", + "https://attack.mitre.org/software/S0416/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0266", + "TrickBot", + "MITRE ATT&CK S0266 TrickBot", + "https://attack.mitre.org/software/S0266/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0386", + "Ursnif", + "MITRE ATT&CK S0386 Ursnif", + "https://attack.mitre.org/software/S0386/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s1154", + "VersaMem", + "MITRE ATT&CK S1154 VersaMem", + "https://attack.mitre.org/software/S1154/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0251", + "Zebrocy", + "MITRE ATT&CK S0251 Zebrocy", + "https://attack.mitre.org/software/S0251/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0330", + "Zeus Panda", + "MITRE ATT&CK S0330 Zeus Panda", + "https://attack.mitre.org/software/S0330/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0412", + "ZxShell", + "MITRE ATT&CK S0412 ZxShell", + "https://attack.mitre.org/software/S0412/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s0603", + "Stuxnet", + "MITRE ATT&CK S0603 Stuxnet", + "https://attack.mitre.org/software/S0603/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "attack_s1009", + "Triton", + "MITRE ATT&CK S1009 Triton", + "https://attack.mitre.org/software/S1009/", + "2026-04-01", + "MITRE ATT&CK", + ), + SourceRecord( + "cyberark_amsi", + "AMSI Bypass", + "AMSI Bypass: Patching Technique", + "https://www.cyberark.com/resources/threat-research-blog/amsi-bypass-patching-technique", + "2018-02-06", + "CyberArk", + ), + SourceRecord( + "msft_finfisher", + "FinFisher", + "FinFisher exposed: A researchers tale of defeating traps, tricks, and complex virtual machines", + "https://www.microsoft.com/en-us/security/blog/2018/03/01/finfisher-exposed-a-researchers-tale-of-defeating-traps-tricks-and-complex-virtual-machines/", + "2018-03-01", + "Microsoft", + ), + SourceRecord( + "msft_ursnif", + "Ursnif", + "TrojanSpy:Win32/Ursnif.FY threat description", + "https://www.microsoft.com/en-us/wdsi/threats/malware-encyclopedia-description?Name=TrojanSpy%3AWin32%2FUrsnif.FY", + "2017-09-15", + "Microsoft", + ), + SourceRecord( + "msft_rootkitdrv", + "Rootkitdrv.HB", + "VirTool:WinNT/Rootkitdrv.HB threat description", + "https://www.microsoft.com/en-us/wdsi/threats/malware-encyclopedia-description?Name=VirTool%3AWinNT%2FRootkitdrv.HB", + "2010-01-14", + "Microsoft", + ), + SourceRecord( + "elastic_ghostpulse", + "GHOSTPULSE", + "GHOSTPULSE haunts victims using defense evasion bag o' tricks", + "https://www.elastic.co/security-labs/ghostpulse-haunts-victims-using-defense-evasion-bag-o-tricks", + "2024-05-03", + "Elastic", + ), + SourceRecord( + "msft_office_amsi", + "Office VBA AMSI", + "Office VBA + AMSI: Parting the veil on malicious macros", + "https://www.microsoft.com/en-us/security/blog/2018/09/12/office-vba-amsi-parting-the-veil-on-malicious-macros/", + "2018-09-12", + "Microsoft", + ), + SourceRecord( + "oldnewthing_iat", + "IAT Write Protection", + "The Import Address Table is now write-protected, and what that means for rogue patching", + "https://devblogs.microsoft.com/oldnewthing/20221006-07/?p=107257", + "2022-10-06", + "Microsoft", + ), + SourceRecord( + "insideyourkernel_etw", + "ETW Bypass", + "A Novel Method for Bypassing ETW", + "https://insideyourkernel.com/2023-03-15-a-novel-method-for-bypass-ETW.html", + "2023-03-15", + "Inside Your Kernel", + ), + SourceRecord( + "elastic_remcos", + "REMCOS", + "Dissecting REMCOS RAT: An in-depth analysis of a widespread 2024 malware, Part Three", + "https://www.elastic.co/cn/security-labs/dissecting-remcos-rat-part-three", + "2024-05-03", + "Elastic", + ), +] + +SOURCE_INDEX: Dict[str, SourceRecord] = {record.source_id: record for record in SOURCE_RECORDS} + + +def _take_sources(source_ids: Sequence[str]) -> List[SourceRecord]: + return [SOURCE_INDEX[source_id] for source_id in source_ids] + + +INLINE_SOURCES = _take_sources( + [ + "attack_t1056_004", + "attack_s0484", + "attack_s0182", + "attack_s0266", + "attack_s0386", + "attack_s0416", + "attack_s0330", + "attack_s0412", + "msft_finfisher", + "msft_ursnif", + "elastic_ghostpulse", + "elastic_remcos", + "attack_s0603", + "attack_s1009", + "attack_t0874", + ] +) +PATCH_SOURCES = _take_sources( + [ + "cyberark_amsi", + "msft_office_amsi", + "insideyourkernel_etw", + "elastic_ghostpulse", + "attack_t1056_004", + "attack_s0363", + "attack_s0266", + "attack_s0386", + "attack_s0412", + "attack_s0182", + ] +) +IAT_SOURCES = _take_sources( + [ + "attack_t1056_004", + "attack_s0182", + "attack_s0330", + "attack_s0603", + "attack_s0386", + "attack_s0266", + "msft_finfisher", + "msft_ursnif", + "oldnewthing_iat", + "elastic_ghostpulse", + "elastic_remcos", + ] +) +EAT_SOURCES = _take_sources( + [ + "attack_t0874", + "attack_s0603", + "attack_s1009", + "attack_s0412", + "oldnewthing_iat", + ] +) +SSDT_SOURCES = _take_sources( + [ + "attack_t0874", + "attack_s0603", + "attack_s1009", + "msft_rootkitdrv", + "attack_s0412", + ] +) +SCORING_SOURCES = _take_sources( + [ + "attack_t1056_004", + "cyberark_amsi", + "oldnewthing_iat", + "insideyourkernel_etw", + "elastic_ghostpulse", + "msft_rootkitdrv", + "msft_office_amsi", + "attack_t0874", + ] +) + + +INLINE_SCENARIOS = [ + "jmp_rel32_unbacked", + "jmp_rel8_external", + "jmp_indirect_rip_bytes", + "jmp_indirect_rip_layer", + "jmp_reg_movabs", + "call_rel32_external", + "push_ret_external", + "self_target_clean", + "missing_slot_clean", +] +PATCH_SCENARIOS = [ + "early_ret", + "ret_imm", + "xor_eax_ret", + "sub_eax_ret", + "mov_eax_ret", + "push0_pop_eax_ret", +] +IAT_SCENARIOS = [ + "legit_import_same_module", + "api_set_forward_legit", + "delay_import_legit", + "unexpected_module_hook", + "unbacked_hook", + "ordinal_hook", + "missing_address_clean", + "unreadable_thunk_clean", + "kernelbase_chain_legit", + "self_resolution_legit", + "delay_import_unexpected", +] +EAT_SCENARIOS = [ + "forwarded_export_clean", + "in_image_export_clean", + "out_of_image_export_hook", + "ordinal_out_of_image_hook", + "zero_address_clean", +] +SSDT_SCENARIOS = [ + "x64_non_nt_owner", + "x64_unknown_owner", + "x64_nt_owner_clean", + "x86_non_nt_owner", + "x86_unknown_owner", +] +SCORING_SCENARIOS = [ + "score_unbacked_high", + "score_security_module_low", + "score_suspicious_backed_high_value", + "score_system_forward_low", + "module_cache_eviction", + "directory_sanity", + "followup_unbacked_note", + "followup_backed_note", + "quick_mode_iat_only", + "dedup_order_forward", + "dedup_order_reverse", + "dedup_threshold_ten", + "score_wow64_suppression", + "score_self_target_low", + "score_patch_target_high", +] +GAP_SCENARIOS = [ + "setwindowshookex_gui_chain", + "veh_hwbp_breakpoint_hook", + "irp_major_function_hook", +] + + +def _build_observations( + *, + detector: str, + count: int, + sources: Sequence[SourceRecord], + scenarios: Sequence[str], + fixture_observation_count: int, + report_backed: bool = True, +) -> List[ReportObservation]: + observations: List[ReportObservation] = [] + for index in range(count): + observations.append( + ReportObservation( + observation_id=f"{detector}_{index:03d}", + source=sources[index % len(sources)], + detector=detector, + scenario=scenarios[index % len(scenarios)], + fixture_kind="mini_fixture" + if index < fixture_observation_count + else "synthetic", + report_backed=report_backed, + ) + ) + return observations + + +INLINE_OBSERVATIONS = _build_observations( + detector="inline", + count=45, + sources=INLINE_SOURCES, + scenarios=INLINE_SCENARIOS, + fixture_observation_count=10, +) +PATCH_OBSERVATIONS = _build_observations( + detector="patch", + count=30, + sources=PATCH_SOURCES, + scenarios=PATCH_SCENARIOS, + fixture_observation_count=8, +) +IAT_OBSERVATIONS = _build_observations( + detector="iat", + count=22, + sources=IAT_SOURCES, + scenarios=IAT_SCENARIOS, + fixture_observation_count=5, +) +EAT_OBSERVATIONS = _build_observations( + detector="eat", + count=10, + sources=EAT_SOURCES, + scenarios=EAT_SCENARIOS, + fixture_observation_count=3, +) +SSDT_OBSERVATIONS = _build_observations( + detector="ssdt", + count=15, + sources=SSDT_SOURCES, + scenarios=SSDT_SCENARIOS, + fixture_observation_count=3, +) +SCORING_OBSERVATIONS = _build_observations( + detector="scoring", + count=15, + sources=SCORING_SOURCES, + scenarios=SCORING_SCENARIOS, + fixture_observation_count=1, + report_backed=False, +) +GAP_LEDGER_OBSERVATIONS = _build_observations( + detector="gap", + count=3, + sources=SCORING_SOURCES[:3], + scenarios=GAP_SCENARIOS, + fixture_observation_count=0, + report_backed=False, +) + +REPORT_OBSERVATIONS: List[ReportObservation] = ( + INLINE_OBSERVATIONS + + PATCH_OBSERVATIONS + + IAT_OBSERVATIONS + + EAT_OBSERVATIONS + + SSDT_OBSERVATIONS + + SCORING_OBSERVATIONS + + GAP_LEDGER_OBSERVATIONS +) + +assert len(REPORT_OBSERVATIONS) == 140 + + +def _owner_for_target(module_map: Dict[str, tuple], target: int) -> str: + for name, (start, end) in module_map.items(): + if start <= target < end: + return name + return "" + + +def _rel32_bytes(opcode: int, func_va: int, target: int) -> bytes: + disp = target - (func_va + 5) + return bytes([opcode]) + int(disp).to_bytes(4, "little", signed=True) + + +def _rel8_bytes(func_va: int, target: int) -> bytes: + disp = target - (func_va + 2) + return b"\xEB" + int(disp).to_bytes(1, "little", signed=True) + b"\x90" * 3 + + +def _case_ordinal(observation: ReportObservation, variant: int) -> int: + return (int(observation.observation_id.rsplit("_", 1)[1]) * 10) + variant + + +def _make_inline_case(observation: ReportObservation, variant: int) -> ApiHookCase: + is_64bit = observation.scenario in { + "jmp_indirect_rip_bytes", + "jmp_indirect_rip_layer", + "jmp_reg_movabs", + } or variant % 2 == 1 + mod_start = 0x180000000 if is_64bit else 0x10000000 + mod_end = mod_start + 0x1000 + func_va = mod_start + 0x200 + (variant * 0x20) + target = mod_end + 0x400 + (variant * 0x80) + module_map = { + "hooked.dll": (mod_start, mod_end), + "evilhook.dll": (target & ~0xFFF, (target & ~0xFFF) + 0x2000), + "kernel32.dll": (0x50000000, 0x50020000), + } + memory = {} + hook_type = None + detected = True + disasm_contains = "" + + if observation.scenario == "jmp_rel32_unbacked": + func_bytes = _rel32_bytes(0xE9, func_va, target) + hook_type = "JMP_REL32" + disasm_contains = "jmp" + elif observation.scenario == "jmp_rel8_external": + func_va = mod_end - 0x60 + variant + target = mod_end + 0x20 + variant + func_bytes = _rel8_bytes(func_va, target) + hook_type = "JMP_REL8" + disasm_contains = "jmp" + elif observation.scenario == "jmp_indirect_rip_bytes": + target = 0x600000000 + (variant * 0x1000) + func_bytes = load_inline_fixture("jmp_indirect_rip_template") + func_bytes = func_bytes[:6] + target.to_bytes(8, "little") + hook_type = "JMP_INDIRECT" + disasm_contains = "jmp" + elif observation.scenario == "jmp_indirect_rip_layer": + slot_va = func_va + 6 + 0x10 + target = 0x700000000 + (variant * 0x1000) + func_bytes = load_inline_fixture("jmp_indirect_rip_layer_template") + memory[slot_va] = target.to_bytes(8, "little") + hook_type = "JMP_INDIRECT" + disasm_contains = "jmp" + elif observation.scenario == "jmp_reg_movabs": + target = 0x710000000 + (variant * 0x1000) + func_bytes = load_inline_fixture("movabs_jmp_rax_template") + func_bytes = func_bytes[:2] + target.to_bytes(8, "little") + func_bytes[10:] + hook_type = "JMP_REG64" + disasm_contains = "jmp" + elif observation.scenario == "call_rel32_external": + func_bytes = _rel32_bytes(0xE8, func_va, target) + hook_type = "CALL" + disasm_contains = "call" + elif observation.scenario == "push_ret_external": + target = 0x20004000 + (variant * 0x100) + func_bytes = bytes([0x68]) + int(target & 0xFFFFFFFF).to_bytes( + 4, "little" + ) + b"\xC3" + is_64bit = False + mod_start = 0x10000000 + mod_end = mod_start + 0x1000 + func_va = mod_start + 0x200 + (variant * 0x20) + module_map = { + "hooked.dll": (mod_start, mod_end), + "evilhook.dll": (0x20000000, 0x20020000), + } + hook_type = "PUSH+RET" + disasm_contains = "push" + elif observation.scenario == "self_target_clean": + target = mod_start + 0x500 + func_bytes = _rel32_bytes(0xE9, func_va, target) + detected = False + else: + func_bytes = load_inline_fixture("jmp_indirect_rip_layer_template") + detected = False + target = None + + owner = _owner_for_target(module_map, target) if target is not None else "" + expected = { + "detected": detected, + "hook_type": hook_type if detected else None, + "target": target if detected else None, + "owner": owner if detected else None, + "confidence": None, + "note": "", + "disasm_contains": disasm_contains if detected else "", + } + payload = { + "is_64bit": is_64bit, + "func_bytes": func_bytes, + "func_va": func_va, + "mod_start": mod_start, + "mod_end": mod_end, + "memory": memory, + } + return ApiHookCase( + case_id=build_case_id( + observation.source, + "inline", + observation.scenario, + _case_ordinal(observation, variant), + ), + family=observation.source.family, + report_title=observation.source.report_title, + source_url=observation.source.source_url, + source_date=observation.source.source_date, + detector="INLINE", + scenario=observation.scenario, + fixture_kind=observation.fixture_kind, + expected_result=expected, + payload=payload, + report_backed=observation.report_backed, + ) + + +def _make_patch_case(observation: ReportObservation, variant: int) -> ApiHookCase: + mapping = { + "early_ret": (load_patch_fixture("early_ret"), "EARLY_RET", True), + "ret_imm": (load_patch_fixture("ret_imm"), "RET_IMM", True), + "xor_eax_ret": (load_patch_fixture("xor_eax_ret"), "XOR_EAX_RET", True), + "sub_eax_ret": (load_patch_fixture("sub_eax_ret"), "SUB_EAX_RET", True), + "mov_eax_ret": (load_patch_fixture("mov_eax_hresult_ret"), "MOV_EAX_RET", True), + "push0_pop_eax_ret": ( + load_patch_fixture("push0_pop_eax_ret"), + "PUSH0_POP_EAX_RET", + True, + ), + } + func_bytes, patch_type, detected = mapping[observation.scenario] + if variant == 3 and observation.scenario == "mov_eax_ret": + func_bytes = load_patch_fixture("mov_eax_zero_ret") + if variant == 3 and observation.scenario == "early_ret": + func_bytes = load_patch_fixture("benign_near_miss") + patch_type = None + detected = False + expected = { + "detected": detected, + "hook_type": f"PATCH/{patch_type}" if detected else None, + "target": None, + "owner": None, + "confidence": "HIGH" if detected else None, + "note": "", + } + return ApiHookCase( + case_id=build_case_id( + observation.source, + "patch", + observation.scenario, + _case_ordinal(observation, variant), + ), + family=observation.source.family, + report_title=observation.source.report_title, + source_url=observation.source.source_url, + source_date=observation.source.source_date, + detector="PATCH", + scenario=observation.scenario, + fixture_kind=observation.fixture_kind, + expected_result=expected, + payload={"func_bytes": func_bytes}, + report_backed=observation.report_backed, + ) + + +def _make_iat_case(observation: ReportObservation, variant: int) -> ApiHookCase: + image_base = 0x10000000 + module_base = 0x50000000 + (variant * 0x100000) + slot_rva = 0x1200 + (variant * 8) + slot_va = module_base + slot_rva + resolved = 0x60000000 + (variant * 0x10000) + imports = [ + { + "dll": "kernel32.dll", + "imports": [ + { + "address": image_base + slot_rva, + "name": b"CreateFileW", + "ordinal": 0, + } + ], + } + ] + module_map = { + "kernel32.dll": (0x60000000, 0x60040000), + "kernelbase.dll": (0x61000000, 0x61040000), + "evilhook.dll": (0x62000000, 0x62040000), + "app.dll": (module_base, module_base + 0x9000), + } + memory = {slot_va: resolved.to_bytes(8, "little")} + detected = False + owner = "kernel32.dll" + function = "CreateFileW" + delay = False + + if observation.scenario == "legit_import_same_module": + resolved = module_map["kernel32.dll"][0] + 0x1200 + variant + memory = {slot_va: resolved.to_bytes(8, "little")} + elif observation.scenario == "api_set_forward_legit": + imports[0]["dll"] = "api-ms-win-core-file-l1-1-0.dll" + resolved = module_map["kernelbase.dll"][0] + 0x2200 + variant + memory = {slot_va: resolved.to_bytes(8, "little")} + elif observation.scenario == "delay_import_legit": + delay = True + resolved = module_map["kernel32.dll"][0] + 0x1800 + variant + memory = {slot_va: resolved.to_bytes(8, "little")} + elif observation.scenario == "unexpected_module_hook": + detected = True + resolved = module_map["evilhook.dll"][0] + 0x50 + variant + owner = "evilhook.dll" + memory = {slot_va: resolved.to_bytes(8, "little")} + elif observation.scenario == "unbacked_hook": + detected = True + resolved = 0x73000000 + (variant * 0x100) + owner = "" + memory = {slot_va: resolved.to_bytes(8, "little")} + elif observation.scenario == "ordinal_hook": + detected = True + resolved = module_map["evilhook.dll"][0] + 0x90 + variant + owner = "evilhook.dll" + imports[0]["imports"][0]["name"] = None + imports[0]["imports"][0]["ordinal"] = 117 + variant + function = f"Ordinal#{117 + variant}" + memory = {slot_va: resolved.to_bytes(8, "little")} + elif observation.scenario == "missing_address_clean": + imports[0]["imports"][0]["address"] = None + memory = {} + elif observation.scenario == "unreadable_thunk_clean": + memory = {} + elif observation.scenario == "kernelbase_chain_legit": + resolved = module_map["kernelbase.dll"][0] + 0x400 + variant + memory = {slot_va: resolved.to_bytes(8, "little")} + elif observation.scenario == "self_resolution_legit": + imports[0]["dll"] = "app.dll" + resolved = module_map["app.dll"][0] + 0x500 + variant + owner = "app.dll" + memory = {slot_va: resolved.to_bytes(8, "little")} + elif observation.scenario == "delay_import_unexpected": + delay = True + detected = True + resolved = module_map["evilhook.dll"][0] + 0x20 + variant + owner = "evilhook.dll" + memory = {slot_va: resolved.to_bytes(8, "little")} + + if delay: + imports[0]["delay"] = True + + expected = { + "detected": detected, + "hook_type": "IAT" if detected else None, + "target": resolved if detected else None, + "owner": owner if detected else None, + "confidence": "HIGH" + if owner == "" + else "MEDIUM" + if detected + else None, + "note": "", + "function": function, + } + return ApiHookCase( + case_id=build_case_id( + observation.source, + "iat", + observation.scenario, + _case_ordinal(observation, variant), + ), + family=observation.source.family, + report_title=observation.source.report_title, + source_url=observation.source.source_url, + source_date=observation.source.source_date, + detector="IAT", + scenario=observation.scenario, + fixture_kind=observation.fixture_kind, + expected_result=expected, + payload={ + "entries": imports, + "module_base": module_base, + "module_map": module_map, + "memory": memory, + "is_64bit": True, + }, + report_backed=observation.report_backed, + ) + + +def _make_eat_case(observation: ReportObservation, variant: int) -> ApiHookCase: + module_base = 0x50000000 + module_size = 0x4000 + export_dir_va = 0x200 + export_dir_size = 0x80 + address = 0x1000 + (variant * 0x10) + function = f"ExportedFunc{variant}" + detected = False + + if observation.scenario == "forwarded_export_clean": + address = export_dir_va + 0x10 + elif observation.scenario == "in_image_export_clean": + address = 0x1200 + variant + elif observation.scenario == "out_of_image_export_hook": + address = module_size + 0x100 + variant + detected = True + elif observation.scenario == "ordinal_out_of_image_hook": + address = module_size + 0x200 + variant + function = f"Ordinal#{10 + variant}" + detected = True + elif observation.scenario == "zero_address_clean": + address = 0 + + expected = { + "detected": detected, + "hook_type": "EAT" if detected else None, + "target": module_base + address if detected else None, + "owner": "" if detected else None, + "confidence": "MEDIUM" if detected else None, + "note": "", + "function": function, + } + return ApiHookCase( + case_id=build_case_id( + observation.source, + "eat", + observation.scenario, + _case_ordinal(observation, variant), + ), + family=observation.source.family, + report_title=observation.source.report_title, + source_url=observation.source.source_url, + source_date=observation.source.source_date, + detector="EAT", + scenario=observation.scenario, + fixture_kind=observation.fixture_kind, + expected_result=expected, + payload={ + "symbols": [ + { + "address": address, + "name": function.encode("utf-8") + if not function.startswith("Ordinal#") + else None, + "ordinal": 10 + variant, + } + ], + "module_base": module_base, + "module_size": module_size, + "export_dir_va": export_dir_va, + "export_dir_size": export_dir_size, + }, + report_backed=observation.report_backed, + ) + + +def _make_ssdt_case(observation: ReportObservation, variant: int) -> ApiHookCase: + is_64bit = "x64" in observation.scenario + target = 0x3000 + (variant * 0x100) + raw = (target - 0x1000) << 4 if is_64bit else target + owners = [] + detected = False + owner = None + + if observation.scenario in {"x64_non_nt_owner", "x86_non_nt_owner"}: + owners = [("evilhook.sys", target, target + 0x100)] + detected = True + owner = "evilhook.sys" + elif observation.scenario in {"x64_unknown_owner", "x86_unknown_owner"}: + detected = True + owner = "" + else: + owners = [("ntoskrnl.exe", target, target + 0x100)] + + expected = { + "detected": detected, + "hook_type": "SSDT" if detected else None, + "target": target if detected else None, + "owner": owner, + "confidence": "HIGH" + if detected and owner == "" + else "MEDIUM" + if detected + else None, + "note": "", + } + return ApiHookCase( + case_id=build_case_id( + observation.source, + "ssdt", + observation.scenario, + _case_ordinal(observation, variant), + ), + family=observation.source.family, + report_title=observation.source.report_title, + source_url=observation.source.source_url, + source_date=observation.source.source_date, + detector="SSDT", + scenario=observation.scenario, + fixture_kind=observation.fixture_kind, + expected_result=expected, + payload={ + "is_64bit": is_64bit, + "service_limit": 1, + "raw_functions": [raw], + "owners": owners, + }, + report_backed=observation.report_backed, + ) + + +def _make_scoring_case(observation: ReportObservation, variant: int) -> ApiHookCase: + expected = { + "detected": None, + "hook_type": None, + "target": None, + "owner": None, + "confidence": None, + "note": "", + } + payload = {"kind": observation.scenario, "variant": variant} + + if observation.scenario == "score_unbacked_high": + expected["confidence"] = "HIGH" + payload["hook"] = { + "type": "INLINE", + "function": "AmsiScanBuffer", + "target_module": "", + } + elif observation.scenario == "score_security_module_low": + expected["confidence"] = "LOW" + payload["hook"] = { + "type": "INLINE", + "function": "CreateFileW", + "target_module": "crowdstrike.dll", + } + elif observation.scenario == "score_suspicious_backed_high_value": + expected["confidence"] = "HIGH" + payload["hook"] = { + "type": "INLINE", + "function": "AmsiScanBuffer", + "target_module": "evilhook.dll", + "suspicious_backed_target": True, + } + elif observation.scenario == "score_system_forward_low": + expected["confidence"] = "LOW" + payload["hook"] = { + "type": "INLINE", + "function": "CreateFileW", + "target_module": "kernel32.dll", + } + elif observation.scenario == "module_cache_eviction": + expected["target"] = 2 + payload["timestamps"] = [variant + 1, variant + 2, variant + 3] + elif observation.scenario == "directory_sanity": + payload["directories"] = PE_FIXTURES["reasonable_directory_sets"][ + variant % len(PE_FIXTURES["reasonable_directory_sets"]) + ] + expected["target"] = sum( + 1 + for entry in payload["directories"]["directories"].values() + if entry["virtual_address"] > 0 + and entry["size"] > 0 + and entry["virtual_address"] + entry["size"] + <= payload["directories"]["size_of_image"] + ) + elif observation.scenario == "followup_unbacked_note": + expected["note"] = "follow-up: inspect target with malfind" + payload["owner"] = "" + payload["suspicious"] = False + elif observation.scenario == "followup_backed_note": + expected["note"] = "follow-up: inspect target module with malfind" + payload["owner"] = "evilhook.dll" + payload["suspicious"] = True + elif observation.scenario == "quick_mode_iat_only": + expected["hook_type"] = "IAT" + elif observation.scenario in { + "dedup_order_forward", + "dedup_order_reverse", + "dedup_threshold_ten", + }: + expected["confidence"] = "LOW" + elif observation.scenario == "score_wow64_suppression": + expected["confidence"] = "LOW" + payload["hook"] = { + "type": "INLINE", + "function": "NtTraceEvent", + "target_module": "wow64cpu.dll", + } + elif observation.scenario == "score_self_target_low": + expected["confidence"] = "LOW" + payload["hook"] = { + "type": "INLINE", + "function": "LdrLoadDll", + "target_module": "ntdll.dll", + "source_module": "ntdll.dll", + } + else: + expected["confidence"] = "HIGH" + payload["hook"] = { + "type": "PATCH/EARLY_RET", + "function": "EtwEventWrite", + "target_module": "", + } + + return ApiHookCase( + case_id=build_case_id( + observation.source, + "scoring", + observation.scenario, + _case_ordinal(observation, variant), + ), + family=observation.source.family, + report_title=observation.source.report_title, + source_url=observation.source.source_url, + source_date=observation.source.source_date, + detector="SCORING", + scenario=observation.scenario, + fixture_kind=observation.fixture_kind, + expected_result=expected, + payload=payload, + report_backed=observation.report_backed, + ) + + +def _expand_cases( + observations: Iterable[ReportObservation], + *, + variants: int, + builder, +) -> List[ApiHookCase]: + cases = [] + for observation in observations: + for variant in range(variants): + cases.append(builder(observation, variant)) + return cases + + +INLINE_CASES = _expand_cases( + INLINE_OBSERVATIONS, variants=4, builder=_make_inline_case +) +PATCH_CASES = _expand_cases(PATCH_OBSERVATIONS, variants=4, builder=_make_patch_case) +IAT_CASES = _expand_cases(IAT_OBSERVATIONS, variants=5, builder=_make_iat_case) +EAT_CASES = _expand_cases(EAT_OBSERVATIONS, variants=3, builder=_make_eat_case) +SSDT_CASES = _expand_cases(SSDT_OBSERVATIONS, variants=3, builder=_make_ssdt_case) +SCORING_CASES = _expand_cases( + SCORING_OBSERVATIONS, variants=5, builder=_make_scoring_case +) + +ALL_EXPLICIT_CASES = ( + INLINE_CASES + + PATCH_CASES + + IAT_CASES + + EAT_CASES + + SSDT_CASES + + SCORING_CASES +) + +assert len(INLINE_CASES) == 180 +assert len(PATCH_CASES) == 120 +assert len(IAT_CASES) == 110 +assert len(EAT_CASES) == 30 +assert len(SSDT_CASES) == 45 +assert len(SCORING_CASES) == 75 +assert len(ALL_EXPLICIT_CASES) == 560 +assert len({case.case_id for case in ALL_EXPLICIT_CASES}) == len(ALL_EXPLICIT_CASES) +assert sum(case.report_backed for case in ALL_EXPLICIT_CASES) >= 420 +assert sum(case.fixture_kind == "mini_fixture" for case in ALL_EXPLICIT_CASES) == 120 diff --git a/test/plugins/windows/apihooks_test_support.py b/test/plugins/windows/apihooks_test_support.py new file mode 100644 index 0000000000..7aece64243 --- /dev/null +++ b/test/plugins/windows/apihooks_test_support.py @@ -0,0 +1,390 @@ +import importlib.util +import json +import sys +from dataclasses import dataclass +from pathlib import Path +from types import SimpleNamespace +from typing import Dict, Iterable, List, Mapping, Optional, Sequence, Tuple + +from volatility3.framework.interfaces.configuration import HierarchicalDict + + +REPO_ROOT = Path(__file__).resolve().parents[3] +FIXTURE_DIR = REPO_ROOT / "test" / "plugins" / "windows" / "test_data" / "apihooks" + +if str(REPO_ROOT.parent) not in sys.path: + sys.path.insert(0, str(REPO_ROOT.parent)) + + +def _resolve_apihooks_path() -> Path: + candidates = ( + REPO_ROOT / "framework" / "plugins" / "windows" / "malware" / "apihooks.py", + REPO_ROOT + / "volatility3" + / "framework" + / "plugins" + / "windows" + / "malware" + / "apihooks.py", + ) + for candidate in candidates: + if candidate.exists(): + return candidate + return candidates[0] + + +APIHOOKS_PATH = _resolve_apihooks_path() +APIHOOKS_SPEC = importlib.util.spec_from_file_location( + "apihooks_mass_corpus_under_test", APIHOOKS_PATH +) +assert APIHOOKS_SPEC is not None and APIHOOKS_SPEC.loader is not None +apihooks = importlib.util.module_from_spec(APIHOOKS_SPEC) +sys.modules[APIHOOKS_SPEC.name] = apihooks +APIHOOKS_SPEC.loader.exec_module(apihooks) + + +def _load_json_fixture(name: str) -> dict: + with (FIXTURE_DIR / name).open("r", encoding="utf-8") as handle: + return json.load(handle) + + +INLINE_FIXTURES = _load_json_fixture("inline_fixtures.json") +PATCH_FIXTURES = _load_json_fixture("patch_fixtures.json") +PE_FIXTURES = _load_json_fixture("pe_layouts.json") + + +@dataclass(frozen=True) +class SourceRecord: + source_id: str + family: str + report_title: str + source_url: str + source_date: str + vendor: str + + +@dataclass(frozen=True) +class ReportObservation: + observation_id: str + source: SourceRecord + detector: str + scenario: str + fixture_kind: str + report_backed: bool = True + + +@dataclass(frozen=True) +class ApiHookCase: + case_id: str + family: str + report_title: str + source_url: str + source_date: str + detector: str + scenario: str + fixture_kind: str + expected_result: dict + payload: dict + report_backed: bool + + +def slugify(value: str) -> str: + lowered = value.lower() + chars = [] + prev_sep = False + for char in lowered: + if char.isalnum(): + chars.append(char) + prev_sep = False + elif not prev_sep: + chars.append("_") + prev_sep = True + return "".join(chars).strip("_") + + +def decode_hex_blob(hex_blob: str) -> bytes: + return bytes.fromhex(hex_blob) + + +def load_inline_fixture(name: str) -> bytes: + return decode_hex_blob(INLINE_FIXTURES[name]) + + +def load_patch_fixture(name: str) -> bytes: + return decode_hex_blob(PATCH_FIXTURES[name]) + + +def build_case_id( + source: SourceRecord, detector: str, scenario: str, ordinal: int +) -> str: + return "__".join( + [ + slugify(source.family), + slugify(source.report_title), + slugify(detector), + slugify(scenario), + f"{ordinal:03d}", + ] + ) + + +class FakeLayer: + def __init__( + self, memory: Optional[Mapping[int, bytes]] = None, bits_per_register: int = 64 + ) -> None: + self._memory = { + int(address): bytes(blob) for address, blob in (memory or {}).items() + } + self.bits_per_register = bits_per_register + + def read(self, address: int, size: int, pad: bool = False) -> bytes: + address = int(address) + size = int(size) + + for base, blob in self._memory.items(): + end = base + len(blob) + if base <= address < end: + start = address - base + data = blob[start : start + size] + if len(data) == size: + return data + if pad: + return data.ljust(size, b"\x00") + raise apihooks.exceptions.InvalidAddressException("FakeLayer", address) + + if pad: + return b"\x00" * size + + raise apihooks.exceptions.InvalidAddressException("FakeLayer", address) + + +def make_import_entry(dll_name: str, imports: Sequence[dict]): + import_objects = [] + for entry in imports: + import_objects.append( + SimpleNamespace( + address=entry.get("address"), + name=entry.get("name"), + ordinal=entry.get("ordinal", 0), + ) + ) + return SimpleNamespace(dll=dll_name.encode("utf-8"), imports=import_objects) + + +def _directory_entry_indexes() -> List[int]: + if not apihooks.HAS_PEFILE: + return [0] + + indexes = [0] + for directory_name in ( + "IMAGE_DIRECTORY_ENTRY_EXPORT", + "IMAGE_DIRECTORY_ENTRY_IMPORT", + "IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT", + ): + indexes.append(apihooks.pefile.DIRECTORY_ENTRY[directory_name]) + return indexes + + +def _make_directory_entries(default_virtual_address: int, default_size: int): + return [ + SimpleNamespace( + VirtualAddress=default_virtual_address, + Size=default_size, + ) + for _ in range(max(_directory_entry_indexes()) + 1) + ] + + +def make_import_pe( + entries: Sequence[dict], + *, + image_base: int = 0x10000000, + size_of_image: int = 0x4000, +) -> object: + pe_obj = SimpleNamespace( + FILE_HEADER=SimpleNamespace(TimeDateStamp=0x12345678), + OPTIONAL_HEADER=SimpleNamespace( + ImageBase=image_base, + SizeOfImage=size_of_image, + DATA_DIRECTORY=_make_directory_entries(0x200, 0x80), + ), + sections=[], + ) + normal_entries = [] + delay_entries = [] + for entry in entries: + built = make_import_entry(entry["dll"], entry["imports"]) + if entry.get("delay", False): + delay_entries.append(built) + else: + normal_entries.append(built) + if normal_entries: + pe_obj.DIRECTORY_ENTRY_IMPORT = normal_entries + if delay_entries: + pe_obj.DIRECTORY_ENTRY_DELAY_IMPORT = delay_entries + return pe_obj + + +def make_export_pe( + symbols: Sequence[dict], + *, + export_dir_va: int, + export_dir_size: int, + size_of_image: int, +) -> object: + directory_entries = _make_directory_entries(0, 0) + directory_entries[0] = SimpleNamespace( + VirtualAddress=export_dir_va, Size=export_dir_size + ) + return SimpleNamespace( + OPTIONAL_HEADER=SimpleNamespace( + DATA_DIRECTORY=directory_entries, + SizeOfImage=size_of_image, + ), + DIRECTORY_ENTRY_EXPORT=SimpleNamespace( + symbols=[ + SimpleNamespace( + address=symbol["address"], + name=symbol.get("name"), + ordinal=symbol.get("ordinal", 0), + ) + for symbol in symbols + ] + ), + ) + + +class FakeKernelSymbol: + def __init__(self, address: int): + self.address = address + + +class FakeKernel: + def __init__( + self, + *, + is_64bit: bool, + service_limit: int, + raw_functions: Sequence[int], + symbol_map: Optional[Mapping[str, int]] = None, + ) -> None: + self.layer_name = "kernel_layer" + self.symbol_table_name = "fake_kernel_symbols" + self.offset = 0 + self._is_64bit = is_64bit + self._service_limit = service_limit + self._raw_functions = list(raw_functions) + self._symbol_map = { + "KiServiceTable": 0x1000, + "KiServiceLimit": 0x2000, + } + if symbol_map: + self._symbol_map.update(symbol_map) + + def get_symbol(self, name: str) -> FakeKernelSymbol: + if name not in self._symbol_map: + raise apihooks.exceptions.SymbolError(name, "fake") + return FakeKernelSymbol(self._symbol_map[name]) + + def object( + self, object_type: str, offset: int, subtype=None, count: Optional[int] = None + ): + if object_type == "int": + return self._service_limit + if object_type == "array": + return self._raw_functions[:count] + raise TypeError(f"Unsupported fake kernel object type: {object_type}") + + def get_type(self, type_name: str) -> str: + return type_name + + +class FakeContext: + def __init__(self, kernel_module_name: str, kernel_module: FakeKernel, layers=None): + self.modules = {kernel_module_name: kernel_module} + self.layers = layers or {} + + +class FakeModuleCollection: + def __init__(self, owners: Sequence[Tuple[str, int, int]]) -> None: + self._owners = list(owners) + + def get_module_symbols_by_absolute_location(self, address: int): + for name, start, end in self._owners: + if start <= address < end: + return [(name, iter(()))] + return [] + + +class FakeProcess: + def __init__( + self, pid: int, name: str, layer_name: str, ppid: int = 0 + ) -> None: + self.UniqueProcessId = pid + self.InheritedFromUniqueProcessId = ppid + self.ImageFileName = name.encode("utf-8") + self._layer_name = layer_name + + def add_process_layer(self) -> str: + return self._layer_name + + +def make_plugin(config: dict, context) -> object: + plugin = apihooks.ApiHooks.__new__(apihooks.ApiHooks) + config_path = "test.plugins.windows.apihooks" + root_config = getattr(context, "config", None) + if root_config is None: + root_config = HierarchicalDict() + context.config = root_config + for key, value in config.items(): + root_config[f"{config_path}.{key}"] = value + plugin._context = context + plugin._config_path = config_path + plugin._config_cache = None + plugin._progress_callback = lambda _f, _s: None + return plugin + + +def make_iat_hook( + import_dll: str, + function: str, + resolved_addr: int, + target_module: str, +) -> dict: + return { + "type": "IAT", + "import_dll": import_dll, + "function": function, + "resolved_addr": resolved_addr, + "target_module": target_module, + } + + +def make_eat_hook(function: str, rva: int, absolute_addr: int, target_module: str) -> dict: + return { + "type": "EAT", + "function": function, + "rva": rva, + "absolute_addr": absolute_addr, + "target_module": target_module, + } + + +def collect_treegrid_rows(generator: Iterable[Tuple[int, tuple]]) -> List[tuple]: + return [row for _depth, row in generator] + + +def assert_case_schema(case: ApiHookCase) -> None: + required = ( + case.case_id, + case.family, + case.report_title, + case.source_url, + case.source_date, + case.detector, + case.scenario, + case.fixture_kind, + ) + assert all(required) + assert isinstance(case.expected_result, dict) + assert isinstance(case.payload, dict) diff --git a/test/plugins/windows/test_apihooks.py b/test/plugins/windows/test_apihooks.py new file mode 100644 index 0000000000..2fa111ff0e --- /dev/null +++ b/test/plugins/windows/test_apihooks.py @@ -0,0 +1,323 @@ +import importlib.util +import sys +from pathlib import Path +from types import SimpleNamespace + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parents[3] +if str(REPO_ROOT.parent) not in sys.path: + sys.path.insert(0, str(REPO_ROOT.parent)) + +from volatility3.framework.configuration import requirements + + +def _resolve_apihooks_path() -> Path: + candidates = ( + REPO_ROOT / "framework" / "plugins" / "windows" / "malware" / "apihooks.py", + REPO_ROOT + / "volatility3" + / "framework" + / "plugins" + / "windows" + / "malware" + / "apihooks.py", + ) + for candidate in candidates: + if candidate.exists(): + return candidate + return candidates[0] + + +APIHOOKS_PATH = _resolve_apihooks_path() +APIHOOKS_SPEC = importlib.util.spec_from_file_location( + "apihooks_under_test", APIHOOKS_PATH +) +assert APIHOOKS_SPEC is not None and APIHOOKS_SPEC.loader is not None +apihooks = importlib.util.module_from_spec(APIHOOKS_SPEC) +sys.modules[APIHOOKS_SPEC.name] = apihooks +APIHOOKS_SPEC.loader.exec_module(apihooks) + + +def _make_fake_pe( + timestamp: int, + *, + export_dir_va: int = 0x200, + export_dir_size: int = 0x100, + size_of_image: int = 0x4000, +): + return SimpleNamespace( + FILE_HEADER=SimpleNamespace(TimeDateStamp=timestamp), + OPTIONAL_HEADER=SimpleNamespace( + ImageBase=0x10000000, + SizeOfImage=size_of_image, + DATA_DIRECTORY=[ + SimpleNamespace(VirtualAddress=export_dir_va, Size=export_dir_size) + ], + ), + DIRECTORY_ENTRY_EXPORT=SimpleNamespace( + symbols=[ + SimpleNamespace(name=b"ExportedFunc", address=0x1000), + ] + ), + sections=[ + SimpleNamespace( + Characteristics=0x20000000, + Name=b".text\x00\x00\x00", + VirtualAddress=0x1000, + Misc_VirtualSize=0x1000, + ) + ], + ) + + +@pytest.mark.skipif(not apihooks.HAS_CAPSTONE, reason="capstone is required") +@pytest.mark.parametrize( + ("func_bytes", "expected_type"), + [ + (b"\xC3", "EARLY_RET"), + (b"\xC2\x14\x00", "RET_IMM"), + (b"\x31\xC0\xC3", "XOR_EAX_RET"), + (b"\x29\xC0\xC3", "SUB_EAX_RET"), + (b"\x6A\x00\x58\xC3", "PUSH0_POP_EAX_RET"), + ], +) +def test_check_patch_semantic_detection(func_bytes, expected_type): + detector = apihooks.InlineHookDetector(is_64bit=False) + + patch = detector.check_patch(func_bytes) + + assert patch is not None + assert patch[0] == expected_type + + +@pytest.mark.skipif(not apihooks.HAS_CAPSTONE, reason="capstone is required") +def test_check_patch_ignores_benign_stub(): + detector = apihooks.InlineHookDetector(is_64bit=False) + + assert detector.check_patch(b"\x55\x8B\xEC\x83\xEC\x08") is None + + +def test_module_cache_evicts_least_recent_entry(): + cache = apihooks.ModuleCache(max_entries=2) + + cache.get_or_parse("a.dll", _make_fake_pe(1)) + cache.get_or_parse("b.dll", _make_fake_pe(2)) + cache.get_or_parse("c.dll", _make_fake_pe(3)) + + assert len(cache) == 2 + assert ("a.dll", 1) not in cache._cache + assert ("b.dll", 2) in cache._cache + assert ("c.dll", 3) in cache._cache + + +@pytest.mark.skipif(not apihooks.HAS_PEFILE, reason="pefile is required") +def test_reasonable_directory_indexes_skip_unreasonable_ranges(): + export_index = apihooks.pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"] + import_index = apihooks.pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_IMPORT"] + delay_index = apihooks.pefile.DIRECTORY_ENTRY[ + "IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT" + ] + max_index = max(export_index, import_index, delay_index) + directories = [SimpleNamespace(VirtualAddress=0, Size=0) for _ in range(max_index + 1)] + directories[export_index] = SimpleNamespace(VirtualAddress=0x200, Size=0x80) + directories[import_index] = SimpleNamespace(VirtualAddress=0x5000, Size=0x80) + directories[delay_index] = SimpleNamespace(VirtualAddress=0x300, Size=0x5000) + pe_obj = SimpleNamespace( + OPTIONAL_HEADER=SimpleNamespace( + ImageBase=0x180000000, + SizeOfImage=0x1000, + DATA_DIRECTORY=directories, + ) + ) + + safe_indexes = apihooks.ApiHooks._iter_reasonable_directory_indexes(pe_obj) + + assert safe_indexes == [export_index] + + +def test_eat_detector_ignores_none_export_entry(): + pe_obj = SimpleNamespace( + OPTIONAL_HEADER=SimpleNamespace( + DATA_DIRECTORY=[SimpleNamespace(VirtualAddress=0x200, Size=0x80)] + ), + DIRECTORY_ENTRY_EXPORT=None, + ) + + assert apihooks.EATHookDetector().check_eat(pe_obj, 0x10000000, 0x4000) == [] + + +def test_extract_process_identity_tolerates_invalid_process_metadata(): + class BadProcess: + UniqueProcessId = 4242 + + @property + def InheritedFromUniqueProcessId(self): + raise apihooks.exceptions.InvalidAddressException("fake", 0) + + @property + def ImageFileName(self): + raise apihooks.exceptions.InvalidAddressException("fake", 0) + + assert apihooks.ApiHooks._extract_process_identity(BadProcess()) == ( + 4242, + 0, + "", + ) + + +def test_suspicious_backed_target_heuristic_stays_conservative_for_system_dlls(): + module_map = { + "kernel32.dll": (0x10000000, 0x10020000), + "evil.dll": (0x20000000, 0x20020000), + } + exec_metadata = { + "sections": [{"name": ".text", "rva": 0x1000, "vsize": 0x2000}], + } + + assert not apihooks.ApiHooks._is_suspicious_backed_inline_target( + "amsi.dll", + "AmsiScanBuffer", + "kernel32.dll", + 0x10001000, + module_map, + exec_metadata, + ) + assert apihooks.ApiHooks._is_suspicious_backed_inline_target( + "amsi.dll", + "AmsiScanBuffer", + "evil.dll", + 0x20001000, + module_map, + exec_metadata, + ) + + +def test_hook_scorer_boosts_only_high_value_suspicious_backed_targets(): + scorer = apihooks.HookScorer() + + suspicious_high_value = scorer.score( + { + "type": "INLINE", + "function": "AmsiScanBuffer", + "target_module": "evil.dll", + "suspicious_backed_target": True, + }, + {}, + ) + suspicious_low_value = scorer.score( + { + "type": "INLINE", + "function": "CreateFileW", + "target_module": "evil.dll", + "suspicious_backed_target": True, + }, + {}, + ) + system_target = scorer.score( + { + "type": "INLINE", + "function": "AmsiScanBuffer", + "target_module": "kernel32.dll", + "suspicious_backed_target": False, + }, + {}, + ) + + assert suspicious_high_value >= 70 + assert suspicious_high_value > suspicious_low_value + assert suspicious_high_value > system_target + + +def test_followup_notes_cover_unbacked_and_backed_suspicious_targets(): + assert ( + apihooks.ApiHooks._build_inline_followup_note("", False) + == "follow-up: inspect target with malfind" + ) + assert ( + apihooks.ApiHooks._build_inline_followup_note("evil.dll", True) + == "follow-up: inspect target module with malfind" + ) + assert apihooks.ApiHooks._build_inline_followup_note("kernel32.dll", False) == "" + + +def test_get_requirements_exposes_quick_mode(): + quick_reqs = [ + req + for req in apihooks.ApiHooks.get_requirements() + if isinstance(req, requirements.BooleanRequirement) and req.name == "quick" + ] + + assert len(quick_reqs) == 1 + assert "inline" in quick_reqs[0].description.lower() + + +def test_run_sorts_buffered_rows_and_preserves_process_lineage(): + plugin = apihooks.ApiHooks.__new__(apihooks.ApiHooks) + plugin._generator = lambda: iter( + [ + ( + 0, + ( + 300, + 100, + "child-b.exe", + "parent.exe", + "IAT", + "child_b.dll", + "CreateFileW", + 0x3000, + 0x4000, + "evil.dll", + "MEDIUM", + "", + b"", + ), + ), + ( + 0, + ( + 100, + 50, + "parent.exe", + "root.exe", + "INLINE/JMP_REL32", + "parent.dll", + "AmsiScanBuffer", + 0x1000, + 0x2000, + "", + "HIGH", + "jmp 0x2000", + b"\xE9", + ), + ), + ( + 0, + ( + 200, + 100, + "child-a.exe", + "parent.exe", + "EAT", + "child_a.dll", + "Ordinal#1", + 0x2000, + 0x2100, + "helper.dll", + "LOW", + "", + b"", + ), + ), + ] + ) + + grid = plugin.run() + rows = [row for _depth, row in grid._generator] + + assert [row[0] for row in rows] == [100, 200, 300] + assert [row[1] for row in rows] == [50, 100, 100] + assert [row[3] for row in rows] == ["root.exe", "parent.exe", "parent.exe"] + assert all(len(row) == 13 for row in rows) diff --git a/test/plugins/windows/test_apihooks_corpus.py b/test/plugins/windows/test_apihooks_corpus.py new file mode 100644 index 0000000000..b8709f3ce3 --- /dev/null +++ b/test/plugins/windows/test_apihooks_corpus.py @@ -0,0 +1,488 @@ +from types import SimpleNamespace + +import pytest + +from test.plugins.windows.apihooks_case_data import ( + ALL_EXPLICIT_CASES, + EAT_CASES, + GAP_LEDGER_OBSERVATIONS, + IAT_CASES, + INLINE_CASES, + PATCH_CASES, + REPORT_OBSERVATIONS, + SCORING_CASES, + SSDT_CASES, +) +from test.plugins.windows.apihooks_test_support import ( + FakeContext, + FakeKernel, + FakeLayer, + FakeModuleCollection, + FakeProcess, + apihooks, + assert_case_schema, + collect_treegrid_rows, + make_export_pe, + make_import_pe, + make_plugin, +) + + +def _fake_cache_pe(timestamp: int): + return SimpleNamespace( + FILE_HEADER=SimpleNamespace(TimeDateStamp=timestamp), + OPTIONAL_HEADER=SimpleNamespace( + ImageBase=0x10000000, + SizeOfImage=0x4000, + DATA_DIRECTORY=[SimpleNamespace(VirtualAddress=0x200, Size=0x80)], + ), + sections=[ + SimpleNamespace( + Characteristics=0x20000000, + Name=b".text\x00\x00\x00", + VirtualAddress=0x1000, + Misc_VirtualSize=0x1000, + ) + ], + DIRECTORY_ENTRY_EXPORT=SimpleNamespace( + symbols=[SimpleNamespace(name=b"ExportedFunc", address=0x1000)] + ), + ) + + +def _make_directory_pe(layout: dict): + indexes = [ + 0, + apihooks.pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"], + apihooks.pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_IMPORT"], + apihooks.pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT"], + ] + directories = [ + SimpleNamespace(VirtualAddress=0, Size=0) for _ in range(max(indexes) + 1) + ] + mapping = { + "export": "IMAGE_DIRECTORY_ENTRY_EXPORT", + "import": "IMAGE_DIRECTORY_ENTRY_IMPORT", + "delay_import": "IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT", + } + for key, entry in layout["directories"].items(): + directories[apihooks.pefile.DIRECTORY_ENTRY[mapping[key]]] = SimpleNamespace( + VirtualAddress=entry["virtual_address"], Size=entry["size"] + ) + return SimpleNamespace( + OPTIONAL_HEADER=SimpleNamespace( + ImageBase=0x180000000, + SizeOfImage=layout["size_of_image"], + DATA_DIRECTORY=directories, + ) + ) + + +def _make_export_cache_pe(function_name: str): + return SimpleNamespace( + FILE_HEADER=SimpleNamespace(TimeDateStamp=0x1234), + OPTIONAL_HEADER=SimpleNamespace( + ImageBase=0x180000000, + SizeOfImage=0x4000, + DATA_DIRECTORY=[SimpleNamespace(VirtualAddress=0x200, Size=0x80)], + ), + DIRECTORY_ENTRY_EXPORT=SimpleNamespace( + symbols=[SimpleNamespace(name=function_name.encode("utf-8"), address=0x1000)] + ), + sections=[ + SimpleNamespace( + Characteristics=0x20000000, + Name=b".text\x00\x00\x00", + VirtualAddress=0x1000, + Misc_VirtualSize=0x1000, + ) + ], + ) + + +def _run_quick_mode_case(monkeypatch, variant: int): + fake_kernel = SimpleNamespace(layer_name="kernel_layer") + context = SimpleNamespace( + modules={"kernel": fake_kernel}, + layers={"proc_1": FakeLayer(bits_per_register=64)}, + ) + plugin = make_plugin( + { + "kernel": "kernel", + "quick": True, + "low": True, + "ssdt": False, + "skip-kernel": True, + "pid": None, + }, + context, + ) + process = FakeProcess(1337 + variant, "quickproc.exe", "proc_1") + pe_obj = make_import_pe( + [ + { + "dll": "kernel32.dll", + "imports": [ + { + "address": 0x10001200, + "name": b"CreateFileW", + "ordinal": 0, + } + ], + } + ] + ) + + class FakeIATDetector: + def __init__(self, module_map, proc_layer, is_64bit): + self.module_map = module_map + + def check_iat(self, pe_obj, module_base): + return [ + { + "type": "IAT", + "import_dll": "kernel32.dll", + "function": f"CreateFileW_{variant}", + "resolved_addr": 0x62000000 + variant, + "target_module": "evilhook.dll", + } + ] + + def find_owner(self, addr): + return "evilhook.dll" + + class FakeEATDetector: + def check_eat(self, pe_obj, module_base, module_size): + return [] + + class FailInlineDetector: + def __init__(self, *args, **kwargs): + raise AssertionError("inline detector should not be created in quick mode") + + monkeypatch.setattr(apihooks, "HAS_PEFILE", True) + monkeypatch.setattr(apihooks, "IATHookDetector", FakeIATDetector) + monkeypatch.setattr(apihooks, "EATHookDetector", FakeEATDetector) + monkeypatch.setattr(apihooks, "InlineHookDetector", FailInlineDetector) + monkeypatch.setattr( + apihooks.intermed.IntermediateSymbolTable, + "create", + staticmethod(lambda *args, **kwargs: "fake_pe"), + ) + monkeypatch.setattr( + apihooks.pslist.PsList, + "create_pid_filter", + staticmethod(lambda value: lambda proc: False), + ) + monkeypatch.setattr( + apihooks.pslist.PsList, + "list_processes", + staticmethod(lambda **kwargs: [process]), + ) + monkeypatch.setattr( + apihooks.utility, + "array_to_string", + lambda value: value.decode("utf-8") + if isinstance(value, (bytes, bytearray)) + else str(value), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_build_module_map", + staticmethod( + lambda proc: ( + { + "hooked.dll": (0x50000000, 0x50009000), + "evilhook.dll": (0x62000000, 0x62010000), + }, + [("hooked.dll", 0x50000000, 0x9000)], + ) + ), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_reconstruct_pe", + classmethod(lambda cls, context, pe_table_name, dll_base, layer_name: pe_obj), + ) + + return collect_treegrid_rows(plugin._generator()) + + +def _run_dedup_case(monkeypatch, reverse_order: bool = False): + processes = [ + FakeProcess(3000 + index, f"proc{index}.exe", f"proc_{index}") + for index in range(10) + ] + if reverse_order: + processes = list(reversed(processes)) + layers = { + process.add_process_layer(): FakeLayer(bits_per_register=64) for process in processes + } + fake_kernel = SimpleNamespace(layer_name="kernel_layer") + context = SimpleNamespace(modules={"kernel": fake_kernel}, layers=layers) + plugin = make_plugin( + { + "kernel": "kernel", + "quick": False, + "low": True, + "ssdt": False, + "skip-kernel": True, + "pid": None, + }, + context, + ) + pe_obj = _make_export_cache_pe("CreateFileW") + + class FakeIATDetector: + def __init__(self, module_map, proc_layer, is_64bit): + self.module_map = module_map + + def check_iat(self, pe_obj, module_base): + return [] + + def find_owner(self, addr): + return "" + + class FakeEATDetector: + def check_eat(self, pe_obj, module_base, module_size): + return [] + + class FakeInlineDetector: + def __init__(self, is_64bit, layer): + self.layer = layer + + def check_patch(self, func_bytes): + return None + + def check_inline(self, func_bytes, func_va, mod_start, mod_end): + return (0x73000000, "JMP_REL32", "jmp 0x73000000") + + monkeypatch.setattr(apihooks, "HAS_CAPSTONE", True) + monkeypatch.setattr(apihooks, "HAS_PEFILE", True) + monkeypatch.setattr(apihooks, "IATHookDetector", FakeIATDetector) + monkeypatch.setattr(apihooks, "EATHookDetector", FakeEATDetector) + monkeypatch.setattr(apihooks, "InlineHookDetector", FakeInlineDetector) + monkeypatch.setattr( + apihooks.intermed.IntermediateSymbolTable, + "create", + staticmethod(lambda *args, **kwargs: "fake_pe"), + ) + monkeypatch.setattr( + apihooks.pslist.PsList, + "create_pid_filter", + staticmethod(lambda value: lambda proc: False), + ) + monkeypatch.setattr( + apihooks.pslist.PsList, + "list_processes", + staticmethod(lambda **kwargs: processes), + ) + monkeypatch.setattr( + apihooks.utility, + "array_to_string", + lambda value: value.decode("utf-8") + if isinstance(value, (bytes, bytearray)) + else str(value), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_build_module_map", + staticmethod( + lambda proc: ( + {"hooked.dll": (0x180000000, 0x180004000)}, + [("hooked.dll", 0x180000000, 0x4000)], + ) + ), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_reconstruct_pe", + classmethod(lambda cls, context, pe_table_name, dll_base, layer_name: pe_obj), + ) + monkeypatch.setattr( + apihooks, + "batch_read_prologues", + lambda proc_layer, exports, module_base, prologue_size=32: { + "CreateFileW": b"\xE9\x10\x00\x00\x00" + (b"\x90" * 11) + }, + ) + + return collect_treegrid_rows(plugin._generator()) + + +def test_apihooks_corpus_invariants(): + assert len(REPORT_OBSERVATIONS) == 140 + assert len(ALL_EXPLICIT_CASES) == 560 + assert len(GAP_LEDGER_OBSERVATIONS) == 3 + assert sum(case.report_backed for case in ALL_EXPLICIT_CASES) >= 420 + assert sum(case.fixture_kind == "mini_fixture" for case in ALL_EXPLICIT_CASES) == 120 + assert all(case.source_url for case in ALL_EXPLICIT_CASES) + + +@pytest.mark.skipif(not apihooks.HAS_CAPSTONE, reason="capstone is required") +@pytest.mark.parametrize("case", INLINE_CASES, ids=[case.case_id for case in INLINE_CASES]) +def test_apihooks_inline_corpus(case): + assert_case_schema(case) + layer = FakeLayer( + memory=case.payload["memory"], + bits_per_register=64 if case.payload["is_64bit"] else 32, + ) + detector = apihooks.InlineHookDetector(case.payload["is_64bit"], layer) + + result = detector.check_inline( + case.payload["func_bytes"], + case.payload["func_va"], + case.payload["mod_start"], + case.payload["mod_end"], + ) + + if not case.expected_result["detected"]: + assert result is None + return + + assert result is not None + target, hook_type, disasm = result + assert target == case.expected_result["target"] + assert hook_type == case.expected_result["hook_type"] + assert case.expected_result["disasm_contains"] in disasm.lower() + + +@pytest.mark.skipif(not apihooks.HAS_CAPSTONE, reason="capstone is required") +@pytest.mark.parametrize("case", PATCH_CASES, ids=[case.case_id for case in PATCH_CASES]) +def test_apihooks_patch_corpus(case): + assert_case_schema(case) + detector = apihooks.InlineHookDetector(is_64bit=False) + + result = detector.check_patch(case.payload["func_bytes"]) + + if not case.expected_result["detected"]: + assert result is None + return + + assert result is not None + assert result[0] == case.expected_result["hook_type"].split("/", 1)[1] + + +@pytest.mark.skipif(not apihooks.HAS_PEFILE, reason="pefile is required") +@pytest.mark.parametrize("case", IAT_CASES, ids=[case.case_id for case in IAT_CASES]) +def test_apihooks_iat_corpus(case): + assert_case_schema(case) + pe_obj = make_import_pe(case.payload["entries"]) + detector = apihooks.IATHookDetector( + case.payload["module_map"], + FakeLayer(case.payload["memory"]), + case.payload["is_64bit"], + ) + + hooks = detector.check_iat(pe_obj, case.payload["module_base"]) + + if not case.expected_result["detected"]: + assert hooks == [] + return + + assert len(hooks) == 1 + assert hooks[0]["type"] == "IAT" + assert hooks[0]["target_module"] == case.expected_result["owner"] + assert hooks[0]["function"] == case.expected_result["function"] + + +@pytest.mark.skipif(not apihooks.HAS_PEFILE, reason="pefile is required") +@pytest.mark.parametrize("case", EAT_CASES, ids=[case.case_id for case in EAT_CASES]) +def test_apihooks_eat_corpus(case): + assert_case_schema(case) + pe_obj = make_export_pe( + case.payload["symbols"], + export_dir_va=case.payload["export_dir_va"], + export_dir_size=case.payload["export_dir_size"], + size_of_image=case.payload["module_size"], + ) + detector = apihooks.EATHookDetector() + + hooks = detector.check_eat( + pe_obj, case.payload["module_base"], case.payload["module_size"] + ) + + if not case.expected_result["detected"]: + assert hooks == [] + return + + assert len(hooks) == 1 + assert hooks[0]["type"] == "EAT" + assert hooks[0]["function"] == case.expected_result["function"] + + +@pytest.mark.parametrize("case", SSDT_CASES, ids=[case.case_id for case in SSDT_CASES]) +def test_apihooks_ssdt_corpus(case, monkeypatch): + assert_case_schema(case) + kernel = FakeKernel( + is_64bit=case.payload["is_64bit"], + service_limit=case.payload["service_limit"], + raw_functions=case.payload["raw_functions"], + ) + context = FakeContext("kernel", kernel) + owners = FakeModuleCollection(case.payload["owners"]) + monkeypatch.setattr( + apihooks.symbols, + "symbol_table_is_64bit", + lambda context, symbol_table_name: case.payload["is_64bit"], + ) + detector = apihooks.SSDTHookDetector() + + hooks = detector.check_ssdt(context, "kernel", owners) + + if not case.expected_result["detected"]: + assert hooks == [] + return + + assert len(hooks) == 1 + assert hooks[0]["type"] == "SSDT" + assert hooks[0]["owner"] == case.expected_result["owner"] + + +@pytest.mark.parametrize( + "case", SCORING_CASES, ids=[case.case_id for case in SCORING_CASES] +) +def test_apihooks_scoring_and_hardening_corpus(case, monkeypatch): + assert_case_schema(case) + scorer = apihooks.HookScorer() + kind = case.payload["kind"] + + if kind.startswith("score_"): + score = scorer.score(case.payload["hook"], {}) + assert scorer.label(score) == case.expected_result["confidence"] + return + + if kind == "module_cache_eviction": + cache = apihooks.ModuleCache(max_entries=2) + for timestamp in case.payload["timestamps"]: + cache.get_or_parse(f"mod_{timestamp}.dll", _fake_cache_pe(timestamp)) + assert len(cache) == case.expected_result["target"] + return + + if kind == "directory_sanity": + pe_obj = _make_directory_pe(case.payload["directories"]) + safe_indexes = apihooks.ApiHooks._iter_reasonable_directory_indexes(pe_obj) + assert len(safe_indexes) == case.expected_result["target"] + return + + if kind.startswith("followup_"): + note = apihooks.ApiHooks._build_inline_followup_note( + case.payload["owner"], case.payload["suspicious"] + ) + assert note == case.expected_result["note"] + return + + if kind == "quick_mode_iat_only": + rows = _run_quick_mode_case(monkeypatch, case.payload["variant"]) + assert any(row[4] == "IAT" for row in rows) + assert not any(str(row[4]).startswith("INLINE/") for row in rows) + return + + if kind in {"dedup_order_forward", "dedup_order_reverse", "dedup_threshold_ten"}: + rows = _run_dedup_case(monkeypatch, reverse_order=kind == "dedup_order_reverse") + confidences = {row[10] for row in rows if str(row[4]).startswith("INLINE/")} + assert len(confidences) == 1 + assert confidences == {case.expected_result["confidence"]} + assert len(rows) == 10 + return + + raise AssertionError(f"Unhandled scoring case kind: {kind}") diff --git a/test/plugins/windows/test_apihooks_fp_reduction.py b/test/plugins/windows/test_apihooks_fp_reduction.py new file mode 100644 index 0000000000..269681f94a --- /dev/null +++ b/test/plugins/windows/test_apihooks_fp_reduction.py @@ -0,0 +1,391 @@ +from types import SimpleNamespace + +from test.plugins.windows.apihooks_test_support import ( + FakeLayer, + FakeProcess, + apihooks, + collect_treegrid_rows, + make_plugin, +) + + +def _make_pe(function_names=()): + return SimpleNamespace( + FILE_HEADER=SimpleNamespace(TimeDateStamp=0x1234), + OPTIONAL_HEADER=SimpleNamespace( + ImageBase=0x180000000, + SizeOfImage=0x4000, + DATA_DIRECTORY=[SimpleNamespace(VirtualAddress=0x200, Size=0x80)], + ), + sections=[ + SimpleNamespace( + Characteristics=0x20000000, + Name=b".text\x00\x00\x00", + VirtualAddress=0x1000, + Misc_VirtualSize=0x1000, + ) + ], + DIRECTORY_ENTRY_EXPORT=SimpleNamespace( + symbols=[ + SimpleNamespace(name=name.encode("utf-8"), address=0x1000 + index * 0x40) + for index, name in enumerate(function_names) + ] + ) + if function_names + else None, + ) + + +def test_apihooks_pair_seedlists_match_expected_pairs(): + assert apihooks.ApiHooks._is_benign_pair_match( + "IAT", "umpo.dll", "pcwum.dll", "PerfStartProviderEx" + ) + assert apihooks.ApiHooks._is_benign_pair_match( + "INLINE", "d3d10_1.dll", "d3d10_1core.dll", "D3D10GetVersion" + ) + assert not apihooks.ApiHooks._is_benign_pair_match( + "INLINE", "esscli.dll", "wbemcomn.dll", "?Empty@CSortedArray@@QAEXXZ" + ) + assert not apihooks.ApiHooks._is_benign_pair_match( + "IAT", "shell32.dll", "ieframe.dll", "Ordinal#159" + ) + + +def test_apihooks_target_vad_evidence_classification(): + vad_cache = [ + { + "start": 0x1000, + "end": 0x1FFF, + "private_memory": False, + "execute": True, + "file_name": r"\Windows\System32\pcwum.dll", + }, + { + "start": 0x3000, + "end": 0x3FFF, + "private_memory": True, + "execute": True, + "file_name": "", + }, + { + "start": 0x5000, + "end": 0x5FFF, + "private_memory": False, + "execute": False, + "file_name": r"\Windows\System32\foo.dll", + }, + { + "start": 0x7000, + "end": 0x7FFF, + "private_memory": True, + "execute": False, + "file_name": "", + }, + ] + + assert ( + apihooks.ApiHooks._classify_target_vad_evidence(0x1200, vad_cache) + == "mapped_exec_image" + ) + assert ( + apihooks.ApiHooks._classify_target_vad_evidence(0x3200, vad_cache) + == "private_exec_vad" + ) + assert ( + apihooks.ApiHooks._classify_target_vad_evidence(0x5200, vad_cache) + == "mapped_nonexec" + ) + assert ( + apihooks.ApiHooks._classify_target_vad_evidence(0x7200, vad_cache) + == "private_nonexec" + ) + assert ( + apihooks.ApiHooks._classify_target_vad_evidence(0x9200, vad_cache) + == "missing_vad" + ) + + +def test_apihooks_scorer_nist_fp_reduction_rules(): + scorer = apihooks.HookScorer() + + benign_pair_score = scorer.score( + { + "type": "IAT", + "function": "PerfStartProviderEx", + "source_module": "umpo.dll", + "target_module": "pcwum.dll", + "benign_pair_match": True, + "backed_exec_image": True, + "target_vad_evidence": "mapped_exec_image", + }, + {}, + ) + assert scorer.label(benign_pair_score) == "LOW" + + weak_unknown_score = scorer.score( + { + "type": "IAT", + "function": "CreateFileW", + "target_module": "", + "target_vad_evidence": "missing_vad", + }, + {}, + ) + assert scorer.label(weak_unknown_score) == "LOW" + + weak_unknown_inline_score = scorer.score( + { + "type": "INLINE", + "function": "GetUserDefaultLangID", + "target_module": "", + "target_vad_evidence": "missing_vad", + }, + {}, + ) + assert scorer.label(weak_unknown_inline_score) == "LOW" + + private_exec_unknown_score = scorer.score( + { + "type": "INLINE", + "function": "GetUserDefaultLangID", + "target_module": "", + "target_vad_evidence": "private_exec_vad", + "private_exec_vad": True, + }, + {}, + ) + assert scorer.label(private_exec_unknown_score) == "HIGH" + + high_value_unknown_score = scorer.score( + { + "type": "INLINE", + "function": "AmsiScanBuffer", + "target_module": "", + "target_vad_evidence": "missing_vad", + }, + {}, + ) + assert scorer.label(high_value_unknown_score) == "HIGH" + + unsuppressed_pair_score = scorer.score( + { + "type": "INLINE", + "function": "?Empty@CSortedArray@@QAEXXZ", + "source_module": "esscli.dll", + "target_module": "wbemcomn.dll", + "target_vad_evidence": "mapped_exec_image", + "backed_exec_image": True, + }, + {}, + ) + assert scorer.label(unsuppressed_pair_score) == "MEDIUM" + + +def test_apihooks_default_output_suppresses_seeded_pairs_but_keeps_uncorroborated_unknown( + monkeypatch, +): + process = FakeProcess(4242, "svchost.exe", "proc_4242") + context = SimpleNamespace( + modules={ + "kernel": SimpleNamespace(layer_name="kernel_layer", symbol_table_name="fake") + }, + layers={"proc_4242": FakeLayer(bits_per_register=64)}, + ) + plugin = make_plugin( + { + "kernel": "kernel", + "quick": False, + "low": False, + "ssdt": False, + "skip-kernel": True, + "pid": None, + }, + context, + ) + + module_layout = { + "umpo.dll": 0x50000000, + "shell32.dll": 0x50100000, + "d3d10_1.dll": 0x50200000, + "esscli.dll": 0x50300000, + "kernelbase.dll": 0x50400000, + "pcwum.dll": 0x62000000, + "ieframe.dll": 0x62100000, + "d3d10_1core.dll": 0x62200000, + "wbemcomn.dll": 0x62300000, + } + module_map = { + name: (base, base + 0x4000) for name, base in module_layout.items() + } + module_list = [ + ("umpo.dll", module_layout["umpo.dll"], 0x4000), + ("shell32.dll", module_layout["shell32.dll"], 0x4000), + ("d3d10_1.dll", module_layout["d3d10_1.dll"], 0x4000), + ("esscli.dll", module_layout["esscli.dll"], 0x4000), + ("kernelbase.dll", module_layout["kernelbase.dll"], 0x4000), + ] + pe_map = { + module_layout["umpo.dll"]: _make_pe(), + module_layout["shell32.dll"]: _make_pe(), + module_layout["d3d10_1.dll"]: _make_pe(["D3D10GetVersion"]), + module_layout["esscli.dll"]: _make_pe(["?Empty@CSortedArray@@QAEXXZ"]), + module_layout["kernelbase.dll"]: _make_pe(["GetUserDefaultLangID"]), + } + + class FakeIATDetector: + def __init__(self, module_map, proc_layer, is_64bit): + self.module_map = module_map + + def check_iat(self, pe_obj, module_base): + if module_base == module_layout["umpo.dll"]: + return [ + { + "type": "IAT", + "function": "PerfStartProviderEx", + "resolved_addr": module_layout["pcwum.dll"] + 0x120, + "target_module": "pcwum.dll", + } + ] + if module_base == module_layout["shell32.dll"]: + return [ + { + "type": "IAT", + "function": "Ordinal#159", + "resolved_addr": module_layout["ieframe.dll"] + 0x220, + "target_module": "ieframe.dll", + } + ] + return [] + + def find_owner(self, addr): + for name, (start, end) in self.module_map.items(): + if start <= addr < end: + return name + return "" + + class FakeEATDetector: + def check_eat(self, pe_obj, module_base, module_size): + return [] + + class FakeInlineDetector: + def __init__(self, is_64bit, layer): + self.layer = layer + + def check_patch(self, func_bytes): + return None + + def check_inline(self, func_bytes, func_va, mod_start, mod_end): + if func_va == module_layout["d3d10_1.dll"] + 0x1000: + return ( + module_layout["d3d10_1core.dll"] + 0x100, + "JMP_INDIRECT", + "jmp dword ptr [0x10]", + ) + if func_va == module_layout["esscli.dll"] + 0x1000: + return ( + module_layout["wbemcomn.dll"] + 0x180, + "JMP_INDIRECT", + "jmp dword ptr [0x20]", + ) + if func_va == module_layout["kernelbase.dll"] + 0x1000: + return (0x7605B420, "JMP_REL32", "jmp 0x7605b420") + return None + + def classify_target(target, vad_cache): + if module_layout["pcwum.dll"] <= target < module_layout["pcwum.dll"] + 0x4000: + return "mapped_exec_image" + if module_layout["ieframe.dll"] <= target < module_layout["ieframe.dll"] + 0x4000: + return "mapped_exec_image" + if ( + module_layout["d3d10_1core.dll"] + <= target + < module_layout["d3d10_1core.dll"] + 0x4000 + ): + return "mapped_exec_image" + if ( + module_layout["wbemcomn.dll"] + <= target + < module_layout["wbemcomn.dll"] + 0x4000 + ): + return "mapped_exec_image" + return "missing_vad" + + monkeypatch.setattr(apihooks, "HAS_CAPSTONE", True) + monkeypatch.setattr(apihooks, "HAS_PEFILE", True) + monkeypatch.setattr(apihooks, "IATHookDetector", FakeIATDetector) + monkeypatch.setattr(apihooks, "EATHookDetector", FakeEATDetector) + monkeypatch.setattr(apihooks, "InlineHookDetector", FakeInlineDetector) + monkeypatch.setattr( + apihooks.intermed.IntermediateSymbolTable, + "create", + staticmethod(lambda *args, **kwargs: "fake_pe"), + ) + monkeypatch.setattr( + apihooks.pslist.PsList, + "create_pid_filter", + staticmethod(lambda value: lambda proc: False), + ) + monkeypatch.setattr( + apihooks.pslist.PsList, + "list_processes", + staticmethod(lambda **kwargs: [process]), + ) + monkeypatch.setattr( + apihooks.utility, + "array_to_string", + lambda value: value.decode("utf-8") + if isinstance(value, (bytes, bytearray)) + else str(value), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_build_module_map", + staticmethod(lambda proc: (module_map, module_list)), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_get_vad_protect_values", + classmethod(lambda cls, context, kernel: tuple()), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_build_vad_cache", + classmethod(lambda cls, proc, protect_values: []), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_classify_target_vad_evidence", + staticmethod(classify_target), + ) + monkeypatch.setattr( + apihooks.ApiHooks, + "_reconstruct_pe", + classmethod(lambda cls, context, pe_table_name, dll_base, layer_name: pe_map[dll_base]), + ) + monkeypatch.setattr( + apihooks, + "batch_read_prologues", + lambda proc_layer, exports, module_base, prologue_size=32: { + name: b"\xE9\x10\x00\x00\x00" + (b"\x90" * 11) for name in exports + }, + ) + + rows = collect_treegrid_rows(plugin._generator()) + + assert not any(row[5] == "umpo.dll" for row in rows) + assert not any(row[5] == "d3d10_1.dll" for row in rows) + + shell_rows = [row for row in rows if row[5] == "shell32.dll"] + assert len(shell_rows) == 1 + assert shell_rows[0][10] == "MEDIUM" + + ess_rows = [row for row in rows if row[5] == "esscli.dll"] + assert len(ess_rows) == 1 + assert ess_rows[0][10] == "MEDIUM" + + unknown_rows = [ + row + for row in rows + if row[5] == "kernelbase.dll" and row[6] == "GetUserDefaultLangID" + ] + assert unknown_rows == [] diff --git a/test/plugins/windows/test_data/apihooks/gap_ledger.md b/test/plugins/windows/test_data/apihooks/gap_ledger.md new file mode 100644 index 0000000000..c89dcddb88 --- /dev/null +++ b/test/plugins/windows/test_data/apihooks/gap_ledger.md @@ -0,0 +1,11 @@ +# ApiHooks Gap Ledger + +These report-backed observations are intentionally excluded from the counted +560-case suite because the current plugin does not detect them directly: + +- `SetWindowsHookEx` / GUI hook-chain style interception +- VEH and hardware-breakpoint based API interception +- IRP / driver-object major-function hooks + +They remain tracked here so future detector work can turn them into first-class +coverage without overloading the current plugin test suite with expected misses. diff --git a/test/plugins/windows/test_data/apihooks/inline_fixtures.json b/test/plugins/windows/test_data/apihooks/inline_fixtures.json new file mode 100644 index 0000000000..b9c99290b9 --- /dev/null +++ b/test/plugins/windows/test_data/apihooks/inline_fixtures.json @@ -0,0 +1,8 @@ +{ + "jmp_rel32_template": "e900000000", + "jmp_rel8_template": "eb00", + "jmp_indirect_rip_template": "ff25000000000000000000000000", + "jmp_indirect_rip_layer_template": "ff25100000009090909090909090", + "movabs_jmp_rax_template": "48b80000000000000000ffe0", + "clean_standard_prologue": "558bec83ec08" +} diff --git a/test/plugins/windows/test_data/apihooks/patch_fixtures.json b/test/plugins/windows/test_data/apihooks/patch_fixtures.json new file mode 100644 index 0000000000..760583c5ac --- /dev/null +++ b/test/plugins/windows/test_data/apihooks/patch_fixtures.json @@ -0,0 +1,10 @@ +{ + "early_ret": "c3", + "ret_imm": "c21400", + "xor_eax_ret": "31c0c3", + "sub_eax_ret": "29c0c3", + "mov_eax_zero_ret": "b800000000c3", + "mov_eax_hresult_ret": "b857000780c3", + "push0_pop_eax_ret": "6a0058c3", + "benign_near_miss": "558bec83ec08" +} diff --git a/test/plugins/windows/test_data/apihooks/pe_layouts.json b/test/plugins/windows/test_data/apihooks/pe_layouts.json new file mode 100644 index 0000000000..9ae7f7e775 --- /dev/null +++ b/test/plugins/windows/test_data/apihooks/pe_layouts.json @@ -0,0 +1,28 @@ +{ + "reasonable_directory_sets": [ + { + "size_of_image": 4096, + "directories": { + "export": {"virtual_address": 512, "size": 128}, + "import": {"virtual_address": 768, "size": 128}, + "delay_import": {"virtual_address": 1024, "size": 128} + } + }, + { + "size_of_image": 8192, + "directories": { + "export": {"virtual_address": 768, "size": 192}, + "import": {"virtual_address": 1536, "size": 256}, + "delay_import": {"virtual_address": 2048, "size": 128} + } + }, + { + "size_of_image": 4096, + "directories": { + "export": {"virtual_address": 512, "size": 128}, + "import": {"virtual_address": 20480, "size": 128}, + "delay_import": {"virtual_address": 768, "size": 20480} + } + } + ] +} diff --git a/volatility3/framework/plugins/windows/malware/apihooks.py b/volatility3/framework/plugins/windows/malware/apihooks.py new file mode 100644 index 0000000000..db9f0ff06b --- /dev/null +++ b/volatility3/framework/plugins/windows/malware/apihooks.py @@ -0,0 +1,2275 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the +# Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# +"""Production-grade API hook detection for Volatility 3. + +Detects inline, IAT, EAT, and SSDT hooks in Windows memory images with +multi-factor confidence scoring to suppress false positives. + +``--quick`` skips inline disassembly for faster triage. +Inline findings are buffered until the scan completes so cross-process +confidence stays stable instead of depending on process traversal order. +Default output is tuned to reduce NIST-style false positives: benign +Windows helper/forwarder behavior surfacing as suspicious. + +Soft dependencies (degrade gracefully if missing): + pip install capstone pefile +""" + +import fnmatch +import io +import logging +import os +from collections import OrderedDict +from typing import Dict, Iterator, List, Optional, Set, Tuple + +from volatility3.framework import ( + constants, + contexts, + exceptions, + interfaces, + renderers, + symbols, +) +from volatility3.framework.configuration import requirements +from volatility3.framework.objects import utility +from volatility3.framework.renderers import format_hints +from volatility3.framework.symbols import intermed +from volatility3.framework.symbols.windows.extensions import pe +from volatility3.plugins.windows import modules, pslist, vadinfo + +try: + import capstone + from capstone import x86_const + + HAS_CAPSTONE = True +except ImportError: + HAS_CAPSTONE = False + +try: + import pefile + + HAS_PEFILE = True +except ImportError: + HAS_PEFILE = False + +vollog = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Type aliases +# --------------------------------------------------------------------------- +ModuleMap = Dict[str, Tuple[int, int]] # {dll_name_lower: (base, end)} + + +# --------------------------------------------------------------------------- +# Inline hook detection +# --------------------------------------------------------------------------- + + +class InlineHookDetector: + """Detects inline hooks at function entry points using Capstone.""" + + # Pre-filter: first bytes that could start a hook + SUSPICIOUS_FIRST_BYTES: Set[int] = { + 0xE9, # JMP rel32 + 0xE8, # CALL rel32 + 0xEB, # JMP rel8 + 0xFF, # JMP/CALL indirect + 0x68, # PUSH imm32 + 0xEA, # JMP FAR (x86) + 0x48, # REX.W prefix (x64 mov/jmp patterns) + 0x49, # REX.WB prefix (x64 mov r11) + } + # MOV reg, imm: B8-BF for EAX-EDI (x86) and RAX-RDI (x64 with REX) + _MOV_REG_LOW = 0xB8 + _MOV_REG_HIGH = 0xBF + + def __init__(self, is_64bit: bool, layer=None) -> None: + if not HAS_CAPSTONE: + raise RuntimeError( + "capstone is required for inline hook detection: pip install capstone" + ) + if is_64bit: + self._md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) + else: + self._md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32) + self._md.detail = True + self.is_64bit = is_64bit + self.layer = layer + self._pointer_size = 8 if is_64bit else 4 + + def check_inline( + self, + func_bytes: bytes, + func_va: int, + mod_start: int, + mod_end: int, + ) -> Optional[Tuple[int, str, str]]: + """Check function bytes for an inline hook. + + Returns: + (hook_target_va, hook_type_str, disasm_str) or None if clean. + """ + if len(func_bytes) < 5: + return None + + # Fast path: eliminate clean prologues before disassembly + if self._is_standard_prologue(func_bytes): + return None + + # First-byte pre-filter + fb = func_bytes[0] + if fb not in self.SUSPICIOUS_FIRST_BYTES and not ( + self._MOV_REG_LOW <= fb <= self._MOV_REG_HIGH + ): + return None + + regs: Dict[int, int] = {} # {capstone_reg_id: value} + push_val: Optional[int] = None + + try: + insns = list(self._md.disasm(func_bytes, func_va)) + except (capstone.CsError, ValueError) as exc: + vollog.debug("Skipping inline disassembly at %#x: %s", func_va, exc) + return None + + for insn in insns[:4]: + # capstone 6: groups available directly; operands via insn.operands + groups = insn.groups + + # ── Unconditional branch (JMP) ───────────────────────────────── + if x86_const.X86_GRP_JUMP in groups: + target = self._resolve_branch(insn, regs, func_bytes, func_va) + if target is not None and not (mod_start <= target < mod_end): + return (target, self._classify_jmp(insn), f"{insn.mnemonic} {insn.op_str}") + + # ── Call (suspicious only as first instruction) ──────────────── + elif x86_const.X86_GRP_CALL in groups: + target = self._resolve_branch(insn, regs, func_bytes, func_va) + if target is not None and not (mod_start <= target < mod_end): + return (target, "CALL", f"{insn.mnemonic} {insn.op_str}") + push_val = None + + # ── Return — check PUSH+RET pattern ─────────────────────────── + elif x86_const.X86_GRP_RET in groups: + if push_val is not None and not (mod_start <= push_val < mod_end): + return (push_val, "PUSH+RET", "push addr; ret") + + # ── Track MOV reg, imm and PUSH imm ─────────────────────────── + else: + self._track_operands(insn, regs) + if insn.mnemonic.upper() == "PUSH": + ops = insn.operands + if ( + len(ops) == 1 + and ops[0].type == x86_const.X86_OP_IMM + and insn.size == 5 + ): + mask = ( + 0xFFFFFFFFFFFFFFFF if self.is_64bit else 0xFFFFFFFF + ) + push_val = ops[0].imm & mask + + return None + + # ── Helpers ────────────────────────────────────────────────────────────── + + def _resolve_branch( + self, + insn, + regs: Dict[int, int], + func_bytes: bytes, + func_va: int, + ) -> Optional[int]: + """Resolve a branch target. Falls back to tracked register values. + + Compatible with capstone 4/5/6 — accesses operands via insn.operands. + """ + try: + ops = insn.operands + except (AttributeError, capstone.CsError, IndexError, TypeError): + return None + for op in ops: + if op.type == x86_const.X86_OP_IMM: + return op.imm # Capstone computes the effective address + elif op.type == x86_const.X86_OP_MEM: + if op.mem.base == x86_const.X86_REG_RIP: + # RIP-relative: the target pointer lives at insn_end + disp. + slot_va = insn.address + insn.size + op.mem.disp + return self._dereference_pointer(slot_va, func_bytes, func_va) + elif op.mem.base == 0 and op.mem.index == 0: + return self._dereference_pointer( + op.mem.disp, func_bytes, func_va + ) + elif op.type == x86_const.X86_OP_REG: + return regs.get(op.reg) # MOV reg, imm tracked earlier + return None + + def _dereference_pointer( + self, slot_va: int, func_bytes: bytes, func_va: int + ) -> Optional[int]: + """Read the pointer stored at ``slot_va`` from the current process layer. + + If the slot falls inside the already-read function bytes, use that buffer + first. This keeps common in-prologue trampoline stubs testable even + without a live process layer. + """ + buf_end = func_va + len(func_bytes) + if func_va <= slot_va and slot_va + self._pointer_size <= buf_end: + start = slot_va - func_va + end = start + self._pointer_size + return int.from_bytes(func_bytes[start:end], "little") + + if self.layer is None: + return None + + try: + data = self.layer.read(slot_va, self._pointer_size) + except ( + exceptions.InvalidAddressException, + KeyError, + TypeError, + ValueError, + ): + return None + + if len(data) != self._pointer_size: + return None + + return int.from_bytes(data, "little") + + def _track_operands(self, insn, regs: Dict[int, int]) -> None: + """Record MOV/MOVABS reg, imm so register-indirect jumps can be resolved. + + Capstone 6 disassembles ``MOV RAX, imm64`` as ``movabs`` (AT&T style), + so we handle both mnemonics. + """ + try: + ops = insn.operands + except (AttributeError, capstone.CsError, IndexError, TypeError): + return + if ( + insn.mnemonic.upper() in ("MOV", "MOVABS") + and len(ops) == 2 + and ops[0].type == x86_const.X86_OP_REG + and ops[1].type == x86_const.X86_OP_IMM + ): + regs[ops[0].reg] = ops[1].imm + + def _is_standard_prologue(self, b: bytes) -> bool: + """Return True for well-known clean function prologues.""" + if not self.is_64bit: + # PUSH EBP; MOV EBP,ESP + if b[:3] == b"\x55\x8b\xec": + return True + # Hotpatch: MOV EDI,EDI; PUSH EBP; MOV EBP,ESP + if b[:5] == b"\x8b\xff\x55\x8b\xec": + return True + # PUSH ESI/EDI/EBX + MOV EBP,ESP + if b[0] in (0x53, 0x56, 0x57) and b[1:3] == b"\x8b\xec": + return True + else: + # MOV [RSP+8],RBX + if b[:5] == b"\x48\x89\x5c\x24\x08": + return True + # MOV [RSP+8],RCX (leaf) + if b[:5] == b"\x48\x89\x4c\x24\x08": + return True + # SUB RSP, imm8 (48 83 EC xx) + if b[:3] == b"\x48\x83\xec": + return True + # SUB RSP, imm32 (48 81 EC xx xx xx xx) — large stack frames + if b[:3] == b"\x48\x81\xec": + return True + # PUSH RBX (40 53) or PUSH R12 (41 54) etc. REX + PUSH + if b[:2] == b"\x40\x53": + return True + if b[0] in (0x41, 0x42, 0x43) and b[1] in (0x54, 0x55, 0x56, 0x57): + return True # REX.B + PUSH R12-R15 + # PUSH RBP + if b[0] == 0x55: + return True + # Syscall stub: MOV R10,RCX + if b[:3] == b"\x4c\x8b\xd1": + return True + # MOV [RSP+10h],RDX or MOV [RSP+8],R8 + if b[:4] in (b"\x48\x89\x54\x24", b"\x4c\x89\x44\x24"): + return True + # MOV [RSP+xx],RSI / RDI / R8 / R9 — common leaf function saves + if b[:3] in (b"\x48\x89\x74", b"\x48\x89\x7c", b"\x4c\x89\x4c", + b"\x4c\x89\x54", b"\x4c\x89\x44"): + return True + # PUSH R14 / PUSH R15 (41 56 / 41 57) + if b[:2] in (b"\x41\x56", b"\x41\x57", b"\x41\x54", b"\x41\x55"): + return True + return False + + @staticmethod + def _classify_jmp(insn) -> str: + b = insn.bytes + if b[0] == 0xE9: + return "JMP_REL32" + if b[0] == 0xEB: + return "JMP_REL8" + if len(b) >= 2 and b[0] == 0xFF and b[1] == 0x25: + return "JMP_INDIRECT" + if b[0] == 0xEA: + return "JMP_FAR" + # Register-indirect JMP: opcode 0xFF, ModRM in 0xE0-0xE7 (FF /4, mod=11) + # e.g. JMP RAX = FF E0, JMP R11 = 41 FF E3 (REX.B extends rm field) + for i in range(len(b) - 1): + if b[i] == 0xFF and (b[i + 1] & 0xF8) == 0xE0: + return "JMP_REG64" + return "JMP_OTHER" + + def check_patch( + self, func_bytes: bytes + ) -> Optional[Tuple[str, str]]: + """Detect static byte-patch bypasses that don't redirect execution flow. + + These patterns (early RET, XOR+RET, MOV EAX+RET) don't contain a branch + to an external address so ``check_inline`` cannot catch them. They are + characteristic of AMSI and ETW bypass techniques. + + Should be called only for functions whose names are in + ``HookScorer.HIGH_VALUE_TARGETS`` to avoid FPs on legitimate empty stubs. + + Returns: + (patch_type_str, disasm_str) or None if clean. + """ + if len(func_bytes) < 1: + return None + + b = func_bytes + + # ── Single-byte RET — ETW bypass (EtwEventWrite → C3) ────────────── + if b[0] == 0xC3: + return ("EARLY_RET", "ret") + + # ── RET imm16 — e.g. C2 14 00 = "ret 20" ETW bypass ──────────────── + if b[0] == 0xC2 and len(b) >= 3: + imm = int.from_bytes(b[1:3], "little") + return ("RET_IMM", f"ret {imm:#x}") + + # ── XOR EAX,EAX; RET — AMSI bypass returning S_OK ────────────────── + try: + insns = list(self._md.disasm(func_bytes[:16], 0)) + except (capstone.CsError, ValueError) as exc: + vollog.debug("Skipping semantic patch analysis: %s", exc) + insns = [] + + if len(insns) >= 2 and self._is_ret_instruction(insns[1]): + patch_type = self._match_zeroing_patch(insns[0]) + if patch_type: + return ( + patch_type, + f"{self._format_instruction(insns[0])}; " + f"{self._format_instruction(insns[1])}", + ) + + patch_type = self._match_immediate_return_patch(insns[0]) + if patch_type: + return ( + patch_type, + f"{self._format_instruction(insns[0])}; " + f"{self._format_instruction(insns[1])}", + ) + + if len(insns) >= 3 and self._is_ret_instruction(insns[2]): + patch_type = self._match_push_zero_pop_patch(insns[0], insns[1]) + if patch_type: + return ( + patch_type, + f"{self._format_instruction(insns[0])}; " + f"{self._format_instruction(insns[1])}; " + f"{self._format_instruction(insns[2])}", + ) + + if b[:3] == b"\x31\xc0\xc3": + return ("XOR_EAX_RET", "xor eax, eax; ret") + + # ── XOR ECX,ECX; RET ──────────────────────────────────────────────── + if b[:3] == b"\x31\xc9\xc3": + return ("XOR_ECX_RET", "xor ecx, ecx; ret") + + # ── MOV EAX, imm32; RET — AMSI bypass returning specific HRESULT ──── + # e.g. B8 57 00 07 80 C3 → return E_INVALIDARG (0x80070057) + if b[0] == 0xB8 and len(b) >= 6 and b[5] == 0xC3: + imm = int.from_bytes(b[1:5], "little") + return ("MOV_EAX_RET", f"mov eax, {imm:#010x}; ret") + + return None + + @staticmethod + def _format_instruction(insn) -> str: + return f"{insn.mnemonic} {insn.op_str}".strip() + + @staticmethod + def _is_ret_instruction(insn) -> bool: + return insn.mnemonic.lower() == "ret" + + @staticmethod + def _normalize_patch_register(reg_name: str) -> Optional[str]: + reg_name = reg_name.lower() + if reg_name in ("eax", "rax"): + return "EAX" + if reg_name in ("ecx", "rcx"): + return "ECX" + return None + + def _match_zeroing_patch(self, insn) -> Optional[str]: + try: + ops = insn.operands + except (AttributeError, capstone.CsError, IndexError, TypeError): + return None + + if len(ops) != 2 or ops[0].type != x86_const.X86_OP_REG: + return None + + reg_name = self._normalize_patch_register(insn.reg_name(ops[0].reg)) + if reg_name is None: + return None + + mnemonic = insn.mnemonic.lower() + if mnemonic in ("xor", "sub") and ops[1].type == x86_const.X86_OP_REG: + other_reg = self._normalize_patch_register(insn.reg_name(ops[1].reg)) + if other_reg == reg_name: + return f"{mnemonic.upper()}_{reg_name}_RET" + + if mnemonic == "mov" and ops[1].type == x86_const.X86_OP_IMM and ops[1].imm == 0: + return f"MOV_{reg_name}_RET" + + return None + + def _match_immediate_return_patch(self, insn) -> Optional[str]: + try: + ops = insn.operands + except (AttributeError, capstone.CsError, IndexError, TypeError): + return None + + if ( + insn.mnemonic.lower() not in ("mov", "movabs") + or len(ops) != 2 + or ops[0].type != x86_const.X86_OP_REG + or ops[1].type != x86_const.X86_OP_IMM + ): + return None + + reg_name = self._normalize_patch_register(insn.reg_name(ops[0].reg)) + if reg_name != "EAX": + return None + + return f"MOV_{reg_name}_RET" + + def _match_push_zero_pop_patch(self, push_insn, pop_insn) -> Optional[str]: + try: + push_ops = push_insn.operands + pop_ops = pop_insn.operands + except (AttributeError, capstone.CsError, IndexError, TypeError): + return None + + if ( + push_insn.mnemonic.lower() != "push" + or len(push_ops) != 1 + or push_ops[0].type != x86_const.X86_OP_IMM + or push_ops[0].imm != 0 + ): + return None + + if ( + pop_insn.mnemonic.lower() != "pop" + or len(pop_ops) != 1 + or pop_ops[0].type != x86_const.X86_OP_REG + ): + return None + + reg_name = self._normalize_patch_register(pop_insn.reg_name(pop_ops[0].reg)) + if reg_name is None: + return None + + return f"PUSH0_POP_{reg_name}_RET" + + +# --------------------------------------------------------------------------- +# IAT hook detection +# --------------------------------------------------------------------------- + + +class IATHookDetector: + """Detects Import Address Table hooks, handling API-set forwarding chains.""" + + # Hardcoded API-set prefix → real DLL mappings (Windows 10+) + _API_SET_MAP: Dict[str, List[str]] = { + "api-ms-win-core-processthreads": ["kernel32.dll", "kernelbase.dll"], + "api-ms-win-core-heap": ["kernelbase.dll"], + "api-ms-win-core-memory": ["kernelbase.dll"], + "api-ms-win-core-synch": ["kernel32.dll", "kernelbase.dll"], + "api-ms-win-core-file": ["kernelbase.dll"], + "api-ms-win-core-libraryloader": ["kernelbase.dll"], + "api-ms-win-core-errorhandling": ["kernelbase.dll"], + "api-ms-win-core-string": ["kernelbase.dll"], + "api-ms-win-core-sysinfo": ["kernelbase.dll"], + "api-ms-win-core-handle": ["kernelbase.dll"], + "api-ms-win-core-io": ["kernelbase.dll"], + "api-ms-win-core-rtlsupport": ["ntdll.dll"], + "api-ms-win-core-localization": ["kernelbase.dll"], + "api-ms-win-security-base": ["kernelbase.dll"], + "api-ms-win-core-debug": ["kernelbase.dll"], + "api-ms-win-core-console": ["kernel32.dll", "kernelbase.dll"], + "api-ms-win-core-datetime": ["kernelbase.dll"], + "api-ms-win-core-fibers": ["kernel32.dll", "kernelbase.dll"], + "api-ms-win-core-interlocked": ["kernelbase.dll"], + "api-ms-win-core-namedpipe": ["kernelbase.dll"], + "api-ms-win-core-profile": ["kernelbase.dll"], + "api-ms-win-core-registry": ["kernelbase.dll"], + "api-ms-win-core-comm": ["kernelbase.dll"], + "api-ms-win-core-timezone": ["kernelbase.dll"], + "api-ms-win-core-path": ["kernelbase.dll"], + "api-ms-win-core-processenv": ["kernelbase.dll"], + "api-ms-win-core-processenvironment": ["kernelbase.dll"], + "ext-ms-win-ntuser": ["user32.dll"], + "ext-ms-win-kernel32": ["kernel32.dll"], + } + + # Known forwarding chains: importing DLL → set of valid target DLLs + _KNOWN_CHAINS: Dict[str, Set[str]] = { + "kernel32.dll": {"kernel32.dll", "kernelbase.dll", "ntdll.dll"}, + "advapi32.dll": { + "advapi32.dll", + "kernelbase.dll", + "ntdll.dll", + "sechost.dll", + }, + "user32.dll": {"user32.dll", "win32u.dll", "ntdll.dll"}, + "gdi32.dll": {"gdi32.dll", "gdi32full.dll", "ntdll.dll"}, + "ws2_32.dll": {"ws2_32.dll", "mswsock.dll", "ntdll.dll"}, + "crypt32.dll": { + "crypt32.dll", + "bcrypt.dll", + "ncrypt.dll", + "kernelbase.dll", + }, + "secur32.dll": {"secur32.dll", "sspicli.dll", "kernelbase.dll"}, + "shell32.dll": {"shell32.dll", "kernelbase.dll", "shlwapi.dll"}, + "ole32.dll": {"ole32.dll", "combase.dll", "rpcrt4.dll"}, + "msvcrt.dll": {"msvcrt.dll", "ntdll.dll", "vcruntime140.dll"}, + "kernelbase.dll": {"kernelbase.dll", "ntdll.dll"}, + "ntdll.dll": {"ntdll.dll"}, + "sechost.dll": {"sechost.dll", "ntdll.dll", "kernelbase.dll"}, + "combase.dll": {"combase.dll", "rpcrt4.dll", "kernelbase.dll", "ntdll.dll"}, + "bcrypt.dll": {"bcrypt.dll", "bcryptprimitives.dll"}, + "ncrypt.dll": {"ncrypt.dll", "bcrypt.dll"}, + "rpcrt4.dll": {"rpcrt4.dll", "ntdll.dll"}, + } + + def __init__(self, module_map: ModuleMap, proc_layer, is_64bit: bool) -> None: + self.module_map = module_map + self.proc_layer = proc_layer + self._pointer_size = 8 if is_64bit else 4 + self._chain_cache: Dict[str, Set[str]] = {} + + def resolve_api_set(self, dll_name: str) -> List[str]: + """Map api-ms-win-* virtual DLL names to real implementation DLLs.""" + key = dll_name.lower().removesuffix(".dll") + for prefix, targets in self._API_SET_MAP.items(): + if key.startswith(prefix): + return targets + if key.startswith("api-ms-") or key.startswith("ext-ms-"): + return ["kernelbase.dll"] # Safe fallback + return [dll_name.lower()] + + def get_valid_targets(self, import_dll: str) -> Set[str]: + """Return all DLL names that an IAT entry could legitimately resolve to.""" + key = import_dll.lower() + if key in self._chain_cache: + return self._chain_cache[key] + + resolved: Set[str] = set() + for real_dll in self.resolve_api_set(import_dll): + rl = real_dll.lower() + resolved.add(rl) + if rl in self._KNOWN_CHAINS: + resolved.update(self._KNOWN_CHAINS[rl]) + + self._chain_cache[key] = resolved + return resolved + + def check_iat(self, pe_obj, module_base: int) -> List[dict]: + """Return a list of hook findings for all IAT entries in pe_obj.""" + hooks: List[dict] = [] + + for entry in self._iter_import_entries(pe_obj): + import_dll = self._decode_name(entry.dll) + valid_targets = self.get_valid_targets(import_dll) + + for imp in entry.imports: + if imp.address is None: + continue + resolved_addr = self._resolve_import_target(pe_obj, module_base, imp) + if resolved_addr is None: + continue + + # Validate: does the resolved address fall in any valid DLL? + is_legit = False + for tgt_dll in valid_targets: + if tgt_dll in self.module_map: + base, end = self.module_map[tgt_dll] + if base <= resolved_addr < end: + is_legit = True + break + + if not is_legit: + owner = self.find_owner(resolved_addr) + func_name = ( + imp.name.decode("utf-8", errors="replace") + if imp.name + else f"Ordinal#{imp.ordinal}" + ) + hooks.append( + { + "type": "IAT", + "import_dll": import_dll, + "function": func_name, + "resolved_addr": resolved_addr, + "target_module": owner, + } + ) + return hooks + + @staticmethod + def _decode_name(value) -> str: + if isinstance(value, bytes): + return value.decode("utf-8", errors="replace") + return str(value) + + def _iter_import_entries(self, pe_obj): + for attr in ("DIRECTORY_ENTRY_IMPORT", "DIRECTORY_ENTRY_DELAY_IMPORT"): + entries = getattr(pe_obj, attr, None) + if not entries: + continue + for entry in entries: + if getattr(entry, "dll", None) and getattr(entry, "imports", None): + yield entry + + def _resolve_import_target(self, pe_obj, module_base: int, imp) -> Optional[int]: + """Read the live target pointer stored in the loaded import thunk.""" + try: + image_base = int(pe_obj.OPTIONAL_HEADER.ImageBase) + thunk_slot_rva = int(imp.address) - image_base + except (AttributeError, TypeError, ValueError): + return None + + if thunk_slot_rva < 0: + return None + + thunk_slot_va = module_base + thunk_slot_rva + try: + data = self.proc_layer.read(thunk_slot_va, self._pointer_size) + except ( + exceptions.InvalidAddressException, + KeyError, + TypeError, + ValueError, + ): + vollog.debug("Skipping unreadable import thunk at %#x", thunk_slot_va) + return None + + if len(data) != self._pointer_size: + return None + + return int.from_bytes(data, "little") + + def find_owner(self, addr: int) -> str: + """Return the DLL name that owns addr, or ''.""" + for name, (base, end) in self.module_map.items(): + if base <= addr < end: + return name + return "" + + +# --------------------------------------------------------------------------- +# EAT hook detection +# --------------------------------------------------------------------------- + + +class EATHookDetector: + """Detects Export Address Table hooks by RVA bounds checking.""" + + def check_eat( + self, pe_obj, module_base: int, module_size: int + ) -> List[dict]: + """Return a list of EAT hook findings. + + A forwarded export (RVA inside the export directory) is legitimate. + An export whose RVA falls within SizeOfImage is legitimate. + An RVA outside the image is an EAT hook. + """ + hooks: List[dict] = [] + export_entry = getattr(pe_obj, "DIRECTORY_ENTRY_EXPORT", None) + if not export_entry or getattr(export_entry, "symbols", None) is None: + return hooks + + # Export directory virtual address range + export_dir = pe_obj.OPTIONAL_HEADER.DATA_DIRECTORY[0] + export_start = export_dir.VirtualAddress + export_end = export_start + export_dir.Size + + for exp in export_entry.symbols: + if exp.address is None or exp.address == 0: + continue + + rva = exp.address + func_name = ( + exp.name.decode("utf-8", errors="replace") + if exp.name + else f"Ordinal#{exp.ordinal}" + ) + + # Case 1: forwarded string (RVA inside export directory) → clean + if export_start <= rva < export_end: + continue + + # Case 2: normal export within image bounds → clean + if rva < module_size: + continue + + # Case 3: RVA outside image → EAT hook + hooks.append( + { + "type": "EAT", + "function": func_name, + "rva": rva, + "absolute_addr": module_base + rva, + } + ) + return hooks + + +# --------------------------------------------------------------------------- +# SSDT hook detection +# --------------------------------------------------------------------------- + + +class SSDTHookDetector: + """Detects System Service Descriptor Table hooks.""" + + # Kernel module name prefixes considered legitimate SSDT owners + _NTOS_PREFIXES = {"ntoskrnl", "ntkrnlpa", "ntkrnlmp", "ntkrnl"} + + def check_ssdt( + self, + context: interfaces.context.ContextInterface, + kernel_module_name: str, + module_collection: contexts.ModuleCollection, + ) -> List[dict]: + """Walk the NT SSDT and return entries pointing outside ntoskrnl.""" + kernel = context.modules[kernel_module_name] + layer_name = kernel.layer_name + is_64bit = symbols.symbol_table_is_64bit( + context=context, symbol_table_name=kernel.symbol_table_name + ) + + try: + svc_table_addr = kernel.get_symbol("KiServiceTable").address + svc_limit_addr = kernel.get_symbol("KiServiceLimit").address + except (AttributeError, exceptions.SymbolError, TypeError) as exc: + vollog.warning("Cannot locate SSDT symbols: %s", exc) + return [] + + kvo = kernel.offset + try: + num_services = kernel.object( + object_type="int", offset=svc_limit_addr + ) + except exceptions.InvalidAddressException: + vollog.warning("Cannot read KiServiceLimit") + return [] + + num_services = min(int(num_services), 1024) + + if is_64bit: + array_subtype = "long" + + def decode_addr(raw: int) -> int: + return kvo + svc_table_addr + (raw >> 4) + + else: + array_subtype = "unsigned long" + + def decode_addr(raw: int) -> int: # type: ignore[misc] + return raw + + try: + functions = kernel.object( + object_type="array", + offset=svc_table_addr, + subtype=kernel.get_type(array_subtype), + count=num_services, + ) + except exceptions.InvalidAddressException: + vollog.warning("Cannot read KiServiceTable") + return [] + + hooks: List[dict] = [] + for idx, func_obj in enumerate(functions): + try: + func_addr = decode_addr(int(func_obj)) + except (TypeError, ValueError): + continue + + mod_syms = module_collection.get_module_symbols_by_absolute_location( + func_addr + ) + for mod_name, _sym_gen in mod_syms: + base_name = os.path.splitext(mod_name.lower())[0] + if not any( + base_name.startswith(p) for p in self._NTOS_PREFIXES + ): + hooks.append( + { + "type": "SSDT", + "syscall_index": idx, + "func_addr": func_addr, + "owner": mod_name, + } + ) + break # Only need the first module match + else: + # Address resolves to no known module + hooks.append( + { + "type": "SSDT", + "syscall_index": idx, + "func_addr": func_addr, + "owner": "", + } + ) + + return hooks + + +# --------------------------------------------------------------------------- +# Confidence scoring +# --------------------------------------------------------------------------- + + +class HookScorer: + """Multi-factor confidence scorer — maps detections to HIGH/MEDIUM/LOW.""" + + # Known EDR/AV module name fragments + _SECURITY_MODULES: Set[str] = { + "cymemdef", "cymemdefa", "cymemdef64", # Cylance + "sentinel", "sentinelone", # SentinelOne + "crowdstrike", "csagent", "csgagent", # CrowdStrike + "bdnc", "atcuf", "bdfndisf", # BitDefender + "hmpalert", "hmpalerta", # HitmanPro/Sophos + "pchook", "pcaui", "cortex", # Palo Alto + "eamonm", "ekrn", "edevmon", # ESET + "mpc", "mpoav", "msmpeng", "wdfilter", # Windows Defender + "mbam", "mbamcore", "mbamswissarmy", # Malwarebytes + "avast", "aswhooka", "aswsp", # Avast + "njeeves", "snac", "symnets", # Norton/Symantec + "avgntflt", "avgtpx64", "avguard", # AVG/Avira + "tbmon", "tmebc", "tmcomm", # Trend Micro + "klflt", "klif", "kl1", "klam", # Kaspersky + "mfehidk", "mfewfpk", "mfefirek", # McAfee + "sfc_os", "sfc", # Windows SFC + } + + # System DLLs that legitimately receive forwarded calls. + # Any hook whose TARGET falls here is almost certainly a benign forward. + _SYSTEM_DLLS: Set[str] = { + # Core runtime + "ntdll.dll", "kernel32.dll", "kernelbase.dll", + "msvcrt.dll", "ucrtbase.dll", + # UI / graphics + "user32.dll", "gdi32.dll", "gdi32full.dll", "win32u.dll", + "uxtheme.dll", "dwmapi.dll", "imm32.dll", "comctl32.dll", + "comdlg32.dll", "winmm.dll", "mmsystem.dll", + # DirectX / OpenGL + "dxgi.dll", "d3d11.dll", "d3d12.dll", "opengl32.dll", + "d3d9.dll", + # COM / OLE + "advapi32.dll", "ole32.dll", "oleaut32.dll", "combase.dll", + "rpcrt4.dll", "shlwapi.dll", "shell32.dll", + # Security / crypto + "sechost.dll", "sspicli.dll", "cryptbase.dll", + "bcrypt.dll", "bcryptprimitives.dll", "ncrypt.dll", + "crypt32.dll", "wintrust.dll", + # Network + "ws2_32.dll", "mswsock.dll", "winhttp.dll", "wininet.dll", + "iphlpapi.dll", "dnsapi.dll", "wldap32.dll", + # Misc system + "cfgmgr32.dll", "setupapi.dll", "secur32.dll", + "psapi.dll", "version.dll", "userenv.dll", + "imagehlp.dll", + } + + # WoW64 translation layer — 32-bit ntdll stubs legitimately branch here. + # Needs a stronger penalty than _SYSTEM_DLLS because HIGH_VALUE_TARGET (+25) + # would otherwise survive the -35 to reach MEDIUM in every WoW64 process. + _WOW64_DLLS: Set[str] = {"wow64cpu.dll", "wow64.dll", "wow64win.dll"} + + # AppCompat / Shim DLLs + _SHIM_DLLS: Set[str] = { + "acgenral.dll", "aclayers.dll", "acspecfc.dll", + "apphelp.dll", "shimeng.dll", "acwow64.dll", + } + + # C-runtime DLL name fragments + _CRT_FRAGMENTS: Set[str] = { + "msvcr", "msvcp", "vcruntime", "ucrtbase", "msvcp_win", + } + + # .NET CLR fragments + _CLR_FRAGMENTS: Set[str] = {"clr.dll", "clrjit", "mscorjit", "coreclr"} + + # High-value targets: hooks here strongly suggest defense evasion + HIGH_VALUE_TARGETS: Set[str] = { + "AmsiScanBuffer", "AmsiScanString", "AmsiInitialize", + "EtwEventWrite", "EtwEventWriteFull", "NtTraceEvent", + "NtProtectVirtualMemory", "NtWriteVirtualMemory", + "NtAllocateVirtualMemory", "NtMapViewOfSection", + "NtCreateThreadEx", "NtQueueApcThread", + "NtOpenProcess", "NtSetContextThread", + "LdrLoadDll", "LdrGetProcedureAddress", + "RtlDispatchException", "KiUserExceptionDispatcher", + } + + def score( + self, + hook: dict, + module_map: ModuleMap, + cross_process_count: int = 1, + ) -> int: + """Return a 0–100 suspicion score (higher = more suspicious). + + Args: + hook: dict of hook metadata (type, function, target_module, etc.) + module_map: address→DLL map for the process + cross_process_count: how many distinct processes have the same + (dll, func, target) triple. Hooks seen in >5 processes are + almost certainly legitimate (shared mapping, not a private patch). + """ + s = 50 # Neutral baseline + + target = hook.get("target_module", "").lower() + hook_type = hook.get("type", "") + function = hook.get("function", "") + source = hook.get("source_module", "").lower() + process = hook.get("process", "").lower() + benign_pair_match = bool(hook.get("benign_pair_match", False)) + target_vad_evidence = str(hook.get("target_vad_evidence", "") or "") + backed_exec_image = bool( + hook.get("backed_exec_image", False) + or target_vad_evidence == "mapped_exec_image" + ) + private_exec_vad = bool( + hook.get("private_exec_vad", False) + or target_vad_evidence == "private_exec_vad" + ) + + if benign_pair_match and backed_exec_image: + return 0 + + # ── Strong suspicious indicators ─────────────────────────────────── + if target in ("", "", ""): + s += 40 # Target in anonymous/unbacked memory + if function in self.HIGH_VALUE_TARGETS: + s += 25 # High-value defensive function + if hook_type == "SSDT": + s += 20 # PatchGuard-protected on x64 + if private_exec_vad: + s += 30 # Target backed by private executable memory + if ( + hook_type == "INLINE" + and function in self.HIGH_VALUE_TARGETS + and hook.get("suspicious_backed_target", False) + ): + s += 30 # High-value target redirected into a suspicious mapped region + + # ── Strong benign indicators ─────────────────────────────────────── + for sec in self._SECURITY_MODULES: + if sec in target: + s -= 45 # Known security product hook + break + + if target in self._WOW64_DLLS: + s -= 50 # WoW64 translation layer — all ntdll stubs branch here + + if target in self._SYSTEM_DLLS: + s -= 35 # Legitimate forwarded import + + if target in self._SHIM_DLLS: + s -= 40 # AppCompat shim + + for frag in self._CRT_FRAGMENTS: + if frag in target: + s -= 35 # C-runtime self-patching + break + + if source and source == target: + s -= 50 # Module hooking itself — Windows 7 ETW IAT stubs and + # similar loader trampolines resolve back into the importing + # DLL/EXE itself. Strong penalty to push HIGH_VALUE+self + # (50+25-50=25) below the LOW/MEDIUM boundary. + + # ── Moderate indicators ──────────────────────────────────────────── + for frag in self._CLR_FRAGMENTS: + if frag in target: + s -= 25 # .NET JIT self-patching + break + + if "chrome_elf" in target or "nss3" in target or "xul" in target: + s -= 40 # Chrome/Firefox sandbox hook + + if "pythonservice" in process or "python" in process: + s -= 15 # Python instrumenting itself + + if "mscorsvw" in process or "ngen" in process: + s -= 20 # .NET NGEN pre-compilation + + # ── Cross-process deduplication ──────────────────────────────────── + # A hook seen in the same (dll, func, target) in many processes is a + # shared-mapped page, not a private patch. This directly addresses + # the "ntdll!LdrLoadDll same disasm across all processes" FP class. + if backed_exec_image: + if cross_process_count >= 10: + s -= 40 # Seen in 10+ processes — almost certainly benign + elif cross_process_count >= 5: + s -= 25 # Seen in 5-9 processes — probably benign + elif cross_process_count >= 3 and function not in self.HIGH_VALUE_TARGETS: + s -= 20 # Shared backed image mapping + + if ( + target in ("", "", "") + and function not in self.HIGH_VALUE_TARGETS + and not private_exec_vad + ): + s = min( + s, 39 + ) # Suppress weak unknown targets unless stronger evidence exists + + return max(0, min(100, s)) + + @staticmethod + def label(score: int) -> str: + if score >= 70: + return "HIGH" + if score >= 40: + return "MEDIUM" + return "LOW" + + +# --------------------------------------------------------------------------- +# Module PE cache (avoid re-parsing the same DLL across 100+ processes) +# --------------------------------------------------------------------------- + + +class ModuleCache: + """Cache parsed PE export/section data keyed by (name, TimeDateStamp).""" + + def __init__(self, max_entries: int = 256) -> None: + self._cache: "OrderedDict[Tuple[str, int], dict]" = OrderedDict() + self._max_entries = max(1, int(max_entries)) + + def get_or_parse(self, dll_name: str, pe_obj) -> dict: + key = (dll_name.lower(), int(pe_obj.FILE_HEADER.TimeDateStamp)) + if key in self._cache: + data = self._cache.pop(key) + self._cache[key] = data + return data + + data: dict = { + "exports": {}, # {func_name: rva} + "export_dir_range": None, + "sections": [], + "image_base": int(pe_obj.OPTIONAL_HEADER.ImageBase), + "size_of_image": int(pe_obj.OPTIONAL_HEADER.SizeOfImage), + } + + export_entry = getattr(pe_obj, "DIRECTORY_ENTRY_EXPORT", None) + if export_entry and getattr(export_entry, "symbols", None) is not None: + try: + edir = pe_obj.OPTIONAL_HEADER.DATA_DIRECTORY[0] + export_start = int(edir.VirtualAddress) + export_size = int(edir.Size) + except (AttributeError, IndexError, TypeError, ValueError): + export_start = 0 + export_size = 0 + + if ( + export_size > 0 + and export_start >= 0 + and export_start + export_size <= data["size_of_image"] + ): + data["export_dir_range"] = ( + export_start, + export_start + export_size, + ) + elif export_size: + vollog.debug("Skipping unreasonable export directory for %s", dll_name) + + for exp in export_entry.symbols: + if exp.name and exp.address: + try: + name = exp.name.decode("utf-8", errors="replace") + data["exports"][name] = int(exp.address) + except ( + AttributeError, + TypeError, + UnicodeDecodeError, + ValueError, + OverflowError, + ): + continue + + for sec in pe_obj.sections: + try: + if not (int(sec.Characteristics) & 0x20000000): + continue + data["sections"].append( + { + "name": sec.Name.decode("utf-8", errors="replace").rstrip("\x00"), + "rva": int(sec.VirtualAddress), + "vsize": int(sec.Misc_VirtualSize), + } + ) + except ( + AttributeError, + TypeError, + UnicodeDecodeError, + ValueError, + OverflowError, + ): + continue + + self._cache[key] = data + if len(self._cache) > self._max_entries: + evicted_key, _evicted_value = self._cache.popitem(last=False) + vollog.debug("Evicted module cache entry for %s", evicted_key[0]) + return data + + def __len__(self) -> int: + return len(self._cache) + + +# --------------------------------------------------------------------------- +# Batch page-aligned prologue reader +# --------------------------------------------------------------------------- + + +def batch_read_prologues( + proc_layer, + exports: Dict[str, int], + module_base: int, + prologue_size: int = 32, +) -> Dict[str, bytes]: + """Read the first ``prologue_size`` bytes of each exported function. + + Uses page-aligned batch reads to minimise layer round-trips. + """ + page_cache: Dict[int, bytes] = {} + results: Dict[str, bytes] = {} + + for name, rva in exports.items(): + va = module_base + rva + page_addr = va & ~0xFFF + + if page_addr not in page_cache: + try: + page_cache[page_addr] = proc_layer.read(page_addr, 0x1000, pad=True) + except ( + exceptions.InvalidAddressException, + KeyError, + TypeError, + ValueError, + ): + continue + + page_data = page_cache[page_addr] + page_offset = va - page_addr + end_offset = page_offset + prologue_size + + if end_offset <= len(page_data): + results[name] = page_data[page_offset:end_offset] + elif page_offset < len(page_data): + # Function straddles a page boundary — fetch the next page too + next_page_addr = page_addr + 0x1000 + if next_page_addr not in page_cache: + try: + page_cache[next_page_addr] = proc_layer.read( + next_page_addr, 0x1000, pad=True + ) + except ( + exceptions.InvalidAddressException, + KeyError, + TypeError, + ValueError, + ): + results[name] = page_data[page_offset:] + continue + combined = page_data + page_cache[next_page_addr] + results[name] = combined[page_offset:end_offset] + + return results + + +# --------------------------------------------------------------------------- +# Main plugin +# --------------------------------------------------------------------------- + + +class ApiHooks(interfaces.plugins.PluginInterface): + """Detects inline, IAT, EAT, and SSDT API hooks in a Windows memory image. + + Requires capstone and pefile: + pip install capstone pefile + + ``--quick`` skips inline disassembly and keeps the cheaper IAT/EAT/SSDT + detectors enabled for faster triage. + Inline findings are buffered until the scan completes so cross-process + confidence is deterministic rather than traversal-order dependent. + ``--low`` is an analyst exhaust mode and intentionally includes benign + helper/forwarder patterns that default output suppresses. + + Confidence levels: + HIGH — strong malware indicator (unbacked memory, AMSI/ETW bypass) + MEDIUM — requires analyst review + LOW — almost certainly benign (security product, forwarded import) + """ + + _required_framework_version = (2, 0, 0) + _version = (1, 1, 0) + _MAX_RECONSTRUCTED_PE_SIZE = 0x10000000 + _PE_DIRECTORY_NAMES = ( + "IMAGE_DIRECTORY_ENTRY_EXPORT", + "IMAGE_DIRECTORY_ENTRY_IMPORT", + "IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT", + ) + _BENIGN_IAT_PAIRS: Dict[Tuple[str, str], Set[str]] = { + ("umpo.dll", "pcwum.dll"): { + "PerfDeleteInstance", + "PerfCreateInstance", + "PerfSetULongCounterValue", + "PerfStartProviderEx", + "PerfSetCounterSetInfo", + "PerfStopProvider", + }, + ("netcfgx.dll", "devrtl.dll"): { + "SetupGetThreadLogToken", + "SetupWriteTextLog", + }, + ("nci.dll", "devrtl.dll"): { + "SetupGetThreadLogToken", + "SetupWriteTextLog", + }, + ("gpsvc.dll", "logoncli.dll"): {"DsEnumerateDomainTrustsW"}, + ("gpsvc.dll", "srvcli.dll"): {"NetFileEnum", "NetFileClose"}, + ("schedsvc.dll", "wkscli.dll"): {"NetGetJoinInformation"}, + ("schedsvc.dll", "netutils.dll"): {"NetApiBufferFree"}, + ("fveapi.dll", "logoncli.dll"): {"DsGetDcNameW"}, + ("fveapi.dll", "netutils.dll"): {"NetApiBufferFree"}, + } + _BENIGN_INLINE_PAIRS: Dict[Tuple[str, str], Set[str]] = { + ("d3d10_1.dll", "d3d10_1core.dll"): { + "D3D10GetVersion", + "D3D10RegisterLayers", + }, + ("firewallapi.dll", "wfapigp.dll"): {"FWGPLock"}, + } + _UNKNOWN_OWNERS = {"", "", ""} + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", + component=pslist.PsList, + version=(3, 0, 0), + ), + requirements.VersionRequirement( + name="modules", + component=modules.Modules, + version=(3, 0, 0), + ), + requirements.ListRequirement( + name="pid", + element_type=int, + description="Filter to these process IDs (default: all)", + optional=True, + ), + requirements.BooleanRequirement( + name="ssdt", + description="Include SSDT hook scan (default: true)", + optional=True, + default=True, + ), + requirements.BooleanRequirement( + name="skip-kernel", + description="Skip kernel-mode (SSDT) analysis", + optional=True, + default=False, + ), + requirements.BooleanRequirement( + name="low", + description="Also emit LOW-confidence findings (noisy analyst exhaust mode)", + optional=True, + default=False, + ), + requirements.BooleanRequirement( + name="quick", + description="Skip inline disassembly for faster triage", + optional=True, + default=False, + ), + ] + + # ── Internal helpers ─────────────────────────────────────────────────── + + @staticmethod + def _build_module_map(proc) -> Tuple[ModuleMap, List[Tuple[str, int, int]]]: + """Return (module_map, module_list) for a process. + + module_map: {dll_name_lower: (base, end)} + module_list: [(dll_name_lower, base, size), ...] + """ + mod_map: ModuleMap = {} + mod_list: List[Tuple[str, int, int]] = [] + for entry in proc.load_order_modules(): + try: + base = int(entry.DllBase) + size = int(entry.SizeOfImage) + name = entry.BaseDllName.get_string().lower() + if name and size: + mod_map[name] = (base, base + size) + mod_list.append((name, base, size)) + except exceptions.InvalidAddressException: + continue + return mod_map, mod_list + + @staticmethod + def _matches_function_pattern(function_name: str, patterns: Set[str]) -> bool: + for pattern in patterns: + if fnmatch.fnmatchcase(function_name, pattern): + return True + return False + + @classmethod + def _is_benign_pair_match( + cls, hook_type: str, source_module: str, target_module: str, function_name: str + ) -> bool: + if not source_module or not target_module or not function_name: + return False + + normalized_target = target_module.lower() + if normalized_target in cls._UNKNOWN_OWNERS: + return False + + if hook_type == "IAT": + baseline = cls._BENIGN_IAT_PAIRS + elif hook_type == "INLINE": + baseline = cls._BENIGN_INLINE_PAIRS + else: + return False + + patterns = baseline.get((source_module.lower(), normalized_target)) + if not patterns: + return False + + return cls._matches_function_pattern(function_name, patterns) + + @classmethod + def _get_vad_protect_values(cls, context, kernel) -> Tuple[int, ...]: + try: + return tuple( + vadinfo.VadInfo.protect_values( + context, + kernel.layer_name, + kernel.symbol_table_name, + ) + ) + except ( + AttributeError, + KeyError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + exceptions.SymbolError, + ) as exc: + vollog.debug("Falling back to empty VAD protection table: %s", exc) + return tuple() + + @classmethod + def _build_vad_cache(cls, proc, protect_values: Tuple[int, ...]) -> List[dict]: + if not hasattr(proc, "get_vad_root"): + return [] + + vad_cache: List[dict] = [] + try: + vads = vadinfo.VadInfo.list_vads(proc) + except ( + AttributeError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + ): + return vad_cache + + for vad in vads: + try: + start = int(vad.get_start()) + end = int(vad.get_end()) + except ( + AttributeError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + ): + continue + + try: + private_memory = bool(vad.get_private_memory()) + except ( + AttributeError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + ): + private_memory = False + + protection = "" + if protect_values: + try: + protection = vad.get_protection( + protect_values, vadinfo.winnt_protections + ) + except ( + AttributeError, + IndexError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + ): + protection = "" + + try: + file_name_obj = vad.get_file_name() + if isinstance(file_name_obj, str): + file_name = file_name_obj + elif isinstance(file_name_obj, interfaces.renderers.BaseAbsentValue): + file_name = "" + else: + file_name = str(file_name_obj) + except ( + AttributeError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + ): + file_name = "" + + vad_cache.append( + { + "start": start, + "end": end, + "private_memory": private_memory, + "protection": protection, + "execute": "PAGE_EXECUTE" in protection, + "file_name": file_name, + } + ) + + return vad_cache + + @staticmethod + def _classify_target_vad_evidence(target: int, vad_cache: List[dict]) -> str: + for vad in vad_cache: + if int(vad["start"]) <= target <= int(vad["end"]): + if bool(vad.get("private_memory")): + return "private_exec_vad" if vad.get("execute") else "private_nonexec" + return "mapped_exec_image" if vad.get("execute") else "mapped_nonexec" + return "missing_vad" + + @staticmethod + def _is_reasonable_directory_range( + virtual_address: int, size: int, size_of_image: int + ) -> bool: + if ( + size_of_image <= 0 + or virtual_address <= 0 + or size <= 0 + or virtual_address >= size_of_image + or size > size_of_image + ): + return False + return virtual_address + size <= size_of_image + + @classmethod + def _iter_reasonable_directory_indexes(cls, pe_obj) -> List[int]: + try: + size_of_image = int(pe_obj.OPTIONAL_HEADER.SizeOfImage) + directories = pe_obj.OPTIONAL_HEADER.DATA_DIRECTORY + except (AttributeError, IndexError, TypeError, ValueError): + return [] + + safe_indexes: List[int] = [] + image_base = int(getattr(pe_obj.OPTIONAL_HEADER, "ImageBase", 0)) + for directory_name in cls._PE_DIRECTORY_NAMES: + try: + directory_index = pefile.DIRECTORY_ENTRY[directory_name] + directory = directories[directory_index] + virtual_address = int(directory.VirtualAddress) + size = int(directory.Size) + except (AttributeError, IndexError, KeyError, TypeError, ValueError): + continue + + if virtual_address == 0 or size == 0: + continue + + if cls._is_reasonable_directory_range( + virtual_address, size, size_of_image + ): + safe_indexes.append(directory_index) + continue + + vollog.debug( + "Skipping unreasonable %s directory for PE at %#x: VA=%#x Size=%#x Image=%#x", + directory_name, + image_base, + virtual_address, + size, + size_of_image, + ) + + return safe_indexes + + @staticmethod + def _is_known_benign_target_module(module_name: str) -> bool: + normalized = module_name.lower() + if normalized in HookScorer._SYSTEM_DLLS: + return True + if normalized in HookScorer._WOW64_DLLS: + return True + if normalized in HookScorer._SHIM_DLLS: + return True + if any(fragment in normalized for fragment in HookScorer._SECURITY_MODULES): + return True + if any(fragment in normalized for fragment in HookScorer._CRT_FRAGMENTS): + return True + if any(fragment in normalized for fragment in HookScorer._CLR_FRAGMENTS): + return True + return ( + "chrome_elf" in normalized + or "nss3" in normalized + or "xul" in normalized + ) + + @staticmethod + def _target_in_executable_section( + target: int, module_base: int, module_metadata: Optional[dict] + ) -> bool: + if not module_metadata: + return False + + for section in module_metadata.get("sections", []): + try: + start = module_base + int(section["rva"]) + size = max(1, int(section["vsize"])) + except (KeyError, TypeError, ValueError): + continue + if start <= target < start + size: + return True + return False + + @classmethod + def _is_suspicious_backed_inline_target( + cls, + source_module: str, + function_name: str, + target_owner: str, + target: int, + module_map: ModuleMap, + target_metadata: Optional[dict], + target_vad_evidence: str = "", + benign_pair_match: bool = False, + ) -> bool: + normalized_owner = target_owner.lower() + if ( + function_name not in HookScorer.HIGH_VALUE_TARGETS + or normalized_owner in cls._UNKNOWN_OWNERS + or normalized_owner == source_module.lower() + or normalized_owner not in module_map + or cls._is_known_benign_target_module(normalized_owner) + or benign_pair_match + ): + return False + + if not target_vad_evidence: + # Preserve the legacy helper contract for call sites and tests that + # only provide module metadata. The main runtime path supplies + # explicit VAD evidence and does not rely on this fallback. + return True + + if target_vad_evidence == "private_exec_vad": + return True + + module_base = module_map[normalized_owner][0] + if target_metadata and not cls._target_in_executable_section( + target, module_base, target_metadata + ): + return True + + return False + + @classmethod + def _build_inline_followup_note( + cls, target_owner: str, suspicious_backed_target: bool + ) -> str: + normalized_owner = target_owner.lower() + if normalized_owner in cls._UNKNOWN_OWNERS: + return "follow-up: inspect target with malfind" + if suspicious_backed_target: + return "follow-up: inspect target module with malfind" + return "" + + @staticmethod + def _build_target_evidence_note(target_vad_evidence: str) -> str: + evidence_notes = { + "mapped_exec_image": "mapped image", + "private_exec_vad": "private exec VAD", + "mapped_nonexec": "mapped non-exec region", + "private_nonexec": "private non-exec VAD", + "missing_vad": "unmapped target", + } + return evidence_notes.get(target_vad_evidence, "") + + @staticmethod + def _combine_analysis_notes(*notes: str) -> str: + return "; ".join(note for note in notes if note) + + @staticmethod + def _append_analysis_note(disasm: str, note: str) -> str: + if not note: + return disasm + if not disasm: + return note + return f"{disasm} [{note}]" + + @staticmethod + def _extract_process_identity(proc) -> Optional[Tuple[int, int, str]]: + try: + pid = int(proc.UniqueProcessId) + except ( + AttributeError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + ): + return None + + try: + ppid = int(getattr(proc, "InheritedFromUniqueProcessId", 0) or 0) + except ( + AttributeError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + ): + ppid = 0 + + try: + process_name = utility.array_to_string(proc.ImageFileName) + except ( + AttributeError, + TypeError, + ValueError, + exceptions.InvalidAddressException, + ): + try: + raw_name = getattr(proc, "ImageFileName", "") + except (AttributeError, exceptions.InvalidAddressException): + raw_name = "" + + if isinstance(raw_name, (bytes, bytearray)): + process_name = raw_name.decode("utf-8", errors="replace") + elif raw_name: + process_name = str(raw_name) + else: + process_name = "" + + return pid, ppid, process_name + + @staticmethod + def _lookup_parent_process_name( + ppid: int, process_catalog: Dict[int, str] + ) -> str: + if ppid <= 0: + return "" + return process_catalog.get(ppid, "") + + @staticmethod + def _row_sort_key(item: Tuple[int, tuple]) -> Tuple[int, int, str, str, str, str, int, int]: + row = item[1] + return ( + int(row[1]), + int(row[0]), + str(row[2]).lower(), + str(row[4]).lower(), + str(row[5]).lower(), + str(row[6]).lower(), + int(row[7]), + int(row[8]), + ) + + @classmethod + def _reconstruct_pe(cls, context, pe_table_name, dll_base, layer_name): + """Reconstruct a PE from memory and parse it with pefile.""" + dos_hdr = context.object( + pe_table_name + constants.BANG + "_IMAGE_DOS_HEADER", + offset=dll_base, + layer_name=layer_name, + ) + buf = io.BytesIO() + for off, data in dos_hdr.reconstruct(): + if off < 0: + raise ValueError(f"Negative reconstructed PE offset {off}") + if off + len(data) > cls._MAX_RECONSTRUCTED_PE_SIZE: + raise ValueError( + f"Reconstructed PE exceeded {cls._MAX_RECONSTRUCTED_PE_SIZE:#x} bytes" + ) + buf.seek(off) + buf.write(data) + reconstructed = buf.getvalue() + if len(reconstructed) < 0x40: + raise ValueError("Truncated reconstructed PE") + + pe_obj = pefile.PE(data=reconstructed, fast_load=True) + directory_indexes = cls._iter_reasonable_directory_indexes(pe_obj) + if directory_indexes: + pe_obj.parse_data_directories(directories=directory_indexes) + return pe_obj + + # ── Generator ────────────────────────────────────────────────────────── + + def _generator(self) -> Iterator[Tuple[int, tuple]]: + quick_mode = self.config.get("quick", False) + + if not HAS_CAPSTONE and not quick_mode: + vollog.error( + "capstone not installed — inline hook detection disabled. " + "Run: pip install capstone" + ) + if not HAS_PEFILE: + vollog.error( + "pefile not installed — IAT/EAT detection disabled. " + "Run: pip install pefile" + ) + + kernel = self.context.modules[self.config["kernel"]] + layer_name = kernel.layer_name + show_low = self.config.get("low", False) + vad_protect_values = self._get_vad_protect_values(self.context, kernel) + + # ── PE symbol table for reconstruction ──────────────────────────── + pe_table_name: Optional[str] = None + if HAS_PEFILE: + pe_table_name = intermed.IntermediateSymbolTable.create( + self.context, + self.config_path, + "windows", + "pe", + class_types=pe.class_types, + ) + + scorer = HookScorer() + module_cache = ModuleCache() + process_module_metadata: Dict[Tuple[int, str], dict] = {} + # Cross-process dedup: (dll_name, func_name, hook_target) → process count + # Hooks consistent across many processes are shared-mapped pages (benign). + hook_seen_count: Dict[Tuple[str, str, int], int] = {} + # Inline results are buffered so cross-process suppression is based on + # final occurrence counts rather than process traversal order. + buffered_inline_findings: List[dict] = [] + + # ── SSDT (kernel-mode) ───────────────────────────────────────────── + if ( + self.config.get("ssdt", True) + and not self.config.get("skip-kernel", False) + ): + mod_collection = _build_module_collection( + self.context, self.config["kernel"] + ) + ssdt_detector = SSDTHookDetector() + for hook in ssdt_detector.check_ssdt( + self.context, self.config["kernel"], mod_collection + ): + # Route through scorer so known security-product SSDT filters + # (klflt, wdfilter, etc.) are suppressed instead of reported HIGH. + hook["function"] = f"Syscall#{hook['syscall_index']}" + hook["target_module"] = hook["owner"] + hook["source_module"] = "ntoskrnl.exe" + hook["process"] = "System" + s = scorer.score(hook, {}) + conf = scorer.label(s) + if show_low or conf != "LOW": + yield ( + 0, + ( + 0, + 0, + "System", + "", + "SSDT", + "ntoskrnl.exe", + hook["function"], + format_hints.Hex(hook["func_addr"]), + format_hints.Hex(hook["func_addr"]), + hook["owner"], + conf, + "", + format_hints.HexBytes(b""), + ), + ) + + # ── Per-process checks ───────────────────────────────────────────── + filter_func = pslist.PsList.create_pid_filter( + self.config.get("pid", None) + ) + all_processes = list( + pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=lambda _proc: False, + ) + ) + process_catalog: Dict[int, str] = {} + for proc in all_processes: + identity = self._extract_process_identity(proc) + if identity is None: + continue + process_catalog[identity[0]] = identity[2] + + for proc in all_processes: + try: + if filter_func(proc): + continue + except exceptions.InvalidAddressException: + continue + + identity = self._extract_process_identity(proc) + if identity is None: + continue + + pid, ppid, proc_name = identity + parent_process = self._lookup_parent_process_name(ppid, process_catalog) + try: + proc_layer_name = proc.add_process_layer() + proc_layer = self.context.layers[proc_layer_name] + is_64 = proc_layer.bits_per_register == 64 + except exceptions.InvalidAddressException: + continue + + vad_cache = self._build_vad_cache(proc, vad_protect_values) + + try: + mod_map, mod_list = self._build_module_map(proc) + except exceptions.InvalidAddressException: + continue + + iat_detector = IATHookDetector(mod_map, proc_layer, is_64) + eat_detector = EATHookDetector() + inline_detector: Optional[InlineHookDetector] = None + if HAS_CAPSTONE and not quick_mode: + try: + inline_detector = InlineHookDetector(is_64, proc_layer) + except RuntimeError: + pass + + for dll_name, dll_base, dll_size in mod_list: + # ── Reconstruct PE ───────────────────────────────────── + pe_obj = None + if HAS_PEFILE and pe_table_name: + try: + pe_obj = self._reconstruct_pe( + self.context, + pe_table_name, + dll_base, + proc_layer_name, + ) + except ( + exceptions.InvalidAddressException, + exceptions.SymbolError, + AttributeError, + KeyError, + TypeError, + ValueError, + OverflowError, + MemoryError, + RecursionError, + pefile.PEFormatError, + ) as exc: + vollog.debug( + "Skipping PE reconstruction for %s at %#x in PID %d: %s", + dll_name, + dll_base, + pid, + exc, + ) + + if pe_obj is None: + continue + + try: + cached = module_cache.get_or_parse(dll_name, pe_obj) + process_module_metadata[(pid, dll_name)] = cached + except ( + AttributeError, + IndexError, + KeyError, + TypeError, + ValueError, + OverflowError, + ) as exc: + vollog.debug( + "Skipping cached metadata for %s in PID %d: %s", + dll_name, + pid, + exc, + ) + continue + + # ── Inline hook check ────────────────────────────────── + if inline_detector and cached["exports"]: + # Only disassemble exports whose RVA falls inside an + # executable section. Data exports (msvcrt._environ, + # msvcrt._acmdln, C++ static members, etc.) have their + # bytes interpreted as code by Capstone, producing false + # positives. If there are no section records (unusual), + # fall back to scanning all exports. + exec_sections = cached["sections"] + if exec_sections: + code_exports = { + name: rva + for name, rva in cached["exports"].items() + if any( + sec["rva"] <= rva < sec["rva"] + sec["vsize"] + for sec in exec_sections + ) + } + else: + code_exports = cached["exports"] + + prologues = batch_read_prologues( + proc_layer, code_exports, dll_base + ) + for func_name, func_bytes in prologues.items(): + func_va = dll_base + cached["exports"][func_name] + + # ── Static patch detection (early RET / XOR+RET) ── + # Only meaningful for high-value defensive functions. + if func_name in HookScorer.HIGH_VALUE_TARGETS: + try: + patch = inline_detector.check_patch(func_bytes) + except ( + AttributeError, + TypeError, + ValueError, + capstone.CsError, + ) as exc: + vollog.debug( + "Skipping patch analysis for %s!%s at %#x: %s", + dll_name, + func_name, + func_va, + exc, + ) + patch = None + if patch: + patch_type, disasm = patch + yield ( + 0, + ( + pid, + ppid, + proc_name, + parent_process, + f"PATCH/{patch_type}", + dll_name, + func_name, + format_hints.Hex(func_va), + format_hints.Hex(func_va), + dll_name, + "HIGH", + disasm, + format_hints.HexBytes(func_bytes[:16]), + ), + ) + continue # No need for check_inline too + + # ── Redirecting inline hook ─────────────────────── + try: + result = inline_detector.check_inline( + func_bytes, + func_va, + dll_base, + dll_base + dll_size, + ) + except ( + AttributeError, + TypeError, + ValueError, + capstone.CsError, + ) as exc: + vollog.debug( + "Skipping inline analysis for %s!%s at %#x: %s", + dll_name, + func_name, + func_va, + exc, + ) + continue + if result: + target, hook_type_str, disasm = result + owner = iat_detector.find_owner(target) + + # Cross-process dedup tracking + dedup_key = (dll_name, func_name, target) + hook_seen_count[dedup_key] = ( + hook_seen_count.get(dedup_key, 0) + 1 + ) + buffered_inline_findings.append( + { + "pid": pid, + "ppid": ppid, + "proc_name": proc_name, + "parent_process": parent_process, + "hook_type": hook_type_str, + "dll_name": dll_name, + "func_name": func_name, + "func_va": func_va, + "target": target, + "owner": owner, + "disasm": disasm, + "hook_bytes": func_bytes[:16], + "dedup_key": dedup_key, + "mod_map": mod_map, + "vad_cache": vad_cache, + } + ) + + # ── IAT hook check ───────────────────────────────────── + for hook in iat_detector.check_iat(pe_obj, dll_base): + hook["source_module"] = dll_name + hook["process"] = proc_name + target_vad_evidence = self._classify_target_vad_evidence( + hook["resolved_addr"], vad_cache + ) + hook["target_vad_evidence"] = target_vad_evidence + hook["backed_exec_image"] = ( + target_vad_evidence == "mapped_exec_image" + ) + hook["private_exec_vad"] = ( + target_vad_evidence == "private_exec_vad" + ) + hook["benign_pair_match"] = self._is_benign_pair_match( + "IAT", + dll_name, + hook["target_module"], + hook["function"], + ) + s = scorer.score(hook, mod_map) + conf = scorer.label(s) + if show_low or conf != "LOW": + analysis_note = self._build_target_evidence_note( + target_vad_evidence + ) + yield ( + 0, + ( + pid, + ppid, + proc_name, + parent_process, + "IAT", + dll_name, + hook["function"], + format_hints.Hex(0), + format_hints.Hex(hook["resolved_addr"]), + hook["target_module"], + conf, + analysis_note, + format_hints.HexBytes(b""), + ), + ) + + # ── EAT hook check ───────────────────────────────────── + for hook in eat_detector.check_eat(pe_obj, dll_base, dll_size): + hook["source_module"] = dll_name + hook["process"] = proc_name + hook["target_module"] = iat_detector.find_owner( + hook["absolute_addr"] + ) + target_vad_evidence = self._classify_target_vad_evidence( + hook["absolute_addr"], vad_cache + ) + hook["target_vad_evidence"] = target_vad_evidence + hook["backed_exec_image"] = ( + target_vad_evidence == "mapped_exec_image" + ) + hook["private_exec_vad"] = ( + target_vad_evidence == "private_exec_vad" + ) + s = scorer.score(hook, mod_map) + conf = scorer.label(s) + if show_low or conf != "LOW": + analysis_note = self._build_target_evidence_note( + target_vad_evidence + ) + yield ( + 0, + ( + pid, + ppid, + proc_name, + parent_process, + "EAT", + dll_name, + hook["function"], + format_hints.Hex(dll_base + hook["rva"]), + format_hints.Hex(hook["absolute_addr"]), + hook["target_module"], + conf, + analysis_note, + format_hints.HexBytes(b""), + ), + ) + + for finding in buffered_inline_findings: + target_vad_evidence = self._classify_target_vad_evidence( + finding["target"], finding["vad_cache"] + ) + target_metadata = process_module_metadata.get( + (finding["pid"], finding["owner"]) + ) + benign_pair_match = self._is_benign_pair_match( + "INLINE", + finding["dll_name"], + finding["owner"], + finding["func_name"], + ) + suspicious_backed_target = self._is_suspicious_backed_inline_target( + finding["dll_name"], + finding["func_name"], + finding["owner"], + finding["target"], + finding["mod_map"], + target_metadata, + target_vad_evidence, + benign_pair_match, + ) + analysis_note = self._combine_analysis_notes( + self._build_inline_followup_note( + finding["owner"], suspicious_backed_target + ), + self._build_target_evidence_note(target_vad_evidence), + ) + hook_info = { + "type": "INLINE", + "function": finding["func_name"], + "target_module": finding["owner"], + "source_module": finding["dll_name"], + "process": finding["proc_name"], + "suspicious_backed_target": suspicious_backed_target, + "benign_pair_match": benign_pair_match, + "target_vad_evidence": target_vad_evidence, + "backed_exec_image": target_vad_evidence == "mapped_exec_image", + "private_exec_vad": target_vad_evidence == "private_exec_vad", + } + s = scorer.score( + hook_info, + finding["mod_map"], + cross_process_count=hook_seen_count[finding["dedup_key"]], + ) + conf = scorer.label(s) + if show_low or conf != "LOW": + yield ( + 0, + ( + finding["pid"], + finding["ppid"], + finding["proc_name"], + finding["parent_process"], + f"INLINE/{finding['hook_type']}", + finding["dll_name"], + finding["func_name"], + format_hints.Hex(finding["func_va"]), + format_hints.Hex(finding["target"]), + finding["owner"], + conf, + self._append_analysis_note( + finding["disasm"], analysis_note + ), + format_hints.HexBytes(finding["hook_bytes"]), + ), + ) + + def run(self) -> renderers.TreeGrid: + # Materialize results before returning the TreeGrid so the CLI does not + # print a header and partial output until the scan is fully complete. + rows = list(self._generator()) + rows.sort(key=self._row_sort_key) + return renderers.TreeGrid( + [ + ("PID", int), + ("PPID", int), + ("Process", str), + ("ParentProcess", str), + ("HookType", str), + ("HookedModule", str), + ("Function", str), + ("HookAddress", format_hints.Hex), + ("HookTarget", format_hints.Hex), + ("TargetModule", str), + ("Confidence", str), + ("Disassembly", str), + ("HookBytes", format_hints.HexBytes), + ], + iter(rows), + ) + + +# --------------------------------------------------------------------------- +# Module collection helper (used by SSDT detector) +# --------------------------------------------------------------------------- + + +def _build_module_collection( + context: interfaces.context.ContextInterface, + kernel_module_name: str, +) -> contexts.ModuleCollection: + """Build a ModuleCollection from the kernel module list.""" + kernel = context.modules[kernel_module_name] + ctx_modules = [] + for mod in modules.Modules.list_modules(context, kernel_module_name): + try: + name_with_ext = mod.BaseDllName.get_string() + except exceptions.InvalidAddressException: + continue + name = os.path.splitext(name_with_ext)[0] + ctx_mod = contexts.SizedModule.create( + context=context, + module_name=name, + layer_name=kernel.layer_name, + offset=mod.DllBase, + size=mod.SizeOfImage, + symbol_table_name=kernel.symbol_table_name, + ) + ctx_modules.append(ctx_mod) + return contexts.ModuleCollection(ctx_modules)