diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index afbe22f..5c485f8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,10 +15,11 @@ jobs: - uses: actions/checkout@v7 - uses: astral-sh/setup-uv@v7 with: - python-version: "3.12" + python-version: "3.10" - run: uv sync --all-extras --dev - run: uv run ruff check - run: uv run ruff format --check + - run: uv run pyrefly check docs: runs-on: ubuntu-latest @@ -26,7 +27,7 @@ jobs: - uses: actions/checkout@v7 - uses: astral-sh/setup-uv@v7 with: - python-version: "3.12" + python-version: "3.10" - run: uv sync --all-extras --dev - run: uv run sphinx-build -W -b html docs public - uses: actions/upload-pages-artifact@v5 @@ -75,7 +76,7 @@ jobs: - uses: actions/checkout@v7 - uses: astral-sh/setup-uv@v7 with: - python-version: "3.12" + python-version: "3.10" - run: uv sync --all-extras --dev - run: uv run coveralls --finish --service=github env: @@ -96,7 +97,7 @@ jobs: - uses: actions/checkout@v7 - uses: astral-sh/setup-uv@v7 with: - python-version: "3.12" + python-version: "3.10" - run: uv sync --all-extras --dev - run: uv build - run: uv publish --trusted-publishing always diff --git a/monitorcontrol/monitorcontrol.py b/monitorcontrol/monitorcontrol.py index 8166a76..d06d184 100644 --- a/monitorcontrol/monitorcontrol.py +++ b/monitorcontrol/monitorcontrol.py @@ -705,6 +705,7 @@ def _convert_to_dict(caps_str: str) -> dict: else: d = result_dict for g in group: + assert g is not None d = d[g] d[val] = {} prev_val = val @@ -716,7 +717,8 @@ def _parse_capabilities(caps_str: str) -> dict: """ Converts the capabilities string into a nice dict """ - caps_dict = { + + caps_dict: dict[str, str | list | dict[int, dict]] = { # Used to specify the protocol class "prot": "", # Identifies the type of display @@ -724,10 +726,10 @@ def _parse_capabilities(caps_str: str) -> dict: # The display model number "model": "", # A list of supported VCP codes. Somehow not the same as "vcp" - "cmds": "", + "cmds": {}, # A list of supported VCP codes with a list of supported values # for each nc code - "vcp": "", + "vcp": {}, # undocumented "mswhql": "", # undocumented @@ -741,12 +743,13 @@ def _parse_capabilities(caps_str: str) -> dict: # Alternate name to be used for control "vcpname": "", # Parsed input sources into text. Not part of capabilities string. - "inputs": "", + "inputs": [], # Parsed color presets into text. Not part of capabilities string. "color_presets": "", } for key in caps_dict: + # The "cmds" and "vcp" caps can be a mapping if key in ["cmds", "vcp"]: caps_dict[key] = _convert_to_dict(_extract_a_cap(caps_str, key)) else: @@ -754,6 +757,10 @@ def _parse_capabilities(caps_str: str) -> dict: # Parse the input sources into a text list for readability input_source_cap = vcp_codes.input_select.value + + # Put this check here to appease the type checker + if isinstance(caps_dict["vcp"], str): + raise ValueError("VCP capabilities dictionary is the wrong type!") if input_source_cap in caps_dict["vcp"]: caps_dict["inputs"] = [] input_val_list = list(caps_dict["vcp"][input_source_cap].keys()) diff --git a/monitorcontrol/vcp/vcp_abc.py b/monitorcontrol/vcp/vcp_abc.py index 7b0a1c5..485b17a 100644 --- a/monitorcontrol/vcp/vcp_abc.py +++ b/monitorcontrol/vcp/vcp_abc.py @@ -1,6 +1,8 @@ import abc from types import TracebackType -from typing import Optional, Tuple, Type +from typing import Generic, Optional, Tuple, Type, TypeVar + +T = TypeVar("T") class VCPError(Exception): @@ -21,9 +23,9 @@ class VCPPermissionError(VCPError): pass -class VCP(abc.ABC): +class VCP(Generic[T], abc.ABC): @abc.abstractmethod - def __enter__(self): + def __enter__(self) -> T: pass @abc.abstractmethod @@ -64,3 +66,7 @@ def get_vcp_feature(self, code: int) -> Tuple[int, int]: VCPError: Failed to get VCP feature. """ pass + + @abc.abstractmethod + def get_vcp_capabilities(self) -> str: + pass diff --git a/monitorcontrol/vcp/vcp_linux.py b/monitorcontrol/vcp/vcp_linux.py index 983bd37..9f82f0a 100644 --- a/monitorcontrol/vcp/vcp_linux.py +++ b/monitorcontrol/vcp/vcp_linux.py @@ -55,7 +55,7 @@ def __init__(self, bus_number: int): """ self.logger = logging.getLogger(__name__) self.bus_number = bus_number - self.fd: Optional[str] = None + self.fd: Optional[int] = None self.fp: str = f"/dev/i2c-{self.bus_number}" # time of last feature set call self.last_set: Optional[float] = None @@ -64,7 +64,7 @@ def __enter__(self): def cleanup(fd: Optional[int]): if fd is not None: try: - os.close(self.fd) + os.close(fd) except OSError: pass @@ -90,7 +90,8 @@ def __exit__( exception_traceback: Optional[TracebackType], ) -> Optional[bool]: try: - os.close(self.fd) + if self.fd: + os.close(self.fd) except OSError as e: raise VCPIOError("unable to close descriptor") from e self.fd = None @@ -344,7 +345,7 @@ def rate_limt(self): if self.last_set is None: return - rate_delay = self.CMD_RATE - time.time() - self.last_set + rate_delay = self.CMD_RATE - (time.time() - self.last_set) if rate_delay > 0: time.sleep(rate_delay) @@ -359,11 +360,14 @@ def read_bytes(self, num_bytes: int) -> bytes: VCPIOError: unable to read data """ try: - return os.read(self.fd, num_bytes) + if self.fd: + return os.read(self.fd, num_bytes) + else: + raise VCPIOError("unable read from I2C bus: no open file descriptor") except OSError as e: raise VCPIOError("unable to read from I2C bus") from e - def write_bytes(self, data: bytes): + def write_bytes(self, data: bytes | bytearray): """ Writes bytes to the I2C bus. @@ -374,22 +378,29 @@ def write_bytes(self, data: bytes): VCPIOError: unable to write data """ try: - os.write(self.fd, data) + if self.fd: + os.write(self.fd, data) + else: + raise VCPIOError( + "unable write to I2C bus: no open file descriptor found" + ) except OSError as e: raise VCPIOError("unable write to I2C bus") from e -def get_vcps() -> List[LinuxVCP]: +def get_vcps() -> List[VCP]: """ Interrogates I2C buses to determine if they are DDC-CI capable. Returns: List of all VCPs detected. """ - vcps = [] + vcps: List[VCP] = [] # iterate I2C devices for device in pyudev.Context().list_devices(subsystem="i2c"): + if device.sys_number is None: + continue vcp = LinuxVCP(device.sys_number) try: with vcp: diff --git a/monitorcontrol/vcp/vcp_windows.py b/monitorcontrol/vcp/vcp_windows.py index c15abf1..d03f944 100644 --- a/monitorcontrol/vcp/vcp_windows.py +++ b/monitorcontrol/vcp/vcp_windows.py @@ -16,6 +16,14 @@ WCHAR, ) +# Move some aliases here so we can have our type +# ignoring in one place + +# pyrefly: ignore[missing-attribute] +c_windll = ctypes.windll +# pyrefly: ignore[missing-attribute] +c_formaterror = ctypes.FormatError + # structure type for a physical monitor class PhysicalMonitor(ctypes.Structure): @@ -37,11 +45,12 @@ def __init__(self, handle: HANDLE, description: str): description: Text description of the physical monitor. """ self.logger = logging.getLogger(__name__) - self.handle = handle + self.handle_p = handle + self.handle = int(handle) self.description = description def __del__(self): - WindowsVCP._destroy_physical_monitor(self.handle) + WindowsVCP._destroy_physical_monitor(self.handle_p) def __enter__(self): pass @@ -70,10 +79,10 @@ def set_vcp_feature(self, code: int, value: int): extra=dict(code=code, value=value), ) try: - if not ctypes.windll.dxva2.SetVCPFeature( - HANDLE(self.handle), BYTE(code), DWORD(value) + if not c_windll.dxva2.SetVCPFeature( + HANDLE(int(self.handle)), BYTE(code), DWORD(value) ): - raise VCPError("failed to set VCP feature: " + ctypes.FormatError()) + raise VCPError("failed to set VCP feature: " + c_formaterror()) except OSError as e: raise VCPError("failed to close handle") from e @@ -97,14 +106,14 @@ def get_vcp_feature(self, code: int) -> Tuple[int, int]: extra=dict(code=code), ) try: - if not ctypes.windll.dxva2.GetVCPFeatureAndVCPFeatureReply( + if not c_windll.dxva2.GetVCPFeatureAndVCPFeatureReply( HANDLE(self.handle), BYTE(code), None, ctypes.byref(feature_current), ctypes.byref(feature_max), ): - raise VCPError("failed to get VCP feature: " + ctypes.FormatError()) + raise VCPError("failed to get VCP feature: " + c_formaterror()) except OSError as e: raise VCPError("failed to get VCP feature") from e self.logger.debug( @@ -131,20 +140,16 @@ def get_vcp_capabilities(self): cap_length = DWORD() self.logger.debug("GetCapabilitiesStringLength") try: - if not ctypes.windll.dxva2.GetCapabilitiesStringLength( + if not c_windll.dxva2.GetCapabilitiesStringLength( HANDLE(self.handle), ctypes.byref(cap_length) ): - raise VCPError( - "failed to get VCP capabilities: " + ctypes.FormatError() - ) + raise VCPError("failed to get VCP capabilities: " + c_formaterror()) cap_string = (ctypes.c_char * cap_length.value)() self.logger.debug("CapabilitiesRequestAndCapabilitiesReply") - if not ctypes.windll.dxva2.CapabilitiesRequestAndCapabilitiesReply( + if not c_windll.dxva2.CapabilitiesRequestAndCapabilitiesReply( HANDLE(self.handle), cap_string, cap_length ): - raise VCPError( - "failed to get VCP capabilities: " + ctypes.FormatError() - ) + raise VCPError("failed to get VCP capabilities: " + c_formaterror()) except OSError as e: raise VCPError("failed to get VCP capabilities") from e return cap_string.value.decode("ascii") @@ -167,7 +172,7 @@ def _get_hmonitors() -> List[HMONITOR]: """ Calls the Windows `EnumDisplayMonitors` API in Python-friendly form. """ - hmonitors = [] # type: List[HMONITOR] + hmonitors: List[HMONITOR] = [] try: def _callback(hmonitor, hdc, lprect, lparam): @@ -175,11 +180,12 @@ def _callback(hmonitor, hdc, lprect, lparam): del hmonitor, hdc, lprect, lparam return True # continue enumeration + # pyrefly: ignore[missing-attribute] MONITORENUMPROC = ctypes.WINFUNCTYPE( # noqa: N806 BOOL, HMONITOR, HDC, ctypes.POINTER(RECT), LPARAM ) callback = MONITORENUMPROC(_callback) - if not ctypes.windll.user32.EnumDisplayMonitors(0, 0, callback, 0): + if not c_windll.user32.EnumDisplayMonitors(0, 0, callback, 0): raise VCPError("Call to EnumDisplayMonitors failed") except OSError as e: raise VCPError("failed to enumerate VCPs") from e @@ -194,12 +200,13 @@ def _physical_monitors_from_hmonitor( """ num_physical = DWORD() try: - if not ctypes.windll.dxva2.GetNumberOfPhysicalMonitorsFromHMONITOR( + # pyrefly: ignore[missing-attribute] + if not c_windll.dxva2.GetNumberOfPhysicalMonitorsFromHMONITOR( hmonitor, ctypes.byref(num_physical) ): raise VCPError( "Call to GetNumberOfPhysicalMonitorsFromHMONITOR failed: " - + ctypes.FormatError() + + c_formaterror() ) except OSError as e: raise VCPError( @@ -208,17 +215,16 @@ def _physical_monitors_from_hmonitor( physical_monitors = (PhysicalMonitor * num_physical.value)() try: - if not ctypes.windll.dxva2.GetPhysicalMonitorsFromHMONITOR( + if not c_windll.dxva2.GetPhysicalMonitorsFromHMONITOR( hmonitor, num_physical.value, physical_monitors ): raise VCPError( - "Call to GetPhysicalMonitorsFromHMONITOR failed: " - + ctypes.FormatError() + "Call to GetPhysicalMonitorsFromHMONITOR failed: " + c_formaterror() ) except OSError as e: raise VCPError("failed to open physical monitor handle") from e return ( - [physical_monitor.handle, physical_monitor.description] + (physical_monitor.handle, physical_monitor.description) for physical_monitor in physical_monitors ) @@ -228,9 +234,9 @@ def _destroy_physical_monitor(handle: HANDLE) -> None: Calls the Windows `DestroyPhysicalMonitor` API in Python-friendly form. """ try: - if not ctypes.windll.dxva2.DestroyPhysicalMonitor(handle): + if not c_windll.dxva2.DestroyPhysicalMonitor(handle): raise VCPError( - "Call to DestroyPhysicalMonitor failed: " + ctypes.FormatError() + "Call to DestroyPhysicalMonitor failed: " + c_formaterror() ) except OSError as e: raise VCPError("failed to close handle") from e diff --git a/pyproject.toml b/pyproject.toml index 06245c6..b46004f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,9 +10,17 @@ dependencies = [ "pyudev>=0.23.3 ; sys_platform != 'win32'", ] +[tool.pyrefly] +project-includes = ["**/*"] +project-excludes = [ + "**/__pycache__", + "**/*venv/**/*", +] + [dependency-groups] dev = [ "coveralls>=4.0.1", + "pyrefly>=v0.37.0", "pytest>=8.3.4", "pytest-cov>=6.0.0", "ruff>=0.11.11", @@ -34,6 +42,12 @@ monitorcontrol = "monitorcontrol.__main__:main" requires = ["setuptools>=77"] build-backend = "setuptools.build_meta" +[tool.coverage.report] +exclude_also = [ + # abstract method bodies are never executed + "@(abc\\.)?abstractmethod", +] + [tool.ruff.lint] extend-select = [ # flake8-builtins diff --git a/tests/test_cli.py b/tests/test_cli.py index 327a100..6cd5c97 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,6 @@ from .test_monitorcontrol import UnitTestVCP -from monitorcontrol import Monitor import monitorcontrol.__main__ +from monitorcontrol import Monitor from monitorcontrol.__main__ import main, count_to_level from unittest import mock diff --git a/tests/test_linux_vcp.py b/tests/test_linux_vcp.py index 0cdfa8f..abf5be2 100644 --- a/tests/test_linux_vcp.py +++ b/tests/test_linux_vcp.py @@ -1,7 +1,391 @@ +import itertools +import os +import struct +import sys +from unittest import mock + import pytest + +from monitorcontrol.vcp import vcp_linux +from monitorcontrol.vcp.vcp_abc import VCPIOError, VCPPermissionError from monitorcontrol.vcp.vcp_linux import LinuxVCP +linux_only = pytest.mark.skipif( + not sys.platform.startswith("linux"), + reason="Linux-only VCP implementation", +) + + +def open_vcp() -> LinuxVCP: + """A LinuxVCP with a fake open file descriptor and stubbed bus I/O.""" + vcp = LinuxVCP(0) + vcp.fd = 10 + vcp.write_bytes = mock.Mock() + return vcp + + +def build_get_vcp_reply( + code: int, + current: int, + maximum: int, + reply_code: int = LinuxVCP.GET_VCP_REPLY, + result_code: int = 0, + source: int = 0x6E, +) -> tuple[bytes, bytes]: + """Builds a (header, payload+checksum) DDC-CI get-VCP-feature reply.""" + payload = struct.pack( + ">BBBBHH", reply_code, result_code, code, 0x00, maximum, current + ) + length = len(payload) + header = bytes([source, length | LinuxVCP.PROTOCOL_FLAG]) + checksum = LinuxVCP.get_checksum(bytearray(header + payload)) + return header, payload + bytes([checksum]) + + +def build_caps_packet( + text: bytes, + offset: int, + reply_code: int = LinuxVCP.GET_VCP_CAPS_REPLY, + source: int = 0x6E, +) -> tuple[bytes, bytes]: + """Builds a (header, payload+checksum) capabilities reply packet.""" + payload = struct.pack(">B", reply_code) + struct.pack(">H", offset) + text + length = len(payload) + header = bytes([source, length | LinuxVCP.PROTOCOL_FLAG]) + checksum = LinuxVCP.get_checksum(bytearray(header + payload)) + return header, payload + bytes([checksum]) + + +# --------------------------------------------------------------------------- +# get_vcps +# --------------------------------------------------------------------------- + + +@linux_only +def test_get_vcps_skips_devices_without_sys_number(): + # i2c devices without a sysfs number cannot map to a /dev/i2c-* bus and + # must be skipped rather than passed to LinuxVCP. + device = mock.Mock(sys_number=None) + context = mock.Mock() + context.list_devices.return_value = [device] + with mock.patch.object(vcp_linux, "pyudev") as pyudev_mock: + pyudev_mock.Context.return_value = context + assert vcp_linux.get_vcps() == [] + context.list_devices.assert_called_once_with(subsystem="i2c") + + +# --------------------------------------------------------------------------- +# __enter__ / __exit__ +# --------------------------------------------------------------------------- + + +@linux_only +def test_enter_success(): + vcp = LinuxVCP(0) + with ( + mock.patch.object(vcp_linux.os, "open", return_value=7) as m_open, + mock.patch.object(vcp_linux.fcntl, "ioctl"), + mock.patch.object(vcp_linux.os, "read", return_value=b"\x00"), + ): + assert vcp.__enter__() is vcp + assert vcp.fd == 7 + m_open.assert_called_once_with(vcp.fp, os.O_RDWR) + + +@linux_only +def test_enter_permission_error(): + vcp = LinuxVCP(0) + with ( + mock.patch.object(vcp_linux.os, "open", side_effect=PermissionError()), + mock.patch.object(vcp_linux.os, "close") as m_close, + ): + with pytest.raises(VCPPermissionError): + vcp.__enter__() + # open failed before assigning fd, so there is nothing to close + m_close.assert_not_called() + + +@linux_only +def test_enter_io_error_closes_fd(): + vcp = LinuxVCP(0) + with ( + mock.patch.object(vcp_linux.os, "open", return_value=7), + mock.patch.object(vcp_linux.fcntl, "ioctl", side_effect=OSError()), + mock.patch.object(vcp_linux.os, "close") as m_close, + ): + with pytest.raises(VCPIOError): + vcp.__enter__() + m_close.assert_called_once_with(7) + + +@linux_only +def test_exit_closes_fd(): + vcp = open_vcp() + with mock.patch.object(vcp_linux.os, "close") as m_close: + assert vcp.__exit__(None, None, None) is False + assert vcp.fd is None + m_close.assert_called_once_with(10) + + +@linux_only +def test_exit_close_error(): + vcp = open_vcp() + with mock.patch.object(vcp_linux.os, "close", side_effect=OSError()): + with pytest.raises(VCPIOError): + vcp.__exit__(None, None, None) + + +# --------------------------------------------------------------------------- +# read_bytes / write_bytes +# --------------------------------------------------------------------------- + + +@linux_only +def test_read_bytes_success(): + vcp = open_vcp() + with mock.patch.object(vcp_linux.os, "read", return_value=b"ab") as m_read: + assert vcp.read_bytes(2) == b"ab" + m_read.assert_called_once_with(10, 2) + + +@linux_only +def test_read_bytes_no_fd(): + vcp = LinuxVCP(0) + with pytest.raises(VCPIOError, match="no open file descriptor"): + vcp.read_bytes(1) + + +@linux_only +def test_read_bytes_os_error(): + vcp = open_vcp() + with mock.patch.object(vcp_linux.os, "read", side_effect=OSError()): + with pytest.raises(VCPIOError, match="unable to read"): + vcp.read_bytes(1) + + +@linux_only +def test_write_bytes_success(): + vcp = LinuxVCP(0) + vcp.fd = 10 + with mock.patch.object(vcp_linux.os, "write") as m_write: + vcp.write_bytes(b"xy") + m_write.assert_called_once_with(10, b"xy") + + +@linux_only +def test_write_bytes_no_fd(): + vcp = LinuxVCP(0) + with pytest.raises(VCPIOError, match="no open file descriptor"): + vcp.write_bytes(b"x") + + +@linux_only +def test_write_bytes_os_error(): + vcp = LinuxVCP(0) + vcp.fd = 10 + with mock.patch.object(vcp_linux.os, "write", side_effect=OSError()): + with pytest.raises(VCPIOError, match="unable write"): + vcp.write_bytes(b"x") + + +# --------------------------------------------------------------------------- +# rate_limt +# --------------------------------------------------------------------------- + + +@linux_only +def test_rate_limt_no_last_set(): + vcp = open_vcp() + with mock.patch.object(vcp_linux.time, "sleep") as m_sleep: + vcp.rate_limt() + m_sleep.assert_not_called() + + +@linux_only +def test_rate_limt_sleeps_when_recent(): + vcp = open_vcp() + vcp.last_set = 100.0 + with ( + mock.patch.object(vcp_linux.time, "time", return_value=100.01), + mock.patch.object(vcp_linux.time, "sleep") as m_sleep, + ): + vcp.rate_limt() + m_sleep.assert_called_once() + assert m_sleep.call_args[0][0] == pytest.approx(LinuxVCP.CMD_RATE - 0.01) + + +@linux_only +def test_rate_limt_no_sleep_when_elapsed(): + vcp = open_vcp() + vcp.last_set = 100.0 + with ( + mock.patch.object(vcp_linux.time, "time", return_value=200.0), + mock.patch.object(vcp_linux.time, "sleep") as m_sleep, + ): + vcp.rate_limt() + m_sleep.assert_not_called() + + +# --------------------------------------------------------------------------- +# set_vcp_feature +# --------------------------------------------------------------------------- + + +@linux_only +def test_set_vcp_feature(): + vcp = open_vcp() + writes = mock.Mock() + vcp.write_bytes = writes + with mock.patch.object(vcp_linux.time, "time", return_value=123.0): + vcp.set_vcp_feature(0x10, 50) + + data = writes.call_args[0][0] + assert data[0] == LinuxVCP.HOST_ADDRESS + assert data[1] == (4 | LinuxVCP.PROTOCOL_FLAG) + assert data[2] == LinuxVCP.SET_VCP_CMD + assert data[3] == 0x10 + assert (data[4] << 8) | data[5] == 50 + expected_checksum = LinuxVCP.get_checksum( + bytearray([LinuxVCP.DDCCI_ADDR << 1]) + bytearray(data[:-1]) + ) + assert data[-1] == expected_checksum + assert vcp.last_set == 123.0 + + +# --------------------------------------------------------------------------- +# get_vcp_feature +# --------------------------------------------------------------------------- + + +@linux_only +def test_get_vcp_feature(): + vcp = open_vcp() + writes = mock.Mock() + vcp.write_bytes = writes + header, body = build_get_vcp_reply(0x10, current=50, maximum=100) + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + assert vcp.get_vcp_feature(0x10) == (50, 100) + writes.assert_called_once() + + +@linux_only +def test_get_vcp_feature_unexpected_reply_code(): + vcp = open_vcp() + header, body = build_get_vcp_reply(0x10, 50, 100, reply_code=0x00) + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + with pytest.raises(VCPIOError, match="unexpected response code"): + vcp.get_vcp_feature(0x10) + + +@linux_only +def test_get_vcp_feature_unexpected_opcode(): + vcp = open_vcp() + header, body = build_get_vcp_reply(0x10, 50, 100) + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + with pytest.raises(VCPIOError, match="unexpected opcode"): + vcp.get_vcp_feature(0x20) + + +@linux_only +def test_get_vcp_feature_result_code_known(): + vcp = open_vcp() + header, body = build_get_vcp_reply(0x10, 50, 100, result_code=1) + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + with pytest.raises(VCPIOError, match="Unsupported VCP code"): + vcp.get_vcp_feature(0x10) + + +@linux_only +def test_get_vcp_feature_result_code_unknown(): + vcp = open_vcp() + header, body = build_get_vcp_reply(0x10, 50, 100, result_code=5) + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + with pytest.raises(VCPIOError, match="unknown code"): + vcp.get_vcp_feature(0x10) + + +@linux_only +def test_get_vcp_feature_checksum_strict_raises(): + vcp = open_vcp() + vcp.CHECKSUM_ERRORS = "strict" + header, body = build_get_vcp_reply(0x10, 50, 100) + body = body[:-1] + bytes([body[-1] ^ 0xFF]) # corrupt checksum + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + with pytest.raises(VCPIOError, match="checksum"): + vcp.get_vcp_feature(0x10) + + +@linux_only +def test_get_vcp_feature_checksum_warning_logs(): + vcp = open_vcp() + vcp.CHECKSUM_ERRORS = "warning" + vcp.logger = mock.Mock() + header, body = build_get_vcp_reply(0x10, 50, 100) + body = body[:-1] + bytes([body[-1] ^ 0xFF]) # corrupt checksum + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + assert vcp.get_vcp_feature(0x10) == (50, 100) + vcp.logger.warning.assert_called_once() + + +# --------------------------------------------------------------------------- +# get_vcp_capabilities +# --------------------------------------------------------------------------- + + +@linux_only +def test_get_vcp_capabilities(): + vcp = open_vcp() + header1, body1 = build_caps_packet(b"cap", offset=0) + header2, body2 = build_caps_packet(b"", offset=3) # empty payload -> stop + vcp.read_bytes = mock.Mock(side_effect=[header1, body1, header2, body2]) + with mock.patch.object(vcp_linux.time, "sleep"): + assert vcp.get_vcp_capabilities() == "cap" + + +@linux_only +def test_get_vcp_capabilities_bad_length(): + vcp = open_vcp() + header = bytes([0x6E, 2 | LinuxVCP.PROTOCOL_FLAG]) # length 2 < 3 + body = bytes([0x00, 0x00, 0x00]) + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + with pytest.raises(VCPIOError, match="unexpected response length"): + vcp.get_vcp_capabilities() + + +@linux_only +def test_get_vcp_capabilities_unexpected_reply_code(): + vcp = open_vcp() + header, body = build_caps_packet(b"x", offset=0, reply_code=0x00) + vcp.read_bytes = mock.Mock(side_effect=[header, body]) + with mock.patch.object(vcp_linux.time, "sleep"): + with pytest.raises(VCPIOError, match="unexpected response code"): + vcp.get_vcp_capabilities() + + +@linux_only +def test_get_vcp_capabilities_loop_limit(): + vcp = open_vcp() + header, body = build_caps_packet(b"x", offset=0) # never terminates + vcp.read_bytes = mock.Mock(side_effect=itertools.cycle([header, body])) + with mock.patch.object(vcp_linux.time, "sleep"): + with pytest.raises(VCPIOError, match="incomplete or too long"): + vcp.get_vcp_capabilities() + + +# --------------------------------------------------------------------------- +# get_checksum +# --------------------------------------------------------------------------- + + @pytest.mark.parametrize( "data, checksum", [ diff --git a/tests/test_monitorcontrol.py b/tests/test_monitorcontrol.py index 32fb698..796a8f4 100644 --- a/tests/test_monitorcontrol.py +++ b/tests/test_monitorcontrol.py @@ -53,7 +53,8 @@ def __exit__( def test_context_manager_assert(): - m = Monitor(None) + vcps = get_test_vcps() + m = Monitor(vcps[0]) with pytest.raises(AssertionError): m.get_power_mode() @@ -75,7 +76,7 @@ def test_get_monitors(): get_monitors() -def get_test_vcps() -> List[Type[vcp.VCP]]: +def get_test_vcps() -> List[vcp.VCP]: if USE_ATTACHED_MONITORS: return get_vcps() else: @@ -276,6 +277,7 @@ def test_input_source_issue_59(monitor: Monitor): def test_input_source_type_error(monitor: Monitor): with pytest.raises(TypeError): + # pyrefly: ignore[bad-argument-type] monitor.set_input_source([]) @@ -371,3 +373,8 @@ def test_convert_to_dict_nested(): caps_str = "DC(00(00 12 13 14))" expected = {0xDC: {0: {0: {}, 0x12: {}, 0x13: {}, 0x14: {}}}} assert _convert_to_dict(caps_str) == expected + + +def test_convert_to_dict_empty(): + # capabilities extraction can return an empty string + assert _convert_to_dict("") == {} diff --git a/tests/test_windows_vcp.py b/tests/test_windows_vcp.py index d74c779..f163335 100644 --- a/tests/test_windows_vcp.py +++ b/tests/test_windows_vcp.py @@ -1,6 +1,9 @@ +import sys import pytest from unittest.mock import patch +if not sys.platform.startswith("win"): + pytest.skip("skipping windows-only tests", allow_module_level=True) from monitorcontrol.vcp.vcp_windows import WindowsVCP diff --git a/uv.lock b/uv.lock index be88d54..6d9311d 100644 --- a/uv.lock +++ b/uv.lock @@ -494,6 +494,7 @@ dependencies = [ [package.dev-dependencies] dev = [ { name = "coveralls" }, + { name = "pyrefly" }, { name = "pytest" }, { name = "pytest-cov" }, { name = "ruff" }, @@ -510,6 +511,7 @@ requires-dist = [{ name = "pyudev", marker = "sys_platform != 'win32'", specifie [package.metadata.requires-dev] dev = [ { name = "coveralls", specifier = ">=4.0.1" }, + { name = "pyrefly", specifier = ">=0.37.0" }, { name = "pytest", specifier = ">=8.3.4" }, { name = "pytest-cov", specifier = ">=6.0.0" }, { name = "ruff", specifier = ">=0.11.11" }, @@ -545,6 +547,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" }, ] +[[package]] +name = "pyrefly" +version = "1.1.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/8e/20/976165fa4b1517a1a92f393b3f4d4badabfff1165eff09d4cd4908428183/pyrefly-1.1.1.tar.gz", hash = "sha256:6deda959f8603a7dbdf112c48983e2275b2903cf33c8c739ed65d7e71a4fd520", size = 5880491, upload-time = "2026-06-18T23:45:43.785Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b5/d6/02ba666018c6a1cb4ddfa2db98ada721adddd374db5c29ba47a0bf2637fa/pyrefly-1.1.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:f4b8595f91885bc8b5e3c282ab68d1df21201668a84e6508b1e15f2feec0bb8d", size = 13631867, upload-time = "2026-06-18T23:45:13.923Z" }, + { url = "https://files.pythonhosted.org/packages/71/47/7a3457dbbddb513a83cf4fe527d5d5ebda5201a1010ad2a6034030e3e358/pyrefly-1.1.1-py3-none-macosx_11_0_arm64.whl", hash = "sha256:d6b238e1362622d47a6eb5af704fd8b613c94e8c303386efd6350e3da59fecc8", size = 13075304, upload-time = "2026-06-18T23:45:16.865Z" }, + { url = "https://files.pythonhosted.org/packages/84/df/70f4b3f42d58ed686a80df31e04eca54d88036cea4f9b96195c64ad0b2b5/pyrefly-1.1.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b50d4510e4f8aaea79e2c4b343a4d7a060c9451c0b2aa9bfe10d7ca1ef33d68d", size = 13446966, upload-time = "2026-06-18T23:45:19.644Z" }, + { url = "https://files.pythonhosted.org/packages/3c/53/12a19bd6c7af985bcbc13c6910d0f9f6684069ead2282a5c08c2bfbb5d03/pyrefly-1.1.1-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f330cf039ef3da3b910c84f3a7e431f0cf8d0c1d2dad26491d6cadf3c7cd4759", size = 14449222, upload-time = "2026-06-18T23:45:22.252Z" }, + { url = "https://files.pythonhosted.org/packages/93/f0/e55c48a50076fc0f9ecf4bdedec50456db383e01162f5e2121f8468be071/pyrefly-1.1.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a6342d87c52b04f72156da04f554c4d57f3616f2b32d1763969efb22d05a1407", size = 14472947, upload-time = "2026-06-18T23:45:24.858Z" }, + { url = "https://files.pythonhosted.org/packages/b6/e7/30e085b31fed978ecb675bdbb54df566673ab550469e5af2d350f6af0be6/pyrefly-1.1.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c08b814ad03175e9cf47111390537161828b472044c39ab3320252b3ac6b2edd", size = 13975252, upload-time = "2026-06-18T23:45:27.247Z" }, + { url = "https://files.pythonhosted.org/packages/47/58/49c3e67641133d3fe5d8d9a660dc0826c6c37ca197d86cad05fa7dd8bfd6/pyrefly-1.1.1-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:d50cad97f19fc893b04deff7239626cffff5dd27ffb29b7d303a1b770247b208", size = 13471780, upload-time = "2026-06-18T23:45:29.775Z" }, + { url = "https://files.pythonhosted.org/packages/71/1e/65a7ba8355e2c39d8331832905fb74dcc85fc122a3f1dfd6dbf2a88907ad/pyrefly-1.1.1-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:2150b450ee6a6bcbe69b2d45d9a4ebc934a609e1abcf65e490433f38eb873d84", size = 13989306, upload-time = "2026-06-18T23:45:32.576Z" }, + { url = "https://files.pythonhosted.org/packages/37/de/b7ee1ab2392c36945738246fba7524439810befa3cfcc03cb6157567fc10/pyrefly-1.1.1-py3-none-win32.whl", hash = "sha256:5ffd8a8ed62fe4e6bf0afe1837d1bad149bb3b9f80e928ef248c96b836db3742", size = 12608469, upload-time = "2026-06-18T23:45:35.419Z" }, + { url = "https://files.pythonhosted.org/packages/a6/9c/a0f5b52934bf80e9c7eff08222e7caf318287b9aef76acb8d9ac5740581b/pyrefly-1.1.1-py3-none-win_amd64.whl", hash = "sha256:4e0430f3ef69c8ac73505fd6584db70ed504665a9f0816fef7f723de510f26cb", size = 13502172, upload-time = "2026-06-18T23:45:38.375Z" }, + { url = "https://files.pythonhosted.org/packages/42/3d/4c6bcb3d456835f51445d3662a428f56c3ea5643ec798c577030ae34298c/pyrefly-1.1.1-py3-none-win_arm64.whl", hash = "sha256:83baf0db71e172665db1fca0ced50b8f7773f5192ca57e8ac6773a772b6d2fc5", size = 12895979, upload-time = "2026-06-18T23:45:41.026Z" }, +] + [[package]] name = "pytest" version = "9.0.3"