diff --git a/dissect/database/sqlite3/encryption/__init__.py b/dissect/database/sqlite3/encryption/__init__.py new file mode 100644 index 0000000..551bebd --- /dev/null +++ b/dissect/database/sqlite3/encryption/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from dissect.database.sqlite3.encryption.sqlcipher.sqlcipher import SQLCipher1, SQLCipher2, SQLCipher3, SQLCipher4 + +__all__ = [ + "SQLCipher1", + "SQLCipher2", + "SQLCipher3", + "SQLCipher4", +] diff --git a/dissect/database/sqlite3/encryption/sqlcipher/__init__.py b/dissect/database/sqlite3/encryption/sqlcipher/__init__.py new file mode 100644 index 0000000..551bebd --- /dev/null +++ b/dissect/database/sqlite3/encryption/sqlcipher/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from dissect.database.sqlite3.encryption.sqlcipher.sqlcipher import SQLCipher1, SQLCipher2, SQLCipher3, SQLCipher4 + +__all__ = [ + "SQLCipher1", + "SQLCipher2", + "SQLCipher3", + "SQLCipher4", +] diff --git a/dissect/database/sqlite3/encryption/sqlcipher/exception.py b/dissect/database/sqlite3/encryption/sqlcipher/exception.py new file mode 100644 index 0000000..428da91 --- /dev/null +++ b/dissect/database/sqlite3/encryption/sqlcipher/exception.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +from dissect.database.exception import Error + + +class SQLCipherError(Error): + pass diff --git a/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py new file mode 100644 index 0000000..4aee06a --- /dev/null +++ b/dissect/database/sqlite3/encryption/sqlcipher/sqlcipher.py @@ -0,0 +1,301 @@ +from __future__ import annotations + +import hashlib +import hmac +from functools import lru_cache +from pathlib import Path +from typing import BinaryIO + +from dissect.util.stream import AlignedStream + +from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError +from dissect.database.sqlite3.exception import InvalidDatabase +from dissect.database.sqlite3.sqlite3 import SQLITE3_HEADER_MAGIC, SQLite3 + +try: + from Crypto.Cipher import AES + + HAS_CRYPTO = True + +except ImportError: + HAS_CRYPTO = False + + +class SQLCipher(SQLite3): + """SQLCipher Community Edition implementation. + + Instantiate with a subclass from :class:`SQLCipher4`, :class:`SQLCipher3`, :class:`SQLCipher2` + or :class:`SQLCipher1`. + + Decrypts a SQLCipher database from the given path or file-like oject. + + Example usage: + >>> from dissect.database.sqlite3.encryption import SQLCipher4 + >>> db = SQLCipher4(Path("file.db"), "passphrase") + >>> row = db.table("MyTable").row(0) + + Args: + fh (Path | BinaryIO): The path or file-like object to open. + passphrase (str | bytes): String or bytes passphrase. + salt (bytes): Optionally provide the 16-byte salt directly. + plaintext_header_size (int): Size of plaintext header to use. + page_size (int): Override size of each page. + kdf_iter (int): Override amount of KDF iterations. + kdf_algo (str | hashlib._Hash): Override KDF digest alrorithm. + hmac_algo (str | hashlib._Hash): Override HMAC digest algorithm. + no_kdf (bool): Disable KDF from passphrase, use as raw key. + verify_hmac (bool): Optionally verify digest of every page. + + Raises: + SQLCipherError: If decryption failed using the provided arguments. + + References: + - https://www.zetetic.net/sqlcipher/design/ + - https://github.com/sqlcipher/sqlcipher + """ + + DEFAULT_PAGE_SIZE: int + DEFAULT_KDF_ITER: int + DEFAULT_KDF_ALGO: str + DEFAULT_HMAC_ALGO: str | None + + def __init__( + self, + fh: Path | BinaryIO, + passphrase: str | bytes, + *, + salt: bytes | None = None, + plaintext_header_size: int | None = None, + page_size: int | None = None, + kdf_iter: int | None = None, + kdf_algo: str | None = None, + hmac_algo: str | None = None, + no_kdf: bool = False, + verify_hmac: bool = False, + ): + if not HAS_CRYPTO: + raise RuntimeError("Missing dependency pycryptodome") + + if isinstance(fh, Path): + cipher_fh = fh.open("rb") + cipher_path = fh + else: + cipher_fh = fh + cipher_path = None + + self.cipher_fh = cipher_fh + self.cipher_path = cipher_path + self.cipher_page_size = page_size or self.DEFAULT_PAGE_SIZE + self.kdf_iter = kdf_iter or self.DEFAULT_KDF_ITER + self.kdf_algo = kdf_algo or self.DEFAULT_KDF_ALGO + self.hmac_algo = hmac_algo or self.DEFAULT_HMAC_ALGO + self.verify_hmac = verify_hmac + + if not hasattr(self.cipher_fh, "read"): + raise ValueError("Provided file handle cannot be read from") + + if isinstance(passphrase, str): + passphrase = passphrase.encode() + + if not passphrase: + raise SQLCipherError("No passphrase provided") + + if isinstance(self.hmac_algo, str): + self.hmac_algo = hashlib.new(self.hmac_algo) + + if isinstance(self.kdf_algo, str): + self.kdf_algo = hashlib.new(self.kdf_algo) + + # Part of the header can be plaintext. We can infer that or it can be passed upon initialization. + # https://www.zetetic.net/sqlcipher/sqlcipher-api/#cipher_plaintext_header_size + if plaintext_header_size: + self.plaintext_header_size = plaintext_header_size + + # The default and recommended plaintext header size is 32 bytes. + elif (header_or_salt := self.cipher_fh.read(16)) == SQLITE3_HEADER_MAGIC: + self.plaintext_header_size = 32 + else: + self.plaintext_header_size = None + + if self.plaintext_header_size and not salt: + raise SQLCipherError("Plaintext header has no salt, please provide salt manually") + + self.salt = salt or header_or_salt + self.passphrase = passphrase + + if no_kdf: + self.key = self.passphrase + else: + self.key = derive_key( + self.passphrase, self.salt, self.kdf_iter, self.kdf_algo.name if self.kdf_algo else None + ) + + # The hmac key is derived using the raw or derived database key with it's own salt and two kdf iterations. + self.hmac_salt = bytes(i ^ 0x3A for i in self.salt) + self.hmac_key = derive_key(self.key, self.hmac_salt, 2, self.hmac_algo.name if self.hmac_algo else None) + + # Initialize the decrypted SQLite3 stream as a file-like object and see if that works. + try: + super().__init__(self.stream(), wal=None, checkpoint=None) + except (InvalidDatabase, SQLCipherError) as e: + raise SQLCipherError("Decryption of SQLCipher database failed or is not a database") from e + + # Sanity check to prevent further issues down the line. + if self.header.page_size != self.cipher_page_size or self.header.schema_format_number not in (1, 2, 3, 4): + raise SQLCipherError("Decryption of SQLCipher database failed or is not a database") + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__} " + f"fh={self.cipher_path or self.cipher_fh} " + f"wal={self.wal} " + f"checkpoint={bool(self.checkpoint)} " + f"pages={self.header.page_count}>" + ) + + def close(self) -> None: + """Close the database.""" + super().close() + # Only close DB handle if we opened it using a path + if self.cipher_path is not None: + self.cipher_fh.close() + + def stream(self) -> SQLCipherStream: + """Create a transparent decryption stream.""" + return SQLCipherStream(self) + + +class SQLCipherStream(AlignedStream): + """Implements a transparent decryption stream for SQLCipher databases.""" + + def __init__(self, sqlcipher: SQLCipher): + super().__init__(None, sqlcipher.cipher_page_size) + + self.fh = sqlcipher.cipher_fh + self.sqlcipher = sqlcipher + + self._read_page = lru_cache(4096)(self._read_page) + + def _read(self, offset: int, length: int) -> bytes: + """Calculates which pages to read from based on the given offset and length. Returns decrypted bytes.""" + + start_page = offset // self.align + num_pages = length // self.align + return b"".join( + self._read_page(num + 1, self.sqlcipher.verify_hmac) for num in range(start_page, start_page + num_pages) + ) + + def _read_page(self, page_num: int, verify_hmac: bool = False) -> bytes: + """Decrypt and read from the given SQLCipher page number. + + References: + - https://github.com/sqlcipher/sqlcipher-tools/blob/master/decrypt.c + """ + + if page_num < 1: + raise ValueError("The first page number is 1") + + fh = self.sqlcipher.cipher_fh + page_size = self.sqlcipher.cipher_page_size + + # Calculate the absolute offset in the SQLCipher file handle by multiplying the page number with + # the SQLCipher page size. + offset = (page_num - 1) * page_size + + # Calculate size of the page iv (always 16 bytes) plus the hmac digest size. + hmac_algo = self.sqlcipher.hmac_algo + digest_size = hmac_algo.digest_size if hmac_algo else 0 + align = 16 + digest_size + + # Calculate the size of the encrypted data by substracting the iv and hmac size from the page size. + # The sum of the iv and hmac size needs to be adjusted to 16 byte blocks. + if align % 16 != 0: + align = (align + 15) & ~15 + enc_size = page_size - align + + # By default, the first page 'contains' the database salt (in place of SQLITE_HEAER_MAGIC) so we substract those + # first 16 bytes from the page size and update the ciphertext offset and size accordingly. + header_offset = 0 + header = b"" + if page_num == 1: + header_offset = self.sqlcipher.plaintext_header_size or 16 + enc_size -= header_offset + offset += header_offset + + # Prepare the plaintext header of the SQLite3 database if this is the first page, or read the plaintext + # header according to the plaintext_header_size variable. + if header_offset == 16: + header = SQLITE3_HEADER_MAGIC + elif header_offset: + fh.seek(0) + header = fh.read(header_offset) + + # The last part of the page contains the iv and optionally a hmac digest. + fh.seek(offset + enc_size) + iv = fh.read(16) + page_hmac = fh.read(digest_size) if digest_size else None + + fh.seek(offset) + ciphertext = fh.read(enc_size) + + if len(iv) != 16 or not ciphertext: + raise EOFError + + # Optionally verify the hmac signature with the page's ciphertext. Assumes default CIPHER_FLAG_LE_PGNO. + # https://github.com/sqlcipher/sqlcipher-tools/blob/master/verify.c + # https://github.com/sqlcipher/sqlcipher/blob/master/src/sqlcipher.c @ sqlcipher_page_hmac + if verify_hmac: + if not hmac_algo: + raise ValueError("verify_hmac is set to True but no HMAC algorithm is selected") + + hmac_msg = ciphertext + iv + page_num.to_bytes(4, "little") + calc_hmac = hmac.digest(self.sqlcipher.hmac_key, hmac_msg, hmac_algo.name) + + if calc_hmac != page_hmac: + raise SQLCipherError( + f"HMAC digest mismatch for page {page_num} (expected {page_hmac.hex()}, got {calc_hmac.hex()})" + ) + + # Decrypt the ciphertext using AES CBC and append null bytes so the plaintext aligns with the page size. + cipher = AES.new(self.sqlcipher.key, AES.MODE_CBC, iv) + plaintext = cipher.decrypt(ciphertext) + (align * b"\x00") + + # Return the plaintext prepended by the optional plaintext header. + return header + plaintext + + +class SQLCipher4(SQLCipher): + DEFAULT_PAGE_SIZE = 4096 + DEFAULT_KDF_ITER = 256_000 + DEFAULT_KDF_ALGO = "SHA512" + DEFAULT_HMAC_ALGO = "SHA512" + + +class SQLCipher3(SQLCipher): + DEFAULT_PAGE_SIZE = 1024 + DEFAULT_KDF_ITER = 64_000 + DEFAULT_KDF_ALGO = "SHA1" + DEFAULT_HMAC_ALGO = "SHA1" + + +class SQLCipher2(SQLCipher): + DEFAULT_PAGE_SIZE = 1024 + DEFAULT_KDF_ITER = 4000 + DEFAULT_KDF_ALGO = "SHA1" + DEFAULT_HMAC_ALGO = "SHA1" + + +class SQLCipher1(SQLCipher): + DEFAULT_PAGE_SIZE = 1024 + DEFAULT_KDF_ITER = 4000 + DEFAULT_KDF_ALGO = "SHA1" + DEFAULT_HMAC_ALGO = None + + +def derive_key(passphrase: bytes, salt: bytes, kdf_iter: int, kdf_algo: str | None) -> bytes: + """Derive the database key as SQLCipher would using PBKDF2.""" + + if not kdf_iter or not kdf_algo: + return passphrase + + return hashlib.pbkdf2_hmac(kdf_algo, passphrase, salt, kdf_iter, 32) diff --git a/dissect/database/sqlite3/sqlite3.py b/dissect/database/sqlite3/sqlite3.py index ccdb0e4..6ded7a2 100644 --- a/dissect/database/sqlite3/sqlite3.py +++ b/dissect/database/sqlite3/sqlite3.py @@ -131,6 +131,9 @@ def __init__( self.page = lru_cache(256)(self.page) + def __repr__(self) -> str: + return f"" # noqa: E501 + def __enter__(self) -> Self: """Return ``self`` upon entering the runtime context.""" return self diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_none_kdf_4000.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_none_kdf_4000.sqlite new file mode 100755 index 0000000..c156d06 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_none_kdf_4000.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:967db750c86483aa6a3b8a8341a4b526dddf382bb65c257d568f0f5f70c44eef +size 3072 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_4000.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_4000.sqlite new file mode 100755 index 0000000..b5d29a8 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_4000.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:82d350da85ecc2e616177f3cec2c617ca07428d8eb5f81c377d24f45a8012655 +size 3072 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_64000.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_64000.sqlite new file mode 100755 index 0000000..586053d --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha1_kdf_64000.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7445a296be286a6f80b3c6aeced3ce6c97a56b0b139086771b3a2300e99b635c +size 3072 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite new file mode 100755 index 0000000..fd06b23 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:da9a1bf147755fdb52bcde5e015c3595600476261676187b9001ddf28ae4c30e +size 24576 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000.sqlite new file mode 100755 index 0000000..3f48e58 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9a069e4540a63f41620dc4c1448d88e1cc7862f17e86bc27db0d7c772fa1f3e1 +size 12288 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite new file mode 100755 index 0000000..5036724 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ed4415bfb01407ffaf09f38bbbdb820171e2db31218687c50cb5f56e67e554ce +size 12288 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/create.sql b/tests/_data/sqlite3/encryption/sqlcipher/create.sql new file mode 100755 index 0000000..f5b7467 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/create.sql @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6aba30f71b43c6a36461a80d8f95204c64add1a0f395d63ca3e55ab80c38901f +size 1088 diff --git a/tests/_data/sqlite3/encryption/sqlcipher/plaintext.sqlite b/tests/_data/sqlite3/encryption/sqlcipher/plaintext.sqlite new file mode 100755 index 0000000..ab579b1 --- /dev/null +++ b/tests/_data/sqlite3/encryption/sqlcipher/plaintext.sqlite @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:aceca4caa08647907e1a3abeeef3ae1d8cccaad30e4046e88b43510e582b9b5e +size 12288 diff --git a/tests/sqlite3/test_sqlcipher.py b/tests/sqlite3/test_sqlcipher.py new file mode 100644 index 0000000..707763d --- /dev/null +++ b/tests/sqlite3/test_sqlcipher.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from dissect.database.sqlite3.encryption.sqlcipher.exception import SQLCipherError +from dissect.database.sqlite3.encryption.sqlcipher.sqlcipher import SQLCipher1, SQLCipher2, SQLCipher3, SQLCipher4 +from tests._util import absolute_path + +if TYPE_CHECKING: + from collections.abc import Callable + + from dissect.database.sqlite3.sqlite3 import SQLite3 + + +def _assert_sqlite_db(sqlite: SQLite3) -> None: + table = sqlite.table("Movies") + assert table.sql == ( + 'CREATE TABLE "Movies" (\n' + '\t"ID"\tINTEGER,\n' + '\t"Title"\tTEXT NOT NULL,\n' + '\t"Year"\tINTEGER NOT NULL,\n' + '\t"Director"\tTEXT NOT NULL,\n' + '\t"Rating"\tINTEGER,\n' + '\tPRIMARY KEY("ID" AUTOINCREMENT)\n' + ")" + ) + + movies = list(table.rows()) + assert len(movies) == 11 + + assert movies[-1].ID == 11 + assert movies[-1].Title == "The Good, the Bad and the Ugly" + assert movies[-1].Year == 1966 + assert movies[-1].Director == "Sergio Leone" + assert movies[-1].Rating == 8.8 + + +@pytest.mark.parametrize( + ("cipher", "kwargs", "path_str"), + [ + # Defaults per major version + pytest.param(SQLCipher4, {"verify_hmac": True}, "aes256_hmac_sha512_kdf_256000.sqlite", id="version-4-default"), + pytest.param(SQLCipher3, {"verify_hmac": True}, "aes256_hmac_sha1_kdf_64000.sqlite", id="version-3-default"), + pytest.param(SQLCipher2, {"verify_hmac": True}, "aes256_hmac_sha1_kdf_4000.sqlite", id="version-2-default"), + pytest.param(SQLCipher1, {}, "aes256_hmac_none_kdf_4000.sqlite", id="version-1-default"), + # Custom parameters + pytest.param( + SQLCipher4, + { + "page_size": 8192, + "hmac_algo": "sha256", + "kdf_algo": "sha1", + "kdf_iter": 1337, + }, + "aes256_hmac_sha256_kdf_sha1_1337_page_8kb.sqlite", + id="version-4-custom-hmac-sha256-kdf-sha1-1337-page-8kb", + ), + ], +) +def test_decrypt_community(cipher: Callable, path_str: str, kwargs: dict) -> None: + """Test if we can parse a SQLCipher (4.5.6 community) encrypted database.""" + + path = absolute_path("_data/sqlite3/encryption/sqlcipher/" + path_str) + + with pytest.raises(SQLCipherError, match="Decryption of SQLCipher database failed"): + cipher(path, "invalid passphrase", **kwargs) + + # Test context manager + with cipher(path, "passphrase", **kwargs) as sqlcipher: + assert sqlcipher.stream().read(20) in ( + b"SQLite format 3\x00\x04\x00\x01\x01", # 1024 + b"SQLite format 3\x00\x10\x00\x01\x01", # 4096 + b"SQLite format 3\x00\x20\x00\x01\x01", # 8192 + ) + _assert_sqlite_db(sqlcipher) + + +def test_decrypt_community_plaintext_header() -> None: + """Test if we can parse and decrypt a SQLCipher 4.5.6 database with a 32-byte plaintext header.""" + + path = absolute_path("_data/sqlite3/encryption/sqlcipher/aes256_hmac_sha512_kdf_256000_plain_header.sqlite") + salt = bytes.fromhex("01010101010101010101010101010101") + + with pytest.raises(SQLCipherError, match="No passphrase provided"): + SQLCipher4(path, "") + + with pytest.raises(SQLCipherError, match="Plaintext header has no salt, please provide salt manually"): + SQLCipher4(path, "invalid passphrase") + + with pytest.raises(SQLCipherError, match="Decryption of SQLCipher database failed"): + SQLCipher4(path, "invalid passphrase", salt=salt) + + sqlcipher = SQLCipher4(path, "passphrase", salt=salt, verify_hmac=True) + _assert_sqlite_db(sqlcipher)