Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fd7d652
test: add integrator drift integration tests
mrosseel Mar 5, 2026
ace665b
feat: add telemetry recording and replay
mrosseel Mar 5, 2026
aff51b9
refactor: extract pointing math and telemetry into separate modules
mrosseel Mar 8, 2026
23a9e3d
test: add telemetry unit tests and fix drift test import
mrosseel Mar 8, 2026
ae540a6
feat: enhance telemetry with raw IMU, target tracking, replay fixes, …
mrosseel Mar 10, 2026
9b80fa5
chore: remove analysis tools from tracking, keep local only
mrosseel Mar 10, 2026
4b7def5
Merge remote-tracking branch 'upstream/main' into telemetry
mrosseel Mar 26, 2026
9dd628c
Merge remote-tracking branch 'upstream/main' into telemetry
mrosseel Apr 23, 2026
bfc349e
Merge upstream/main into telemetry
mrosseel May 15, 2026
6c706c4
Merge upstream/main into telemetry
mrosseel May 16, 2026
a201aca
fix: clear camera_center/camera_solve on failed solves
mrosseel May 22, 2026
7d8f6e6
Merge fix-stale-camera-center into telemetry
mrosseel May 22, 2026
7b3cc2b
test: fix E741 and mypy errors in telemetry test files
mrosseel May 22, 2026
d527597
refactor: move get_initialized_solved_dict to its own module
mrosseel May 22, 2026
3ceadad
Merge upstream/main into telemetry
mrosseel May 24, 2026
6612aba
Merge upstream/main into telemetry
mrosseel May 25, 2026
9c4b1f6
refactor(telemetry): drop dead imu_pos/pos fields, record screen_dire…
mrosseel May 25, 2026
0a756ab
Merge upstream/main into telemetry
mrosseel Jun 1, 2026
058c269
fix(telemetry): force-apply replay datetime, stop-replay UI, survive …
mrosseel Jun 9, 2026
32fabe5
fix(telemetry): replay state lifecycle, full IMU capture, recorder hy…
mrosseel Jun 9, 2026
3147e68
refactor(imu): read raw gyro/accel inside Imu.update(), next to the q…
mrosseel Jun 9, 2026
b1e9fed
feat(telemetry): make raw gyro/accel capture opt-in via telemetry_raw…
mrosseel Jun 9, 2026
e70835c
Merge branch 'main' of github.com:brickbots/PiFinder into telemetry
mrosseel Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,4 @@ astro_data/comets.txt
# users GPS data
test_ubx
test_ubx/*
python/telemetry_analysis/
5 changes: 4 additions & 1 deletion default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,8 @@
"active_telescope_index": 0,
"active_eyepiece_index": 0
},
"imu_threshold_scale": 1
"imu_threshold_scale": 1,
"telemetry_record": false,
"telemetry_images": false,
"telemetry_raw_imu": false
}
12 changes: 11 additions & 1 deletion python/PiFinder/camera_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,17 @@ def get_image_loop(
f"Exposure saved and auto-exposure disabled: {self.exposure_time}µs"
)

if command.startswith("save"):
if command.startswith("save_image:"):
# Save current camera frame to specified path
save_path = command.split(":", 1)[1]
try:
img = camera_image.copy()
img.save(save_path, "PNG", compress_level=6)
logger.debug("Telemetry image saved: %s", save_path)
except Exception as e:
logger.error("Failed to save telemetry image: %s", e)

if command.startswith("save:"):
# Set flag to save next capture to this file
self._save_next_to = command.split(":")[1]
console_queue.put("CAM: Save flag set")
Expand Down
8 changes: 7 additions & 1 deletion python/PiFinder/imu_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ class Imu:
def __init__(self):
self._moving = False
self._flip = False
pass
# Attributes imu_pi.imu_monitor reads when this class is used as
# the hardware-fallback: never calibrated, never any readings.
self.calibration = 0
self.avg_quat = (1.0, 0.0, 0.0, 0.0)
self.gyro = None
self.accel = None
self.last_read_time = 0.0

def moving(self):
"""
Expand Down
34 changes: 30 additions & 4 deletions python/PiFinder/imu_pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def __init__(self):
self._flip_count = 0
self.calibration = 0
self.avg_quat = (0, 0, 0, 0) # Scalar-first quaternion as float: (w, x, y, z)
# Raw sensor readings taken alongside the quaternion, for telemetry
self.gyro = None
self.accel = None
# Epoch of the last successful sensor read
self.last_read_time = 0.0
self.__moving = False
self.__reading_diff = 0.0

Expand All @@ -50,6 +55,10 @@ def __init__(self):
# to stop moving.

cfg = config.Config()
# Raw gyro/accel capture is opt-in: two extra I2C transactions per
# sample on a bus the BNO055 is sensitive about, and only useful
# for telemetry analysis.
self._raw_telemetry = bool(cfg.get_option("telemetry_raw_imu", False))
imu_threshold_scale = cfg.get_option("imu_threshold_scale", 1)
self.__moving_threshold = (
0.0005 * imu_threshold_scale,
Expand Down Expand Up @@ -81,6 +90,17 @@ def update(self):
logger.warning("IMU: Failed to get sensor values")
return

# When enabled, read raw sensor data alongside the quaternion so
# the telemetry sample is coherent (same instant, same I2C burst).
if self._raw_telemetry:
try:
self.gyro = self.sensor.gyro
self.accel = self.sensor.linear_acceleration
except (OSError, RuntimeError):
self.gyro = None
self.accel = None
self.last_read_time = time.time()

_quat_diff = []
for i in range(4):
_quat_diff.append(abs(quat[i] - self.quat_history[-1][i]))
Expand Down Expand Up @@ -161,7 +181,6 @@ def imu_monitor(shared_state, console_queue, log_queue):

imu = ImuFake()

imu = Imu()
imu_calibrated = False
imu_sample = ImuSample(
# Scalar-first numpy quaternion(w, x, y, z) - init to invalid quaternion
Expand All @@ -175,20 +194,27 @@ def imu_monitor(shared_state, console_queue, log_queue):
imu.update()
imu_sample.status = imu.calibration

# Raw data + read epoch are captured by imu.update() in the same
# I2C burst as the quaternion; copy them onto the published sample.
# The fresh timestamp per read keeps the telemetry recorder's
# dedup-by-sample-epoch working while stationary.
imu_sample.gyro = imu.gyro
imu_sample.accel = imu.accel
if imu.last_read_time:
imu_sample.timestamp = imu.last_read_time

if imu.moving():
if not imu_sample.moving:
logger.debug("IMU: move start")
imu_sample.moving = True
# Scalar-first (w, x, y, z); stamp the sample epoch alongside it
# Scalar-first (w, x, y, z)
imu_sample.quat = quaternion.from_float_array(imu.avg_quat)
imu_sample.timestamp = time.time()
else:
if imu_sample.moving:
# If we were moving and we now stopped
logger.debug("IMU: move end")
imu_sample.moving = False
imu_sample.quat = quaternion.from_float_array(imu.avg_quat)
imu_sample.timestamp = time.time()

if not imu_calibrated:
if imu_sample.status == 3:
Expand Down
101 changes: 88 additions & 13 deletions python/PiFinder/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
prediction during dead-reckoning. The IDR remains a math primitive
(``RaDecRoll`` in, ``RaDecRoll`` out); this module bridges between it
and :class:`PointingEstimate`.

Telemetry record/replay is handled by :class:`TelemetryManager`
(``telemetry.py``). Replayed sessions are converted back into
:class:`SolveResult` / :class:`ImuSample` messages and fed through the
same ``_apply_*`` / ``_advance_with_imu`` paths as live data.
"""

from __future__ import annotations
Expand All @@ -45,6 +50,7 @@
from PiFinder.multiproclogging import MultiprocLogging
from PiFinder.pointing_model.imu_dead_reckoning import ImuDeadReckoning
import PiFinder.pointing_model.quaternion_transforms as qt
from PiFinder.telemetry import TelemetryManager
from PiFinder.types.positioning import (
FailedSolve,
ImuSample,
Expand All @@ -62,12 +68,21 @@
IMU_MOVED_ANG_THRESHOLD = np.deg2rad(0.06)


def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=False):
def integrator(
shared_state,
solver_queue,
console_queue,
log_queue,
is_debug=False,
command_queue=None,
camera_command_queue=None,
):
MultiprocLogging.configurer(log_queue)
if is_debug:
logger.setLevel(logging.DEBUG)
logger.debug("Starting Integrator")

telemetry = None
try:
cfg = config.Config()
screen_direction = cfg.get_option("screen_direction")
Expand All @@ -82,22 +97,59 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
# Epoch of the last estimate we published; gate re-publishing on it.
last_published_time = time.time()

was_replaying = False
telemetry = TelemetryManager(
cfg, shared_state, console_queue, camera_command_queue
)

while True:
state_utils.sleep_for_framerate(shared_state)

telemetry.poll_commands(command_queue)

pointing_updated = False

# 1. Pull any pending solve result from the queue.
# 1. Pull the next message — from the replay stream when
# replaying, otherwise from the solver queue.
solve_result: Optional[SolveResult] = None
try:
solve_result = solver_queue.get(block=False)
except queue.Empty:
pass
replay_imu: Optional[ImuSample] = None

if telemetry.replaying:
if not was_replaying:
was_replaying = True
# Recorded epochs are in the past; rewind the publish
# gate so replayed estimates pass the newer-than check.
last_published_time = 0.0
# The solver keeps running during replay; discard its output.
_drain_queue(solver_queue)
message = telemetry.next_replay_message()
if isinstance(message, (SuccessfulSolve, FailedSolve)):
solve_result = message
elif isinstance(message, ImuSample):
replay_imu = message
else:
if was_replaying:
# Replay ended — reset to a clean unanchored state.
was_replaying = False
estimate = PointingEstimate()
idr.reset()
last_published_time = time.time()
logger.info("Replay ended, integrator state reset")
try:
solve_result = solver_queue.get(block=False)
except queue.Empty:
pass

if isinstance(solve_result, SuccessfulSolve):
telemetry.record_solve(
solve_result, predicted=estimate.pointing.aligned.estimate
)
estimate = _apply_successful_solve(estimate, solve_result, idr)
pointing_updated = True
elif isinstance(solve_result, FailedSolve):
telemetry.record_solve(
solve_result, predicted=estimate.pointing.aligned.estimate
)
estimate = _apply_failed_solve(estimate, solve_result)
# Publish unconditionally so auto-exposure sees the failed
# attempt (Matches=0, fresh last_solve_attempt). The estimate
Expand All @@ -106,17 +158,26 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
# progresses it when motion exceeds the deadband.
shared_state.set_solution(copy.deepcopy(estimate))

# 2. If we have an anchor and didn't just do a fresh plate-solve,
# try to advance the estimate via IMU dead-reckoning.
# 2. Pull the current IMU sample — from the replay stream when
# replaying — and record it. Recording happens before the
# anchor gate so sessions capture IMU data from the start,
# not only once the first solve has anchored dead-reckoning.
# (record_imu dedupes on sample timestamp and is a no-op
# while replaying or when recording is off.)
imu = replay_imu if telemetry.replaying else shared_state.imu()
if imu:
telemetry.record_imu(imu)

# If we have an anchor and didn't just do a fresh plate-solve,
# try to advance the estimate via IMU dead-reckoning.
if (
not pointing_updated
imu
and not pointing_updated
and idr.is_initialized()
and estimate.imu_anchor is not None
):
imu = shared_state.imu()
if imu:
if _advance_with_imu(estimate, idr, imu):
pointing_updated = True
if _advance_with_imu(estimate, idr, imu):
pointing_updated = True

# 3. Publish if we updated something newer than what we last sent.
if (
Expand All @@ -137,8 +198,13 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
shared_state.set_solution(copy.deepcopy(estimate))
last_published_time = estimate.estimate_time

telemetry.flush()

except EOFError:
logger.error("Main no longer running for integrator")
finally:
if telemetry is not None:
telemetry.stop()


def _apply_successful_solve(
Expand Down Expand Up @@ -263,3 +329,12 @@ def _get_alt_az(ra_deg, dec_deg, location, dt) -> tuple[float | None, float | No
return None, None
calc_utils.sf_utils.set_location(location.lat, location.lon, location.altitude)
return calc_utils.sf_utils.radec_to_altaz(ra_deg, dec_deg, dt)


def _drain_queue(q):
"""Discard all pending items from a queue."""
try:
while True:
q.get(block=False)
except queue.Empty:
pass
11 changes: 11 additions & 0 deletions python/PiFinder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def setup_dirs():
utils.create_path(Path(utils.data_dir, "screenshots"))
utils.create_path(Path(utils.data_dir, "solver_debug_dumps"))
utils.create_path(Path(utils.data_dir, "logs"))
utils.create_path(Path(utils.data_dir, "telemetry"))
os.chmod(Path(utils.data_dir), 0o777)


Expand Down Expand Up @@ -384,13 +385,16 @@ def main(
logger.info("PiFinder running on %s, %s, %s", os_detail, platform, arch)

# init UI Modes
integrator_command_queue: Queue = Queue()

command_queues = {
"camera": camera_command_queue,
"console": console_queue,
"ui_queue": ui_queue,
"align_command": alignment_command_queue,
"align_response": alignment_response_queue,
"gps": gps_queue,
"integrator": integrator_command_queue,
}
cfg = config.Config()

Expand Down Expand Up @@ -545,6 +549,10 @@ def main(
integrator_logqueque,
verbose,
),
kwargs={
"command_queue": integrator_command_queue,
"camera_command_queue": camera_command_queue,
},
)
integrator_process.start()

Expand Down Expand Up @@ -649,10 +657,13 @@ def main(
location = shared_state.location()

# Only update GPS fixes, as soon as it's loaded or comes from the WEB it's untouchable
# "replay" is protected too: a telemetry replay owns the
# location until it ends and restores the original.
if (
not location.source == "WEB"
and not location.source.startswith("CONFIG:")
and not location.source == "MANUAL"
and not location.source == "replay"
and (
location.error_in_m == 0
or float(gps_content["error_in_m"])
Expand Down
Loading
Loading