4646import sys
4747import time
4848from collections import defaultdict
49- from typing import Set , List
49+ from typing import Set , List , Tuple
5050
5151import evdev
5252from evdev .ecodes import EV_KEY , EV_ABS , EV_REL , REL_HWHEEL , REL_WHEEL
8080MSG_STATUS = "status"
8181
8282
83- def get_pipe_paths ():
84- """Get the path where the pipe can be found."""
85- return (
86- f"/tmp/input-remapper-{ UserUtils .home } /reader-results" ,
87- f"/tmp/input-remapper-{ UserUtils .home } /reader-commands" ,
88- )
89-
90-
9183class ReaderService :
9284 """Service that only reads events and is supposed to run as root.
9385
@@ -114,13 +106,13 @@ class ReaderService:
114106 _maximum_lifetime : int = 60 * 15
115107 _timeout_tolerance : int = 60
116108
117- def __init__ (self , groups : _Groups , global_uinputs : GlobalUInputs ):
109+ def __init__ (self , groups : _Groups , global_uinputs : GlobalUInputs ) -> None :
118110 """Construct the reader-service and initialize its communication pipes."""
119111 self ._start_time = time .time ()
120112 self .groups = groups
121113 self .global_uinputs = global_uinputs
122- self ._results_pipe = Pipe (get_pipe_paths ()[0 ])
123- self ._commands_pipe = Pipe (get_pipe_paths ()[1 ])
114+ self ._results_pipe = Pipe (self . get_pipe_paths ()[0 ])
115+ self ._commands_pipe = Pipe (self . get_pipe_paths ()[1 ])
124116 self ._pipe = multiprocessing .Pipe ()
125117
126118 self ._tasks : Set [asyncio .Task ] = set ()
@@ -129,7 +121,24 @@ def __init__(self, groups: _Groups, global_uinputs: GlobalUInputs):
129121 self ._results_pipe .send ({"type" : MSG_STATUS , "message" : "ready" })
130122
131123 @staticmethod
132- def is_running ():
124+ def get_pipe_paths () -> Tuple [str , str ]:
125+ """Get the path where the pipe can be found."""
126+ return (
127+ f"/tmp/input-remapper-{ UserUtils .home } /reader-results" ,
128+ f"/tmp/input-remapper-{ UserUtils .home } /reader-commands" ,
129+ )
130+
131+ @staticmethod
132+ def pipes_exist () -> bool :
133+ # Just checking for one of the 4 files (results, commands both read and write)
134+ # should be enough I guess.
135+ path = f"{ ReaderService .get_pipe_paths ()[0 ]} r"
136+ # Use os.path.exists, not lexists or islink, because broken links are bad.
137+ # New pipes and symlinks need to be made.
138+ return os .path .exists (path )
139+
140+ @staticmethod
141+ def is_running () -> bool :
133142 """Check if the reader-service is running."""
134143 try :
135144 subprocess .check_output (["pgrep" , "-f" , "input-remapper-reader-service" ])
0 commit comments