diff --git a/Makefile b/Makefile index 07ee3aec..93ca7e4c 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,32 @@ -lint: - python -m pip install --quiet --upgrade pycln isort black - python -m pycln . - python -m isort . - python -m black . +SRC_DIR := mabel +PYTHON := python +UV := $(PYTHON) -m uv +PIP := $(UV) pip -update: - python -m pip install --upgrade pip - python -m pip install --upgrade -r requirements.txt - python -m pip install --upgrade -r tests/requirements.txt +define print_green + @echo "\033[0;32m$(1)\033[0m" +endef + +define print_blue + @echo "\033[0;34m$(1)\033[0m" +endef + +lint: ## Run all linting tools + $(call print_blue,"Installing linting tools...") + @$(PIP) install --quiet --upgrade pycln isort ruff + $(call print_blue,"Cleaning unused imports...") + @$(PYTHON) -m pycln . + $(call print_blue,"Sorting imports...") + @$(PYTHON) -m isort . + $(call print_blue,"Formatting code...") + @$(PYTHON) -m ruff format $(SRC_DIR) + $(call print_green,"Linting complete!") + +update: ## Update all dependencies + $(call print_blue,"Updating dependencies...") + @$(PYTHON) -m pip install --upgrade pip uv + @$(UV) pip install --upgrade -r tests/requirements.txt + @$(UV) pip install --upgrade -r requirements.txt test: clear diff --git a/examples/Reading Data.ipynb b/examples/Reading Data.ipynb index 3fe1ef1c..7ae062ea 100644 --- a/examples/Reading Data.ipynb +++ b/examples/Reading Data.ipynb @@ -107,7 +107,7 @@ "metadata": {}, "outputs": [], "source": [ - "from mabel.data import Reader, STORAGE_CLASS, DictSet, SqlReader\n", + "from mabel.data import STORAGE_CLASS, DictSet, SqlReader\n", "from mabel.adapters.disk import DiskReader" ] }, diff --git a/mabel/adapters/database/null_writer.py b/mabel/adapters/database/null_writer.py index 32bbfd83..41f0d1da 100644 --- a/mabel/adapters/database/null_writer.py +++ b/mabel/adapters/database/null_writer.py @@ -6,7 +6,9 @@ class NullWriter(BaseDatabaseWriter): def commit(self, dataframe: DataFrame): # save the data to an interim table - naive_inserter = f"INSERT INTO `{self.dataset}` ( {', '.join(dataframe.column_names)} )" + naive_inserter = ( + f"INSERT INTO `{self.dataset}` ( {', '.join(dataframe.column_names)} )" + ) naive_inserter += ",\n".join(f"\t({row})" for row in dataframe) + ";" # row acts like a tuple when used diff --git a/mabel/adapters/disk/disk_reader.py b/mabel/adapters/disk/disk_reader.py index f064d639..db47344a 100644 --- a/mabel/adapters/disk/disk_reader.py +++ b/mabel/adapters/disk/disk_reader.py @@ -1,4 +1,3 @@ -from ...data.readers.internals.base_inner_reader import BUFFER_SIZE from ...data.readers.internals.base_inner_reader import BaseInnerReader diff --git a/mabel/adapters/google/bigquery_writer.py b/mabel/adapters/google/bigquery_writer.py index 2fdd431e..05e4a56e 100644 --- a/mabel/adapters/google/bigquery_writer.py +++ b/mabel/adapters/google/bigquery_writer.py @@ -47,7 +47,9 @@ def __init__(self, project: str, credentials=None, **kwargs): super().__init__(**kwargs) - def _get_table(self, dataset, table, schema, partition_expiration: Optional[int] = None): + def _get_table( + self, dataset, table, schema, partition_expiration: Optional[int] = None + ): bigquery_client = bigquery.Client() dataset = bigquery_client.dataset(dataset) @@ -62,8 +64,12 @@ def _get_table(self, dataset, table, schema, partition_expiration: Optional[int] bigquery.SchemaField( name="_system_date", field_type="TIMESTAMP", mode=FieldMode.REQUIRED ), - bigquery.SchemaField(name="name", field_type="STRING", mode=FieldMode.REQUIRED), - bigquery.SchemaField(name="age", field_type="INTEGER", mode=FieldMode.REQUIRED), + bigquery.SchemaField( + name="name", field_type="STRING", mode=FieldMode.REQUIRED + ), + bigquery.SchemaField( + name="age", field_type="INTEGER", mode=FieldMode.REQUIRED + ), ] table = bigquery.Table(table_ref, schema=schema) diff --git a/mabel/adapters/google/google_cloud_storage_reader.py b/mabel/adapters/google/google_cloud_storage_reader.py index 4f5d7355..d0acb8af 100644 --- a/mabel/adapters/google/google_cloud_storage_reader.py +++ b/mabel/adapters/google/google_cloud_storage_reader.py @@ -48,7 +48,9 @@ def get_blobs_at_path(self, path): gcs_bucket = client.get_bucket(bucket) blobs = list(client.list_blobs(bucket_or_name=gcs_bucket, prefix=object_path)) - yield from [bucket + "/" + blob.name for blob in blobs if not blob.name.endswith("/")] + yield from [ + bucket + "/" + blob.name for blob in blobs if not blob.name.endswith("/") + ] def get_blob(bucket: str = None, blob_name: str = None): diff --git a/mabel/adapters/google/google_cloud_storage_writer.py b/mabel/adapters/google/google_cloud_storage_writer.py index dc41e0fe..570e42a5 100644 --- a/mabel/adapters/google/google_cloud_storage_writer.py +++ b/mabel/adapters/google/google_cloud_storage_writer.py @@ -53,7 +53,9 @@ def commit(self, byte_data, override_blob_name=None): try: blob = self.gcs_bucket.blob(blob_name) - self.retry(blob.upload_from_string)(byte_data, content_type="application/octet-stream") + self.retry(blob.upload_from_string)( + byte_data, content_type="application/octet-stream" + ) return blob_name except Exception as err: # pragma: no cover import traceback diff --git a/mabel/adapters/minio/minio_reader.py b/mabel/adapters/minio/minio_reader.py index 5d3fffc4..d7449ba1 100644 --- a/mabel/adapters/minio/minio_reader.py +++ b/mabel/adapters/minio/minio_reader.py @@ -37,7 +37,9 @@ def get_blobs_at_path(self, path): bucket, object_path, _, _ = paths.get_parts(path) for cycle_date in dates.date_range(self.start_date, self.end_date): cycle_path = paths.build_path(path=object_path, date=cycle_date) - blobs = self.minio.list_objects(bucket_name=bucket, prefix=cycle_path, recursive=True) + blobs = self.minio.list_objects( + bucket_name=bucket, prefix=cycle_path, recursive=True + ) yield from [ bucket + "/" + blob.object_name diff --git a/mabel/adapters/minio/minio_writer.py b/mabel/adapters/minio/minio_writer.py index 7225f04e..5c532b7f 100644 --- a/mabel/adapters/minio/minio_writer.py +++ b/mabel/adapters/minio/minio_writer.py @@ -13,7 +13,13 @@ class MinIoWriter(BaseInnerWriter): def __init__( - self, *, end_point: str, access_key: str, secret_key: str, secure: bool = False, **kwargs + self, + *, + end_point: str, + access_key: str, + secret_key: str, + secure: bool = False, + **kwargs, ): if not minio_installed: # pragma: no cover raise MissingDependencyError( @@ -32,6 +38,8 @@ def commit(self, byte_data, override_blob_name=None): else: blob_name = self._build_path() - self.client.put_object(self.bucket, blob_name, io.BytesIO(byte_data), len(byte_data)) + self.client.put_object( + self.bucket, blob_name, io.BytesIO(byte_data), len(byte_data) + ) return blob_name diff --git a/mabel/data/internals/collected_set.py b/mabel/data/internals/collected_set.py index 99361c0d..9122fe0e 100644 --- a/mabel/data/internals/collected_set.py +++ b/mabel/data/internals/collected_set.py @@ -33,7 +33,8 @@ def __init__(self, dictset: DictSet, column: str, dedupe: bool = False): collections.setdefault(key, []).append(my_item) if dedupe: collections = { - k: {frozenset(i.items()): i for i in v}.values() for k, v in collections.items() + k: {frozenset(i.items()): i for i in v}.values() + for k, v in collections.items() } self._collections = collections @@ -52,7 +53,9 @@ def count(self, collection=None): return {x: len(y) for x, y in self._collections.items()} else: try: - return [len(y) for x, y in self._collections.items() if x == collection].pop() + return [ + len(y) for x, y in self._collections.items() if x == collection + ].pop() except IndexError: return 0 @@ -72,7 +75,9 @@ def aggregate(self, column, method): """ response = {} for key, items in self._collections.items(): - values = [item.get(column) for item in items if item.get(column) is not None] + values = [ + item.get(column) for item in items if item.get(column) is not None + ] response[key] = method(values) return response diff --git a/mabel/data/internals/dictset.py b/mabel/data/internals/dictset.py index 9536f78b..9bcad060 100644 --- a/mabel/data/internals/dictset.py +++ b/mabel/data/internals/dictset.py @@ -144,7 +144,9 @@ def inner_sampler(dictset): if random_value % selector == 0: yield row - return DictSet(inner_sampler(iter(self._iterator)), storage_class=self.storage_class) + return DictSet( + inner_sampler(iter(self._iterator)), storage_class=self.storage_class + ) def collect_list(self, key: str = None) -> Union[list, map]: """ @@ -173,7 +175,9 @@ def keys(self, number_of_rows: int = 0): """ if number_of_rows > 0: rows = self.itake(number_of_rows) - return reduce(lambda x, y: x + [a for a in y.keys() if a not in x], rows, []) + return reduce( + lambda x, y: x + [a for a in y.keys() if a not in x], rows, [] + ) return reduce( lambda x, y: x + [a for a in y.keys() if a not in x], iter(self._iterator), @@ -184,7 +188,9 @@ def types(self, number_of_rows: int = 100): top = self.take(number_of_rows) response = {} for key in top.keys(): - key_type = {type(val).__name__ for val in top.collect_list(key) if val != None} + key_type = { + type(val).__name__ for val in top.collect_list(key) if val != None + } if len(key_type) == 0: # pragma: no cover response[key] = "empty" if len(key_type) == 1: @@ -295,7 +301,9 @@ def distinct(self, *columns): def do_dedupe(data): for item in data: if columns: - hashed_item = hash("".join([str(item.get(c, "$$")) for c in columns])) + hashed_item = hash( + "".join([str(item.get(c, "$$")) for c in columns]) + ) else: hashed_item = reduce( lambda x, y: x ^ y, @@ -306,7 +314,9 @@ def do_dedupe(data): yield item hash_list[hashed_item] = True - return DictSet(do_dedupe(iter(self._iterator)), storage_class=self.storage_class) + return DictSet( + do_dedupe(iter(self._iterator)), storage_class=self.storage_class + ) def group_by(self, group_by_columns): """ @@ -482,7 +492,9 @@ def inner_select(it): for record in it: yield {k: record.get(k, None) for k in columns} - return DictSet(inner_select(iter(self._iterator)), storage_class=self.storage_class) + return DictSet( + inner_select(iter(self._iterator)), storage_class=self.storage_class + ) def sort_and_take(self, column, take: int = 5000, descending: bool = False): def safety_key(column): @@ -494,7 +506,9 @@ def safety_key(column): ) if self.storage_class == STORAGE_CLASS.MEMORY: - yield from sorted(self._iterator, key=safety_key(column), reverse=descending)[:take] + yield from sorted( + self._iterator, key=safety_key(column), reverse=descending + )[:take] else: # In a low-memory environment we probably can't store all of the records diff --git a/mabel/data/internals/display.py b/mabel/data/internals/display.py index a999a1cb..edc4b754 100644 --- a/mabel/data/internals/display.py +++ b/mabel/data/internals/display.py @@ -25,7 +25,9 @@ def sanitize(htmlstring): if isinstance(htmlstring, (list, tuple, set)) or hasattr(htmlstring, "as_list"): return "[ " + ", ".join([sanitize(i) for i in htmlstring]) + " ]" if hasattr(htmlstring, "items"): - return sanitize("{ " + ", ".join([f'"{k}": {v}' for k, v in htmlstring.items()]) + " }") + return sanitize( + "{ " + ", ".join([f'"{k}": {v}' for k, v in htmlstring.items()]) + " }" + ) if not isinstance(htmlstring, str): return htmlstring escapes = {'"': """, "'": "'", "<": "<", ">": ">", "$": "$"} @@ -66,7 +68,7 @@ def _to_html_table(data, columns): footer = "" if isinstance(dictset, types.GeneratorType): - footer = f"\n

