From 015e47f34df4b9d6bee059730f15c3198e0b3c12 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Tue, 10 Feb 2026 10:45:56 +0100 Subject: [PATCH 01/11] Add SQLCipher implementation --- .../database/sqlite3/encryption/__init__.py | 10 + .../sqlite3/encryption/sqlcipher/__init__.py | 10 + .../sqlite3/encryption/sqlcipher/exception.py | 7 + .../sqlite3/encryption/sqlcipher/sqlcipher.py | 291 ++++++++++++++++++ dissect/database/sqlite3/sqlite3.py | 7 +- .../aes256_hmac_none_kdf_4000.sqlite | 3 + .../aes256_hmac_sha1_kdf_4000.sqlite | 3 + .../aes256_hmac_sha1_kdf_64000.sqlite | 3 + ..._hmac_sha256_kdf_sha1_1337_page_8kb.sqlite | 3 + .../aes256_hmac_sha512_kdf_256000.sqlite | 3 + ...hmac_sha512_kdf_256000_plain_header.sqlite | 3 + .../sqlite3/encryption/sqlcipher/create.sql | 3 + .../encryption/sqlcipher/plaintext.sqlite | 3 + tests/sqlite3/test_sqlcipher.py | 96 ++++++ 14 files changed, 443 insertions(+), 2 deletions(-) create mode 100644 dissect/database/sqlite3/encryption/__init__.py create mode 100644 dissect/database/sqlite3/encryption/sqlcipher/__init__.py create mode 100644 dissect/database/sqlite3/encryption/sqlcipher/exception.py create mode 100644 dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py create mode 100755 tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_none_kdf_4000.sqlite create mode 100755 tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_4000.sqlite create mode 100755 tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_64000.sqlite create mode 100755 tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite create mode 100755 tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000.sqlite create mode 100755 tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite create mode 100755 tests/_data/sqlite3/encryption/sqlcipher/create.sql create mode 100755 tests/_data/sqlite3/encryption/sqlcipher/plaintext.sqlite create mode 100644 tests/sqlite3/test_sqlcipher.py diff --git a/dissect/database/sqlite3/encryption/__init__.py b/dissect/database/sqlite3/encryption/__init__.py new file mode 100644 index 0000000..551bebd --- /dev/null +++ b/dissect/database/sqlite3/encryption/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from dissect.database.sqlite3.encryption.sqlcipher.sqlcipher import SQLCipher1, SQLCipher2, SQLCipher3, SQLCipher4 + +__all__ = [ + "SQLCipher1", + "SQLCipher2", + "SQLCipher3", + "SQLCipher4", +] diff --git a/dissect/database/sqlite3/encryption/sqlcipher/__init__.py b/dissect/database/sqlite3/encryption/sqlcipher/__init__.py new file mode 100644 index 0000000..551bebd --- /dev/null +++ b/dissect/database/sqlite3/encryption/sqlcipher/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from dissect.database.sqlite3.encryption.sqlcipher.sqlcipher import SQLCipher1, SQLCipher2, SQLCipher3, SQLCipher4 + +__all__ = [ + "SQLCipher1", + "SQLCipher2", + "SQLCipher3", + "SQLCipher4", +] diff --git a/dissect/database/sqlite3/encryption/sqlcipher/exception.py b/dissect/database/sqlite3/encryption/sqlcipher/exception.py new file mode 100644 index 0000000..428da91 --- /dev/null +++ b/dissect/database/sqlite3/encryption/sqlcipher/exception.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +from dissect.database.exception import Error + + +class SQLCipherError(Error): + pass diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py new file mode 100644 index 0000000..da6003b --- /dev/null +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -0,0 +1,291 @@ +from __future__ import annotations + +from io import BytesIO +from pathlib import Path +from typing import BinaryIO + +from Crypto.Cipher import AES +from Crypto.Hash import SHA1, SHA256, SHA512 +from Crypto.Hash import new as new_hash +from Crypto.Protocol.KDF import PBKDF2 +from dissect.util.stream import MappingStream + +from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError +from dissect.database.sqlite3.exception import InvalidDatabase +from dissect.database.sqlite3.sqlite3 import SQLite3 + + +class SQLCipher(SQLite3): + """Abstract SQLCipher Community Edition implementation. Not intended for direct use. + Invoke :class:`SQLCipher4` or :class:`SQLCipher3` instead. + + HMAC key derivation and tag verification is currently not implemented. + + References: + - https://www.zetetic.net/sqlcipher/design/ + - https://github.com/sqlcipher/sqlcipher + """ + + DEFAULT_PAGE_SIZE: int + DEFAULT_KDF_ITER: int + DEFAULT_KDF_ALGO: object + DEFAULT_HMAC_ALGO: object + + def __init__( + self, + fh: Path | BinaryIO, + passphrase: str | bytes, + *, + salt: bytes | None = None, + plaintext_header: int | None = None, + page_size: int | None = None, + kdf_iter: int | None = None, + kdf_algo: object | None = None, + hmac_algo: object | None = None, + no_kdf: bool = False, + ): + self.cipher_fh = fh + self.cipher_path = None + self.cipher_page_size = page_size or self.DEFAULT_PAGE_SIZE + self.kdf_iter = kdf_iter or self.DEFAULT_KDF_ITER + self.kdf_algo = kdf_algo or self.DEFAULT_KDF_ALGO + self.hmac_algo = hmac_algo or self.DEFAULT_HMAC_ALGO + + if isinstance(fh, Path): + self.cipher_path = fh + self.cipher_fh = fh.open("rb") + + if not hasattr(self.cipher_fh, "read"): + raise ValueError("Provided file handle cannot be read from") + + if isinstance(passphrase, str): + passphrase = passphrase.encode() + + if not passphrase: + raise SQLCipherError("No passphrase provided") + + if isinstance(self.hmac_algo, str): + self.hmac_algo = new_hash(self.hmac_algo) + + if isinstance(self.kdf_algo, str): + self.kdf_algo = new_hash(self.kdf_algo) + + # Part of the header can be plaintext. We can infer that or it can be passed upon initialization. + # https://www.zetetic.net/sqlcipher/sqlcipher-api/#cipher_plaintext_header_size + if plaintext_header: + self.plaintext_header = plaintext_header + + # The default and recommended plaintext header size is 32 bytes. + elif (header_or_salt := self.cipher_fh.read(16)) == b"SQLite format 3\x00": + self.plaintext_header = 32 + else: + self.plaintext_header = None + + if self.plaintext_header and not salt: + raise SQLCipherError("Plaintext header has no salt, please provide salt manually") + + self.salt = salt or header_or_salt + self.passphrase = passphrase + self.key = self.passphrase if no_kdf else derive_key(self.passphrase, self.salt, self.kdf_iter, self.kdf_algo) + + # Initialize the decrypted SQLite3 stream as a file-like object and see if that works. + try: + super().__init__(self.stream(), wal=None, checkpoint=None) + except InvalidDatabase as e: + raise SQLCipherError("Decryption of SQLCipher database failed or is not a database") from e + + # Sanity check to prevent further issues down the line. + if self.header.page_size != self.cipher_page_size or self.header.schema_format_number not in (1, 2, 3, 4): + raise SQLCipherError("Decryption of SQLCipher database failed or is not a database") + self.unlocked = True + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__} " + f"fh='{self.cipher_path or self.cipher_fh!s}' " + f"wal='{self.wal!s}' " + f"checkpoint={bool(self.checkpoint)!r} " + f"pages={self.header.page_count!r} " + f"unlocked={self.unlocked!r}>" + ) + + def close(self) -> None: + """Close the database.""" + super().close() + # Only close DB handle if we opened it using a path + if self.cipher_path is not None: + self.cipher_fh.close() + + def stream(self) -> MappingStream: + """Create a mapped stream of ``SQLCipherPage`` instances.""" + stream = MappingStream() + + # Add an appropriate plaintext SQLite3 header. + self.cipher_fh.seek(0) + offset = self.plaintext_header or 16 + header = BytesIO(self.cipher_fh.read(offset) if self.plaintext_header else b"SQLite format 3\x00") + stream.add(0, offset, header) + + # Creates SQLCipherPage objects which can be lazily read from. No page reading or decrypting happens + # until a specific page is accessed by the reader of the MappingStream. + page_num = 1 + while True: + try: + page = SQLCipherPage(self, page_num) + size = self.cipher_page_size - ((self.plaintext_header or 16) if page_num == 1 else 0) + stream.add(offset, size, page) + offset += size + page_num += 1 + except EOFError: # noqa: PERF203 + break + + return stream + + +class SQLCipher4(SQLCipher): + """SQLCipher Community edition version 4. + + Decrypts a SQLCipher database from the given path or file-like oject. + + Example usage: + >>> from dissect.database.sqlite3.encryption import SQLCipher4 + >>> db = SQLCipher4(Path("file.db"), "passphrase") + >>> row = db.table("MyTable").row(0) + + Args: + fh: The path or file-like object to open. + passphrase: String or bytes passphrase. + salt: Optionally provide the 16-byte salt directly. + plaintext_header: Size of plaintext header to use. + page_size: Override size of each page. + kdf_iter: Override amount of KDF iterations. + kdf_algo: Override KDF digest alrorithm. + hmac_algo: Override HMAC digest algorithm. + + Raises: + SQLCipherError: If decryption failed using the provided arguments. + + References: + - https://www.zetetic.net/sqlcipher/design/ + - https://github.com/sqlcipher/sqlcipher + """ + + DEFAULT_PAGE_SIZE = 4096 + DEFAULT_KDF_ITER = 256_000 + DEFAULT_KDF_ALGO = SHA512 + DEFAULT_HMAC_ALGO = SHA512 + + +class SQLCipher3(SQLCipher): + """SQLCipher version 3.""" + + DEFAULT_PAGE_SIZE = 1024 + DEFAULT_KDF_ITER = 64_000 + DEFAULT_KDF_ALGO = SHA1 + DEFAULT_HMAC_ALGO = SHA1 + + +class SQLCipher2(SQLCipher): + """SQLCipher version 2.""" + + DEFAULT_PAGE_SIZE = 1024 + DEFAULT_KDF_ITER = 4000 + DEFAULT_KDF_ALGO = SHA1 + DEFAULT_HMAC_ALGO = SHA1 + + +class SQLCipher1(SQLCipher): + """SQLCipher version 1.""" + + DEFAULT_PAGE_SIZE = 1024 + DEFAULT_KDF_ITER = 4000 + DEFAULT_KDF_ALGO = SHA1 + DEFAULT_HMAC_ALGO = None + + +class SQLCipherPage: + """Represents a single SQLCipher page. Acts as if it is a BytesIO object to read from.""" + + def __init__(self, sqlcipher: SQLCipher, page_num: int) -> None: + self.sqlcipher = sqlcipher + self.page_num = page_num + self.offset = (page_num - 1) * sqlcipher.cipher_page_size + + # Calculate size of page iv (always 16 bytes) plus hmac digest size + self.align = 16 + (sqlcipher.hmac_algo.digest_size if sqlcipher.hmac_algo else 0) + + # Calculate the size of the encrypted data by substracting the iv+hmac size + # from the page size. The iv+hmac size needs to be adjusted to 16 byte blocks. + if self.align % 16 != 0: + self.align = (self.align + 15) & ~15 + self.enc_size = sqlcipher.cipher_page_size - self.align + + # The first page 'contains' the database salt so substract those first 16 bytes + # from the page size and set the file handle forward accordingly. + if page_num == 1: + header_offset = sqlcipher.plaintext_header or 16 + self.enc_size -= header_offset + self.offset += header_offset + + # Data is only read from the cipher file handle when ``.read()`` is called. + self.plaintext = None + self.encrypted = None + + # The last part of the page contains the iv and optionally hmac. + sqlcipher.cipher_fh.seek(self.offset + self.enc_size) + self.iv = sqlcipher.cipher_fh.read(16) + self.mac = sqlcipher.cipher_fh.read(sqlcipher.hmac_algo.digest_size) if sqlcipher.hmac_algo else None + + if len(self.iv) != 16: + raise EOFError + + self._pos = 0 + + def __repr__(self) -> str: + return ( + f"" + ) + + @property + def decrypted(self) -> bool: + return bool(self.plaintext) + + def seek(self, pos: int, whence: int = 0) -> None: + self._pos = pos + + def tell(self) -> int: + return self._pos + + def read(self, size: int | None = None) -> bytes: + """Cached plaintext reader of this page.""" + + if size == -1: + size = None + + if self.plaintext: + return self.plaintext[self._pos : size] + + self.sqlcipher.cipher_fh.seek(self.offset) + self.encrypted = self.sqlcipher.cipher_fh.read(self.enc_size) + + # We could have reached the end of the database if no more pages are left to read. + if not self.encrypted: + raise EOFError + + cipher = AES.new(self.sqlcipher.key, AES.MODE_CBC, self.iv) + + # Append null bytes so the plaintext aligns with the page size. + # https://github.com/sqlcipher/sqlcipher-tools/blob/master/decrypt.c + self.plaintext = cipher.decrypt(self.encrypted) + (self.align * b"\x00") + return self.plaintext[self._pos : size] + + +def derive_key(passphrase: bytes, salt: bytes, kdf_iter: int, kdf_algo: SHA1 | SHA256 | SHA512) -> bytes: + """Derive the database key as SQLCipher would using PBKDF2.""" + + if not kdf_iter and not kdf_algo: + return passphrase + + return PBKDF2(passphrase, salt, 32, count=kdf_iter, hmac_hash_module=kdf_algo) diff --git a/dissect/database/sqlite3/sqlite3.py b/dissect/database/sqlite3/sqlite3.py index ccdb0e4..945318a 100644 --- a/dissect/database/sqlite3/sqlite3.py +++ b/dissect/database/sqlite3/sqlite3.py @@ -3,7 +3,7 @@ import itertools import re from functools import lru_cache -from io import BytesIO +from io import BytesIO, RawIOBase from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO @@ -76,7 +76,7 @@ class SQLite3: def __init__( self, - fh: Path | BinaryIO, + fh: Path | BinaryIO | RawIOBase, wal: WAL | Path | BinaryIO | None = None, checkpoint: Checkpoint | int | None = None, ): @@ -131,6 +131,9 @@ def __init__( self.page = lru_cache(256)(self.page) + def __repr__(self) -> str: + return f"" # noqa: E501 + def __enter__(self) -> Self: """Return ``self`` upon entering the runtime context.""" return self diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_none_kdf_4000.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_none_kdf_4000.sqlite new file mode 100755 index 0000000..c156d06 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_none_kdf_4000.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:967db750c86483aa6a3b8a8341a4b526dddf382bb65c257d568f0f5f70c44eef +size 3072 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_4000.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_4000.sqlite new file mode 100755 index 0000000..b5d29a8 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_4000.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:82d350da85ecc2e616177f3cec2c617ca07428d8eb5f81c377d24f45a8012655 +size 3072 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_64000.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_64000.sqlite new file mode 100755 index 0000000..586053d --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_64000.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7445a296be286a6f80b3c6aeced3ce6c97a56b0b139086771b3a2300e99b635c +size 3072 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite new file mode 100755 index 0000000..fd06b23 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:da9a1bf147755fdb52bcde5e015c3595600476261676187b9001ddf28ae4c30e +size 24576 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000.sqlite new file mode 100755 index 0000000..3f48e58 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9a069e4540a63f41620dc4c1448d88e1cc7862f17e86bc27db0d7c772fa1f3e1 +size 12288 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite new file mode 100755 index 0000000..5036724 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ed4415bfb01407ffaf09f38bbbdb820171e2db31218687c50cb5f56e67e554ce +size 12288 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/create.sql b/tests/_data/sqlite3/encryption/sqlcipher/create.sql new file mode 100755 index 0000000..f5b7467 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/create.sql @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6aba30f71b43c6a36461a80d8f95204c64add1a0f395d63ca3e55ab80c38901f +size 1088 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/plaintext.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/plaintext.sqlite new file mode 100755 index 0000000..ab579b1 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/plaintext.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:aceca4caa08647907e1a3abeeef3ae1d8cccaad30e4046e88b43510e582b9b5e +size 12288 diff --git a/tests/sqlite3/test_sqlcipher.py b/tests/sqlite3/test_sqlcipher.py new file mode 100644 index 0000000..f48ad69 --- /dev/null +++ b/tests/sqlite3/test_sqlcipher.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError +from dissect.database.sqlite3.encryption.sqlcipher.sqlcipher import SQLCipher1, SQLCipher2, SQLCipher3, SQLCipher4 +from tests._util import absolute_path + +if TYPE_CHECKING: + from collections.abc import Callable + + from dissect.database.sqlite3.sqlite3 import SQLite3 + + +def _assert_sqlite_db(sqlite: SQLite3) -> None: + table = sqlite.table("Movies") + assert table.sql == ( + 'CREATE TABLE "Movies" (\n' + '\t"ID"\tINTEGER,\n' + '\t"Title"\tTEXT NOT NULL,\n' + '\t"Year"\tINTEGER NOT NULL,\n' + '\t"Director"\tTEXT NOT NULL,\n' + '\t"Rating"\tINTEGER,\n' + '\tPRIMARY KEY("ID" AUTOINCREMENT)\n' + ")" + ) + + movies = list(table.rows()) + assert len(movies) == 11 + + assert movies[-1].ID == 11 + assert movies[-1].Title == "The Good, the Bad and the Ugly" + assert movies[-1].Year == 1966 + assert movies[-1].Director == "Sergio Leone" + assert movies[-1].Rating == 8.8 + + +@pytest.mark.parametrize( + ("cipher", "kwargs", "path_str"), + [ + # Defaults per major version + pytest.param(SQLCipher4, {}, "aes256_hmac_sha512_kdf_256000.sqlite", id="version-4-default"), + pytest.param(SQLCipher3, {}, "aes256_hmac_sha1_kdf_64000.sqlite", id="version-3-default"), + pytest.param(SQLCipher2, {}, "aes256_hmac_sha1_kdf_4000.sqlite", id="version-2-default"), + pytest.param(SQLCipher1, {}, "aes256_hmac_none_kdf_4000.sqlite", id="version-1-default"), + # Custom parameters + pytest.param( + SQLCipher4, + { + "page_size": 8192, + "hmac_algo": "sha256", + "kdf_algo": "sha1", + "kdf_iter": 1337, + }, + "aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite", + id="version-4-custom-hmac-sha256-kdf-sha1-1337-page-8kb", + ), + ], +) +def test_decrypt_community(cipher: Callable, path_str: str, kwargs: dict) -> None: + """Test if we can parse a SQLCipher (4.5.6 community) encrypted database.""" + + path = absolute_path("_data/sqlite3/encryption/sqlcipher/" + path_str) + + with pytest.raises(SQLCipherError, match="Decryption of SQLCipher database failed"): + cipher(path, "invalid passphrase", **kwargs) + + # Test context manager + with cipher(path, "passphrase", **kwargs) as sqlcipher: + assert sqlcipher.stream().read(20) in ( + b"SQLite format 3\x00\x04\x00\x01\x01", # 1024 + b"SQLite format 3\x00\x10\x00\x01\x01", # 4096 + b"SQLite format 3\x00\x20\x00\x01\x01", # 8192 + ) + _assert_sqlite_db(sqlcipher) + + +def test_decrypt_community_plaintext_header() -> None: + """Test if we can parse and decrypt a SQLCipher 4.5.6 database with a 32-byte plaintext header.""" + + path = absolute_path("_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite") + salt = bytes.fromhex("01010101010101010101010101010101") + + with pytest.raises(SQLCipherError, match="No passphrase provided"): + SQLCipher4(path, "") + + with pytest.raises(SQLCipherError, match="Plaintext header has no salt, please provide salt manually"): + SQLCipher4(path, "invalid passphrase") + + with pytest.raises(SQLCipherError, match="Decryption of SQLCipher database failed"): + SQLCipher4(path, "invalid passphrase", salt=salt) + + sqlcipher = SQLCipher4(path, "passphrase", salt=salt) + _assert_sqlite_db(sqlcipher) From 9eb8332228f860e8b489b458c3bbfd1b518e7152 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Tue, 10 Feb 2026 10:49:36 +0100 Subject: [PATCH 02/11] Add RuntimeError for pycryptodome import --- .../sqlite3/encryption/sqlcipher/sqlcipher.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index da6003b..f52d6dc 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -4,16 +4,23 @@ from pathlib import Path from typing import BinaryIO -from Crypto.Cipher import AES -from Crypto.Hash import SHA1, SHA256, SHA512 -from Crypto.Hash import new as new_hash -from Crypto.Protocol.KDF import PBKDF2 from dissect.util.stream import MappingStream from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError from dissect.database.sqlite3.exception import InvalidDatabase from dissect.database.sqlite3.sqlite3 import SQLite3 +try: + from Crypto.Cipher import AES + from Crypto.Hash import SHA1, SHA256, SHA512 + from Crypto.Hash import new as new_hash + from Crypto.Protocol.KDF import PBKDF2 + + HAS_CRYPTO = True + +except ImportError: + HAS_CRYPTO = False + class SQLCipher(SQLite3): """Abstract SQLCipher Community Edition implementation. Not intended for direct use. @@ -51,6 +58,9 @@ def __init__( self.kdf_algo = kdf_algo or self.DEFAULT_KDF_ALGO self.hmac_algo = hmac_algo or self.DEFAULT_HMAC_ALGO + if not HAS_CRYPTO: + raise RuntimeError("Missing dependency pycryptodome") + if isinstance(fh, Path): self.cipher_path = fh self.cipher_fh = fh.open("rb") From 8eb70e19a0b1862b6e856fe5ff58fae2948b3d7b Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Tue, 10 Feb 2026 13:34:13 +0100 Subject: [PATCH 03/11] implement review feedback --- .../sqlite3/encryption/sqlcipher/sqlcipher.py | 81 ++++++++----------- dissect/database/sqlite3/sqlite3.py | 2 +- 2 files changed, 35 insertions(+), 48 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index f52d6dc..b18e0d7 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -23,11 +23,33 @@ class SQLCipher(SQLite3): - """Abstract SQLCipher Community Edition implementation. Not intended for direct use. - Invoke :class:`SQLCipher4` or :class:`SQLCipher3` instead. + """SQLCipher Community Edition implementation. + Instantiate with a subclass from :class:`SQLCipher4`, :class:`SQLCipher3`, :class:`SQLCipher2` + or :class:`SQLCipher1`. + + Decrypts a SQLCipher database from the given path or file-like oject. HMAC key derivation and tag verification is currently not implemented. + Example usage: + >>> from dissect.database.sqlite3.encryption import SQLCipher4 + >>> db = SQLCipher4(Path("file.db"), "passphrase") + >>> row = db.table("MyTable").row(0) + + Args: + fh (Path | BinaryIO): The path or file-like object to open. + passphrase (str | bytes): String or bytes passphrase. + salt (bytes): Optionally provide the 16-byte salt directly. + plaintext_header_size (int): Size of plaintext header to use. + page_size (int): Override size of each page. + kdf_iter (int): Override amount of KDF iterations. + kdf_algo (str | Crypto.Hash): Override KDF digest alrorithm. + hmac_algo (str | Crypto.Hash): Override HMAC digest algorithm. + no_kdf (bool): Disable KDF from passphrase, use as raw key. + + Raises: + SQLCipherError: If decryption failed using the provided arguments. + References: - https://www.zetetic.net/sqlcipher/design/ - https://github.com/sqlcipher/sqlcipher @@ -44,7 +66,7 @@ def __init__( passphrase: str | bytes, *, salt: bytes | None = None, - plaintext_header: int | None = None, + plaintext_header_size: int | None = None, page_size: int | None = None, kdf_iter: int | None = None, kdf_algo: object | None = None, @@ -82,16 +104,16 @@ def __init__( # Part of the header can be plaintext. We can infer that or it can be passed upon initialization. # https://www.zetetic.net/sqlcipher/sqlcipher-api/#cipher_plaintext_header_size - if plaintext_header: - self.plaintext_header = plaintext_header + if plaintext_header_size: + self.plaintext_header_size = plaintext_header_size # The default and recommended plaintext header size is 32 bytes. elif (header_or_salt := self.cipher_fh.read(16)) == b"SQLite format 3\x00": - self.plaintext_header = 32 + self.plaintext_header_size = 32 else: - self.plaintext_header = None + self.plaintext_header_size = None - if self.plaintext_header and not salt: + if self.plaintext_header_size and not salt: raise SQLCipherError("Plaintext header has no salt, please provide salt manually") self.salt = salt or header_or_salt @@ -107,7 +129,6 @@ def __init__( # Sanity check to prevent further issues down the line. if self.header.page_size != self.cipher_page_size or self.header.schema_format_number not in (1, 2, 3, 4): raise SQLCipherError("Decryption of SQLCipher database failed or is not a database") - self.unlocked = True def __repr__(self) -> str: return ( @@ -115,8 +136,7 @@ def __repr__(self) -> str: f"fh='{self.cipher_path or self.cipher_fh!s}' " f"wal='{self.wal!s}' " f"checkpoint={bool(self.checkpoint)!r} " - f"pages={self.header.page_count!r} " - f"unlocked={self.unlocked!r}>" + f"pages={self.header.page_count!r}>" ) def close(self) -> None: @@ -132,8 +152,8 @@ def stream(self) -> MappingStream: # Add an appropriate plaintext SQLite3 header. self.cipher_fh.seek(0) - offset = self.plaintext_header or 16 - header = BytesIO(self.cipher_fh.read(offset) if self.plaintext_header else b"SQLite format 3\x00") + offset = self.plaintext_header_size or 16 + header = BytesIO(self.cipher_fh.read(offset) if self.plaintext_header_size else b"SQLite format 3\x00") stream.add(0, offset, header) # Creates SQLCipherPage objects which can be lazily read from. No page reading or decrypting happens @@ -142,7 +162,7 @@ def stream(self) -> MappingStream: while True: try: page = SQLCipherPage(self, page_num) - size = self.cipher_page_size - ((self.plaintext_header or 16) if page_num == 1 else 0) + size = self.cipher_page_size - ((self.plaintext_header_size or 16) if page_num == 1 else 0) stream.add(offset, size, page) offset += size page_num += 1 @@ -153,33 +173,6 @@ def stream(self) -> MappingStream: class SQLCipher4(SQLCipher): - """SQLCipher Community edition version 4. - - Decrypts a SQLCipher database from the given path or file-like oject. - - Example usage: - >>> from dissect.database.sqlite3.encryption import SQLCipher4 - >>> db = SQLCipher4(Path("file.db"), "passphrase") - >>> row = db.table("MyTable").row(0) - - Args: - fh: The path or file-like object to open. - passphrase: String or bytes passphrase. - salt: Optionally provide the 16-byte salt directly. - plaintext_header: Size of plaintext header to use. - page_size: Override size of each page. - kdf_iter: Override amount of KDF iterations. - kdf_algo: Override KDF digest alrorithm. - hmac_algo: Override HMAC digest algorithm. - - Raises: - SQLCipherError: If decryption failed using the provided arguments. - - References: - - https://www.zetetic.net/sqlcipher/design/ - - https://github.com/sqlcipher/sqlcipher - """ - DEFAULT_PAGE_SIZE = 4096 DEFAULT_KDF_ITER = 256_000 DEFAULT_KDF_ALGO = SHA512 @@ -187,8 +180,6 @@ class SQLCipher4(SQLCipher): class SQLCipher3(SQLCipher): - """SQLCipher version 3.""" - DEFAULT_PAGE_SIZE = 1024 DEFAULT_KDF_ITER = 64_000 DEFAULT_KDF_ALGO = SHA1 @@ -196,8 +187,6 @@ class SQLCipher3(SQLCipher): class SQLCipher2(SQLCipher): - """SQLCipher version 2.""" - DEFAULT_PAGE_SIZE = 1024 DEFAULT_KDF_ITER = 4000 DEFAULT_KDF_ALGO = SHA1 @@ -205,8 +194,6 @@ class SQLCipher2(SQLCipher): class SQLCipher1(SQLCipher): - """SQLCipher version 1.""" - DEFAULT_PAGE_SIZE = 1024 DEFAULT_KDF_ITER = 4000 DEFAULT_KDF_ALGO = SHA1 diff --git a/dissect/database/sqlite3/sqlite3.py b/dissect/database/sqlite3/sqlite3.py index 945318a..9319684 100644 --- a/dissect/database/sqlite3/sqlite3.py +++ b/dissect/database/sqlite3/sqlite3.py @@ -132,7 +132,7 @@ def __init__( self.page = lru_cache(256)(self.page) def __repr__(self) -> str: - return f"" # noqa: E501 + return f"" # noqa: E501 def __enter__(self) -> Self: """Return ``self`` upon entering the runtime context.""" From 86d04c3dda7ca9b7a1866aaef930d1f7623c4bf2 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Tue, 10 Feb 2026 14:55:40 +0100 Subject: [PATCH 04/11] Implement AlignedStream --- .../sqlite3/encryption/sqlcipher/sqlcipher.py | 89 +++++++++---------- tests/sqlite3/test_sqlcipher.py | 2 +- 2 files changed, 42 insertions(+), 49 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index b18e0d7..9b19a67 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -1,14 +1,13 @@ from __future__ import annotations -from io import BytesIO +from functools import lru_cache from pathlib import Path from typing import BinaryIO -from dissect.util.stream import MappingStream - from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError from dissect.database.sqlite3.exception import InvalidDatabase from dissect.database.sqlite3.sqlite3 import SQLite3 +from dissect.util.stream import AlignedStream try: from Crypto.Cipher import AES @@ -90,6 +89,8 @@ def __init__( if not hasattr(self.cipher_fh, "read"): raise ValueError("Provided file handle cannot be read from") + self.cipher_fh: BinaryIO + if isinstance(passphrase, str): passphrase = passphrase.encode() @@ -146,30 +147,25 @@ def close(self) -> None: if self.cipher_path is not None: self.cipher_fh.close() - def stream(self) -> MappingStream: - """Create a mapped stream of ``SQLCipherPage`` instances.""" - stream = MappingStream() + def stream(self) -> SQLCipherStream: + """Create an aligned stream of :class:`SQLCipherPage` instances.""" + return SQLCipherStream(self, self.cipher_fh, size=None, align=self.cipher_page_size) + - # Add an appropriate plaintext SQLite3 header. - self.cipher_fh.seek(0) - offset = self.plaintext_header_size or 16 - header = BytesIO(self.cipher_fh.read(offset) if self.plaintext_header_size else b"SQLite format 3\x00") - stream.add(0, offset, header) +class SQLCipherStream(AlignedStream): + def __init__(self, sqlcipher: SQLCipher, fh: BinaryIO, size: int | None = None, align: int = 4096): + super().__init__(size, align) + self.sqlcipher = sqlcipher + self.fh = fh + self._read_page = lru_cache(4096)(self._read_page) - # Creates SQLCipherPage objects which can be lazily read from. No page reading or decrypting happens - # until a specific page is accessed by the reader of the MappingStream. - page_num = 1 - while True: - try: - page = SQLCipherPage(self, page_num) - size = self.cipher_page_size - ((self.plaintext_header_size or 16) if page_num == 1 else 0) - stream.add(offset, size, page) - offset += size - page_num += 1 - except EOFError: # noqa: PERF203 - break + def _read(self, offset: int, length: int) -> bytes: + pages_offset = offset // self.align + num_pages = length // self.align + return b"".join(self._read_page(num + 1) for num in range(pages_offset, pages_offset + num_pages)) - return stream + def _read_page(self, page_num: int) -> bytes: + return SQLCipherPage(self.sqlcipher, page_num).read() class SQLCipher4(SQLCipher): @@ -220,13 +216,11 @@ def __init__(self, sqlcipher: SQLCipher, page_num: int) -> None: # The first page 'contains' the database salt so substract those first 16 bytes # from the page size and set the file handle forward accordingly. if page_num == 1: - header_offset = sqlcipher.plaintext_header or 16 - self.enc_size -= header_offset - self.offset += header_offset - - # Data is only read from the cipher file handle when ``.read()`` is called. - self.plaintext = None - self.encrypted = None + self.header_offset = sqlcipher.plaintext_header_size or 16 + self.enc_size -= self.header_offset + self.offset += self.header_offset + else: + self.header_offset = 0 # The last part of the page contains the iv and optionally hmac. sqlcipher.cipher_fh.seek(self.offset + self.enc_size) @@ -239,15 +233,7 @@ def __init__(self, sqlcipher: SQLCipher, page_num: int) -> None: self._pos = 0 def __repr__(self) -> str: - return ( - f"" - ) - - @property - def decrypted(self) -> bool: - return bool(self.plaintext) + return f"" def seek(self, pos: int, whence: int = 0) -> None: self._pos = pos @@ -256,27 +242,34 @@ def tell(self) -> int: return self._pos def read(self, size: int | None = None) -> bytes: - """Cached plaintext reader of this page.""" + """Plaintext reader of this page.""" if size == -1: size = None - if self.plaintext: - return self.plaintext[self._pos : size] - self.sqlcipher.cipher_fh.seek(self.offset) - self.encrypted = self.sqlcipher.cipher_fh.read(self.enc_size) + encrypted = self.sqlcipher.cipher_fh.read(self.enc_size) # We could have reached the end of the database if no more pages are left to read. - if not self.encrypted: + if not encrypted: raise EOFError cipher = AES.new(self.sqlcipher.key, AES.MODE_CBC, self.iv) # Append null bytes so the plaintext aligns with the page size. # https://github.com/sqlcipher/sqlcipher-tools/blob/master/decrypt.c - self.plaintext = cipher.decrypt(self.encrypted) + (self.align * b"\x00") - return self.plaintext[self._pos : size] + plaintext = cipher.decrypt(encrypted) + (self.align * b"\x00") + + # Prepend the plaintext header of the SQLite3 database if this is the first page. + if self.header_offset == 16: + header = b"SQLite format 3\x00" + elif self.header_offset: + self.sqlcipher.cipher_fh.seek(0) + header = self.sqlcipher.cipher_fh.read(self.header_offset) + else: + header = b"" + + return (header + plaintext)[self._pos : size] def derive_key(passphrase: bytes, salt: bytes, kdf_iter: int, kdf_algo: SHA1 | SHA256 | SHA512) -> bytes: diff --git a/tests/sqlite3/test_sqlcipher.py b/tests/sqlite3/test_sqlcipher.py index f48ad69..56747df 100644 --- a/tests/sqlite3/test_sqlcipher.py +++ b/tests/sqlite3/test_sqlcipher.py @@ -3,9 +3,9 @@ from typing import TYPE_CHECKING import pytest - from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError from dissect.database.sqlite3.encryption.sqlcipher.sqlcipher import SQLCipher1, SQLCipher2, SQLCipher3, SQLCipher4 + from tests._util import absolute_path if TYPE_CHECKING: From d9a17149575fc030fab746780b043693d836c579 Mon Sep 17 00:00:00 2001 From: Computer Network Investigation <121175071+JSCU-CNI@users.noreply.github.com> Date: Tue, 10 Feb 2026 17:29:47 +0100 Subject: [PATCH 05/11] Apply suggestions from code review Co-authored-by: Erik Schamper <1254028+Schamper@users.noreply.github.com> --- .../sqlite3/encryption/sqlcipher/sqlcipher.py | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index 9b19a67..b111eb9 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -72,25 +72,26 @@ def __init__( hmac_algo: object | None = None, no_kdf: bool = False, ): - self.cipher_fh = fh - self.cipher_path = None - self.cipher_page_size = page_size or self.DEFAULT_PAGE_SIZE - self.kdf_iter = kdf_iter or self.DEFAULT_KDF_ITER - self.kdf_algo = kdf_algo or self.DEFAULT_KDF_ALGO - self.hmac_algo = hmac_algo or self.DEFAULT_HMAC_ALGO - if not HAS_CRYPTO: raise RuntimeError("Missing dependency pycryptodome") if isinstance(fh, Path): - self.cipher_path = fh - self.cipher_fh = fh.open("rb") + cipher_fh = fh.open("rb") + cipher_path = fh + else: + cipher_fh = fh + cipher_path = None + + self.cipher_fh = cipher_fh + self.cipher_path = cipher_path + self.cipher_page_size = page_size or self.DEFAULT_PAGE_SIZE + self.kdf_iter = kdf_iter or self.DEFAULT_KDF_ITER + self.kdf_algo = kdf_algo or self.DEFAULT_KDF_ALGO + self.hmac_algo = hmac_algo or self.DEFAULT_HMAC_ALGO if not hasattr(self.cipher_fh, "read"): raise ValueError("Provided file handle cannot be read from") - self.cipher_fh: BinaryIO - if isinstance(passphrase, str): passphrase = passphrase.encode() @@ -134,10 +135,10 @@ def __init__( def __repr__(self) -> str: return ( f"<{self.__class__.__name__} " - f"fh='{self.cipher_path or self.cipher_fh!s}' " - f"wal='{self.wal!s}' " - f"checkpoint={bool(self.checkpoint)!r} " - f"pages={self.header.page_count!r}>" + f"fh={self.cipher_path or self.cipher_fh} " + f"wal={self.wal} " + f"checkpoint={bool(self.checkpoint)} " + f"pages={self.header.page_count}>" ) def close(self) -> None: @@ -149,14 +150,14 @@ def close(self) -> None: def stream(self) -> SQLCipherStream: """Create an aligned stream of :class:`SQLCipherPage` instances.""" - return SQLCipherStream(self, self.cipher_fh, size=None, align=self.cipher_page_size) + return SQLCipherStream(self) class SQLCipherStream(AlignedStream): - def __init__(self, sqlcipher: SQLCipher, fh: BinaryIO, size: int | None = None, align: int = 4096): - super().__init__(size, align) + def __init__(self, sqlcipher: SQLCipher): + super().__init__(None, self.cipher_page_size) self.sqlcipher = sqlcipher - self.fh = fh + self.fh = fh.cipher_fh self._read_page = lru_cache(4096)(self._read_page) def _read(self, offset: int, length: int) -> bytes: From 2ab28e1f29c5bbff69739f42a7e458d3515a7417 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Tue, 10 Feb 2026 17:39:05 +0100 Subject: [PATCH 06/11] Fix suggestions --- .../database/sqlite3/encryption/sqlcipher/sqlcipher.py | 10 +++++----- dissect/database/sqlite3/sqlite3.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index b111eb9..11f1326 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -6,7 +6,7 @@ from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError from dissect.database.sqlite3.exception import InvalidDatabase -from dissect.database.sqlite3.sqlite3 import SQLite3 +from dissect.database.sqlite3.sqlite3 import SQLITE3_HEADER_MAGIC, SQLite3 from dissect.util.stream import AlignedStream try: @@ -110,7 +110,7 @@ def __init__( self.plaintext_header_size = plaintext_header_size # The default and recommended plaintext header size is 32 bytes. - elif (header_or_salt := self.cipher_fh.read(16)) == b"SQLite format 3\x00": + elif (header_or_salt := self.cipher_fh.read(16)) == SQLITE3_HEADER_MAGIC: self.plaintext_header_size = 32 else: self.plaintext_header_size = None @@ -155,9 +155,9 @@ def stream(self) -> SQLCipherStream: class SQLCipherStream(AlignedStream): def __init__(self, sqlcipher: SQLCipher): - super().__init__(None, self.cipher_page_size) + super().__init__(None, sqlcipher.cipher_page_size) + self.fh = sqlcipher.cipher_fh self.sqlcipher = sqlcipher - self.fh = fh.cipher_fh self._read_page = lru_cache(4096)(self._read_page) def _read(self, offset: int, length: int) -> bytes: @@ -263,7 +263,7 @@ def read(self, size: int | None = None) -> bytes: # Prepend the plaintext header of the SQLite3 database if this is the first page. if self.header_offset == 16: - header = b"SQLite format 3\x00" + header = SQLITE3_HEADER_MAGIC elif self.header_offset: self.sqlcipher.cipher_fh.seek(0) header = self.sqlcipher.cipher_fh.read(self.header_offset) diff --git a/dissect/database/sqlite3/sqlite3.py b/dissect/database/sqlite3/sqlite3.py index 9319684..6ded7a2 100644 --- a/dissect/database/sqlite3/sqlite3.py +++ b/dissect/database/sqlite3/sqlite3.py @@ -3,7 +3,7 @@ import itertools import re from functools import lru_cache -from io import BytesIO, RawIOBase +from io import BytesIO from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO @@ -76,7 +76,7 @@ class SQLite3: def __init__( self, - fh: Path | BinaryIO | RawIOBase, + fh: Path | BinaryIO, wal: WAL | Path | BinaryIO | None = None, checkpoint: Checkpoint | int | None = None, ): From cc7a5273919020a5cdb4b5c95c2ea9ff63c68ba7 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Tue, 10 Feb 2026 17:49:35 +0100 Subject: [PATCH 07/11] use hashlib --- .../sqlite3/encryption/sqlcipher/sqlcipher.py | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index 11f1326..8ed15f6 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -1,5 +1,6 @@ from __future__ import annotations +import hashlib from functools import lru_cache from pathlib import Path from typing import BinaryIO @@ -11,9 +12,6 @@ try: from Crypto.Cipher import AES - from Crypto.Hash import SHA1, SHA256, SHA512 - from Crypto.Hash import new as new_hash - from Crypto.Protocol.KDF import PBKDF2 HAS_CRYPTO = True @@ -56,8 +54,8 @@ class SQLCipher(SQLite3): DEFAULT_PAGE_SIZE: int DEFAULT_KDF_ITER: int - DEFAULT_KDF_ALGO: object - DEFAULT_HMAC_ALGO: object + DEFAULT_KDF_ALGO: str + DEFAULT_HMAC_ALGO: str | None def __init__( self, @@ -68,8 +66,8 @@ def __init__( plaintext_header_size: int | None = None, page_size: int | None = None, kdf_iter: int | None = None, - kdf_algo: object | None = None, - hmac_algo: object | None = None, + kdf_algo: str | None = None, + hmac_algo: str | None = None, no_kdf: bool = False, ): if not HAS_CRYPTO: @@ -99,10 +97,10 @@ def __init__( raise SQLCipherError("No passphrase provided") if isinstance(self.hmac_algo, str): - self.hmac_algo = new_hash(self.hmac_algo) + self.hmac_algo = hashlib.new(self.hmac_algo) if isinstance(self.kdf_algo, str): - self.kdf_algo = new_hash(self.kdf_algo) + self.kdf_algo = hashlib.new(self.kdf_algo) # Part of the header can be plaintext. We can infer that or it can be passed upon initialization. # https://www.zetetic.net/sqlcipher/sqlcipher-api/#cipher_plaintext_header_size @@ -120,7 +118,13 @@ def __init__( self.salt = salt or header_or_salt self.passphrase = passphrase - self.key = self.passphrase if no_kdf else derive_key(self.passphrase, self.salt, self.kdf_iter, self.kdf_algo) + + if no_kdf: + self.key = self.passphrase + else: + self.key = derive_key( + self.passphrase, self.salt, self.kdf_iter, self.kdf_algo.name if self.kdf_algo else None + ) # Initialize the decrypted SQLite3 stream as a file-like object and see if that works. try: @@ -172,28 +176,28 @@ def _read_page(self, page_num: int) -> bytes: class SQLCipher4(SQLCipher): DEFAULT_PAGE_SIZE = 4096 DEFAULT_KDF_ITER = 256_000 - DEFAULT_KDF_ALGO = SHA512 - DEFAULT_HMAC_ALGO = SHA512 + DEFAULT_KDF_ALGO = "SHA512" + DEFAULT_HMAC_ALGO = "SHA512" class SQLCipher3(SQLCipher): DEFAULT_PAGE_SIZE = 1024 DEFAULT_KDF_ITER = 64_000 - DEFAULT_KDF_ALGO = SHA1 - DEFAULT_HMAC_ALGO = SHA1 + DEFAULT_KDF_ALGO = "SHA1" + DEFAULT_HMAC_ALGO = "SHA1" class SQLCipher2(SQLCipher): DEFAULT_PAGE_SIZE = 1024 DEFAULT_KDF_ITER = 4000 - DEFAULT_KDF_ALGO = SHA1 - DEFAULT_HMAC_ALGO = SHA1 + DEFAULT_KDF_ALGO = "SHA1" + DEFAULT_HMAC_ALGO = "SHA1" class SQLCipher1(SQLCipher): DEFAULT_PAGE_SIZE = 1024 DEFAULT_KDF_ITER = 4000 - DEFAULT_KDF_ALGO = SHA1 + DEFAULT_KDF_ALGO = "SHA1" DEFAULT_HMAC_ALGO = None @@ -273,10 +277,10 @@ def read(self, size: int | None = None) -> bytes: return (header + plaintext)[self._pos : size] -def derive_key(passphrase: bytes, salt: bytes, kdf_iter: int, kdf_algo: SHA1 | SHA256 | SHA512) -> bytes: +def derive_key(passphrase: bytes, salt: bytes, kdf_iter: int, kdf_algo: str | None) -> bytes: """Derive the database key as SQLCipher would using PBKDF2.""" - if not kdf_iter and not kdf_algo: + if not kdf_iter or not kdf_algo: return passphrase - return PBKDF2(passphrase, salt, 32, count=kdf_iter, hmac_hash_module=kdf_algo) + return hashlib.pbkdf2_hmac(kdf_algo, passphrase, salt, kdf_iter, 32) From ced997228919b172dcbc86e411f0983730009478 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 11 Feb 2026 10:40:49 +0100 Subject: [PATCH 08/11] Replace SQLCipherPage with SQLCipherStream._read_page --- .../sqlite3/encryption/sqlcipher/sqlcipher.py | 147 ++++++++---------- tests/sqlite3/test_sqlcipher.py | 2 +- 2 files changed, 70 insertions(+), 79 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index 8ed15f6..74f15c8 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -5,10 +5,11 @@ from pathlib import Path from typing import BinaryIO +from dissect.util.stream import AlignedStream + from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError from dissect.database.sqlite3.exception import InvalidDatabase from dissect.database.sqlite3.sqlite3 import SQLITE3_HEADER_MAGIC, SQLite3 -from dissect.util.stream import AlignedStream try: from Crypto.Cipher import AES @@ -158,19 +159,85 @@ def stream(self) -> SQLCipherStream: class SQLCipherStream(AlignedStream): + """Implements a transparent decryption stream in the form of an :class:`AlignedStream` based on + the ``page_size`` for SQLCipher databases.""" + def __init__(self, sqlcipher: SQLCipher): super().__init__(None, sqlcipher.cipher_page_size) + self.fh = sqlcipher.cipher_fh self.sqlcipher = sqlcipher + self._read_page = lru_cache(4096)(self._read_page) def _read(self, offset: int, length: int) -> bytes: + """Calculates which pages to read from based on the given offset and length. Returns decrypted bytes.""" + pages_offset = offset // self.align num_pages = length // self.align return b"".join(self._read_page(num + 1) for num in range(pages_offset, pages_offset + num_pages)) def _read_page(self, page_num: int) -> bytes: - return SQLCipherPage(self.sqlcipher, page_num).read() + """Decrypt and read from the given SQLCipher page number. + + References: + - https://github.com/sqlcipher/sqlcipher-tools/blob/master/decrypt.c + """ + + if page_num < 1: + raise ValueError("The first page number is 1") + + fh = self.sqlcipher.cipher_fh + page_size = self.sqlcipher.cipher_page_size + + # Calculate the absolute offset in the SQLCipher file handle by multiplying the page number with + # the SQLCipher page size. + offset = (page_num - 1) * page_size + + # Calculate size of the page iv (always 16 bytes) plus the hmac digest size. + digest_size = self.sqlcipher.hmac_algo.digest_size if self.sqlcipher.hmac_algo else 0 + align = 16 + digest_size + + # Calculate the size of the encrypted data by substracting the iv and hmac size from the page size. + # The sum of the iv and hmac size needs to be adjusted to 16 byte blocks. + if align % 16 != 0: + align = (align + 15) & ~15 + enc_size = page_size - align + + # By default, the first page 'contains' the database salt (in place of SQLITE_HEAER_MAGIC) so we substract those + # first 16 bytes from the page size and update the ciphertext offset and size accordingly. + header_offset = 0 + header = b"" + if page_num == 1: + header_offset = self.sqlcipher.plaintext_header_size or 16 + enc_size -= header_offset + offset += header_offset + + # Prepare the plaintext header of the SQLite3 database if this is the first page, or read the plaintext + # header according to the plaintext_header_size variable. + if header_offset == 16: + header = SQLITE3_HEADER_MAGIC + elif header_offset: + fh.seek(0) + header = fh.read(header_offset) + + # The last part of the page contains the iv and optionally a hmac digest. + fh.seek(offset + enc_size) + iv = fh.read(16) + _mac = fh.read(digest_size) if digest_size else None + + fh.seek(offset) + ciphertext = fh.read(enc_size) + + if len(iv) != 16 or not ciphertext: + raise EOFError + + # Decrypt the ciphertext using AES CBC and append null bytes so the plaintext aligns with the page size. + cipher = AES.new(self.sqlcipher.key, AES.MODE_CBC, iv) + plaintext = cipher.decrypt(ciphertext) + (align * b"\x00") + + # Return the plaintext prepended by the optional plaintext header. + return header + plaintext class SQLCipher4(SQLCipher): @@ -201,82 +268,6 @@ class SQLCipher1(SQLCipher): DEFAULT_HMAC_ALGO = None -class SQLCipherPage: - """Represents a single SQLCipher page. Acts as if it is a BytesIO object to read from.""" - - def __init__(self, sqlcipher: SQLCipher, page_num: int) -> None: - self.sqlcipher = sqlcipher - self.page_num = page_num - self.offset = (page_num - 1) * sqlcipher.cipher_page_size - - # Calculate size of page iv (always 16 bytes) plus hmac digest size - self.align = 16 + (sqlcipher.hmac_algo.digest_size if sqlcipher.hmac_algo else 0) - - # Calculate the size of the encrypted data by substracting the iv+hmac size - # from the page size. The iv+hmac size needs to be adjusted to 16 byte blocks. - if self.align % 16 != 0: - self.align = (self.align + 15) & ~15 - self.enc_size = sqlcipher.cipher_page_size - self.align - - # The first page 'contains' the database salt so substract those first 16 bytes - # from the page size and set the file handle forward accordingly. - if page_num == 1: - self.header_offset = sqlcipher.plaintext_header_size or 16 - self.enc_size -= self.header_offset - self.offset += self.header_offset - else: - self.header_offset = 0 - - # The last part of the page contains the iv and optionally hmac. - sqlcipher.cipher_fh.seek(self.offset + self.enc_size) - self.iv = sqlcipher.cipher_fh.read(16) - self.mac = sqlcipher.cipher_fh.read(sqlcipher.hmac_algo.digest_size) if sqlcipher.hmac_algo else None - - if len(self.iv) != 16: - raise EOFError - - self._pos = 0 - - def __repr__(self) -> str: - return f"" - - def seek(self, pos: int, whence: int = 0) -> None: - self._pos = pos - - def tell(self) -> int: - return self._pos - - def read(self, size: int | None = None) -> bytes: - """Plaintext reader of this page.""" - - if size == -1: - size = None - - self.sqlcipher.cipher_fh.seek(self.offset) - encrypted = self.sqlcipher.cipher_fh.read(self.enc_size) - - # We could have reached the end of the database if no more pages are left to read. - if not encrypted: - raise EOFError - - cipher = AES.new(self.sqlcipher.key, AES.MODE_CBC, self.iv) - - # Append null bytes so the plaintext aligns with the page size. - # https://github.com/sqlcipher/sqlcipher-tools/blob/master/decrypt.c - plaintext = cipher.decrypt(encrypted) + (self.align * b"\x00") - - # Prepend the plaintext header of the SQLite3 database if this is the first page. - if self.header_offset == 16: - header = SQLITE3_HEADER_MAGIC - elif self.header_offset: - self.sqlcipher.cipher_fh.seek(0) - header = self.sqlcipher.cipher_fh.read(self.header_offset) - else: - header = b"" - - return (header + plaintext)[self._pos : size] - - def derive_key(passphrase: bytes, salt: bytes, kdf_iter: int, kdf_algo: str | None) -> bytes: """Derive the database key as SQLCipher would using PBKDF2.""" diff --git a/tests/sqlite3/test_sqlcipher.py b/tests/sqlite3/test_sqlcipher.py index 56747df..f48ad69 100644 --- a/tests/sqlite3/test_sqlcipher.py +++ b/tests/sqlite3/test_sqlcipher.py @@ -3,9 +3,9 @@ from typing import TYPE_CHECKING import pytest + from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError from dissect.database.sqlite3.encryption.sqlcipher.sqlcipher import SQLCipher1, SQLCipher2, SQLCipher3, SQLCipher4 - from tests._util import absolute_path if TYPE_CHECKING: From cb7cefe7eb5e0475aef95ee2a1679a5250324d21 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 11 Feb 2026 10:47:37 +0100 Subject: [PATCH 09/11] fix docstring --- dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index 74f15c8..e8ac45b 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -41,8 +41,8 @@ class SQLCipher(SQLite3): plaintext_header_size (int): Size of plaintext header to use. page_size (int): Override size of each page. kdf_iter (int): Override amount of KDF iterations. - kdf_algo (str | Crypto.Hash): Override KDF digest alrorithm. - hmac_algo (str | Crypto.Hash): Override HMAC digest algorithm. + kdf_algo (str | hashlib._Hash): Override KDF digest alrorithm. + hmac_algo (str | hashlib._Hash): Override HMAC digest algorithm. no_kdf (bool): Disable KDF from passphrase, use as raw key. Raises: From f2c21c10901f4ced03cc5feb0d9b72bb1115077e Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 11 Feb 2026 13:04:38 +0100 Subject: [PATCH 10/11] Add HMAC verification --- .../sqlite3/encryption/sqlcipher/sqlcipher.py | 36 +++++++++++++++---- tests/sqlite3/test_sqlcipher.py | 8 ++--- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index e8ac45b..3e8b221 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -1,6 +1,7 @@ from __future__ import annotations import hashlib +import hmac from functools import lru_cache from pathlib import Path from typing import BinaryIO @@ -27,7 +28,6 @@ class SQLCipher(SQLite3): or :class:`SQLCipher1`. Decrypts a SQLCipher database from the given path or file-like oject. - HMAC key derivation and tag verification is currently not implemented. Example usage: >>> from dissect.database.sqlite3.encryption import SQLCipher4 @@ -44,6 +44,7 @@ class SQLCipher(SQLite3): kdf_algo (str | hashlib._Hash): Override KDF digest alrorithm. hmac_algo (str | hashlib._Hash): Override HMAC digest algorithm. no_kdf (bool): Disable KDF from passphrase, use as raw key. + verify_hmac (bool): Optionally verify digest of every page. Raises: SQLCipherError: If decryption failed using the provided arguments. @@ -70,6 +71,7 @@ def __init__( kdf_algo: str | None = None, hmac_algo: str | None = None, no_kdf: bool = False, + verify_hmac: bool = False, ): if not HAS_CRYPTO: raise RuntimeError("Missing dependency pycryptodome") @@ -87,6 +89,7 @@ def __init__( self.kdf_iter = kdf_iter or self.DEFAULT_KDF_ITER self.kdf_algo = kdf_algo or self.DEFAULT_KDF_ALGO self.hmac_algo = hmac_algo or self.DEFAULT_HMAC_ALGO + self.verify_hmac = verify_hmac if not hasattr(self.cipher_fh, "read"): raise ValueError("Provided file handle cannot be read from") @@ -127,10 +130,14 @@ def __init__( self.passphrase, self.salt, self.kdf_iter, self.kdf_algo.name if self.kdf_algo else None ) + # The hmac key is derived using the raw or derived database key with it's own salt and two kdf iterations. + self.hmac_salt = bytes(i ^ 0x3A for i in self.salt) + self.hmac_key = derive_key(self.key, self.hmac_salt, 2, self.hmac_algo.name if self.hmac_algo else None) + # Initialize the decrypted SQLite3 stream as a file-like object and see if that works. try: super().__init__(self.stream(), wal=None, checkpoint=None) - except InvalidDatabase as e: + except (InvalidDatabase, SQLCipherError) as e: raise SQLCipherError("Decryption of SQLCipher database failed or is not a database") from e # Sanity check to prevent further issues down the line. @@ -175,9 +182,12 @@ def _read(self, offset: int, length: int) -> bytes: pages_offset = offset // self.align num_pages = length // self.align - return b"".join(self._read_page(num + 1) for num in range(pages_offset, pages_offset + num_pages)) + return b"".join( + self._read_page(num + 1, self.sqlcipher.verify_hmac) + for num in range(pages_offset, pages_offset + num_pages) + ) - def _read_page(self, page_num: int) -> bytes: + def _read_page(self, page_num: int, verify_hmac: bool = False) -> bytes: """Decrypt and read from the given SQLCipher page number. References: @@ -195,7 +205,8 @@ def _read_page(self, page_num: int) -> bytes: offset = (page_num - 1) * page_size # Calculate size of the page iv (always 16 bytes) plus the hmac digest size. - digest_size = self.sqlcipher.hmac_algo.digest_size if self.sqlcipher.hmac_algo else 0 + hmac_algo = self.sqlcipher.hmac_algo + digest_size = hmac_algo.digest_size if hmac_algo else 0 align = 16 + digest_size # Calculate the size of the encrypted data by substracting the iv and hmac size from the page size. @@ -224,7 +235,7 @@ def _read_page(self, page_num: int) -> bytes: # The last part of the page contains the iv and optionally a hmac digest. fh.seek(offset + enc_size) iv = fh.read(16) - _mac = fh.read(digest_size) if digest_size else None + page_hmac = fh.read(digest_size) if digest_size else None fh.seek(offset) ciphertext = fh.read(enc_size) @@ -232,6 +243,19 @@ def _read_page(self, page_num: int) -> bytes: if len(iv) != 16 or not ciphertext: raise EOFError + # Optionally verify the hmac signature with the page's ciphertext. Assumes default CIPHER_FLAG_LE_PGNO. + # https://github.com/sqlcipher/sqlcipher-tools/blob/master/verify.c + # https://github.com/sqlcipher/sqlcipher/blob/master/src/sqlcipher.c @ sqlcipher_page_hmac + if verify_hmac: + if not hmac_algo: + raise ValueError("verify_hmac is set to True but no HMAC algorithm is selected") + + hmac_msg = ciphertext + iv + page_num.to_bytes(4, "little") + calc_hmac = hmac.digest(self.sqlcipher.hmac_key, hmac_msg, hmac_algo.name) + + if calc_hmac != page_hmac: + raise SQLCipherError(f"HMAC digest mismatch for page {page_num}") + # Decrypt the ciphertext using AES CBC and append null bytes so the plaintext aligns with the page size. cipher = AES.new(self.sqlcipher.key, AES.MODE_CBC, iv) plaintext = cipher.decrypt(ciphertext) + (align * b"\x00") diff --git a/tests/sqlite3/test_sqlcipher.py b/tests/sqlite3/test_sqlcipher.py index f48ad69..707763d 100644 --- a/tests/sqlite3/test_sqlcipher.py +++ b/tests/sqlite3/test_sqlcipher.py @@ -41,9 +41,9 @@ def _assert_sqlite_db(sqlite: SQLite3) -> None: ("cipher", "kwargs", "path_str"), [ # Defaults per major version - pytest.param(SQLCipher4, {}, "aes256_hmac_sha512_kdf_256000.sqlite", id="version-4-default"), - pytest.param(SQLCipher3, {}, "aes256_hmac_sha1_kdf_64000.sqlite", id="version-3-default"), - pytest.param(SQLCipher2, {}, "aes256_hmac_sha1_kdf_4000.sqlite", id="version-2-default"), + pytest.param(SQLCipher4, {"verify_hmac": True}, "aes256_hmac_sha512_kdf_256000.sqlite", id="version-4-default"), + pytest.param(SQLCipher3, {"verify_hmac": True}, "aes256_hmac_sha1_kdf_64000.sqlite", id="version-3-default"), + pytest.param(SQLCipher2, {"verify_hmac": True}, "aes256_hmac_sha1_kdf_4000.sqlite", id="version-2-default"), pytest.param(SQLCipher1, {}, "aes256_hmac_none_kdf_4000.sqlite", id="version-1-default"), # Custom parameters pytest.param( @@ -92,5 +92,5 @@ def test_decrypt_community_plaintext_header() -> None: with pytest.raises(SQLCipherError, match="Decryption of SQLCipher database failed"): SQLCipher4(path, "invalid passphrase", salt=salt) - sqlcipher = SQLCipher4(path, "passphrase", salt=salt) + sqlcipher = SQLCipher4(path, "passphrase", salt=salt, verify_hmac=True) _assert_sqlite_db(sqlcipher) From 3ecb3562b374ae026f436c6564aee119a32a8099 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Fri, 13 Feb 2026 14:50:40 +0100 Subject: [PATCH 11/11] Small tweaks --- .../sqlite3/encryption/sqlcipher/sqlcipher.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py index 3e8b221..4aee06a 100644 --- a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -161,13 +161,12 @@ def close(self) -> None: self.cipher_fh.close() def stream(self) -> SQLCipherStream: - """Create an aligned stream of :class:`SQLCipherPage` instances.""" + """Create a transparent decryption stream.""" return SQLCipherStream(self) class SQLCipherStream(AlignedStream): - """Implements a transparent decryption stream in the form of an :class:`AlignedStream` based on - the ``page_size`` for SQLCipher databases.""" + """Implements a transparent decryption stream for SQLCipher databases.""" def __init__(self, sqlcipher: SQLCipher): super().__init__(None, sqlcipher.cipher_page_size) @@ -180,11 +179,10 @@ def __init__(self, sqlcipher: SQLCipher): def _read(self, offset: int, length: int) -> bytes: """Calculates which pages to read from based on the given offset and length. Returns decrypted bytes.""" - pages_offset = offset // self.align + start_page = offset // self.align num_pages = length // self.align return b"".join( - self._read_page(num + 1, self.sqlcipher.verify_hmac) - for num in range(pages_offset, pages_offset + num_pages) + self._read_page(num + 1, self.sqlcipher.verify_hmac) for num in range(start_page, start_page + num_pages) ) def _read_page(self, page_num: int, verify_hmac: bool = False) -> bytes: @@ -254,7 +252,9 @@ def _read_page(self, page_num: int, verify_hmac: bool = False) -> bytes: calc_hmac = hmac.digest(self.sqlcipher.hmac_key, hmac_msg, hmac_algo.name) if calc_hmac != page_hmac: - raise SQLCipherError(f"HMAC digest mismatch for page {page_num}") + raise SQLCipherError( + f"HMAC digest mismatch for page {page_num} (expected {page_hmac.hex()}, got {calc_hmac.hex()})" + ) # Decrypt the ciphertext using AES CBC and append null bytes so the plaintext aligns with the page size. cipher = AES.new(self.sqlcipher.key, AES.MODE_CBC, iv)