diff --git a/src/stratis_cli/_actions/_formatting.py b/src/stratis_cli/_actions/_formatting.py index a40b17cb9..ffd780e66 100644 --- a/src/stratis_cli/_actions/_formatting.py +++ b/src/stratis_cli/_actions/_formatting.py @@ -17,6 +17,7 @@ # isort: STDLIB import sys +from functools import wraps from typing import Any, Callable, List, Optional from uuid import UUID @@ -24,6 +25,9 @@ from dbus import Struct from wcwidth import wcswidth +# isort: FIRSTPARTY +from dbus_client_gen import DbusClientMissingPropertyError + # placeholder for tables where a desired value was not obtained from stratisd # when the value should be supported. TABLE_FAILURE_STRING = "FAILURE" @@ -160,3 +164,23 @@ def get_uuid_formatter(unhyphenated: bool) -> Callable: return ( (lambda u: UUID(str(u)).hex) if unhyphenated else (lambda u: str(UUID(str(u)))) ) + + +def catch_missing_property( + prop_to_str: Callable[[Any], Any], default: Any +) -> Callable[[Any], Any]: + """ + Return a function to just return a default if a property is missing. + """ + + @wraps(prop_to_str) + def inner(mo: Any) -> str: + """ + Catch the exception and return a default + """ + try: + return prop_to_str(mo) + except DbusClientMissingPropertyError: # pragma: no cover + return default + + return inner diff --git a/src/stratis_cli/_actions/_list_filesystem.py b/src/stratis_cli/_actions/_list_filesystem.py index 972256d37..c9b5a6261 100644 --- a/src/stratis_cli/_actions/_list_filesystem.py +++ b/src/stratis_cli/_actions/_list_filesystem.py @@ -28,7 +28,9 @@ from ._constants import TOP_OBJECT from ._formatting import ( TABLE_FAILURE_STRING, + TABLE_UNKNOWN_STRING, TOTAL_USED_FREE, + catch_missing_property, get_property, print_table, ) @@ -152,17 +154,37 @@ def filesystem_size_quartet( ) return f"{triple_str} / {limit}" + pool_name_func = catch_missing_property( + lambda mo: self.pool_object_path_to_pool_name.get( + mo.Pool(), TABLE_UNKNOWN_STRING + ), + TABLE_UNKNOWN_STRING, + ) + name_func = catch_missing_property( + lambda mofs: mofs.Name(), TABLE_UNKNOWN_STRING + ) + size_func = catch_missing_property( + lambda mofs: filesystem_size_quartet( + Range(mofs.Size()), + get_property(mofs.Used(), Range, None), + get_property(mofs.SizeLimit(), Range, None), + ), + TABLE_UNKNOWN_STRING, + ) + devnode_func = catch_missing_property( + lambda mofs: mofs.Devnode(), TABLE_UNKNOWN_STRING + ) + uuid_func = catch_missing_property( + lambda mofs: self.uuid_formatter(mofs.Uuid()), TABLE_UNKNOWN_STRING + ) + tables = [ ( - self.pool_object_path_to_pool_name[mofilesystem.Pool()], - mofilesystem.Name(), - filesystem_size_quartet( - Range(mofilesystem.Size()), - get_property(mofilesystem.Used(), Range, None), - get_property(mofilesystem.SizeLimit(), Range, None), - ), - mofilesystem.Devnode(), - self.uuid_formatter(mofilesystem.Uuid()), + pool_name_func(mofilesystem), + name_func(mofilesystem), + size_func(mofilesystem), + devnode_func(mofilesystem), + uuid_func(mofilesystem), ) for mofilesystem in self.filesystems_with_props ] diff --git a/src/stratis_cli/_actions/_list_pool.py b/src/stratis_cli/_actions/_list_pool.py index 2d129d93a..510cd2ba0 100644 --- a/src/stratis_cli/_actions/_list_pool.py +++ b/src/stratis_cli/_actions/_list_pool.py @@ -36,6 +36,7 @@ PoolAllocSpaceAlert, PoolDeviceSizeChangeAlert, PoolEncryptionAlert, + PoolMaintenanceAlert, ) from .._constants import PoolId from .._errors import StratisCliResourceNotFoundError @@ -44,7 +45,9 @@ from ._constants import TOP_OBJECT from ._formatting import ( TABLE_FAILURE_STRING, + TABLE_UNKNOWN_STRING, TOTAL_USED_FREE, + catch_missing_property, get_property, print_table, ) @@ -59,24 +62,6 @@ ) -def _metadata_version(mopool: Any) -> MetadataVersion | None: - try: - return MetadataVersion(int(mopool.MetadataVersion())) - except ValueError: # pragma: no cover - return None - - -def _volume_key_loaded(mopool: Any) -> tuple[bool, bool] | tuple[bool, str]: - """ - The string result is an error message indicating that the volume key - state is unknown. - """ - result = mopool.VolumeKeyLoaded() - if isinstance(result, int): - return (True, bool(result)) - return (False, str(result)) # pragma: no cover - - # This method is only used with legacy pools def _non_existent_or_inconsistent_to_str( value: EncryptionInfo | None, @@ -150,79 +135,16 @@ def __str__(self) -> str: ) -class DefaultAlerts: # pylint: disable=too-few-public-methods +class DeviceSizeChangedAlerts: # pylint: disable=too-few-public-methods """ - Alerts to display for a started pool. + Calculate alerts for changed devices; requires searching among devices. """ - def __init__(self, devs: Iterable[tuple[Any, Mapping[str, Mapping[str, Any]]]]): - """ - The initializer. - - :param devs: result of GetManagedObjects - """ - (self.increased, self.decreased) = DefaultAlerts._pools_with_changed_devs(devs) - - def alert_codes(self, pool_object_path: str, mopool: Any) -> List[PoolAlertType]: - """ - Return alert code objects for a pool. - - :param mopool: object to access pool properties - - :returns: list of PoolAlertType - """ - action_availability = PoolActionAvailability[str(mopool.AvailableActions())] - availability_alerts = action_availability.pool_maintenance_alerts() - - no_alloc_space_alerts = ( - [PoolAllocSpaceAlert.NO_ALLOC_SPACE] if mopool.NoAllocSpace() else [] - ) - - device_size_changed_alerts = DefaultAlerts._from_sets( - pool_object_path, self.increased, self.decreased - ) - - metadata_version = _metadata_version(mopool) - - (vkl_is_bool, volume_key_loaded) = _volume_key_loaded(mopool) - - pool_encryption_alerts = ( - [PoolEncryptionAlert.VOLUME_KEY_NOT_LOADED] - if metadata_version is MetadataVersion.V2 - and mopool.Encrypted() - and vkl_is_bool - and not volume_key_loaded - else [] - ) + ( - [PoolEncryptionAlert.VOLUME_KEY_STATUS_UNKNOWN] - if metadata_version is MetadataVersion.V2 - and mopool.Encrypted() - and not vkl_is_bool - else [] - ) - - return ( - availability_alerts - + no_alloc_space_alerts - + device_size_changed_alerts - + pool_encryption_alerts - ) - - @staticmethod - def _pools_with_changed_devs( - devs_to_search: Iterable[tuple[Any, Mapping[str, Mapping[str, Any]]]] - ) -> tuple[set[str], set[str]]: + def __init__( + self, devs_to_search: Iterable[tuple[Any, Mapping[str, Mapping[str, Any]]]] + ): """ - Returns a tuple of sets containing (1) pools that have a device that - has increased in size and (2) pools that have a device that has - decreased in size. - - A pool may occupy both sets if one device has increased and one has - decreased. - - :param devs_to_search: an iterable of device objects - :returns: a pair of sets - :rtype: tuple of (set of ObjectPath) + Initializer. """ # pylint: disable=import-outside-toplevel from ._data import MODev @@ -237,33 +159,25 @@ def _pools_with_changed_devs( if observed_size < size: # pragma: no cover decreased.add(modev.Pool()) - return (increased, decreased) + (self.increased, self.decreased) = (increased, decreased) - @staticmethod - def _from_sets( - pool_object_path: str, increased: set[str], decreased: set[str] - ) -> List[PoolDeviceSizeChangeAlert]: + def alert_codes(self, pool_object_path: str) -> List[PoolDeviceSizeChangeAlert]: """ Get the code from sets and one pool object path. :param pool_object_path: the pool object path - :param increased: pools that have devices that have increased in size - :type increased: set of object path - :param decreased: pools that have devices that have decrease in size - :type increased: set of object path - :returns: the codes """ if ( - pool_object_path in increased and pool_object_path in decreased + pool_object_path in self.increased and pool_object_path in self.decreased ): # pragma: no cover return [ PoolDeviceSizeChangeAlert.DEVICE_SIZE_INCREASED, PoolDeviceSizeChangeAlert.DEVICE_SIZE_DECREASED, ] - if pool_object_path in increased: # pragma: no cover + if pool_object_path in self.increased: # pragma: no cover return [PoolDeviceSizeChangeAlert.DEVICE_SIZE_INCREASED] - if pool_object_path in decreased: # pragma: no cover + if pool_object_path in self.decreased: # pragma: no cover return [PoolDeviceSizeChangeAlert.DEVICE_SIZE_DECREASED] return [] @@ -320,11 +234,82 @@ def display(self): """ -class Default(ListPool): # pylint: disable=too-few-public-methods +class Default(ListPool): """ Handle listing the pools that are listed by default. """ + @staticmethod + def metadata_version(mopool: Any) -> MetadataVersion | None: + """ + Return the metadata version, dealing with the possibility that it + might be an error string. + """ + try: + return MetadataVersion(int(mopool.MetadataVersion())) + except ValueError: # pragma: no cover + return None + + @staticmethod + def _volume_key_loaded(mopool: Any) -> tuple[bool, bool] | tuple[bool, str]: + """ + The string result is an error message indicating that the volume key + state is unknown. + """ + result = mopool.VolumeKeyLoaded() + if isinstance(result, int): + return (True, bool(result)) + return (False, str(result)) # pragma: no cover + + @staticmethod + def alert_codes( + mopool: Any, + ) -> List[PoolEncryptionAlert | PoolAllocSpaceAlert | PoolMaintenanceAlert]: + """ + Return alert code objects for a pool. + + :param mopool: object to access pool properties + + :returns: list of alerts obtainable from GetManagedObjects properties + """ + action_availability = PoolActionAvailability[str(mopool.AvailableActions())] + availability_alerts = action_availability.pool_maintenance_alerts() + + no_alloc_space_alerts = ( + [PoolAllocSpaceAlert.NO_ALLOC_SPACE] if mopool.NoAllocSpace() else [] + ) + + metadata_version = Default.metadata_version(mopool) + + (vkl_is_bool, volume_key_loaded) = Default._volume_key_loaded(mopool) + + pool_encryption_alerts = ( + [PoolEncryptionAlert.VOLUME_KEY_NOT_LOADED] + if metadata_version is MetadataVersion.V2 + and mopool.Encrypted() + and vkl_is_bool + and not volume_key_loaded + else [] + ) + ( + [PoolEncryptionAlert.VOLUME_KEY_STATUS_UNKNOWN] + if metadata_version is MetadataVersion.V2 + and mopool.Encrypted() + and not vkl_is_bool + else [] + ) + + return availability_alerts + no_alloc_space_alerts + pool_encryption_alerts + + @staticmethod + def size_triple(mopool: Any) -> SizeTriple: + """ + Calculate SizeTriple from size information. + """ + return SizeTriple( + Range(mopool.TotalPhysicalSize()), + get_property(mopool.TotalPhysicalUsed(), Range, None), + ) + class DefaultDetail(Default): # pylint: disable=too-few-public-methods """ @@ -342,7 +327,7 @@ def __init__(self, uuid_formatter: Callable[[str | UUID], str], selection: PoolI self.selection = selection def _print_detail_view( - self, pool_object_path: str, mopool: Any, alerts: DefaultAlerts + self, pool_object_path: str, mopool: Any, alerts: DeviceSizeChangedAlerts ): # pylint: disable=too-many-locals """ Print the detailed view for a single pool. @@ -350,22 +335,23 @@ def _print_detail_view( :param UUID uuid: the pool uuid :param pool_object_path: object path of the pool :param MOPool mopool: properties of the pool - :param DefaultAlerts alerts: pool alerts + :param DeviceSizeChangedAlerts alerts: pool alerts """ encrypted = mopool.Encrypted() print(f"UUID: {self.uuid_formatter(mopool.Uuid())}") print(f"Name: {mopool.Name()}") - alert_summary = [ + alert_summary = sorted( f"{code}: {code.summarize()}" - for code in alerts.alert_codes(pool_object_path, mopool) - ] + for code in alerts.alert_codes(pool_object_path) + + Default.alert_codes(mopool) + ) print(f"Alerts: {len(alert_summary)}") for line in alert_summary: # pragma: no cover print(f" {line}") - metadata_version = _metadata_version(mopool) + metadata_version = Default.metadata_version(mopool) print(f"Metadata Version: {metadata_version}") @@ -422,10 +408,7 @@ def _print_detail_view( else: print("Encryption Enabled: No") - size_triple = SizeTriple( - Range(mopool.TotalPhysicalSize()), - get_property(mopool.TotalPhysicalUsed(), Range, None), - ) + size_triple = Default.size_triple(mopool) print(f"Fully Allocated: {'Yes' if mopool.NoAllocSpace() else 'No'}") print(f" Size: {size_triple.total()}") @@ -452,7 +435,7 @@ def display(self): .search(managed_objects) ) - alerts = DefaultAlerts( + alerts = DeviceSizeChangedAlerts( devs(props={"Pool": pool_object_path}).search(managed_objects) ) @@ -472,7 +455,7 @@ def __init__(self, uuid_formatter: Callable[[str | UUID], str]): """ self.uuid_formatter = uuid_formatter - def display(self): + def display(self): # pylint: disable=too-many-locals """ List pools in table view. """ @@ -494,10 +477,7 @@ def physical_size_triple(mopool: Any) -> str: :returns: a string to display in the resulting list output :rtype: str """ - size_triple = SizeTriple( - Range(mopool.TotalPhysicalSize()), - get_property(mopool.TotalPhysicalUsed(), Range, None), - ) + size_triple = Default.size_triple(mopool) return " / ".join( ( @@ -531,7 +511,7 @@ def gen_string(has_property: bool, code: str) -> str: """ return (" " if has_property else "~") + code - metadata_version = _metadata_version(mopool) + metadata_version = Default.metadata_version(mopool) props_list = [ (metadata_version in (MetadataVersion.V1, None), "Le"), @@ -543,23 +523,37 @@ def gen_string(has_property: bool, code: str) -> str: managed_objects = ObjectManager.Methods.GetManagedObjects(proxy, {}) - alerts = DefaultAlerts(devs().search(managed_objects)) + alerts = DeviceSizeChangedAlerts(devs().search(managed_objects)) pools_with_props = [ (objpath, MOPool(info)) for objpath, info in pools().search(managed_objects) ] + name_func = catch_missing_property(lambda mo: mo.Name(), TABLE_UNKNOWN_STRING) + size_func = catch_missing_property(physical_size_triple, TABLE_UNKNOWN_STRING) + properties_func = catch_missing_property( + properties_string, TABLE_UNKNOWN_STRING + ) + uuid_func = catch_missing_property( + lambda mo: self.uuid_formatter(mo.Uuid()), TABLE_UNKNOWN_STRING + ) + + def alert_func(pool_object_path: str, mopool: Any) -> List[PoolAlertType]: + """ + Combined alert codes. + """ + return catch_missing_property( + Default.alert_codes, default=[TABLE_UNKNOWN_STRING] + )(mopool) + alerts.alert_codes(pool_object_path) + tables = [ ( - mopool.Name(), - physical_size_triple(mopool), - properties_string(mopool), - self.uuid_formatter(mopool.Uuid()), + name_func(mopool), + size_func(mopool), + properties_func(mopool), + uuid_func(mopool), ", ".join( - sorted( - str(code) - for code in alerts.alert_codes(pool_object_path, mopool) - ) + sorted(str(code) for code in alert_func(pool_object_path, mopool)) ), ) for (pool_object_path, mopool) in pools_with_props diff --git a/src/stratis_cli/_actions/_physical.py b/src/stratis_cli/_actions/_physical.py index 86a7ec2a5..473af01a4 100644 --- a/src/stratis_cli/_actions/_physical.py +++ b/src/stratis_cli/_actions/_physical.py @@ -17,6 +17,7 @@ # isort: STDLIB from argparse import Namespace +from typing import Any # isort: THIRDPARTY from justbytes import Range @@ -26,6 +27,7 @@ from ._constants import TOP_OBJECT from ._formatting import ( TABLE_UNKNOWN_STRING, + catch_missing_property, get_property, get_uuid_formatter, print_table, @@ -79,7 +81,7 @@ def list_devices(namespace: Namespace): # pylint: disable=too-many-locals ).search(managed_objects) ) - def paths(modev): + def paths(modev: Any) -> str: """ Return () if they are different, otherwise, just . @@ -100,7 +102,7 @@ def paths(modev): else f"{physical_path} ({metadata_path})" ) - def size(modev): + def size_str(modev: Any) -> str: """ Return in-use size (observed size) if they are different, otherwise just in-use size. @@ -113,24 +115,35 @@ def size(modev): else f"{in_use_size} ({observed_size})" ) - def tier_str(value): + def tier_str(modev: Any) -> str: """ String representation of a tier. """ try: - return str(BlockDevTiers(value)) + return str(BlockDevTiers(modev.Tier())) except ValueError: # pragma: no cover return TABLE_UNKNOWN_STRING format_uuid = get_uuid_formatter(namespace.unhyphenated_uuids) + pool_name_func = catch_missing_property( + lambda mo: path_to_name.get(mo.Pool(), TABLE_UNKNOWN_STRING), + TABLE_UNKNOWN_STRING, + ) + paths_func = catch_missing_property(paths, TABLE_UNKNOWN_STRING) + size_func = catch_missing_property(size_str, TABLE_UNKNOWN_STRING) + tier_func = catch_missing_property(tier_str, TABLE_UNKNOWN_STRING) + uuid_func = catch_missing_property( + lambda modev: format_uuid(modev.Uuid()), TABLE_UNKNOWN_STRING + ) + tables = [ [ - path_to_name.get(modev.Pool(), TABLE_UNKNOWN_STRING), - paths(modev), - size(modev), - tier_str(modev.Tier()), - format_uuid(modev.Uuid()), + pool_name_func(modev), + paths_func(modev), + size_func(modev), + tier_func(modev), + uuid_func(modev), ] for modev in modevs ]