top {i+1} rows x {len(columns)} columns

" + footer = f"\n

top {i + 1} rows x {len(columns)} columns

" footer += "\nNOTE: the displayed records may have been spent" elif hasattr(dictset, "__len__"): footer = f"\n

{len(dictset)} rows x {len(columns)} columns

" # type:ignore @@ -95,7 +97,9 @@ def format_value(val): if isinstance(val, (list, tuple, set)) or hasattr(val, "as_list"): return "[ " + ", ".join([format_value(i) for i in val]) + " ]" if hasattr(val, "items"): - return format_value("{ " + ", ".join([f'"{k}": {v}' for k, v in val.items()]) + " }") + return format_value( + "{ " + ", ".join([f'"{k}": {v}' for k, v in val.items()]) + " }" + ) return str(val) result = [] @@ -121,14 +125,18 @@ def format_value(val): # display headers result.append("┌" + "┬".join(bars) + "┐") - result.append("│" + "│".join([str(k).center(v + 2) for k, v in columns.items()]) + "│") + result.append( + "│" + "│".join([str(k).center(v + 2) for k, v in columns.items()]) + "│" + ) result.append("├" + "┼".join(bars) + "┤") # display values for row in cache: result.append( "│" - + "│".join([str(format_value(v)).center(columns[k] + 2) for k, v in row.items()]) + + "│".join( + [str(format_value(v)).center(columns[k] + 2) for k, v in row.items()] + ) + "│" ) diff --git a/mabel/data/internals/dnf_filters.py b/mabel/data/internals/dnf_filters.py index a5a75032..953f263d 100644 --- a/mabel/data/internals/dnf_filters.py +++ b/mabel/data/internals/dnf_filters.py @@ -119,7 +119,9 @@ def filter_dictset(self, dictset: Iterable[dict]) -> Iterable: if self.empty_filter: yield from dictset else: - yield from (record for record in dictset if evaluate(self.predicates, record)) + yield from ( + record for record in dictset if evaluate(self.predicates, record) + ) def __call__(self, record) -> bool: return evaluate(self.predicates, record) diff --git a/mabel/data/internals/expression.py b/mabel/data/internals/expression.py index 6c270150..d128f146 100644 --- a/mabel/data/internals/expression.py +++ b/mabel/data/internals/expression.py @@ -15,6 +15,7 @@ Derived from: https://gist.github.com/leehsueh/1290686 """ + from mabel.data.readers.internals.inline_evaluator import * from mabel.utils.dates import parse_iso from mabel.utils.token_labeler import OPERATORS @@ -50,7 +51,9 @@ def parse(self): def parse_expression(self): andTerm1 = self.parse_and_term() - while self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.OR: + while ( + self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.OR + ): self.tokenizer.next() andTermX = self.parse_and_term() andTerm = TreeNode(TOKENS.OR, None) @@ -61,7 +64,9 @@ def parse_expression(self): def parse_and_term(self): condition1 = self.parse_condition() - while self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.AND: + while ( + self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.AND + ): self.tokenizer.next() conditionX = self.parse_condition() condition = TreeNode(TOKENS.AND, None) @@ -115,7 +120,10 @@ def parse_condition(self): terminal1 = TreeNode(TOKENS.VARIABLE, "".join(collector)) - if self.tokenizer.has_next() and self.tokenizer.next_token_type() == TOKENS.LEFTPARENTHESES: + if ( + self.tokenizer.has_next() + and self.tokenizer.next_token_type() == TOKENS.LEFTPARENTHESES + ): # If we have a ( then go looking for the matching ) self.tokenizer.next() @@ -141,7 +149,9 @@ def parse_condition(self): condition.left = terminal1 condition.right = terminal2 return condition - raise InvalidExpression(f"Operator expected, but got `{self.tokenizer.next()}`") + raise InvalidExpression( + f"Operator expected, but got `{self.tokenizer.next()}`" + ) raise InvalidExpression("Operator expected, but got nothing") def parse_terminal(self): @@ -249,7 +259,9 @@ def evaluate_recursive(self, treeNode, variable_dict): if treeNode.token_type == TOKENS.OR: return left or right - raise InvalidExpression(f"Unexpected value of type `{str(treeNode.token_type)}`") + raise InvalidExpression( + f"Unexpected value of type `{str(treeNode.token_type)}`" + ) def to_dnf(self): """ diff --git a/mabel/data/internals/group_by.py b/mabel/data/internals/group_by.py index c93667d7..5230b4c2 100644 --- a/mabel/data/internals/group_by.py +++ b/mabel/data/internals/group_by.py @@ -5,6 +5,7 @@ python setup.py build_ext --inplace """ + from collections import defaultdict from xxhash import xxh3_64_intdigest @@ -73,7 +74,8 @@ def _map(self, collect_columns): for record in self._dictset: try: group_key: int = xxh3_64_intdigest( - "".join([str(record[column]) for column in self._columns]), HASH_SEED + "".join([str(record[column]) for column in self._columns]), + HASH_SEED, ) except KeyError: group_key: int = xxh3_64_intdigest( @@ -163,10 +165,13 @@ def aggregate(self, aggregations): for group, results in collector.items(): for func, col in requested_aggs: if func == "AVG": - results[f"AVG({col})"] = results[f"SUM({col})"] / results[f"COUNT({col})"] + results[f"AVG({col})"] = ( + results[f"SUM({col})"] / results[f"COUNT({col})"] + ) results = { - f"{func}({col})": results.get(f"{func}({col})") for func, col in requested_aggs + f"{func}({col})": results.get(f"{func}({col})") + for func, col in requested_aggs } keys = self._group_keys[group] diff --git a/mabel/data/internals/records.py b/mabel/data/internals/records.py index fc48701d..e8d927a0 100644 --- a/mabel/data/internals/records.py +++ b/mabel/data/internals/records.py @@ -59,7 +59,9 @@ def set_value(record: dict, field_name: str, setter: Callable) -> dict: return copy -def flatten(dictionary: MutableMapping[Any, Any], separator: str = ".", parent_key=False): +def flatten( + dictionary: MutableMapping[Any, Any], separator: str = ".", parent_key=False +): """ Turn a nested dictionary into a flattened dictionary @@ -78,7 +80,11 @@ def flatten(dictionary: MutableMapping[Any, Any], separator: str = ".", parent_k for key, value in dictionary.items(): new_key = str(parent_key) + separator + key if parent_key else key if hasattr(value, "items"): - items.extend(flatten(dictionary=value, separator=separator, parent_key=new_key).items()) + items.extend( + flatten( + dictionary=value, separator=separator, parent_key=new_key + ).items() + ) elif isinstance(value, list): for k, v in enumerate(value): items.extend(flatten({str(k): v}, new_key).items()) diff --git a/mabel/data/internals/storage_classes/storage_class_disk.py b/mabel/data/internals/storage_classes/storage_class_disk.py index 21a56319..34558e42 100644 --- a/mabel/data/internals/storage_classes/storage_class_disk.py +++ b/mabel/data/internals/storage_classes/storage_class_disk.py @@ -53,7 +53,9 @@ def _read_file(self): MMAP is by far the fastest way to read files in Python. """ with open(self.file, mode="rb") as file_obj: - with mmap.mmap(file_obj.fileno(), length=0, access=mmap.ACCESS_READ) as mmap_obj: + with mmap.mmap( + file_obj.fileno(), length=0, access=mmap.ACCESS_READ + ) as mmap_obj: line = mmap_obj.readline() while line: yield line diff --git a/mabel/data/readers/internals/base_inner_reader.py b/mabel/data/readers/internals/base_inner_reader.py index 65bb7e8a..008674ea 100644 --- a/mabel/data/readers/internals/base_inner_reader.py +++ b/mabel/data/readers/internals/base_inner_reader.py @@ -92,8 +92,10 @@ def get_list_of_blobs(self): # For each day in the range, get the blobs for us to read for cycle_date in dates.date_range(self.start_date, self.end_date): # Build the path name - cycle_path = pathlib.Path(paths.build_path(path=self.dataset, date=cycle_date)) - if not cycle_path in visited: + cycle_path = pathlib.Path( + paths.build_path(path=self.dataset, date=cycle_date) + ) + if cycle_path not in visited: visited[cycle_path] = True cycle_blobs = list(self.get_blobs_at_path(path=cycle_path)) @@ -130,12 +132,21 @@ def get_list_of_blobs(self): partition_filter = f"/by_{partition_filter_field}/{partition_filter_field}={partition_filter_value}/" # If we can find the partition in the folder set, then prune to it - if any([f"by_{partition_filter_field}" in by for by in list_of_partitions]): + if any( + [ + f"by_{partition_filter_field}" in by + for by in list_of_partitions + ] + ): # Do the pruning - cycle_blobs = [blob for blob in cycle_blobs if partition_filter in blob] + cycle_blobs = [ + blob for blob in cycle_blobs if partition_filter in blob + ] # We only have one partition now list_of_partitions = [f"by_{partition_filter_field}"] - get_logger().debug(f"Applied partition filter by: `{partition_filter}`") + get_logger().debug( + f"Applied partition filter by: `{partition_filter}`" + ) else: get_logger().debug( f"Wasn't able to find partition to filter by: `{partition_filter}`" @@ -150,7 +161,9 @@ def get_list_of_blobs(self): f"Ignoring {len(list_of_partitions)} 'by' partitionings, reading from '{chosen_partition}'" ) # Do the pruning - cycle_blobs = [blob for blob in cycle_blobs if f"/{chosen_partition}/" in blob] + cycle_blobs = [ + blob for blob in cycle_blobs if f"/{chosen_partition}/" in blob + ] def safe_get_next(lst, item): try: @@ -165,7 +178,8 @@ def safe_get_next(lst, item): partitioned_folders = {""} else: partitioned_folders = { - safe_get_next(blob.split("/"), chosen_partition) for blob in cycle_blobs + safe_get_next(blob.split("/"), chosen_partition) + for blob in cycle_blobs } for partitioned_folder in partitioned_folders: @@ -177,20 +191,32 @@ def safe_get_next(lst, item): # Work out if there's an as_at part as_ats = { - self._extract_as_at(blob) for blob in partitioned_blobs if "as_at_" in blob + self._extract_as_at(blob) + for blob in partitioned_blobs + if "as_at_" in blob } if as_ats: as_ats = sorted(as_ats) as_at = as_ats.pop() is_complete = lambda blobs: any( - [blob for blob in blobs if as_at + "/frame.complete" in blob] + [ + blob + for blob in blobs + if as_at + "/frame.complete" in blob + ] ) is_invalid = lambda blobs: any( - [blob for blob in blobs if (as_at + "/frame.ignore" in blob)] + [ + blob + for blob in blobs + if (as_at + "/frame.ignore" in blob) + ] ) - while not is_complete(partitioned_blobs) or is_invalid(partitioned_blobs): + while not is_complete(partitioned_blobs) or is_invalid( + partitioned_blobs + ): if not is_complete(partitioned_blobs): get_logger().debug( f"Frame `{partitioned_folder}/{as_at}` is not complete - `frame.complete` file is not present - skipping this frame." diff --git a/mabel/data/readers/internals/cursor.py b/mabel/data/readers/internals/cursor.py index 8d6ab80b..26840fd9 100644 --- a/mabel/data/readers/internals/cursor.py +++ b/mabel/data/readers/internals/cursor.py @@ -39,9 +39,9 @@ def load_cursor(self, cursor): cursor = orjson.loads(cursor) if ( - not "location" in cursor.keys() - or not "map" in cursor.keys() - or not "partition" in cursor.keys() + "location" not in cursor.keys() + or "map" not in cursor.keys() + or "partition" not in cursor.keys() ): raise InvalidCursor(f"Cursor is malformed or corrupted {cursor}") @@ -57,7 +57,9 @@ def load_cursor(self, cursor): blob_map = bitarray() blob_map.frombytes(map_bytes) self.read_blobs = [ - self.readable_blobs[i] for i in range(len(self.readable_blobs)) if blob_map[i] + self.readable_blobs[i] + for i in range(len(self.readable_blobs)) + if blob_map[i] ] def next_blob(self, previous_blob=None): @@ -69,10 +71,14 @@ def next_blob(self, previous_blob=None): if self.partition in self.readable_blobs: return self.partition partition_finder = [ - blob for blob in self.readable_blobs if xxh3_64_intdigest(blob, 0) == self.partition + blob + for blob in self.readable_blobs + if xxh3_64_intdigest(blob, 0) == self.partition ] if len(partition_finder) != 1: - raise ValueError(f"Unable to determine current partition ({self.partition})") + raise ValueError( + f"Unable to determine current partition ({self.partition})" + ) return partition_finder[0] unread = [blob for blob in self.readable_blobs if blob not in self.read_blobs] if len(unread) > 0: @@ -101,7 +107,12 @@ def __getitem__(self, item): if item == "map": blob_map = bitarray( - "".join(["1" if blob in self.read_blobs else "0" for blob in self.readable_blobs]) + "".join( + [ + "1" if blob in self.read_blobs else "0" + for blob in self.readable_blobs + ] + ) ) return blob_map.tobytes().hex() if item == "partition": diff --git a/mabel/data/readers/internals/inline_evaluator.py b/mabel/data/readers/internals/inline_evaluator.py index 07319baf..d302084b 100644 --- a/mabel/data/readers/internals/inline_evaluator.py +++ b/mabel/data/readers/internals/inline_evaluator.py @@ -20,6 +20,7 @@ will perform the function LEFT on the NAME field from the dict and return AGE from the dict """ + import re from mabel.data.readers.internals.inline_functions import FUNCTIONS diff --git a/mabel/data/readers/internals/multiprocess_wrapper.py b/mabel/data/readers/internals/multiprocess_wrapper.py index 94de6bab..2ff7373b 100644 --- a/mabel/data/readers/internals/multiprocess_wrapper.py +++ b/mabel/data/readers/internals/multiprocess_wrapper.py @@ -80,7 +80,9 @@ def _inner_process(func, source_queue, reply_queue): # pragma: no cover def processed_reader(func, items_to_read, support_files): # pragma: no cover if os.name == "nt": # pragma: no cover - raise NotImplementedError("Reader Multi Processing not available on Windows platforms") + raise NotImplementedError( + "Reader Multi Processing not available on Windows platforms" + ) process_pool = [] @@ -116,7 +118,9 @@ def processed_reader(func, items_to_read, support_files): # pragma: no cover ): try: records = reply_queue.get(timeout=1) - yield from map(json, [r for r in lz4.frame.decompress(records).split(b"\n") if r]) + yield from map( + json, [r for r in lz4.frame.decompress(records).split(b"\n") if r] + ) if item_index < len(items_to_read): send_queue.put_nowait(items_to_read[item_index]) item_index += 1 diff --git a/mabel/data/readers/internals/parallel_reader.py b/mabel/data/readers/internals/parallel_reader.py index 186e79e2..4d986d3a 100644 --- a/mabel/data/readers/internals/parallel_reader.py +++ b/mabel/data/readers/internals/parallel_reader.py @@ -21,6 +21,7 @@ │ Reduce │ Aggregate │ └────────────┴────────────────────────────────────────────────────────────┘ """ + from enum import Enum from orso import logging diff --git a/mabel/data/readers/internals/sql_reader.py b/mabel/data/readers/internals/sql_reader.py index 2199ea50..5334e6a2 100644 --- a/mabel/data/readers/internals/sql_reader.py +++ b/mabel/data/readers/internals/sql_reader.py @@ -122,7 +122,9 @@ def validate_dataset(self, dataset): raise InvalidSqlError("Malformed FROM clause - must start with a letter.") # can't be attempting path traversal if ".." in dataset or "//" in dataset or "--" in dataset: - raise InvalidSqlError("Malformed FROM clause - invalid repeated characters.") + raise InvalidSqlError( + "Malformed FROM clause - invalid repeated characters." + ) # can only contain limited character set (alpha num . / - _) if ( not dataset.replace(".", "") @@ -163,7 +165,9 @@ def parse(self, statement): if labeler.next_token_value().upper() == "DISTINCT": labeler.next() self.distinct = True - while labeler.has_next() and labeler.next_token_type() == TOKENS.EMPTY: + while ( + labeler.has_next() and labeler.next_token_type() == TOKENS.EMPTY + ): labeler.next() self.select_expression = self.collector(labeler) elif labeler.peek().upper() == "FROM": @@ -193,7 +197,9 @@ def parse(self, statement): labeler.next() if open_parentheses != 0: - raise InvalidSqlError("Malformed FROM clause - mismatched parenthesis.") + raise InvalidSqlError( + "Malformed FROM clause - mismatched parenthesis." + ) self.dataset = collector else: @@ -288,7 +294,11 @@ def SqlReader(sql_statement: str, **kwargs): # convert the clause into something we can pass to GroupBy if sql.group_by: - groups = [group.strip() for group in sql.group_by.split(",") if group.strip() != ""] + groups = [ + group.strip() + for group in sql.group_by.split(",") + if group.strip() != "" + ] else: groups = ["*"] # we're not really grouping @@ -344,7 +354,9 @@ def _perform_renames(row): if sql.limit: take = int(sql.limit) reader = DictSet( - reader.sort_and_take(column=sql.order_by, take=take, descending=sql.order_descending) + reader.sort_and_take( + column=sql.order_by, take=take, descending=sql.order_descending + ) ) # LIMIT clause diff --git a/mabel/data/readers/reader.py b/mabel/data/readers/reader.py index c61ef60a..a4f07134 100644 --- a/mabel/data/readers/reader.py +++ b/mabel/data/readers/reader.py @@ -147,7 +147,13 @@ class has a decorator which helps to ensure it is called correctly. # - this doesn't replace a proper ACL and permissions model, but can provide # some control if other options are limited or unavailable. if valid_dataset_prefixes: - if not any([True for prefix in valid_dataset_prefixes if str(dataset).startswith(prefix)]): + if not any( + [ + True + for prefix in valid_dataset_prefixes + if str(dataset).startswith(prefix) + ] + ): raise AccessDenied("Access has been denied to this Dataset (prefix).") # lazy loading of dependency - in this case the Google GCS Reader @@ -180,7 +186,9 @@ class has a decorator which helps to ensure it is called correctly. # number of days to walk backwards to find records freshness_limit = parse_delta(kwargs.get("freshness_limit", "")) - if freshness_limit and reader_class.start_date != reader_class.end_date: # pragma: no cover + if ( + freshness_limit and reader_class.start_date != reader_class.end_date + ): # pragma: no cover raise InvalidCombinationError( "freshness_limit can only be used when the start and end dates are the same" ) @@ -241,7 +249,9 @@ def _create_line_reader(self): ): self.reader_class.step_back_a_day() blob_list = self.reader_class.get_list_of_blobs() - if self.freshness_limit < datetime.timedelta(days=self.reader_class.days_stepped_back): + if self.freshness_limit < datetime.timedelta( + days=self.reader_class.days_stepped_back + ): message = f"No data found in last {self.freshness_limit} - aborting ({self.reader_class.dataset})" get_logger().warning(message) raise DataNotFoundError(message) @@ -251,7 +261,9 @@ def _create_line_reader(self): ) # Build lists of blobs we have handlers for, based on the file extensions - supported_blobs = [b for b in blob_list if f".{ b.split('.')[-1] }" in KNOWN_EXTENSIONS] + supported_blobs = [ + b for b in blob_list if f".{b.split('.')[-1]}" in KNOWN_EXTENSIONS + ] readable_blobs = [ b for b in supported_blobs @@ -298,7 +310,9 @@ def _create_line_reader(self): ], ) location = self.cursor.skip_to_cursor(blob_reader) - for self.cursor.location, record in enumerate(blob_reader, start=location): + for self.cursor.location, record in enumerate( + blob_reader, start=location + ): yield record blob_to_read = self.cursor.next_blob(blob_to_read) diff --git a/mabel/data/writers/batch_writer.py b/mabel/data/writers/batch_writer.py index 6f3a3516..b07e9dae 100644 --- a/mabel/data/writers/batch_writer.py +++ b/mabel/data/writers/batch_writer.py @@ -103,25 +103,26 @@ def __init__( self.always_complete = always_complete def finalize(self, **kwargs): - final = super().finalize() has_failure = bool(kwargs.get("has_failure", False)) if has_failure: self.seen_failures = True - get_logger().debug(f"Error found in the stack, not marking frame as complete.") + get_logger().debug( + "Error found in the stack, not marking frame as complete." + ) return -1 if self.seen_failures: get_logger().debug( - f"Error previously seen in the stack, not marking frame as complete." + "Error previously seen in the stack, not marking frame as complete." ) return -1 if self.records == 0 and not self.always_complete: get_logger().warning( - f"No records written, and 'always_complete' not set, so not marking frame as complete." + "No records written, and 'always_complete' not set, so not marking frame as complete." ) return -1 diff --git a/mabel/data/writers/internals/blob_writer.py b/mabel/data/writers/internals/blob_writer.py index 0871a9c4..8d7c6a7a 100644 --- a/mabel/data/writers/internals/blob_writer.py +++ b/mabel/data/writers/internals/blob_writer.py @@ -129,7 +129,9 @@ def _normalize_arrow_schema(self, table, mabel_schema: RelationSchema): if mabel_column and mabel_column.type in type_map: index = table.column_names.index(column) # update the schema - schema = schema.set(index, pyarrow.field(column, type_map[mabel_column.type])) + schema = schema.set( + index, pyarrow.field(column, type_map[mabel_column.type]) + ) # apply the updated schema table = table.cast(target_schema=schema) return table @@ -147,7 +149,7 @@ def commit(self): try: import pyarrow import pyarrow.parquet - except ImportError as err: # pragma: no cover + except ImportError: # pragma: no cover raise MissingDependencyError( "`pyarrow` is missing, please install or include in requirements.txt" ) diff --git a/mabel/data/writers/internals/writer_pool.py b/mabel/data/writers/internals/writer_pool.py index 16dbefb7..71d7fbba 100644 --- a/mabel/data/writers/internals/writer_pool.py +++ b/mabel/data/writers/internals/writer_pool.py @@ -63,7 +63,9 @@ def remove_writer(self, identity): ) else: writer = writers[0] - self.writers = [w for w in self.writers if w.get("identity") != identity] + self.writers = [ + w for w in self.writers if w.get("identity") != identity + ] writer.get("writer").commit() finally: lock.release() diff --git a/mabel/data/writers/stream_writer.py b/mabel/data/writers/stream_writer.py index e158d7f7..02a083b1 100644 --- a/mabel/data/writers/stream_writer.py +++ b/mabel/data/writers/stream_writer.py @@ -147,7 +147,9 @@ def append(self, record: dict): this_identity = identity # do the actual replacing of the placeholders for k, v in zip(placeholders, values): - this_identity = this_identity.replace("{" + k + "}", text.sanitize(str(v))) + this_identity = this_identity.replace( + "{" + k + "}", text.sanitize(str(v)) + ) # get the writer and save the record blob_writer = self.writer_pool.get_writer(this_identity) @@ -186,7 +188,9 @@ def pool_attendant(self): ) self.writer_pool.remove_writer(blob_writer_identity) # if we're over capacity, evict the LRU writers - for blob_writer_identity in self.writer_pool.nominate_writers_to_evict(): + for ( + blob_writer_identity + ) in self.writer_pool.nominate_writers_to_evict(): get_logger().debug( f"Evicting {blob_writer_identity} from the writer pool due the pool being over its {self.writer_pool_capacity} capacity, poolsize={len(self.writer_pool.writers)}" ) diff --git a/mabel/data/writers/writer.py b/mabel/data/writers/writer.py index 57ff9171..3fadf9c8 100644 --- a/mabel/data/writers/writer.py +++ b/mabel/data/writers/writer.py @@ -45,7 +45,9 @@ def __init__( dataset = kwargs.get("dataset", "") if "BACKOUT" in dataset: - InvalidDataSetError("BACKOUT is a reserved word and cannot be used in Dataset names") + InvalidDataSetError( + "BACKOUT is a reserved word and cannot be used in Dataset names" + ) if dataset.endswith("/"): InvalidDataSetError("Dataset names cannot end with /") if "{" in dataset or "}" in dataset: @@ -55,7 +57,9 @@ def __init__( # handle transitional states - use the new features to override the legacy features if kwargs.get("raw_path") is not None: - logger.warning("`raw_path` is being deprecated, set `partitions` to `None` instead.") + logger.warning( + "`raw_path` is being deprecated, set `partitions` to `None` instead." + ) if str(kwargs.get("raw_path", "")).upper() == "TRUE": partitions = None if "{date" in dataset: @@ -104,8 +108,8 @@ def __init__( arg_dict = kwargs.copy() arg_dict["dataset"] = f"{self.dataset}" arg_dict["inner_writer"] = ( - f"{arg_dict.get('inner_writer', type(None)).__name__}" - ) # type:ignore + f"{arg_dict.get('inner_writer', type(None)).__name__}" # type:ignore + ) logger.debug(orjson.dumps(arg_dict)) # add the schema to the writer - pyarrow uses this @@ -156,5 +160,7 @@ def finalize(self, **kwargs): try: return self.blob_writer.commit() except Exception as e: - logger.error(f"{type(self).__name__} failed to close pool: {type(e).__name__} - {e}") + logger.error( + f"{type(self).__name__} failed to close pool: {type(e).__name__} - {e}" + ) raise e diff --git a/mabel/errors/render_error_stack.py b/mabel/errors/render_error_stack.py index 2b3aa5b9..48956fa1 100644 --- a/mabel/errors/render_error_stack.py +++ b/mabel/errors/render_error_stack.py @@ -71,7 +71,9 @@ def _build_error_stack(): is_cause = False while True: - stack = Stack(exc_type=str(exc_type.__name__), exc_value=str(exc_value), is_cause=is_cause) + stack = Stack( + exc_type=str(exc_type.__name__), exc_value=str(exc_value), is_cause=is_cause + ) stacks.append(stack) append = stack.frames.append @@ -97,7 +99,11 @@ def _build_error_stack(): continue cause = exc_value.__context__ - if cause and cause.__traceback__ and not getattr(exc_value, "__suppress_context__", False): + if ( + cause + and cause.__traceback__ + and not getattr(exc_value, "__suppress_context__", False) + ): exc_type = cause.__class__ exc_value = cause traceback = cause.__traceback__ @@ -131,7 +137,7 @@ def _read_from_code(filename: str, line: int, extend_by: int) -> Generator: yield bar_label(path.stem + path.suffix) for line_number in range(start_line, end_line): prefix = ">" if line_number == line else " " - yield f"{prefix}{line_number:4d} | {lines[line_number-1]}" + yield f"{prefix}{line_number:4d} | {lines[line_number - 1]}" line_number += 1 except: return "" @@ -145,7 +151,9 @@ def _render_error_stack(): if frame.filename.startswith("<"): continue yield from _render_locals(frame.locals) - yield from _read_from_code(filename=frame.filename, line=frame.lineno, extend_by=3) + yield from _read_from_code( + filename=frame.filename, line=frame.lineno, extend_by=3 + ) yield bar_label("") yield "" diff --git a/mabel/utils/common.py b/mabel/utils/common.py index 3112b24b..06876212 100644 --- a/mabel/utils/common.py +++ b/mabel/utils/common.py @@ -19,7 +19,9 @@ def read_config(config_file): config = orjson.loads(f.read()) return config except IndexError as e: - raise IndexError(f"Error: {e}, Likely Cause: Config file `{config_file}` not found") + raise IndexError( + f"Error: {e}, Likely Cause: Config file `{config_file}` not found" + ) except ValueError as e: raise ValueError( f"Error: {e}, Likely Cause: Config file `{config_file}` incorrectly formatted" diff --git a/mabel/utils/dates.py b/mabel/utils/dates.py index 61ffe07d..cca278af 100644 --- a/mabel/utils/dates.py +++ b/mabel/utils/dates.py @@ -92,11 +92,13 @@ def parse_iso( if not 10 <= len(value) <= 28: return None val_len = len(value) - if not value[4] in DATE_SEPARATORS or not value[7] in DATE_SEPARATORS: + if value[4] not in DATE_SEPARATORS or value[7] not in DATE_SEPARATORS: return None if val_len == 10: # YYYY-MM-DD - return datetime.datetime(*map(int, [value[:4], value[5:7], value[8:10]])) + return datetime.datetime( + *map(int, [value[:4], value[5:7], value[8:10]]) + ) if val_len >= 16: if not (value[10] in ("T", " ") and value[13] in DATE_SEPARATORS): return None @@ -160,7 +162,9 @@ def date_range( start_date = start_date.replace(minute=0, second=0, microsecond=0) # type:ignore if end_date < start_date: # type:ignore - raise ValueError("date_range: end_date must be the same or later than the start_date ") + raise ValueError( + "date_range: end_date must be the same or later than the start_date " + ) for n in range( int((end_date - start_date).total_seconds() // SECONDS_PER_HOUR) # type:ignore diff --git a/mabel/utils/parameter_validator.py b/mabel/utils/parameter_validator.py index f6425771..7d74ab6f 100644 --- a/mabel/utils/parameter_validator.py +++ b/mabel/utils/parameter_validator.py @@ -25,7 +25,9 @@ def get_levenshtein_distance(word1, word2): for x in range(1, len(word1) + 1): for y in range(1, len(word2) + 1): if word1[x - 1] == word2[y - 1]: - matrix[x][y] = min(matrix[x - 1][y] + 1, matrix[x - 1][y - 1], matrix[x][y - 1] + 1) + matrix[x][y] = min( + matrix[x - 1][y] + 1, matrix[x - 1][y - 1], matrix[x][y - 1] + 1 + ) else: matrix[x][y] = min( matrix[x - 1][y] + 1, matrix[x - 1][y - 1] + 1, matrix[x][y - 1] + 1 @@ -67,7 +69,9 @@ def wrapper(*args, **kwargs): ]: suggestion = [] for valid in valid_parameters: - if get_levenshtein_distance(not_on_list, valid) <= (len(valid) // 2): + if get_levenshtein_distance(not_on_list, valid) <= ( + len(valid) // 2 + ): suggestion.append(valid) if len(suggestion): get_logger().error( @@ -90,9 +94,13 @@ def wrapper(*args, **kwargs): has_errors = True # check for missing required paramters - required_parameters = {item.get("name") for item in my_rules if item.get("required")} + required_parameters = { + item.get("name") for item in my_rules if item.get("required") + } missing_required_parameters = [ - param for param in required_parameters if param not in entered_parameters + param + for param in required_parameters + if param not in entered_parameters ] if len(missing_required_parameters): get_logger().error( @@ -106,7 +114,9 @@ def wrapper(*args, **kwargs): # warnings warninged_parameters = { - item.get("name"): item.get("warning") for item in my_rules if item.get("warning") + item.get("name"): item.get("warning") + for item in my_rules + if item.get("warning") } for parameter, warning in warninged_parameters.items(): if parameter in entered_parameters: @@ -119,7 +129,9 @@ def wrapper(*args, **kwargs): and entered_parameters[param.get("name")] ): toxic = [ - t for t in param["incompatible_with"] if t in entered_parameters.keys() + t + for t in param["incompatible_with"] + if t in entered_parameters.keys() ] if toxic: has_errors = True diff --git a/mabel/utils/paths.py b/mabel/utils/paths.py index 4e923c97..f1ed16cf 100644 --- a/mabel/utils/paths.py +++ b/mabel/utils/paths.py @@ -37,7 +37,7 @@ def build_path(path: str, date: datetime.date = None): if not path: raise ValueError("build_path: path must have a value") - if not path[-1] in ["/"]: + if path[-1] not in ["/"]: # process the path bucket, path_string, filename, extension = get_parts(path) if path_string != "/": diff --git a/mabel/utils/timer.py b/mabel/utils/timer.py index e4384d56..08265f26 100644 --- a/mabel/utils/timer.py +++ b/mabel/utils/timer.py @@ -18,4 +18,6 @@ def __enter__(self): self.start = time.time_ns() def __exit__(self, type, value, traceback): - print("{} took {} seconds".format(self.name, (time.time_ns() - self.start) / 1e9)) + print( + "{} took {} seconds".format(self.name, (time.time_ns() - self.start) / 1e9) + ) diff --git a/mabel/version.py b/mabel/version.py index d0b73bf8..a1d0e704 100644 --- a/mabel/version.py +++ b/mabel/version.py @@ -1,6 +1,6 @@ # Store the version here so: # 1) we don't load dependencies by storing it in __init__.py # 2) we can import it in setup.py for the same reason -__version__ = "0.6.27" +__version__ = "0.6.28" # nodoc - don't add to the documentation wiki diff --git a/tests/NFT/currency.py b/tests/NFT/currency.py index a080c4a1..a00e0210 100644 --- a/tests/NFT/currency.py +++ b/tests/NFT/currency.py @@ -67,7 +67,7 @@ def search_osv(library, version): data = {"version": version, "package": {"name": library, "ecosystem": "PyPI"}} resp = requests.post(url=url, data=json.dumps(data)) return resp.content - except Exception as e: + except Exception: return b"{}" diff --git a/tests/helpers/runner.py b/tests/helpers/runner.py index 1a373c25..a12b65c9 100644 --- a/tests/helpers/runner.py +++ b/tests/helpers/runner.py @@ -40,7 +40,7 @@ def run_tests(): status = "\033[38;2;26;185;67m pass" else: failed += 1 - status = f"\033[38;2;255;121;198m fail" + status = "\033[38;2;255;121;198m fail" time_taken = int((time.monotonic_ns() - start_time) / 1e6) print(f"\033[0;32m{str(time_taken).rjust(8)}ms {status}\033[0m") if error: @@ -51,7 +51,7 @@ def run_tests(): f" \033[38;2;255;121;198m{error.__class__.__name__}\033[0m" + f" {error}\n" + f" \033[38;2;241;250;140m{file_name}\033[0m" - + f"\033[38;2;98;114;164m:\033[0m" + + "\033[38;2;98;114;164m:\033[0m" + f"\033[38;2;26;185;67m{line_number}\033[0m" + f" \033[38;2;98;114;164m{code_line}\033[0m" ) diff --git a/tests/performance/arrow.py b/tests/performance/arrow.py index 3bf4c813..3de6a08c 100644 --- a/tests/performance/arrow.py +++ b/tests/performance/arrow.py @@ -1,6 +1,5 @@ from pyarrow import json import pyarrow.parquet as pq -import pyarrow as pa import time diff --git a/tests/performance/avro_performance.py b/tests/performance/avro_performance.py index 4cef066a..7f9ea841 100644 --- a/tests/performance/avro_performance.py +++ b/tests/performance/avro_performance.py @@ -9,14 +9,13 @@ outcome: stay with the simple JSONL format. """ -import avro.schema -from avro.datafile import DataFileReader, DataFileWriter -from avro.io import DatumReader, DatumWriter +from avro.datafile import DataFileReader +from avro.io import DatumReader import os import sys sys.path.insert(1, os.path.join(sys.path[0], "../..")) -from mabel.data.readers import Reader, FileReader +from mabel.data.readers import FileReader from mabel.data.formats import dictset try: diff --git a/tests/performance/index_hashing_performance.py b/tests/performance/index_hashing_performance.py index 7c9ad0bf..a079c604 100644 --- a/tests/performance/index_hashing_performance.py +++ b/tests/performance/index_hashing_performance.py @@ -1,7 +1,7 @@ -import os, sys +import os +import sys sys.path.insert(1, os.path.join(sys.path[0], "../..")) -from mabel.data import Reader from mabel.data.internals.index import IndexBuilder from orso.logging import get_logger from timer import Timer diff --git a/tests/performance/index_performance.py b/tests/performance/index_performance.py index 442cb6af..e74e0e8e 100644 --- a/tests/performance/index_performance.py +++ b/tests/performance/index_performance.py @@ -10,10 +10,11 @@ ------------------------------- """ -import os, sys +import os +import sys sys.path.insert(1, os.path.join(sys.path[0], "../..")) -from mabel.data.internals.index import Index, IndexBuilder +from mabel.data.internals.index import Index import time @@ -30,7 +31,8 @@ def time_it(dataset, username): return (time.perf_counter_ns() - start) / 1e9 -import os, sys +import os +import sys sys.path.insert(1, os.path.join(sys.path[0], "../..")) from mabel.data import Reader diff --git a/tests/performance/indexing.py b/tests/performance/indexing.py index a4cc644c..ff3011a6 100644 --- a/tests/performance/indexing.py +++ b/tests/performance/indexing.py @@ -4,7 +4,6 @@ import os sys.path.insert(1, os.path.join(sys.path[0], "../..")) -from mabel.adapters.null import NullWriter from orso.logging import get_logger from mabel.data.validator import Schema from mabel.data import BatchWriter, Reader diff --git a/tests/performance/reader_performance.py b/tests/performance/reader_performance.py index 4b5f2bf6..c231f207 100644 --- a/tests/performance/reader_performance.py +++ b/tests/performance/reader_performance.py @@ -7,7 +7,7 @@ def do_read(): sys.path.insert(1, os.path.join(sys.path[0], "../..")) from mabel.adapters.disk import DiskReader - from mabel.data import Reader, SqlReader + from mabel.data import SqlReader # d = Reader(inner_reader=DiskReader, dataset="tests/data/nvd/", raw_path=True) SQL = "SELECT COUNT(*) FROM (SELECT * FROM tests/data/huge GROUP BY cve.CVE_data_meta.ASSIGNER)" diff --git a/tests/performance/validator_performance.py b/tests/performance/validator_performance.py index b6f8d5c8..0042bbf8 100644 --- a/tests/performance/validator_performance.py +++ b/tests/performance/validator_performance.py @@ -8,7 +8,6 @@ """ import time -import pydantic import datetime import statistics from typing import Optional @@ -18,10 +17,8 @@ sys.path.insert(1, os.path.join(sys.path[0], "../..")) from mabel.data.validator import Schema -from mabel.data.formats import dictset from mabel.data.formats.dictset import display -import orjson as json py_tweet = { diff --git a/tests/performance/writer_performance.py b/tests/performance/writer_performance.py index c9400c6b..b9978891 100644 --- a/tests/performance/writer_performance.py +++ b/tests/performance/writer_performance.py @@ -24,10 +24,9 @@ sys.path.insert(1, os.path.join(sys.path[0], "../..")) from mabel.data import BatchWriter from mabel.adapters.null import NullWriter -from mabel.adapters.disk import DiskWriter from orso.logging import get_logger from mabel.data.validator import Schema -from mabel.data.internals import display, dictset +from mabel.data.internals import display import ujson as json diff --git a/tests/storage_tests/disk/test_reader_disk_reader.py b/tests/storage_tests/disk/test_reader_disk_reader.py index 4b0dfbd8..9fcafd21 100644 --- a/tests/storage_tests/disk/test_reader_disk_reader.py +++ b/tests/storage_tests/disk/test_reader_disk_reader.py @@ -87,7 +87,7 @@ def test_disk_binary(): w = BatchWriter( inner_writer=DiskWriter, blob_size=1024, - dataset=f"_temp/test/disk/dataset/binary", + dataset="_temp/test/disk/dataset/binary", schema=["index"], ) for i in range(200): @@ -98,7 +98,7 @@ def test_disk_binary(): # read over both paritions. r = Reader( inner_reader=DiskReader, - dataset=f"_temp/test/disk/dataset/binary", + dataset="_temp/test/disk/dataset/binary", ) l = list(r) @@ -113,7 +113,7 @@ def test_disk_text(): inner_writer=DiskWriter, blob_size=1024, format="jsonl", - dataset=f"_temp/test/gcs/dataset/text", + dataset="_temp/test/gcs/dataset/text", schema=["index"], ) for i in range(250): @@ -124,7 +124,7 @@ def test_disk_text(): # read over both paritions. r = Reader( inner_reader=DiskReader, - dataset=f"_temp/test/gcs/dataset/text", + dataset="_temp/test/gcs/dataset/text", ) l = list(r) diff --git a/tests/storage_tests/disk/test_writer_disk_writer.py b/tests/storage_tests/disk/test_writer_disk_writer.py index 8e7d5596..8bf9a18f 100644 --- a/tests/storage_tests/disk/test_writer_disk_writer.py +++ b/tests/storage_tests/disk/test_writer_disk_writer.py @@ -8,7 +8,6 @@ sys.path.insert(1, os.path.join(sys.path[0], "../../..")) from mabel.adapters.disk import DiskReader, DiskWriter -from mabel.adapters.null import NullWriter from mabel.data import BatchWriter from mabel.data import Reader from mabel.data.internals.dictset import STORAGE_CLASS diff --git a/tests/storage_tests/minio_s3/test_rw_minio.py b/tests/storage_tests/minio_s3/test_rw_minio.py index 3499f214..209ce523 100644 --- a/tests/storage_tests/minio_s3/test_rw_minio.py +++ b/tests/storage_tests/minio_s3/test_rw_minio.py @@ -2,9 +2,8 @@ import sys sys.path.insert(1, os.path.join(sys.path[0], "../../..")) -from mabel.adapters.minio import MinIoWriter, MinIoReader +from mabel.adapters.minio import MinIoWriter from mabel.data import BatchWriter -from mabel.data import Reader from rich import traceback traceback.install() diff --git a/tests/test_data_display.py b/tests/test_data_display.py index 698a3373..68d314bf 100644 --- a/tests/test_data_display.py +++ b/tests/test_data_display.py @@ -2,7 +2,7 @@ import sys sys.path.insert(1, os.path.join(sys.path[0], "..")) -from mabel import Reader, DictSet +from mabel import Reader from mabel.data import STORAGE_CLASS from mabel.data.internals.display import html_table from mabel.adapters.disk import DiskReader diff --git a/tests/test_error_stack.py b/tests/test_error_stack.py index f6b9f439..f027ce17 100644 --- a/tests/test_error_stack.py +++ b/tests/test_error_stack.py @@ -2,7 +2,7 @@ import sys sys.path.insert(1, os.path.join(sys.path[0], "..")) -from mabel.errors.render_error_stack import _build_error_stack, render_error_stack +from mabel.errors.render_error_stack import render_error_stack from rich import traceback traceback.install() diff --git a/tests/test_reader.py b/tests/test_reader.py index df1b2b91..bf0da36a 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -1,10 +1,8 @@ -import pytest import os import sys sys.path.insert(1, os.path.join(sys.path[0], "..")) from mabel import Reader -from mabel.data import STORAGE_CLASS from mabel.adapters.disk import DiskReader from rich import traceback diff --git a/tests/test_reader_inline_functions.py b/tests/test_reader_inline_functions.py index 927552b6..6d4e9704 100644 --- a/tests/test_reader_inline_functions.py +++ b/tests/test_reader_inline_functions.py @@ -3,7 +3,6 @@ import sys sys.path.insert(1, os.path.join(sys.path[0], "..")) -from mabel.data import Reader from rich import traceback from mabel.data.readers.internals import inline_functions diff --git a/tests/test_writer_batch_schema_error.py b/tests/test_writer_batch_schema_error.py index a724fa4e..7341106b 100644 --- a/tests/test_writer_batch_schema_error.py +++ b/tests/test_writer_batch_schema_error.py @@ -5,10 +5,9 @@ import pytest sys.path.insert(1, os.path.join(sys.path[0], "..")) -from mabel.adapters.disk import DiskReader, DiskWriter +from mabel.adapters.disk import DiskWriter from mabel.adapters.null import NullWriter from mabel.data import BatchWriter -from mabel.data import Reader from orso.exceptions import DataValidationError, ExcessColumnsInDataError dataset = { diff --git a/tests/test_writer_data_expectations.py b/tests/test_writer_data_expectations.py index b2cc9a93..15715b37 100644 --- a/tests/test_writer_data_expectations.py +++ b/tests/test_writer_data_expectations.py @@ -3,11 +3,9 @@ import pytest sys.path.insert(1, os.path.join(sys.path[0], "..")) -from mabel.adapters.disk import DiskReader, DiskWriter from mabel.adapters.null import NullWriter -from mabel.data import Reader, Writer +from mabel.data import Writer from rich import traceback -from data_expectations import Expectations from data_expectations.errors import ExpectationNotMetError traceback.install() diff --git a/tests/test_writer_database_writer.py b/tests/test_writer_database_writer.py index d3333cf6..46e22d88 100644 --- a/tests/test_writer_database_writer.py +++ b/tests/test_writer_database_writer.py @@ -3,7 +3,6 @@ sys.path.insert(1, os.path.join(sys.path[0], "..")) from mabel.data import DatabaseWriter -from mabel.data import Reader from rich import traceback from mabel.adapters.database import NullWriter diff --git a/tests/test_writer_stream_writer_substitutions.py b/tests/test_writer_stream_writer_substitutions.py index 1e1f35d2..e18bd3e4 100644 --- a/tests/test_writer_stream_writer_substitutions.py +++ b/tests/test_writer_stream_writer_substitutions.py @@ -1,7 +1,6 @@ import os import sys -import orjson sys.path.insert(1, os.path.join(sys.path[0], "..")) from mabel.adapters.null import NullWriter