Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
lint:
python -m pip install --quiet --upgrade pycln isort black
python -m pycln .
python -m isort .
python -m black .
SRC_DIR := mabel
PYTHON := python
UV := $(PYTHON) -m uv
PIP := $(UV) pip

update:
python -m pip install --upgrade pip
python -m pip install --upgrade -r requirements.txt
python -m pip install --upgrade -r tests/requirements.txt
define print_green
@echo "\033[0;32m$(1)\033[0m"
endef

define print_blue
@echo "\033[0;34m$(1)\033[0m"
endef

lint: ## Run all linting tools
$(call print_blue,"Installing linting tools...")
@$(PIP) install --quiet --upgrade pycln isort ruff
$(call print_blue,"Cleaning unused imports...")
@$(PYTHON) -m pycln .
$(call print_blue,"Sorting imports...")
@$(PYTHON) -m isort .
$(call print_blue,"Formatting code...")
@$(PYTHON) -m ruff format $(SRC_DIR)
$(call print_green,"Linting complete!")

update: ## Update all dependencies
$(call print_blue,"Updating dependencies...")
@$(PYTHON) -m pip install --upgrade pip uv
@$(UV) pip install --upgrade -r tests/requirements.txt
@$(UV) pip install --upgrade -r requirements.txt

test:
clear
Expand Down
2 changes: 1 addition & 1 deletion examples/Reading Data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
"metadata": {},
"outputs": [],
"source": [
"from mabel.data import Reader, STORAGE_CLASS, DictSet, SqlReader\n",
"from mabel.data import STORAGE_CLASS, DictSet, SqlReader\n",
"from mabel.adapters.disk import DiskReader"
]
},
Expand Down
4 changes: 3 additions & 1 deletion mabel/adapters/database/null_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
class NullWriter(BaseDatabaseWriter):
def commit(self, dataframe: DataFrame):
# save the data to an interim table
naive_inserter = f"INSERT INTO `{self.dataset}` ( {', '.join(dataframe.column_names)} )"
naive_inserter = (
f"INSERT INTO `{self.dataset}` ( {', '.join(dataframe.column_names)} )"
)
naive_inserter += ",\n".join(f"\t({row})" for row in dataframe) + ";"

# row acts like a tuple when used
Expand Down
1 change: 0 additions & 1 deletion mabel/adapters/disk/disk_reader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from ...data.readers.internals.base_inner_reader import BUFFER_SIZE
from ...data.readers.internals.base_inner_reader import BaseInnerReader


Expand Down
12 changes: 9 additions & 3 deletions mabel/adapters/google/bigquery_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def __init__(self, project: str, credentials=None, **kwargs):

super().__init__(**kwargs)

def _get_table(self, dataset, table, schema, partition_expiration: Optional[int] = None):
def _get_table(
self, dataset, table, schema, partition_expiration: Optional[int] = None
):
bigquery_client = bigquery.Client()
dataset = bigquery_client.dataset(dataset)

Expand All @@ -62,8 +64,12 @@ def _get_table(self, dataset, table, schema, partition_expiration: Optional[int]
bigquery.SchemaField(
name="_system_date", field_type="TIMESTAMP", mode=FieldMode.REQUIRED
),
bigquery.SchemaField(name="name", field_type="STRING", mode=FieldMode.REQUIRED),
bigquery.SchemaField(name="age", field_type="INTEGER", mode=FieldMode.REQUIRED),
bigquery.SchemaField(
name="name", field_type="STRING", mode=FieldMode.REQUIRED
),
bigquery.SchemaField(
name="age", field_type="INTEGER", mode=FieldMode.REQUIRED
),
]
table = bigquery.Table(table_ref, schema=schema)

Expand Down
4 changes: 3 additions & 1 deletion mabel/adapters/google/google_cloud_storage_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def get_blobs_at_path(self, path):
gcs_bucket = client.get_bucket(bucket)
blobs = list(client.list_blobs(bucket_or_name=gcs_bucket, prefix=object_path))

