Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions mabel/data/internals/group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""
from collections import defaultdict

from siphashc import siphash
from xxhash import xxh3_64_intdigest


def summer(x, y):
Expand All @@ -25,7 +25,7 @@ def summer(x, y):
"AVG": lambda x, y: 1,
}

HASH_SEED = b"Anakin Skywalker"
HASH_SEED = 42


class TooManyGroups(Exception):
Expand Down Expand Up @@ -72,14 +72,14 @@ def _map(self, collect_columns):

for record in self._dictset:
try:
group_key: int = siphash(
HASH_SEED,
group_key: int = xxh3_64_intdigest(
"".join([str(record[column]) for column in self._columns]),
HASH_SEED
)
except KeyError:
group_key: int = siphash(
HASH_SEED,
group_key: int = xxh3_64_intdigest(
"".join([f"{record.get(column, '')}" for column in self._columns]),
HASH_SEED,
Comment on lines +80 to +82

Copilot AI Sep 4, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The xxh3_64_intdigest function call has an extra comma after HASH_SEED. This should be removed to match the function signature and the usage pattern on lines 75-77.

Suggested change
group_key: int = xxh3_64_intdigest(
"".join([f"{record.get(column, '')}" for column in self._columns]),
HASH_SEED,
HASH_SEED

Copilot uses AI. Check for mistakes.
)
if group_key not in self._group_keys.keys():
self._group_keys[group_key] = [
Expand Down
12 changes: 7 additions & 5 deletions mabel/data/readers/internals/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

Cursor is made of three parts:
- map : a bit array representing all of the blobs in the set - unread blobs
are 0s and read blobs are 1s. This allows for blobs to be read in
are 0s and read blobs are 1s. This allows for blobs to be read in
an arbitrary order - although currently only implemented linearly.
- partition: the active parition (blob) that is being read
- location : the record in the active partition (blob), so we can resume reading
midway through the blob if required.
"""

import orjson
from orso.cityhash import CityHash64
from xxhash import xxh3_64_intdigest


class InvalidCursor(Exception):
Expand Down Expand Up @@ -47,7 +47,9 @@ def load_cursor(self, cursor):

self.location = cursor["location"]
find_partition = [
blob for blob in self.readable_blobs if CityHash64(blob) == cursor["partition"]
blob
for blob in self.readable_blobs
if xxh3_64_intdigest(blob, 0) == cursor["partition"]
]
if len(find_partition) == 1:
self.partition = find_partition[0]
Expand All @@ -67,7 +69,7 @@ def next_blob(self, previous_blob=None):
if self.partition in self.readable_blobs:
return self.partition
partition_finder = [
blob for blob in self.readable_blobs if CityHash64(blob) == self.partition
blob for blob in self.readable_blobs if xxh3_64_intdigest(blob, 0) == self.partition
]
if len(partition_finder) != 1:
raise ValueError(f"Unable to determine current partition ({self.partition})")
Expand Down Expand Up @@ -103,7 +105,7 @@ def __getitem__(self, item):
)
return blob_map.tobytes().hex()
if item == "partition":
return CityHash64(self.partition)
return xxh3_64_intdigest(self.partition, 0)
if item == "location":
return self.location
return None
Expand Down
4 changes: 1 addition & 3 deletions mabel/data/readers/internals/threaded_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""

"""
""" """

import logging
import threading
Expand Down
2 changes: 1 addition & 1 deletion mabel/data/validator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


def schema_loader(
definition: Union[str, List[Dict[str, Any]], dict, RelationSchema, bool]
definition: Union[str, List[Dict[str, Any]], dict, RelationSchema, bool],
) -> Union[RelationSchema, bool]:
if definition is None:
raise ValueError(
Expand Down
2 changes: 1 addition & 1 deletion mabel/data/writers/internals/blob_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def commit(self):
pytable = self._normalize_arrow_schema(pytable, self.schema)

tempfile = io.BytesIO()
pyarrow.parquet.write_table(pytable, where=tempfile, compression="zstd")
pyarrow.parquet.write_table(pytable, where=tempfile)

tempfile.seek(0)
write_buffer = tempfile.read()
Expand Down
2 changes: 1 addition & 1 deletion mabel/utils/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def parse_delta(delta: str) -> datetime.timedelta:


def parse_iso(
value: Union[str, int, float, datetime.datetime, datetime.date]
value: Union[str, int, float, datetime.datetime, datetime.date],
) -> Optional[datetime.datetime]:
"""
Parses an ISO date string into a datetime object, with an emphasis on speed.
Expand Down
2 changes: 1 addition & 1 deletion mabel/version.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Store the version here so:
# 1) we don't load dependencies by storing it in __init__.py
# 2) we can import it in setup.py for the same reason
__version__ = "0.6.24"
__version__ = "0.6.25"

# nodoc - don't add to the documentation wiki
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ bitarray
lz4
orjson
orso>=0.0.147
siphashc
xxhash
zstandard
4 changes: 2 additions & 2 deletions tests/performance/index_performance.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""
Results (seconds to search for a username in 65,500 rows):

indexed | row exists | time
indexed | row exists | time
-------------------------------
yes | yes | 0.094 <- about 3.5x faster when is match
yes | no | 0.006 <- over 50x faster when no match
no | yes | 0.357
no | yes | 0.357
no | no | 0.332
-------------------------------
"""
Expand Down
4 changes: 1 addition & 3 deletions tests/performance/indexing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""

"""
""" """

import sys
import os
Expand Down
6 changes: 3 additions & 3 deletions tests/performance/json_performance.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""
JSON parsing and serialization performance tests so a decision on
JSON parsing and serialization performance tests so a decision on
which library(s) to use can be made - previously the selection was
inconsistent.

Results (seconds to process 10m rows):

library | parsing | serialize
library | parsing | serialize
-------------------------------
json | 36.6 | 1.74
json | 36.6 | 1.74
ujson | 16.5 | 0.86
orjson | 10.4 | 0.66 <- lower is better
simd | 2.8 | N/A <- lower is better
Expand Down
5 changes: 3 additions & 2 deletions tests/test_data_dictset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys

sys.path.insert(1, os.path.join(sys.path[0], ".."))

from mabel import Reader, DictSet
from mabel.data import STORAGE_CLASS
from mabel.adapters.disk import DiskReader
Expand Down Expand Up @@ -189,7 +190,7 @@ def test_hash():
]
ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
hashval = hash(ds)
assert hashval == 8826660322425604498, hashval
assert hashval == 386528107484878589, hashval


def test_sort():
Expand All @@ -210,6 +211,6 @@ def test_sort():


if __name__ == "__main__": # pragma: no cover
from tests.helpers.runner import run_tests
from helpers.runner import run_tests

run_tests()
44 changes: 30 additions & 14 deletions tests/test_data_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,24 @@ def test_group_by():
gb = GroupBy(ds, "user")
cn = gb.count()
ls = list(cn)
assert ls == [{'COUNT(*)': 6, 'user': 'alice'}, {'COUNT(*)': 5, 'user': 'bob'}, {'COUNT(*)': 2, 'user': 'eve'}], ls
expected = [{'COUNT(*)': 6, 'user': 'alice'}, {'COUNT(*)': 5, 'user': 'bob'}, {'COUNT(*)': 2, 'user': 'eve'}]
assert set(tuple(sorted(d.items())) for d in ls) == set(tuple(sorted(d.items())) for d in expected)
Comment on lines +41 to +42

Copilot AI Sep 4, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The set comparison pattern using tuple(sorted(d.items())) is repeated throughout this file. Consider extracting this into a helper function like assert_dict_sets_equal(actual, expected) to reduce duplication and improve readability.

Copilot uses AI. Check for mistakes.

ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
gb = list(GroupBy(ds, "user").average("value"))
assert gb == [{'AVG(value)': Decimal("4.0"), 'user': 'alice'}, {'AVG(value)': Decimal("1.4"), 'user': 'bob'}, {'AVG(value)': Decimal("6.5"), 'user': 'eve'}], gb
expected = [{'AVG(value)': Decimal("4.0"), 'user': 'alice'}, {'AVG(value)': Decimal("1.4"), 'user': 'bob'}, {'AVG(value)': Decimal("6.5"), 'user': 'eve'}]
assert set(tuple(sorted(d.items())) for d in gb) == set(tuple(sorted(d.items())) for d in expected)

ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
assert list(GroupBy(ds, "user").max("value")) == [{'MAX(value)': 5, 'user': 'alice'}, {'MAX(value)': 2, 'user': 'bob'}, {'MAX(value)': 7, 'user': 'eve'}]
gs = list(GroupBy(ds, "user").max("value"))
expected = [{'MAX(value)': 5, 'user': 'alice'}, {'MAX(value)': 2, 'user': 'bob'}, {'MAX(value)': 7, 'user': 'eve'}]
assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)

ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
assert list(GroupBy(ds, "user").min("value")) == [{'MIN(value)': 3, 'user': 'alice'}, {'MIN(value)': 1, 'user': 'bob'}, {'MIN(value)': 6, 'user': 'eve'}]
gs = list(GroupBy(ds, "user").min("value"))
expected = [{'MIN(value)': 3, 'user': 'alice'}, {'MIN(value)': 1, 'user': 'bob'}, {'MIN(value)': 6, 'user': 'eve'}]
assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)

# fmt:on


Expand All @@ -75,27 +82,34 @@ def test_combined_group_by():
# fmt:off
ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
gs = list(GroupBy(ds, ("fname", "sname")).groups())
assert gs == [{'fname': 'bob', 'sname': 'smith'}, {'fname': 'bob', 'sname': 'jones'}, {'fname': 'alice', 'sname': 'jones'}, {'fname': 'alice', 'sname': 'smith'}, {'fname': 'eve', 'sname': 'jones'}, {'fname': 'eve', 'sname': 'smith'}], gs
expected = [{'fname': 'bob', 'sname': 'smith'}, {'fname': 'bob', 'sname': 'jones'}, {'fname': 'alice', 'sname': 'jones'}, {'fname': 'alice', 'sname': 'smith'}, {'fname': 'eve', 'sname': 'jones'}, {'fname': 'eve', 'sname': 'smith'}]
assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)

ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
gs = list(GroupBy(ds, ("fname", "sname")).count())
assert gs == [{'COUNT(*)': 2, 'fname': 'bob', 'sname': 'jones'}, {'COUNT(*)': 3, 'fname': 'bob', 'sname': 'smith'}, {'COUNT(*)': 3, 'fname': 'alice', 'sname': 'smith'}, {'COUNT(*)': 1, 'fname': 'eve', 'sname': 'smith'}, {'COUNT(*)': 1, 'fname': 'eve', 'sname': 'jones'}, {'COUNT(*)': 3, 'fname': 'alice', 'sname': 'jones'}], gs
expected = [{'COUNT(*)': 2, 'fname': 'bob', 'sname': 'jones'}, {'COUNT(*)': 3, 'fname': 'bob', 'sname': 'smith'}, {'COUNT(*)': 3, 'fname': 'alice', 'sname': 'smith'}, {'COUNT(*)': 1, 'fname': 'eve', 'sname': 'smith'}, {'COUNT(*)': 1, 'fname': 'eve', 'sname': 'jones'}, {'COUNT(*)': 3, 'fname': 'alice', 'sname': 'jones'}]
assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)

ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
gs = list(GroupBy(ds, ("fname", "sname")).average('value'))
assert gs == [{'AVG(value)': Decimal('2.0'), 'fname': 'bob', 'sname': 'jones'}, {'AVG(value)': Decimal('1.0'), 'fname': 'bob', 'sname': 'smith'}, {'AVG(value)': Decimal('4.333333333333333333333333333'), 'fname': 'alice', 'sname': 'smith'}, {'AVG(value)': Decimal('7.0'), 'fname': 'eve', 'sname': 'smith'}, {'AVG(value)': Decimal('6.0'), 'fname': 'eve', 'sname': 'jones'}, {'AVG(value)': Decimal('3.666666666666666666666666667'), 'fname': 'alice', 'sname': 'jones'}], gs
expected = [{'AVG(value)': Decimal('2.0'), 'fname': 'bob', 'sname': 'jones'}, {'AVG(value)': Decimal('1.0'), 'fname': 'bob', 'sname': 'smith'}, {'AVG(value)': Decimal('4.333333333333333333333333333'), 'fname': 'alice', 'sname': 'smith'}, {'AVG(value)': Decimal('7.0'), 'fname': 'eve', 'sname': 'smith'}, {'AVG(value)': Decimal('6.0'), 'fname': 'eve', 'sname': 'jones'}, {'AVG(value)': Decimal('3.666666666666666666666666667'), 'fname': 'alice', 'sname': 'jones'}]
assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)


ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
gs = list(GroupBy(ds, ("fname", "sname")).average('cost'))
assert gs == [{'AVG(cost)': Decimal('3.0'), 'fname': 'bob', 'sname': 'jones'}, {'AVG(cost)': Decimal('1.666666666666666666666666667'), 'fname': 'bob', 'sname': 'smith'}, {'AVG(cost)': Decimal('2.333333333333333333333333333'), 'fname': 'alice', 'sname': 'smith'}, {'AVG(cost)': Decimal('1.0'), 'fname': 'eve', 'sname': 'smith'}, {'AVG(cost)': Decimal('2.0'), 'fname': 'eve', 'sname': 'jones'}, {'AVG(cost)': Decimal('3.333333333333333333333333333'), 'fname': 'alice', 'sname': 'jones'}], gs
expected = [{'AVG(cost)': Decimal('3.0'), 'fname': 'bob', 'sname': 'jones'}, {'AVG(cost)': Decimal('1.666666666666666666666666667'), 'fname': 'bob', 'sname': 'smith'}, {'AVG(cost)': Decimal('2.333333333333333333333333333'), 'fname': 'alice', 'sname': 'smith'}, {'AVG(cost)': Decimal('1.0'), 'fname': 'eve', 'sname': 'smith'}, {'AVG(cost)': Decimal('2.0'), 'fname': 'eve', 'sname': 'jones'}, {'AVG(cost)': Decimal('3.333333333333333333333333333'), 'fname': 'alice', 'sname': 'jones'}]
assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)

ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
gs = list(GroupBy(ds, ("fname", "sname")).average(('cost', 'value',)))
assert gs == [{'AVG(cost)': Decimal('3'), 'AVG(value)': Decimal('2'), 'fname': 'bob', 'sname': 'jones'}, {'AVG(cost)': Decimal('1.666666666666666666666666667'), 'AVG(value)': Decimal('1'), 'fname': 'bob', 'sname': 'smith'}, {'AVG(cost)': Decimal('2.333333333333333333333333333'), 'AVG(value)': Decimal('4.333333333333333333333333333'), 'fname': 'alice', 'sname': 'smith'}, {'AVG(cost)': 1.0, 'AVG(value)': 7.0, 'fname': 'eve', 'sname': 'smith'}, {'AVG(cost)': 2.0, 'AVG(value)': 6.0, 'fname': 'eve', 'sname': 'jones'}, {'AVG(cost)': Decimal('3.333333333333333333333333333'), 'AVG(value)': Decimal('3.666666666666666666666666667'), 'fname': 'alice', 'sname': 'jones'}]
expected = [{'AVG(cost)': Decimal('3'), 'AVG(value)': Decimal('2'), 'fname': 'bob', 'sname': 'jones'}, {'AVG(cost)': Decimal('1.666666666666666666666666667'), 'AVG(value)': Decimal('1'), 'fname': 'bob', 'sname': 'smith'}, {'AVG(cost)': Decimal('2.333333333333333333333333333'), 'AVG(value)': Decimal('4.333333333333333333333333333'), 'fname': 'alice', 'sname': 'smith'}, {'AVG(cost)': 1.0, 'AVG(value)': 7.0, 'fname': 'eve', 'sname': 'smith'}, {'AVG(cost)': 2.0, 'AVG(value)': 6.0, 'fname': 'eve', 'sname': 'jones'}, {'AVG(cost)': Decimal('3.333333333333333333333333333'), 'AVG(value)': Decimal('3.666666666666666666666666667'), 'fname': 'alice', 'sname': 'jones'}]
assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)

ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
gs = list(GroupBy(ds, ("fname", "sname")).aggregate([('MAX', 'value'),('MIN', 'cost')]))
assert gs == [{'MAX(value)': 2, 'MIN(cost)': 2, 'fname': 'bob', 'sname': 'jones'}, {'MAX(value)': 1, 'MIN(cost)': 1, 'fname': 'bob', 'sname': 'smith'}, {'MAX(value)': 5, 'MIN(cost)': 1, 'fname': 'alice', 'sname': 'smith'}, {'MAX(value)': 7, 'MIN(cost)': 1, 'fname': 'eve', 'sname': 'smith'}, {'MAX(value)': 6, 'MIN(cost)': 2, 'fname': 'eve', 'sname': 'jones'}, {'MAX(value)': 5, 'MIN(cost)': 2, 'fname': 'alice', 'sname': 'jones'}], gs
expected = [{'MAX(value)': 2, 'MIN(cost)': 2, 'fname': 'bob', 'sname': 'jones'}, {'MAX(value)': 1, 'MIN(cost)': 1, 'fname': 'bob', 'sname': 'smith'}, {'MAX(value)': 5, 'MIN(cost)': 1, 'fname': 'alice', 'sname': 'smith'}, {'MAX(value)': 7, 'MIN(cost)': 1, 'fname': 'eve', 'sname': 'smith'}, {'MAX(value)': 6, 'MIN(cost)': 2, 'fname': 'eve', 'sname': 'jones'}, {'MAX(value)': 5, 'MIN(cost)': 2, 'fname': 'alice', 'sname': 'jones'}]
assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)

# fmt:on

Expand All @@ -109,15 +123,17 @@ def test_gappy_set():
{"key": 4, "value": None, "plus1": 5},
]
ds = DictSet(data, storage_class=STORAGE_CLASS.MEMORY)
g = list(ds.group_by("value").average("key"))
assert g == [
gs = list(ds.group_by("value").average("key"))
expected = [
{"AVG(key)": 4.0, "value": None},
{"AVG(key)": 3.0, "value": "two"},
{"AVG(key)": 1.0, "value": "one"},
], g
]

assert set(tuple(sorted(d.items())) for d in gs) == set(tuple(sorted(d.items())) for d in expected)


if __name__ == "__main__": # pragma: no cover
from tests.helpers.runner import run_tests
from helpers.runner import run_tests

run_tests()
8 changes: 3 additions & 5 deletions tests/test_reader_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ def test_cursor():
for i in range(number_of_records):
data.append({"one": 1, "index": i})

reader = Reader(inner_reader=NullReader, dataset="none", partitions=None, data=data)

# create random offsets for testing - it's illogical to have a 0 cursor
offsets = (entropy.random_range(1, number_of_records) for i in range(20))

Expand Down Expand Up @@ -135,7 +133,7 @@ def test_cursor_as_text():
offsets = (entropy.random_range(1, number_of_records - 1) for i in range(5))

for offset in offsets:
cursor = {"location": offset, "map": "00", "partition": 1983839293359648136}
cursor = {"location": offset, "map": "00", "partition": 13429097919052166063}
reader = Reader(
inner_reader=DiskReader,
dataset="tests/data/formats/jsonl",
Expand Down Expand Up @@ -164,7 +162,7 @@ def test_move_to_cursor():
inner_reader=DiskReader,
dataset="tests/data/formats/jsonl",
partitions=None,
cursor={"location": offset, "map": "00", "partition": 1983839293359648136},
cursor={"location": offset, "map": "00", "partition": 13429097919052166063},
persistence=STORAGE_CLASS.NO_PERSISTANCE,
)

Expand Down Expand Up @@ -229,6 +227,6 @@ def test_multiple_files():


if __name__ == "__main__": # pragma: no cover
from tests.helpers.runner import run_tests
from helpers.runner import run_tests

run_tests()
4 changes: 1 addition & 3 deletions tests/test_utils_common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""

"""
""" """

import os
import sys
Expand Down
Loading