yield from [bucket + "/" + blob.name for blob in blobs if not blob.name.endswith("/")]
yield from [
bucket + "/" + blob.name for blob in blobs if not blob.name.endswith("/")
]


def get_blob(bucket: str = None, blob_name: str = None):
Expand Down
4 changes: 3 additions & 1 deletion mabel/adapters/google/google_cloud_storage_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def commit(self, byte_data, override_blob_name=None):

try:
blob = self.gcs_bucket.blob(blob_name)
self.retry(blob.upload_from_string)(byte_data, content_type="application/octet-stream")
self.retry(blob.upload_from_string)(
byte_data, content_type="application/octet-stream"
)
return blob_name
except Exception as err: # pragma: no cover
import traceback
Expand Down
4 changes: 3 additions & 1 deletion mabel/adapters/minio/minio_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def get_blobs_at_path(self, path):
bucket, object_path, _, _ = paths.get_parts(path)
for cycle_date in dates.date_range(self.start_date, self.end_date):
cycle_path = paths.build_path(path=object_path, date=cycle_date)
blobs = self.minio.list_objects(bucket_name=bucket, prefix=cycle_path, recursive=True)
blobs = self.minio.list_objects(
bucket_name=bucket, prefix=cycle_path, recursive=True
)

yield from [
bucket + "/" + blob.object_name
Expand Down
12 changes: 10 additions & 2 deletions mabel/adapters/minio/minio_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@

class MinIoWriter(BaseInnerWriter):
def __init__(
self, *, end_point: str, access_key: str, secret_key: str, secure: bool = False, **kwargs
self,
*,
end_point: str,
access_key: str,
secret_key: str,
secure: bool = False,
**kwargs,
):
if not minio_installed: # pragma: no cover
raise MissingDependencyError(
Expand All @@ -32,6 +38,8 @@ def commit(self, byte_data, override_blob_name=None):
else:
blob_name = self._build_path()

self.client.put_object(self.bucket, blob_name, io.BytesIO(byte_data), len(byte_data))
self.client.put_object(
self.bucket, blob_name, io.BytesIO(byte_data), len(byte_data)
)

return blob_name
11 changes: 8 additions & 3 deletions mabel/data/internals/collected_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def __init__(self, dictset: DictSet, column: str, dedupe: bool = False):
collections.setdefault(key, []).append(my_item)
if dedupe:
collections = {
k: {frozenset(i.items()): i for i in v}.values() for k, v in collections.items()
k: {frozenset(i.items()): i for i in v}.values()
for k, v in collections.items()
}

self._collections = collections
Expand All @@ -52,7 +53,9 @@ def count(self, collection=None):
return {x: len(y) for x, y in self._collections.items()}
else:
try:
return [len(y) for x, y in self._collections.items() if x == collection].pop()
return [
len(y) for x, y in self._collections.items() if x == collection
].pop()
except IndexError:
return 0

Expand All @@ -72,7 +75,9 @@ def aggregate(self, column, method):
"""
response = {}
for key, items in self._collections.items():
values = [item.get(column) for item in items if item.get(column) is not None]
values = [
item.get(column) for item in items if item.get(column) is not None
]
Comment on lines +78 to +80

Copilot AI Oct 8, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The item.get(column) call is duplicated for each item. Consider storing the result in a variable to avoid the double lookup: value = item.get(column); if value is not None.

Suggested change
values = [
item.get(column) for item in items if item.get(column) is not None
]
values = []
for item in items:
value = item.get(column)
if value is not None:
values.append(value)

Copilot uses AI. Check for mistakes.
response[key] = method(values)
return response

Expand Down
28 changes: 21 additions & 7 deletions mabel/data/internals/dictset.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ def inner_sampler(dictset):
if random_value % selector == 0:
yield row

return DictSet(inner_sampler(iter(self._iterator)), storage_class=self.storage_class)
return DictSet(
inner_sampler(iter(self._iterator)), storage_class=self.storage_class
)

def collect_list(self, key: str = None) -> Union[list, map]:
"""
Expand Down Expand Up @@ -173,7 +175,9 @@ def keys(self, number_of_rows: int = 0):
"""
if number_of_rows > 0:
rows = self.itake(number_of_rows)
return reduce(lambda x, y: x + [a for a in y.keys() if a not in x], rows, [])
return reduce(
lambda x, y: x + [a for a in y.keys() if a not in x], rows, []
)
return reduce(
lambda x, y: x + [a for a in y.keys() if a not in x],
iter(self._iterator),
Expand All @@ -184,7 +188,9 @@ def types(self, number_of_rows: int = 100):
top = self.take(number_of_rows)
response = {}
for key in top.keys():
key_type = {type(val).__name__ for val in top.collect_list(key) if val != None}
key_type = {
type(val).__name__ for val in top.collect_list(key) if val != None

Copilot AI Oct 8, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use is not None instead of != None for None comparisons to follow Python best practices.

Suggested change
type(val).__name__ for val in top.collect_list(key) if val != None
type(val).__name__ for val in top.collect_list(key) if val is not None

Copilot uses AI. Check for mistakes.
}
if len(key_type) == 0: # pragma: no cover
response[key] = "empty"
if len(key_type) == 1:
Expand Down Expand Up @@ -295,7 +301,9 @@ def distinct(self, *columns):
def do_dedupe(data):
for item in data:
if columns:
hashed_item = hash("".join([str(item.get(c, "$$")) for c in columns]))
hashed_item = hash(
"".join([str(item.get(c, "$$")) for c in columns])
)
else:
hashed_item = reduce(
lambda x, y: x ^ y,
Expand All @@ -306,7 +314,9 @@ def do_dedupe(data):
yield item
hash_list[hashed_item] = True

return DictSet(do_dedupe(iter(self._iterator)), storage_class=self.storage_class)
return DictSet(
do_dedupe(iter(self._iterator)), storage_class=self.storage_class
)

def group_by(self, group_by_columns):
"""
Expand Down Expand Up @@ -482,7 +492,9 @@ def inner_select(it):
for record in it:
yield {k: record.get(k, None) for k in columns}

return DictSet(inner_select(iter(self._iterator)), storage_class=self.storage_class)
return DictSet(
inner_select(iter(self._iterator)), storage_class=self.storage_class
)

def sort_and_take(self, column, take: int = 5000, descending: bool = False):
def safety_key(column):
Expand All @@ -494,7 +506,9 @@ def safety_key(column):
)

if self.storage_class == STORAGE_CLASS.MEMORY:
yield from sorted(self._iterator, key=safety_key(column), reverse=descending)[:take]
yield from sorted(
self._iterator, key=safety_key(column), reverse=descending
)[:take]

else:
# In a low-memory environment we probably can't store all of the records
Expand Down
18 changes: 13 additions & 5 deletions mabel/data/internals/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def sanitize(htmlstring):
if isinstance(htmlstring, (list, tuple, set)) or hasattr(htmlstring, "as_list"):
return "[ " + ", ".join([sanitize(i) for i in htmlstring]) + " ]"
if hasattr(htmlstring, "items"):
return sanitize("{ " + ", ".join([f'"{k}": {v}' for k, v in htmlstring.items()]) + " }")
return sanitize(
"{ " + ", ".join([f'"{k}": {v}' for k, v in htmlstring.items()]) + " }"
)
if not isinstance(htmlstring, str):
return htmlstring
escapes = {'"': "&quot;", "'": "&#39;", "<": "&lt;", ">": "&gt;", "$": "&#x24;"}
Expand Down Expand Up @@ -66,7 +68,7 @@ def _to_html_table(data, columns):

footer = ""
if isinstance(dictset, types.GeneratorType):
footer = f"\n<p>top {i+1} rows x {len(columns)} columns</p>"
footer = f"\n<p>top {i + 1} rows x {len(columns)} columns</p>"
footer += "\nNOTE: the displayed records may have been spent"
elif hasattr(dictset, "__len__"):
footer = f"\n<p>{len(dictset)} rows x {len(columns)} columns</p>" # type:ignore
Expand Down Expand Up @@ -95,7 +97,9 @@ def format_value(val):
if isinstance(val, (list, tuple, set)) or hasattr(val, "as_list"):
return "[ " + ", ".join([format_value(i) for i in val]) + " ]"
if hasattr(val, "items"):
return format_value("{ " + ", ".join([f'"{k}": {v}' for k, v in val.items()]) + " }")
return format_value(
"{ " + ", ".join([f'"{k}": {v}' for k, v in val.items()]) + " }"
)
return str(val)

result = []
Expand All @@ -121,14 +125,18 @@ def format_value(val):

# display headers
result.append("┌" + "┬".join(bars) + "┐")
result.append("│" + "│".join([str(k).center(v + 2) for k, v in columns.items()]) + "│")
result.append(
"│" + "│".join([str(k).center(v + 2) for k, v in columns.items()]) + "│"
)
result.append("├" + "┼".join(bars) + "┤")

# display values
for row in cache:
result.append(
"│"
+ "│".join([str(format_value(v)).center(columns[k] + 2) for k, v in row.items()])
+ "│".join(
[str(format_value(v)).center(columns[k] + 2) for k, v in row.items()]
)
+ "│"
)

Expand Down
4 changes: 3 additions & 1 deletion mabel/data/internals/dnf_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ def filter_dictset(self, dictset: Iterable[dict]) -> Iterable:
if self.empty_filter:
yield from dictset
else:
yield from (record for record in dictset if evaluate(self.predicates, record))
yield from (
record for record in dictset if evaluate(self.predicates, record)
)

def __call__(self, record) -> bool:
return evaluate(self.predicates, record)
22 changes: 17 additions & 5 deletions mabel/data/internals/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

Derived from: https://gist.github.com/leehsueh/1290686
"""

from mabel.data.readers.internals.inline_evaluator import *
from mabel.utils.dates import parse_iso
from mabel.utils.token_labeler import OPERATORS
Expand Down Expand Up @@ -50,7 +51,9 @@ def parse(self):

def parse_expression(self):
andTerm1 = self.parse_and_term()
while self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.OR:
while (
self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.OR
):
self.tokenizer.next()
andTermX = self.parse_and_term()
andTerm = TreeNode(TOKENS.OR, None)
Expand All @@ -61,7 +64,9 @@ def parse_expression(self):

def parse_and_term(self):
condition1 = self.parse_condition()
while self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.AND:
while (
self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.AND
):
self.tokenizer.next()
conditionX = self.parse_condition()
condition = TreeNode(TOKENS.AND, None)
Expand Down Expand Up @@ -115,7 +120,10 @@ def parse_condition(self):

terminal1 = TreeNode(TOKENS.VARIABLE, "".join(collector))

if self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.LEFTPARENTHESES:
if (
self.tokenizer.has_next()
and self.tokenizer.next_token_type() == TOKENS.LEFTPARENTHESES
):
# If we have a ( then go looking for the matching )

self.tokenizer.next()
Expand All @@ -141,7 +149,9 @@ def parse_condition(self):
condition.left = terminal1
condition.right = terminal2
return condition
raise InvalidExpression(f"Operator expected, but got `{self.tokenizer.next()}`")
raise InvalidExpression(
f"Operator expected, but got `{self.tokenizer.next()}`"
)
raise InvalidExpression("Operator expected, but got nothing")

def parse_terminal(self):
Expand Down Expand Up @@ -249,7 +259,9 @@ def evaluate_recursive(self, treeNode, variable_dict):
if treeNode.token_type == TOKENS.OR:
return left or right

raise InvalidExpression(f"Unexpected value of type `{str(treeNode.token_type)}`")
raise InvalidExpression(
f"Unexpected value of type `{str(treeNode.token_type)}`"
)

def to_dnf(self):
"""
Expand Down
Loading
Loading