diff --git a/default_config.json b/default_config.json index b7bf33ba..2858db40 100644 --- a/default_config.json +++ b/default_config.json @@ -16,6 +16,8 @@ "chart_dso": 128, "chart_reticle": 128, "chart_constellations": 64, + "image_nsew": true, + "image_bbox": true, "chart_coord_sys": "horiz", "solve_pixel": [256, 256], "gps_type": "ublox", diff --git a/python/PiFinder/cat_images.py b/python/PiFinder/cat_images.py index ae4a9b96..992e6190 100644 --- a/python/PiFinder/cat_images.py +++ b/python/PiFinder/cat_images.py @@ -5,6 +5,7 @@ to handle catalog image loading """ +import math import os from PIL import Image, ImageChops, ImageDraw from PiFinder import image_util @@ -19,6 +20,103 @@ logger = logging.getLogger("Catalog.Images") +def cardinal_vectors(image_rotate, fx=1, fy=1): + """Return (nx, ny), (ex, ey) unit vectors for North and East. + + image_rotate: degrees the POSS image was rotated (180 + roll). + fx, fy: -1 to mirror that axis (flip/flop), +1 otherwise. + """ + theta = math.radians(image_rotate) + n = (fx * math.sin(theta), fy * -math.cos(theta)) + e = (-fx * math.cos(theta), -fy * math.sin(theta)) + return n, e + + +def size_overlay_points(extents, pa, image_rotate, px_per_arcsec, cx, cy, fx=1, fy=1): + """Compute outline points for the size overlay. + + Returns a list of (x, y) tuples. + For 1 extent returns None (caller should use native ellipse). + """ + if not extents or len(extents) == 1: + return None + + theta = math.radians(image_rotate - pa - 90) + cos_t = math.cos(theta) + sin_t = math.sin(theta) + + points = [] + if len(extents) == 2: + rx = extents[0] * px_per_arcsec / 2 + ry = extents[1] * px_per_arcsec / 2 + for i in range(36): + t = 2 * math.pi * i / 36 + x = rx * math.cos(t) + y = ry * math.sin(t) + points.append( + (cx + fx * (x * cos_t - y * sin_t), cy + fy * (x * sin_t + y * cos_t)) + ) + else: + step = 2 * math.pi / len(extents) + for i, ext in enumerate(extents): + angle = i * step - math.pi / 2 + r = ext * px_per_arcsec / 2 + x = r * math.cos(angle) + y = r * math.sin(angle) + points.append( + (cx + fx * (x * cos_t - y * sin_t), cy + fy * (x * sin_t + y * cos_t)) + ) + return points + + +def vertex_overlay_points( + vertices, obj_ra, obj_dec, image_rotate, px_per_arcsec, cx, cy, fx=1, fy=1 +): + """Project RA/Dec vertex pairs to pixel coords via gnomonic projection. + + vertices: list of [ra, dec] pairs in degrees. + obj_ra, obj_dec: object center in degrees. + Returns list of (x, y) pixel tuples. + """ + theta = math.radians(image_rotate) + cos_t = math.cos(theta) + sin_t = math.sin(theta) + + ra0 = math.radians(obj_ra) + dec0 = math.radians(obj_dec) + cos_dec0 = math.cos(dec0) + sin_dec0 = math.sin(dec0) + + points = [] + for ra_deg, dec_deg in vertices: + ra = math.radians(ra_deg) + dec = math.radians(dec_deg) + cos_dec = math.cos(dec) + sin_dec = math.sin(dec) + dra = ra - ra0 + + cos_c = sin_dec0 * sin_dec + cos_dec0 * cos_dec * math.cos(dra) + if cos_c <= 0: + continue + # gnomonic: xi points East, eta points North (radians) + xi = (cos_dec * math.sin(dra)) / cos_c + eta = (cos_dec0 * sin_dec - sin_dec0 * cos_dec * math.cos(dra)) / cos_c + + # convert to arcsec offsets then pixels + dx_arcsec = -xi * 206264.806 # negate: East is left on POSS + dy_arcsec = -eta * 206264.806 # negate: North is up, pixel y is down + + dx_px = dx_arcsec * px_per_arcsec + dy_px = dy_arcsec * px_per_arcsec + + # apply image rotation + rx = dx_px * cos_t - dy_px * sin_t + ry = dx_px * sin_t + dy_px * cos_t + + points.append((cx + fx * rx, cy + fy * ry)) + return points + + def get_display_image( catalog_object, eyepiece_text, @@ -27,6 +125,9 @@ def get_display_image( display_class, burn_in=True, magnification=None, + telescope=None, + show_nsew=True, + show_bbox=True, ): """ Returns a 128x128 image buffer for @@ -37,6 +138,8 @@ def get_display_image( roll: degrees """ + flip = telescope.flip_image if telescope else False + flop = telescope.flop_image if telescope else False object_image_path = resolve_image_name(catalog_object, source="POSS") logger.debug("object_image_path = %s", object_image_path) @@ -59,6 +162,10 @@ def get_display_image( image_rotate += roll return_image = return_image.rotate(image_rotate) + if flip: + return_image = return_image.transpose(Image.FLIP_LEFT_RIGHT) + if flop: + return_image = return_image.transpose(Image.FLIP_TOP_BOTTOM) # FOV fov_size = int(1024 * fov / 2) @@ -98,6 +205,92 @@ def get_display_image( width=1, ) + cx = display_class.fov_res / 2 + cy = display_class.fov_res / 2 + fx = -1 if flip else 1 + fy = -1 if flop else 1 + + # NSEW cardinal labels — show only 2: topmost and leftmost + if show_nsew: + (nx, ny), (ex, ey) = cardinal_vectors(image_rotate, fx, fy) + label_font = display_class.fonts.base + label_color = display_class.colors.get(64) + r_label = display_class.fov_res / 2 - 2 + top_limit = display_class.titlebar_height + bottom_limit = display_class.fov_res - label_font.height * 2 + + candidates = [ + ("N", nx, ny), + ("S", -nx, -ny), + ("E", ex, ey), + ("W", -ex, -ey), + ] + by_top = sorted(candidates, key=lambda c: c[2]) + by_left = sorted(candidates, key=lambda c: c[1]) + chosen = {by_top[0][0]: by_top[0]} + # pick leftmost that isn't already chosen + for c in by_left: + if c[0] not in chosen: + chosen[c[0]] = c + break + + for label, dx, dy in chosen.values(): + lx = cx + dx * r_label - label_font.width / 2 + ly = cy + dy * r_label - label_font.height / 2 + lx = max(0, min(lx, display_class.fov_res - label_font.width)) + ly = max(top_limit, min(ly, bottom_limit)) + ui_utils.shadow_outline_text( + ri_draw, + (lx, ly), + label, + font=label_font, + align="left", + fill=label_color, + shadow_color=display_class.colors.get(0), + outline=1, + ) + + # Size overlay + extents = catalog_object.size.extents + if show_bbox and extents and fov > 0: + px_per_arcsec = display_class.fov_res / (fov * 3600) + overlay_color = display_class.colors.get(100) + + if catalog_object.size.is_vertices: + points = vertex_overlay_points( + extents, + catalog_object.ra, + catalog_object.dec, + image_rotate, + px_per_arcsec, + cx, + cy, + fx, + fy, + ) + if len(points) >= 2: + ri_draw.line(points, fill=overlay_color, width=1) + elif len(extents) == 1: + r = extents[0] * px_per_arcsec / 2 + ri_draw.ellipse( + [cx - r, cy - r, cx + r, cy + r], + outline=overlay_color, + width=1, + ) + else: + points = size_overlay_points( + extents, + catalog_object.size.position_angle, + image_rotate, + px_per_arcsec, + cx, + cy, + fx, + fy, + ) + if points: + ri_draw.polygon(points, outline=overlay_color) + # Pad out image if needed if display_class.fov_res != display_class.resX: pad_image = Image.new("RGB", display_class.resolution) diff --git a/python/PiFinder/catalog_imports/bright_stars_loader.py b/python/PiFinder/catalog_imports/bright_stars_loader.py index fa4dfc34..b202b2a9 100644 --- a/python/PiFinder/catalog_imports/bright_stars_loader.py +++ b/python/PiFinder/catalog_imports/bright_stars_loader.py @@ -9,7 +9,7 @@ from tqdm import tqdm import PiFinder.utils as utils -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject from PiFinder.calc_utils import ra_to_deg, dec_to_deg from .catalog_import_utils import ( NewCatalogObject, @@ -45,8 +45,7 @@ def load_bright_stars(): sequence = int(dfs[0]) logging.debug(f"---------------> Bright Stars {sequence=} <---------------") - size = "" - # const = dfs[2].strip() + size = SizeObject([]) desc = "" ra_h = int(dfs[3]) @@ -58,7 +57,6 @@ def load_bright_stars(): dec_deg = dec_to_deg(dec_d, dec_m, 0) mag = MagnitudeObject([float(dfs[7].strip())]) - # const = dfs[8] new_object = NewCatalogObject( object_type=obj_type, diff --git a/python/PiFinder/catalog_imports/caldwell_loader.py b/python/PiFinder/catalog_imports/caldwell_loader.py index 0e29f513..ae25157e 100644 --- a/python/PiFinder/catalog_imports/caldwell_loader.py +++ b/python/PiFinder/catalog_imports/caldwell_loader.py @@ -17,6 +17,7 @@ insert_catalog, insert_catalog_max_sequence, add_space_after_prefix, + parse_arcmin_size, ) # Import shared database object @@ -46,7 +47,7 @@ def load_caldwell(): mag = MagnitudeObject([]) else: mag = MagnitudeObject([float(mag)]) - size = dfs[5][5:].strip() + size = parse_arcmin_size(dfs[5][5:].strip()) ra_h = int(dfs[6]) ra_m = float(dfs[7]) ra_deg = ra_to_deg(ra_h, ra_m, 0) diff --git a/python/PiFinder/catalog_imports/catalog_import_utils.py b/python/PiFinder/catalog_imports/catalog_import_utils.py index 279169df..a36dbb4e 100644 --- a/python/PiFinder/catalog_imports/catalog_import_utils.py +++ b/python/PiFinder/catalog_imports/catalog_import_utils.py @@ -10,7 +10,7 @@ from dataclasses import dataclass, field from tqdm import tqdm -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject from PiFinder.ui.ui_utils import normalize from PiFinder import calc_utils from PiFinder.db.objects_db import ObjectsDatabase @@ -30,7 +30,7 @@ class NewCatalogObject: dec: float mag: MagnitudeObject object_id: int = 0 - size: str = "" + size: SizeObject = field(default_factory=lambda: SizeObject([])) description: str = "" aka_names: list[str] = field(default_factory=list) surface_brightness: float = 0.0 @@ -76,7 +76,7 @@ def insert(self, find_object_id=True): self.ra, self.dec, self.constellation, - self.size, + self.size.to_json(), self.mag.to_json(), self.surface_brightness, ) @@ -158,6 +158,22 @@ def get_object_id(self, object_name: str): return result +def parse_arcmin_size(raw: str) -> SizeObject: + """Parse a size string assumed to be in arcminutes. Handles 'NxM' format.""" + if not raw: + return SizeObject([]) + parts = raw.lower().replace("x", " ").split() + values = [] + for p in parts: + try: + values.append(float(p)) + except ValueError: + logging.warning("Non-numeric size token %r in %r", p, raw) + if not values: + return SizeObject([]) + return SizeObject.from_arcmin(*values) + + def safe_convert_to_float(x): """Convert to float, filtering out non-numeric values""" try: diff --git a/python/PiFinder/catalog_imports/harris_loader.py b/python/PiFinder/catalog_imports/harris_loader.py index 72649f02..bf067992 100644 --- a/python/PiFinder/catalog_imports/harris_loader.py +++ b/python/PiFinder/catalog_imports/harris_loader.py @@ -14,7 +14,7 @@ import numpy as np import numpy.typing as npt import PiFinder.utils as utils -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject from PiFinder.calc_utils import ra_to_deg, dec_to_deg from .catalog_import_utils import ( delete_catalog_from_database, @@ -286,15 +286,13 @@ def create_cluster_object(entry: npt.NDArray, seq: int) -> Dict[str, Any]: logging.debug(f" Magnitude: None (invalid value: {mag_value})") # Size - use half-mass radius (Rh) in arcminutes - # Format using utils.format_size_value to match other catalogs rh = entry["Rh"].item() if is_valid_value(rh): - # Convert to string, removing unnecessary decimals - result["size"] = utils.format_size_value(rh) + result["size"] = SizeObject.from_arcmin(float(rh)) if VERBOSE: - logging.debug(f" Size (half-mass radius): {result['size']} arcmin") + logging.debug(f" Size (half-mass radius): {rh} arcmin") else: - result["size"] = "" + result["size"] = SizeObject([]) if VERBOSE: logging.debug(f" Size: None (invalid Rh value: {rh})") diff --git a/python/PiFinder/catalog_imports/herschel_loader.py b/python/PiFinder/catalog_imports/herschel_loader.py index f6f3c904..4d2d7089 100644 --- a/python/PiFinder/catalog_imports/herschel_loader.py +++ b/python/PiFinder/catalog_imports/herschel_loader.py @@ -54,9 +54,13 @@ def load_herschel400(): f"---------------> Herschel 400 {sequence=} <---------------" ) - object_id = objects_db.get_catalog_object_by_sequence( + result = objects_db.get_catalog_object_by_sequence( "NGC", NGC_sequence - )["id"] + ) + if result is None: + logging.warning("NGC %s not found, skipping H%d", NGC_sequence, sequence) + continue + object_id = result["id"] objects_db.insert_name(object_id, h_name, catalog) objects_db.insert_catalog_object(object_id, catalog, sequence, h_desc) conn.commit() diff --git a/python/PiFinder/catalog_imports/lynga_loader.py b/python/PiFinder/catalog_imports/lynga_loader.py index 09f7df85..0797bc40 100644 --- a/python/PiFinder/catalog_imports/lynga_loader.py +++ b/python/PiFinder/catalog_imports/lynga_loader.py @@ -17,7 +17,7 @@ import numpy as np import numpy.typing as npt import PiFinder.utils as utils -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject from PiFinder.calc_utils import ra_to_deg, dec_to_deg from .catalog_import_utils import ( delete_catalog_from_database, @@ -410,11 +410,11 @@ def create_cluster_object(entry: npt.NDArray, seq: int) -> Dict[str, Any]: # Angular diameter in arcminutes diam = entry["Diam"].item() if is_valid_value(diam): - result["size"] = utils.format_size_value(diam) + result["size"] = SizeObject.from_arcmin(float(diam)) if VERBOSE: - logging.debug(f" Size: {result['size']} arcmin") + logging.debug(f" Size: {result['size']}") else: - result["size"] = "" + result["size"] = SizeObject([]) # --- Description --- description_parts: List[str] = [] diff --git a/python/PiFinder/catalog_imports/main.py b/python/PiFinder/catalog_imports/main.py index 63df2f41..01abb457 100644 --- a/python/PiFinder/catalog_imports/main.py +++ b/python/PiFinder/catalog_imports/main.py @@ -86,6 +86,8 @@ def main(): objects_db, _ = init_shared_database() logging.info("creating catalog tables") + conn, _ = objects_db.get_conn_cursor() + conn.execute("PRAGMA journal_mode = WAL") objects_db.destroy_tables() objects_db.create_tables() @@ -122,6 +124,12 @@ def main(): resolve_object_images() print_database() + # Finalize: checkpoint WAL and switch to DELETE mode so the .db is + # self-contained (no -wal/-shm sidecars needed at runtime). + logging.info("Finalizing database...") + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + conn.execute("PRAGMA journal_mode = DELETE") + if __name__ == "__main__": main() diff --git a/python/PiFinder/catalog_imports/post_processing.py b/python/PiFinder/catalog_imports/post_processing.py index 22fa3a8c..d411547c 100644 --- a/python/PiFinder/catalog_imports/post_processing.py +++ b/python/PiFinder/catalog_imports/post_processing.py @@ -11,7 +11,7 @@ # Import shared database object from .database import objects_db from .catalog_import_utils import NewCatalogObject -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject import PiFinder.utils as utils @@ -123,7 +123,7 @@ def add_missing_messier_objects(): ra=185.552, # 12h 22m 12.5272s in degrees dec=58.083, # +58° 4′ 58.549″ in degrees mag=MagnitudeObject([9.9]), # Average of components A (9.64) and B (10.11) - size="0.1'", + size=SizeObject.from_arcmin(0.1), description="Winnecke 4 double star", aka_names=m40_aka_names, ) @@ -143,7 +143,7 @@ def add_missing_messier_objects(): ra=56.85, # 03h 47m 24s in degrees dec=24.117, # +24° 07′ 00″ in degrees mag=MagnitudeObject([1.6]), - size="120'", # 2° = 120 arcminutes + size=SizeObject.from_degrees(2.0), # 2° description="Pleiades open cluster", aka_names=m45_aka_names, ) @@ -163,7 +163,7 @@ def add_missing_messier_objects(): ra=274.6, # 18h 18m 24s in degrees dec=-18.4, # -18° 24′ 00″ in degrees mag=MagnitudeObject([4.6]), # Visual magnitude of the brightest part - size="90'", # About 1.5 degrees + size=SizeObject.from_degrees(1.5), # ~1.5° description="Sagittarius Star Cloud", aka_names=m24_aka_names, ) @@ -183,7 +183,7 @@ def add_missing_messier_objects(): ra=226.623, # 15h 06m 29.5s in degrees dec=55.763, # +55° 45′ 48″ in degrees mag=MagnitudeObject([10.7]), - size="5.2'x2.3'", + size=SizeObject.from_arcmin(5.2, 2.3), description="Spindle Galaxy (controversial Messier object)", aka_names=m102_aka_names, ) diff --git a/python/PiFinder/catalog_imports/sac_loaders.py b/python/PiFinder/catalog_imports/sac_loaders.py index 8fa58859..238746f5 100644 --- a/python/PiFinder/catalog_imports/sac_loaders.py +++ b/python/PiFinder/catalog_imports/sac_loaders.py @@ -9,7 +9,7 @@ from tqdm import tqdm import PiFinder.utils as utils -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject from PiFinder.calc_utils import ra_to_deg, dec_to_deg from .catalog_import_utils import ( NewCatalogObject, @@ -23,6 +23,33 @@ from .database import objects_db +def _parse_sac_asterism_size(raw: str) -> SizeObject: + """Parse SAC asterism size strings like '3d X 2.4d', '10x5', '20deg x 15deg'. + + Values with 'd', 'deg', or '°' are degrees; plain numbers are arcminutes. + """ + cleaned = raw.strip().replace(" ", "").replace("X", "x") + if not cleaned: + return SizeObject([]) + parts = cleaned.split("x") + values = [] + is_degrees = False + for p in parts: + p = p.strip() + if "deg" in p or "°" in p or p.endswith("d"): + is_degrees = True + p = p.replace("deg", "").replace("°", "").rstrip("d") + try: + values.append(float(p)) + except ValueError: + return SizeObject([]) + if not values: + return SizeObject([]) + if is_degrees: + return SizeObject.from_degrees(*values) + return SizeObject.from_arcmin(*values) + + def load_sac_asterisms(): """Load the SAC Asterisms catalog""" logging.info("Loading SAC Asterisms") @@ -56,7 +83,6 @@ def load_sac_asterisms(): logging.debug( f"---------------> SAC Asterisms {sequence=} <---------------" ) - # const = dfs[2].strip() ra = dfs[3].strip() dec = dfs[4].strip() mag = dfs[5].strip() @@ -64,13 +90,7 @@ def load_sac_asterisms(): mag = MagnitudeObject([]) else: mag = MagnitudeObject([float(mag)]) - size = ( - dfs[6] - .replace(" ", "") - .replace("X", "x") - .replace("deg", "°") - .replace("d", "°") - ) + size = _parse_sac_asterism_size(dfs[6]) desc = dfs[9].strip() ra = ra.split() @@ -182,6 +202,11 @@ def load_sac_multistars(): dec_m = float(dec[1]) dec_deg = dec_to_deg(dec_d, dec_m, 0) + if sep and utils.is_number(sep): + size = SizeObject.from_arcsec(float(sep)) + else: + size = SizeObject([]) + new_object = NewCatalogObject( object_type=obj_type, catalog_code=catalog, @@ -189,7 +214,7 @@ def load_sac_multistars(): ra=ra_deg, dec=dec_deg, mag=mag, - size=sep, + size=size, description=desc, aka_names=name, ) @@ -249,10 +274,9 @@ def load_sac_redstars(): logging.debug( f"---------------> SAC Red Stars {sequence=} <---------------" ) - # const = dfs[3].strip() ra = dfs[4].strip() dec = dfs[5].strip() - size = "" + size = SizeObject([]) mag = dfs[6].strip() if mag == "none": mag = MagnitudeObject([]) diff --git a/python/PiFinder/catalog_imports/specialized_loaders.py b/python/PiFinder/catalog_imports/specialized_loaders.py index 5e29e770..e8d68aee 100644 --- a/python/PiFinder/catalog_imports/specialized_loaders.py +++ b/python/PiFinder/catalog_imports/specialized_loaders.py @@ -15,7 +15,7 @@ from collections import namedtuple, defaultdict import PiFinder.utils as utils -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject from PiFinder.calc_utils import ra_to_deg, dec_to_deg, b1950_to_j2000 from .catalog_import_utils import ( NewCatalogObject, @@ -23,6 +23,7 @@ insert_catalog, insert_catalog_max_sequence, add_space_after_prefix, + parse_arcmin_size, ) # Import shared database object @@ -43,7 +44,7 @@ def load_egc(): delete_catalog_from_database(catalog) insert_catalog(catalog, Path(utils.astro_data_dir, "EGC.desc")) - egc = Path(utils.astro_data_dir, "egc.tsv") + egc = Path(utils.astro_data_dir, "EGC.tsv") # Create shared ObjectFinder to avoid recreating for each object from .catalog_import_utils import ObjectFinder @@ -72,7 +73,8 @@ def load_egc(): dec_s = int(dec[2]) dec_deg = dec_to_deg(dec_deg, dec_m, dec_s) - size = dfs[5] + raw_size = dfs[5].strip() + size = parse_arcmin_size(raw_size) mag = MagnitudeObject([float(dfs[4])]) desc = dfs[7] @@ -138,7 +140,7 @@ def load_collinder(): dec_s = int(dec[9:11]) dec_deg = dec_to_deg(dec_deg, dec_m, dec_s) - size = dfs[7] + size = parse_arcmin_size(dfs[7]) desc = f"{dfs[6]} stars, like {dfs[8]}" # Assuming all the parsing logic is done and all variables are available... @@ -284,7 +286,7 @@ def load_taas200(): mag = MagnitudeObject([]) else: mag = MagnitudeObject([float(mag)]) - size = row["Size"] + size = parse_arcmin_size(row["Size"]) desc = row["Description"] nr_stars = row["# Stars"] gc = row["GC Conc or Class"] @@ -363,10 +365,13 @@ def load_rasc_double_Stars(): alternate_ids = dfs[2].split(",") wds = dfs[3] obj_type = "D*" - # const = dfs[4] mags = json.loads(dfs[7]) mag = MagnitudeObject(mags) - size = dfs[8] + raw_sep = dfs[8].strip() + try: + size = SizeObject.from_arcsec(float(raw_sep)) + except (ValueError, TypeError): + size = SizeObject([]) # 03 31.1 +27 44 ra = dfs[5].split() ra_h = int(ra[0]) @@ -429,7 +434,7 @@ def load_barnard(): for row in tqdm(list(df), leave=False): Barn = row[1:5].strip() if Barn[-1] == "a": - print(f"Skipping {Barn=}") + logging.debug(f"Skipping {Barn=}") continue RA2000h = int(row[22:24]) RA2000m = int(row[25:27]) @@ -437,7 +442,7 @@ def load_barnard(): DE2000_sign = row[32] DE2000d = int(row[33:35]) DE2000m = int(row[36:38]) - Diam = float(row[39:44]) if row[39:44].strip() else "" + raw_diam = row[39:44].strip() sequence = Barn logging.debug(f"<------------- Barnard {sequence=} ------------->") obj_type = "Nb" @@ -451,6 +456,11 @@ def load_barnard(): dec_deg = dec_to_deg(dec_deg, dec_m, 0) desc = barn_dict[Barn].strip() + if raw_diam: + barn_size = SizeObject.from_arcmin(float(raw_diam)) + else: + barn_size = SizeObject([]) + new_object = NewCatalogObject( object_type=obj_type, catalog_code=catalog, @@ -458,7 +468,7 @@ def load_barnard(): ra=ra_deg, dec=dec_deg, mag=MagnitudeObject([]), - size=str(Diam), + size=barn_size, description=desc, aka_names=[], ) @@ -490,8 +500,8 @@ def load_sharpless(): # read description dictionary descriptions_dict = {} - with open(akas, mode="r", newline="", encoding="utf-8") as file: - reader = csv.reader(open(descriptions, "r")) + with open(descriptions, "r") as file: + reader = csv.reader(file) for row in reader: if len(row) == 2: k, v = row @@ -569,7 +579,7 @@ def load_sharpless(): sequence=record["Sh2"], ra=j_ra_deg, dec=dec_deg, - size=str(record["Diam"]), + size=SizeObject.from_arcmin(float(record["Diam"])), mag=MagnitudeObject([]), description=desc, aka_names=current_akas, @@ -738,7 +748,6 @@ def load_tlk_90_vars(): ra=ra_deg, dec=dec_deg, mag=mag_object, - size="", description=desc, aka_names=current_akas, ) @@ -778,6 +787,12 @@ def load_abell(): if other_name != "": aka_names.append(other_name) + raw_abell_size = split_line[6].strip() + try: + abell_size = SizeObject.from_arcmin(float(raw_abell_size)) + except (ValueError, TypeError): + abell_size = SizeObject([]) + new_object = NewCatalogObject( object_type=obj_type, catalog_code=catalog, @@ -785,7 +800,7 @@ def load_abell(): ra=float(split_line[3].strip()), dec=float(split_line[4].strip()), mag=MagnitudeObject([float(split_line[5].strip())]), - size=split_line[6].strip(), + size=abell_size, aka_names=aka_names, ) diff --git a/python/PiFinder/catalog_imports/steinicke_loader.py b/python/PiFinder/catalog_imports/steinicke_loader.py index 6d2c31a4..acf147d9 100644 --- a/python/PiFinder/catalog_imports/steinicke_loader.py +++ b/python/PiFinder/catalog_imports/steinicke_loader.py @@ -15,8 +15,7 @@ from collections import defaultdict import PiFinder.utils as utils -from PiFinder.utils import format_size_value -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject from .catalog_import_utils import ( NewCatalogObject, delete_catalog_from_database, @@ -455,12 +454,18 @@ def get_priority(obj): # Get surface brightness surface_brightness = obj.get("surface_brightness") - # Format size information - size = "" + # Format size information (arcminutes from Steinicke) + pa = float(obj["position_angle"]) if obj.get("position_angle") else 0.0 if obj.get("diameter_larger"): - size = format_size_value(obj["diameter_larger"]) + larger = float(obj["diameter_larger"]) if obj.get("diameter_smaller"): - size += f"x{format_size_value(obj['diameter_smaller'])}" + size = SizeObject.from_arcmin( + larger, float(obj["diameter_smaller"]), position_angle=pa + ) + else: + size = SizeObject.from_arcmin(larger, position_angle=pa) + else: + size = SizeObject([]) desc = "" extra = "" diff --git a/python/PiFinder/catalog_imports/wds_loader.py b/python/PiFinder/catalog_imports/wds_loader.py index 96d79863..395f35ec 100644 --- a/python/PiFinder/catalog_imports/wds_loader.py +++ b/python/PiFinder/catalog_imports/wds_loader.py @@ -15,7 +15,7 @@ from collections import defaultdict import PiFinder.utils as utils -from PiFinder.composite_object import MagnitudeObject +from PiFinder.composite_object import MagnitudeObject, SizeObject from PiFinder.calc_utils import ra_to_deg, dec_to_deg from .catalog_import_utils import ( delete_catalog_from_database, @@ -206,8 +206,8 @@ def handle_multiples(key, values) -> dict: result["ra"] = value["ra"] result["dec"] = value["dec"] result["mag"] = MagnitudeObject([mag1, mag2]) - sizemax = np.max([value["Sep_First"], value["Sep_Last"]]) - result["size"] = str(round(sizemax, 1)) + sizemax = float(np.max([value["Sep_First"], value["Sep_Last"]])) + result["size"] = SizeObject.from_arcsec(round(sizemax, 1)) discoverers.add(value["Discoverer_Number"]) notes = value["Notes"].strip() notes_str = "" if len(notes) == 0 else f" Notes: {notes}" diff --git a/python/PiFinder/catalogs.py b/python/PiFinder/catalogs.py index 81cf55d4..c5bac3cf 100644 --- a/python/PiFinder/catalogs.py +++ b/python/PiFinder/catalogs.py @@ -14,7 +14,7 @@ from PiFinder.db.db import Database from PiFinder.db.objects_db import ObjectsDatabase from PiFinder.db.observations_db import ObservationsDatabase -from PiFinder.composite_object import CompositeObject, MagnitudeObject +from PiFinder.composite_object import CompositeObject, MagnitudeObject, SizeObject from PiFinder.utils import Timer from PiFinder.config import Config from PiFinder.catalog_base import ( @@ -223,9 +223,6 @@ def apply_filter(self, obj: CompositeObject): if obj.const not in self._constellations: obj.last_filtered_result = False return False - else: - obj.last_filtered_result = False - return False # check altitude if self._altitude != -1 and self.fast_aa: @@ -259,9 +256,6 @@ def apply_filter(self, obj: CompositeObject): if obj.obj_type not in self._object_types: obj.last_filtered_result = False return False - else: - obj.last_filtered_result = False - return False # check observed if self._observed is not None and self._observed != "Any": @@ -657,7 +651,7 @@ def add_planet(self, sequence: int, name: str, planet: Dict[str, Dict[str, float "ra": ra, "dec": dec, "const": constellation, - "size": "", + "size": SizeObject([]), "mag": MagnitudeObject([planet["mag"]]), "names": [name.capitalize()], "catalog_code": "PL", @@ -825,7 +819,6 @@ def _create_full_composite_object(self, catalog_obj: Dict) -> CompositeObject: "sequence": catalog_obj["sequence"], "description": catalog_obj.get("description", ""), "const": obj_data.get("const", ""), - "size": obj_data.get("size", ""), "surface_brightness": obj_data.get("surface_brightness", None), } @@ -842,6 +835,8 @@ def _create_full_composite_object(self, catalog_obj: Dict) -> CompositeObject: composite_instance.mag = MagnitudeObject([]) composite_instance.mag_str = "-" + composite_instance.size = SizeObject.from_json(obj_data.get("size", "")) + composite_instance._details_loaded = True return composite_instance @@ -975,7 +970,6 @@ def _create_full_composite_object( "sequence": catalog_obj["sequence"], "description": catalog_obj.get("description", ""), "const": obj_data.get("const", ""), - "size": obj_data.get("size", ""), "surface_brightness": obj_data.get("surface_brightness", None), } @@ -992,6 +986,8 @@ def _create_full_composite_object( composite_instance.mag = MagnitudeObject([]) composite_instance.mag_str = "-" + composite_instance.size = SizeObject.from_json(obj_data.get("size", "")) + composite_instance._details_loaded = True return composite_instance diff --git a/python/PiFinder/comet_catalog.py b/python/PiFinder/comet_catalog.py index dbe89858..e41603ee 100644 --- a/python/PiFinder/comet_catalog.py +++ b/python/PiFinder/comet_catalog.py @@ -13,7 +13,7 @@ ) from PiFinder.catalogs import Catalog from PiFinder.state import SharedStateObj -from PiFinder.composite_object import CompositeObject, MagnitudeObject +from PiFinder.composite_object import CompositeObject, MagnitudeObject, SizeObject import PiFinder.comets as comets from PiFinder.utils import Timer, comet_file from PiFinder.calc_utils import sf_utils @@ -280,7 +280,7 @@ def add_comet(self, sequence: int, name: str, comet: Dict[str, Dict[str, float]] "ra": ra, "dec": dec, "const": constellation, - "size": "", + "size": SizeObject([]), "mag": mag, "mag_str": mag.calc_two_mag_representation(), "names": [name], diff --git a/python/PiFinder/composite_object.py b/python/PiFinder/composite_object.py index bd87c636..eff3800f 100644 --- a/python/PiFinder/composite_object.py +++ b/python/PiFinder/composite_object.py @@ -2,10 +2,173 @@ from dataclasses import dataclass, field import numpy as np import json -from typing import List +import math +from typing import List, Union, cast from PiFinder.utils import is_number +class SizeObject: + """Structured angular size for astronomical objects. + + All extents are stored internally in arcseconds. + - [] -> unknown / point source + - [d] -> circular, diameter d + - [major, minor] -> elliptical (major x minor axes) + - [v1, v2, ...] -> polygon radial distances at equal angular intervals + - [[ra,dec], ...] -> RA/Dec polyline vertices (degrees) + - [[[ra,dec],[ra,dec]], ...] -> disconnected line segments (degrees) + + The geometry field disambiguates: "polyline" or "segments". + """ + + def __init__( + self, + extents: Union[List[float], List[List[float]]], + position_angle: float = 0.0, + geometry: str = "", + ): + self.extents: Union[List[float], List[List[float]]] = extents + self.position_angle: float = position_angle + self.geometry: str = geometry + + # --- mode detection --- + + @property + def is_vertices(self) -> bool: + """True for polyline vertices: [[ra,dec], ...]""" + if not self.extents: + return False + if self.geometry == "segments": + return False + if self.geometry == "polyline": + return True + return isinstance(self.extents[0], (list, tuple)) + + @property + def is_segments(self) -> bool: + """True for disconnected segments: [[[ra,dec],[ra,dec]], ...]""" + if self.geometry == "segments": + return True + return False + + def _all_vertices(self) -> List[List[float]]: + """Collect all RA/Dec vertices regardless of geometry type.""" + if self.is_segments: + verts: List[List[float]] = [] + # Segments-mode extents: list of [[ra,dec],[ra,dec]] segments. + for seg in cast(List[List[List[float]]], self.extents): + verts.extend(seg) + return verts + if self.is_vertices: + return cast(List[List[float]], self.extents) + return [] + + @property + def max_extent_arcsec(self) -> float: + if not self.extents: + return 0.0 + verts = self._all_vertices() + if verts: + max_sep = 0.0 + for i in range(len(verts)): + for j in range(i + 1, len(verts)): + ra1, dec1 = math.radians(verts[i][0]), math.radians(verts[i][1]) + ra2, dec2 = math.radians(verts[j][0]), math.radians(verts[j][1]) + dra = ra2 - ra1 + ddec = dec2 - dec1 + cos_dec = math.cos((dec1 + dec2) / 2) + sep = math.sqrt((dra * cos_dec) ** 2 + ddec**2) + max_sep = max(max_sep, sep) + return math.degrees(max_sep) * 3600.0 + # Numeric extents at this point — vertex/segment branches handled above. + return max(cast(List[float], self.extents)) + + # --- constructors --- + + @classmethod + def from_arcmin(cls, *values: float, position_angle: float = 0.0) -> "SizeObject": + return cls([v * 60.0 for v in values], position_angle=position_angle) + + @classmethod + def from_arcsec(cls, *values: float, position_angle: float = 0.0) -> "SizeObject": + return cls(list(values), position_angle=position_angle) + + @classmethod + def from_degrees(cls, *values: float, position_angle: float = 0.0) -> "SizeObject": + return cls([v * 3600.0 for v in values], position_angle=position_angle) + + @classmethod + def from_vertices(cls, vertices: List[List[float]]) -> "SizeObject": + return cls(vertices, position_angle=0.0) + + # --- serialization --- + + def to_json(self) -> str: + return json.dumps({"e": self.extents, "p": self.position_angle}) + + @classmethod + def from_json(cls, json_str: str) -> "SizeObject": + if not json_str: + return cls([]) + try: + parsed = json.loads(json_str) + except (json.JSONDecodeError, TypeError): + # Legacy DB rows store size as plain text (e.g. "5'", "17x8"), + # not JSON. Degrade to an empty SizeObject so the catalog + # still loads; a re-import populates proper extents. + return cls([]) + if not isinstance(parsed, dict) or "e" not in parsed: + return cls([]) + return cls(parsed["e"], position_angle=parsed.get("p", 0.0)) + + # --- display --- + + def _format_value(self, arcsec: float, unit_suffix: str) -> str: + """Format a single value, dropping .0 for whole numbers.""" + if unit_suffix == '"': + val = arcsec + elif unit_suffix == "'": + val = arcsec / 60.0 + else: + val = arcsec / 3600.0 + if val == int(val): + return f"{int(val)}{unit_suffix}" + return f"{val:.1f}{unit_suffix}" + + def _pick_unit(self, arcsec: float) -> str: + """Choose display unit for a value in arcseconds.""" + if arcsec >= 3600.0: + return "°" + if arcsec >= 60.0: + return "'" + return '"' + + def to_display_string(self) -> str: + if not self.extents: + return "" + if self.is_vertices or self.is_segments: + extent = self.max_extent_arcsec + return f"~{self._format_value(extent, self._pick_unit(extent))}" + # Numeric-extent path: extents is List[float] here. + extents = cast(List[float], self.extents) + unit = self._pick_unit(max(extents)) + if len(extents) == 1: + return self._format_value(extents[0], unit) + if len(extents) == 2: + a = self._format_value(extents[0], unit) + b = self._format_value(extents[1], unit) + # strip repeated unit suffix for compact display: 17'x8' + return f"{a}x{b}" + # 3+ extents: show max extent only with polygon marker + return f"~{self._format_value(max(extents), unit)}" + + def __repr__(self) -> str: + return f"SizeObject({self.extents})" + + def __str__(self) -> str: + return self.to_display_string() + + class MagnitudeObject: UNKNOWN_MAG: float = 99 mags: List = [] @@ -48,9 +211,17 @@ def __repr__(self): @classmethod def from_json(cls, json_str): - data = json.loads(json_str) - obj = cls(data["mags"]) - return obj + if not json_str: + return cls([]) + try: + data = json.loads(json_str) + except (json.JSONDecodeError, TypeError): + # Legacy DB rows store mag as plain text (e.g. "12.5", "12.5/13.5"), + # not JSON. Degrade to an empty MagnitudeObject. + return cls([]) + if not isinstance(data, dict) or "mags" not in data: + return cls([]) + return cls(data["mags"]) @dataclass @@ -67,7 +238,7 @@ class CompositeObject: # dec in degrees, J2000 dec: float = field(default=0.0) const: str = field(default="") - size: str = field(default="") + size: "SizeObject" = field(default_factory=lambda: SizeObject([])) mag: MagnitudeObject = field(default=MagnitudeObject([])) mag_str: str = field(default="") catalog_code: str = field(default="") diff --git a/python/PiFinder/db/objects_db.py b/python/PiFinder/db/objects_db.py index b8ad3b50..95eaa3c2 100644 --- a/python/PiFinder/db/objects_db.py +++ b/python/PiFinder/db/objects_db.py @@ -18,9 +18,6 @@ def __init__(self, db_path=utils.pifinder_db): self.cursor.execute("PRAGMA mmap_size = 268435456;") # 256MB memory mapping self.cursor.execute("PRAGMA cache_size = -64000;") # 64MB cache (negative = KB) self.cursor.execute("PRAGMA temp_store = MEMORY;") # Keep temporary data in RAM - self.cursor.execute( - "PRAGMA journal_mode = WAL;" - ) # Write-ahead logging for better concurrency self.cursor.execute( "PRAGMA synchronous = NORMAL;" ) # Balanced safety/performance @@ -40,7 +37,7 @@ def create_tables(self): dec NUMERIC, const TEXT, size TEXT, - mag NUMERIC, + mag TEXT, surface_brightness NUMERIC ); """ diff --git a/python/PiFinder/gen_images.py b/python/PiFinder/gen_images.py index 81865cb4..b4804c63 100644 --- a/python/PiFinder/gen_images.py +++ b/python/PiFinder/gen_images.py @@ -1,41 +1,61 @@ #!/usr/bin/python # -*- coding:utf-8 -*- """ -This module fetches images from sky survey sources on the internet -and prepares them for PiFinder use. +Fetch images from sky survey sources (NASA SkyView POSS, SDSS DR18) +and prepare them for PiFinder use. + +Usage: + python -m PiFinder.gen_images # Fetch missing images + python -m PiFinder.gen_images --force # Re-fetch ALL images + python -m PiFinder.gen_images --force --poss # Re-fetch POSS only + python -m PiFinder.gen_images --force --sdss # Re-fetch SDSS only + python -m PiFinder.gen_images --workers 20 # More concurrency """ -import requests +import argparse import os -from tqdm import tqdm +import sqlite3 +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from io import BytesIO +from typing import Dict, List, Tuple + +import requests from PIL import Image, ImageOps -from PiFinder.db.objects_db import ObjectsDatabase -from PiFinder.catalogs import CompositeObject +from tqdm import tqdm + +from PiFinder import utils + +BASE_IMAGE_PATH = f"{utils.data_dir}/catalog_images" + +# Catalogs that are excluded from image fetching (no meaningful survey images) +EXCLUDED_CATALOGS = {"WDS"} -BASE_IMAGE_PATH = "/Users/rich/Projects/Astronomy/PiFinder/astro_data/catalog_images" +SKYVIEW_URL = ( + "https://skyview.gsfc.nasa.gov/current/cgi/runquery.pl" + "?Survey=digitized+sky+survey&position={ra},{dec}" + "&Return=JPEG&size=1&pixels=1024" +) -CATALOG_PATH = "/Users/rich/Projects/Astronomy/PiFinder/astro_data/pifinder_objects.db" +SDSS_URL = ( + "https://skyserver.sdss.org/dr18/SkyServerWS/ImgCutout/getjpeg" + "?ra={ra}&dec={dec}&scale=3.515&width=1024&height=1024&opt=" +) -def resolve_image_name(catalog_object, source): - """ - returns the image path for this objects - """ - return f"{BASE_IMAGE_PATH}/{str(catalog_object.image_name)[-1]}/{catalog_object.image_name}_{source}.jpg" +def resolve_image_path(image_name: str, source: str) -> str: + last_char = str(image_name)[-1] + return f"{BASE_IMAGE_PATH}/{last_char}/{image_name}_{source}.jpg" -def check_image(image): - """ - Checks for defects.... - """ - # out of range message +def check_sdss_image(image: Image.Image) -> bool: + """Check SDSS image for defects (blank/out-of-range).""" blank = True for y in range(0, 24): if image.getpixel((0, y + 50)) > 0: blank = False break if blank: - print("\tSDSS Out of range") return False black_pixel_count = 0 @@ -43,73 +63,213 @@ def check_image(image): if pixel == 0: black_pixel_count += 1 if black_pixel_count > 120000: - print("\tToo many black pixels") return False - return True -def fetch_object_image(_obj, low_cut=10): - """ - Check if image exists - or fetch it. - - Returns image path - """ - catalog_object = CompositeObject.from_dict(dict(_obj)) - ra = catalog_object.ra - dec = catalog_object.dec - - object_image_path = resolve_image_name(catalog_object, "POSS") - if not os.path.exists(object_image_path): - print(f"Fetching {object_image_path}") - # POSS - # this url has less contrast and requires a low-cut on the autoconstrast - fetch_url = f"https://skyview.gsfc.nasa.gov/current/cgi/runquery.pl?Survey=digitized+sky+survey&position={ra},{dec}&Return=JPEG&size=1&pixels=1024" - - fetched_image = Image.open(requests.get(fetch_url, stream=True).raw) - fetched_image = fetched_image.convert("L") - fetched_image = ImageOps.autocontrast(fetched_image, cutoff=(low_cut, 0)) - fetched_image.save(object_image_path) - print("\tPOSS Good!") - - # SDSS DR18 - object_image_path = resolve_image_name(catalog_object, "SDSS") - fetch_url = f"https://skyserver.sdss.org/dr18/SkyServerWS/ImgCutout/getjpeg?ra={ra}&dec={dec}&scale=3.515&width=1024&height=1024&opt=" - fetched_image = Image.open(requests.get(fetch_url, stream=True).raw) - fetched_image = fetched_image.convert("L") - - # check to see if it's black (i.e. out of SDSS coverage area) - if check_image(fetched_image): - print("\tSDSS Good!") - fetched_image = ImageOps.autocontrast(fetched_image) - fetched_image.save(object_image_path) +def fetch_poss( + session: requests.Session, ra: float, dec: float, image_name: str, low_cut: int = 10 +) -> Tuple[bool, str]: + """Fetch POSS image from NASA SkyView.""" + path = resolve_image_path(image_name, "POSS") + url = SKYVIEW_URL.format(ra=ra, dec=dec) + try: + resp = session.get(url, timeout=60) + if resp.status_code != 200: + return False, f"HTTP {resp.status_code}" + img = Image.open(BytesIO(resp.content)) + img = img.convert("L") + img = ImageOps.autocontrast(img, cutoff=(low_cut, 0)) + os.makedirs(os.path.dirname(path), exist_ok=True) + img.save(path) + return True, "" + except Exception as e: + return False, str(e) + + +def fetch_sdss( + session: requests.Session, ra: float, dec: float, image_name: str +) -> Tuple[bool, str]: + """Fetch SDSS DR18 image.""" + path = resolve_image_path(image_name, "SDSS") + url = SDSS_URL.format(ra=ra, dec=dec) + try: + resp = session.get(url, timeout=60) + if resp.status_code != 200: + return False, f"HTTP {resp.status_code}" + img = Image.open(BytesIO(resp.content)) + img = img.convert("L") + if not check_sdss_image(img): + return False, "out of range" + img = ImageOps.autocontrast(img) + os.makedirs(os.path.dirname(path), exist_ok=True) + img.save(path) + return True, "" + except Exception as e: + return False, str(e) + + +def fetch_object( + session: requests.Session, + ra: float, + dec: float, + image_name: str, + do_poss: bool, + do_sdss: bool, + force: bool, +) -> Tuple[str, Dict[str, Tuple[bool, str]]]: + """Fetch survey images for one object.""" + results: Dict[str, Tuple[bool, str]] = {} + + if do_poss: + path = resolve_image_path(image_name, "POSS") + if force or not os.path.exists(path): + results["POSS"] = fetch_poss(session, ra, dec, image_name) else: - print("\tSDSS BAD!") - return False + results["POSS"] = (True, "exists") - return True + if do_sdss: + path = resolve_image_path(image_name, "SDSS") + if force or not os.path.exists(path): + results["SDSS"] = fetch_sdss(session, ra, dec, image_name) + else: + results["SDSS"] = (True, "exists") + + return image_name, results + + +def get_objects_to_fetch() -> List[Tuple[float, float, str]]: + """Get all non-WDS objects with their coordinates and image names.""" + db_path = utils.pifinder_db + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + excluded_placeholders = ",".join("?" for _ in EXCLUDED_CATALOGS) + cursor.execute( + f""" + SELECT DISTINCT o.ra, o.dec, oi.image_name + FROM objects o + JOIN object_images oi ON oi.object_id = o.id + JOIN catalog_objects co ON co.object_id = o.id + WHERE co.catalog_code NOT IN ({excluded_placeholders}) + AND oi.image_name != '' + """, + list(EXCLUDED_CATALOGS), + ) + rows = [(r["ra"], r["dec"], r["image_name"]) for r in cursor.fetchall()] + conn.close() + return rows def create_catalog_image_dirs(): - """ - Checks for and creates catalog_image dirs - """ if not os.path.exists(BASE_IMAGE_PATH): os.makedirs(BASE_IMAGE_PATH) - for i in range(0, 10): - _image_dir = f"{BASE_IMAGE_PATH}/{i}" - if not os.path.exists(_image_dir): - os.makedirs(_image_dir) + d = f"{BASE_IMAGE_PATH}/{i}" + if not os.path.exists(d): + os.makedirs(d) def main(): - objects_db = ObjectsDatabase() + parser = argparse.ArgumentParser( + description="Fetch survey images for PiFinder objects" + ) + parser.add_argument( + "--force", action="store_true", help="Re-fetch even if image exists" + ) + parser.add_argument("--poss", action="store_true", help="Fetch POSS only") + parser.add_argument("--sdss", action="store_true", help="Fetch SDSS only") + parser.add_argument( + "--workers", type=int, default=10, help="Concurrent workers (default: 10)" + ) + args = parser.parse_args() + + do_poss = True + do_sdss = True + if args.poss and not args.sdss: + do_sdss = False + elif args.sdss and not args.poss: + do_poss = False + create_catalog_image_dirs() - all_objects = objects_db.get_objects() - for catalog_object in tqdm(all_objects): - fetch_object_image(catalog_object) + + print("Querying objects from database...") + objects = get_objects_to_fetch() + print(f"Found {len(objects)} objects (excluding {', '.join(EXCLUDED_CATALOGS)})") + + if args.force: + to_fetch = objects + print(f"Force mode: will re-fetch all {len(to_fetch)} objects") + else: + to_fetch = [] + for ra, dec, name in objects: + poss_missing = do_poss and not os.path.exists( + resolve_image_path(name, "POSS") + ) + sdss_missing = do_sdss and not os.path.exists( + resolve_image_path(name, "SDSS") + ) + if poss_missing or sdss_missing: + to_fetch.append((ra, dec, name)) + print(f"Missing images: {len(to_fetch)} of {len(objects)}") + + if not to_fetch: + print("Nothing to fetch!") + return + + sources = [] + if do_poss: + sources.append("POSS") + if do_sdss: + sources.append("SDSS") + print(f"Fetching: {', '.join(sources)} with {args.workers} workers") + + session = requests.Session() + session.headers.update({"User-Agent": "PiFinder-ImageGenerator/2.0"}) + + failed: List[Tuple[str, str]] = [] + fetched = 0 + skipped = 0 + t_start = time.time() + + with ThreadPoolExecutor(max_workers=args.workers) as executor: + futures = { + executor.submit( + fetch_object, session, ra, dec, name, do_poss, do_sdss, args.force + ): name + for ra, dec, name in to_fetch + } + + for future in tqdm( + as_completed(futures), total=len(futures), desc="Downloading" + ): + name = futures[future] + try: + _, results = future.result() + for source, (ok, err) in results.items(): + if err == "exists": + skipped += 1 + elif ok: + fetched += 1 + else: + failed.append((f"{name}_{source}", err)) + except Exception as e: + failed.append((name, str(e))) + + elapsed = time.time() - t_start + print( + f"\nDone in {elapsed:.0f}s: {fetched} fetched, {skipped} skipped, {len(failed)} failed" + ) + + if failed: + print(f"\nFailed ({len(failed)}):") + for name, err in failed[:20]: + print(f" {name}: {err}") + if len(failed) > 20: + print(f" ... and {len(failed) - 20} more") + + session.close() if __name__ == "__main__": diff --git a/python/PiFinder/nearby.py b/python/PiFinder/nearby.py index f2ac7448..7c828a2c 100644 --- a/python/PiFinder/nearby.py +++ b/python/PiFinder/nearby.py @@ -71,6 +71,10 @@ def calculate_objects_balltree(self, objects: list[CompositeObject]) -> None: Calculates a flat list of objects and the balltree for those objects """ deduplicated_objects = deduplicate_objects(objects) + if not deduplicated_objects: + self._objects = np.array([]) + self._objects_balltree = None + return object_radecs = np.array( [[np.deg2rad(x.ra), np.deg2rad(x.dec)] for x in deduplicated_objects] ) diff --git a/python/PiFinder/obslist.py b/python/PiFinder/obslist.py index 1b391122..d577cab6 100644 --- a/python/PiFinder/obslist.py +++ b/python/PiFinder/obslist.py @@ -1,161 +1,214 @@ #!/usr/bin/python # -*- coding:utf-8 -*- """ -This module has functions -for reading / writing -observing lists in skylist -format used by SkySafari -but supported by other -tools +Observing list management for PiFinder. + +Reads observing lists in any supported format (via obslist_formats), +resolves entries against the PiFinder catalog database, and provides +the list as CompositeObject instances for the UI. + +Writing always produces SkySafari .skylist format. """ +from __future__ import annotations + import os import logging -from textwrap import dedent from PiFinder import utils from PiFinder.catalogs import Catalogs +from PiFinder.composite_object import CompositeObject +from PiFinder.obslist_formats import ( + ObsList, + ObsListEntry, + SKYSAFARI_CATALOG_NAMES_INV, + SUPPORTED_EXTENSIONS, + read_file as formats_read_file, + write_skylist, +) logger = logging.getLogger("Observation.List") OBSLIST_DIR = f"{utils.data_dir}/obslists/" -SKYSAFARI_CATALOG_NAMES = { - "CAL": "C", - "COL": "Cr", -} - -SKYSAFARI_CATALOG_NAMES_INV = {v: k for k, v in SKYSAFARI_CATALOG_NAMES.items()} - def write_list(catalog_objects, name): """ Writes the list of catalog objects - to a file. + to a .skylist file. """ - index_num = 0 - with open(OBSLIST_DIR + name + ".skylist", "w") as skylist: - skylist.write("SkySafariObservingListVersion=3.0\n") - for obj in catalog_objects: - catalog_name = SKYSAFARI_CATALOG_NAMES.get( - obj.catalog_code, obj.catalog_code - ) - catalog_number = f"{catalog_name} {obj.sequence}" - entry_text = dedent( - f""" - SkyObject=BeginObject - ObjectID=4,-1,-1 - CatalogNumber={catalog_number} - DefaultIndex={index_num} - EndObject=SkyObject - """ - ).strip() - skylist.write(entry_text + "\n") - index_num += 1 - - -def resolve_object(catalog_numbers, catalogs: Catalogs): + entries = [_entry_from_composite(obj) for obj in catalog_objects] + obs_list = ObsList(name=name, entries=entries) + content = write_skylist(obs_list) + with open(OBSLIST_DIR + name + ".skylist", "w") as f: + f.write(content) + + +def _entry_from_composite(obj: CompositeObject) -> ObsListEntry: + """Convert a CompositeObject to an ObsListEntry.""" + return ObsListEntry( + name=obj.display_name, + ra=obj.ra, + dec=obj.dec, + obj_type=obj.obj_type, + mag=obj.mag, + size=obj.size, + catalog_code=obj.catalog_code, + sequence=obj.sequence, + description=obj.description, + ) + + +CATALOG_ALIASES: dict = { + "Messier": "M", + "Caldwell": "C", + "Collinder": "Cr", +} + + +def resolve_object(catalog_numbers, catalogs: Catalogs, comment: str = ""): """ - Takes a list of SkySafari catalog - numbers and tries to find an object - in our DB which matches + Takes a list of catalog number strings + (e.g. ["M 31", "NGC 224"]) and tries to + find a matching object in the PiFinder DB. """ for catalog_number in catalog_numbers: - catalog = catalog_number.split(" ")[0] - catalog = SKYSAFARI_CATALOG_NAMES_INV.get(catalog, catalog) + parts = catalog_number.strip().split(" ", 1) + catalog = SKYSAFARI_CATALOG_NAMES_INV.get(parts[0], parts[0]) + catalog = CATALOG_ALIASES.get(catalog, catalog) try: - sequence = catalog_number.split(" ")[1].strip() - sequence = int(sequence) + sequence = int(parts[1].strip()) except (ValueError, IndexError): sequence = None if sequence is not None: _object = catalogs.get_object(catalog, sequence) if _object: + if comment and not _object.description: + _object.description = comment return _object return None +def _coordinate_object(entry: ObsListEntry, index: int) -> CompositeObject: + """ + Creates a CompositeObject from an ObsListEntry's coordinates + when catalog resolution fails. + """ + const = "" + if entry.ra is not None and entry.dec is not None: + try: + from PiFinder.calc_utils import sf_utils + + const = sf_utils.radec_to_constellation(entry.ra, entry.dec) + except Exception: + pass + return CompositeObject( + id=-(index + 1), + object_id=-(index + 1), + ra=entry.ra, + dec=entry.dec, + obj_type=entry.obj_type or "?", + const=const, + size=entry.size, + catalog_code=entry.catalog_code or "OBS", + sequence=entry.sequence or (index + 1), + description=entry.description or entry.name, + names=[entry.name], + mag=entry.mag, + ) + + def read_list(catalogs: Catalogs, name): """ - Reads a skylist style observing - list. Matches against catalogs - and returns a catalog list + Reads an observing list file in any supported format. + Resolves entries against catalogs and returns a catalog list. """ + filepath = os.path.join(OBSLIST_DIR, name) + + try: + obs_list = formats_read_file(filepath) + except Exception as e: + logger.critical("Failed to read observing list %s: %s", name, e) + return { + "result": "error", + "objects_parsed": 0, + "message": str(e), + "catalog_objects": [], + } list_catalog: list = [] - catalog_numbers: list = [] - objects_parsed = 0 - in_object = False - with open(OBSLIST_DIR + name + ".skylist", "r") as skylist: - for line in skylist: - line = line.strip() - if line == "SkyObject=BeginObject": - if in_object: - logger.critical( - "Encountered object start while in object. File is corrupt" - ) - return { - "result": "error", - "objects_parsed": objects_parsed, - "message": "Bad start tag", - "catalog_objects": list_catalog, - } - - catalog_numbers = [] - in_object = True - - elif line == "EndObject=SkyObject": - if not in_object: - logger.critical( - "Encountered object end while not in object. File is corrupt" - ) - return { - "result": "error", - "objects_parsed": objects_parsed, - "message": "Bad end tag", - "catalog_objects": list_catalog, - } - - # see if we can resolve an object - _object = resolve_object(catalog_numbers, catalogs) - - if _object: - list_catalog.append(_object) - - objects_parsed += 1 - in_object = False - - elif line.startswith("CatalogNumber"): - if not in_object: - logger.critical( - "Encountered catalog number while not in object. File is corrupt" - ) - return { - "result": "error", - "objects_parsed": objects_parsed, - "message": "Bad catalog tag", - "catalog_objects": list_catalog, - } - catalog_numbers.append(line.split("=")[1]) + for i, entry in enumerate(obs_list.entries): + _object = None + + # Try catalog resolution with catalog_names (skylist multi-name support) + if entry.catalog_names: + _object = resolve_object(entry.catalog_names, catalogs, entry.description) + elif entry.catalog_code and entry.sequence: + _object = resolve_object( + [f"{entry.catalog_code} {entry.sequence}"], + catalogs, + entry.description, + ) + + # Fall back to coordinate-based object + if not _object and (entry.ra or entry.dec): + _object = _coordinate_object(entry, i) - else: - pass + if _object: + list_catalog.append(_object) return { "result": "success", - "objects_parsed": objects_parsed, + "objects_parsed": len(obs_list.entries), "message": "Complete", "catalog_objects": list_catalog, } -def get_lists(): - """ - Returns a list of list names on disk +def get_lists(subdir=""): """ - obs_files = [] - for filename in os.listdir(OBSLIST_DIR): - if not filename.startswith(".") and filename.endswith(".skylist"): - obs_files.append(filename[:-8]) + Returns entries (folders and observing list files) under OBSLIST_DIR/subdir. + Each entry is a dict with 'name', 'type' ('folder' or 'file'), + and either 'subdir' (for folders) or 'path' (for files). - return obs_files + When multiple files share the same stem (e.g. CSOG.skylist and CSOG.csv), + an extension tag is appended to the display name: "CSOG [skylist]". + """ + target = os.path.join(OBSLIST_DIR, subdir) + if not os.path.isdir(target): + return [] + + folders = [] + files = [] + stem_counts: dict = {} + for name in sorted(os.listdir(target)): + if name.startswith("."): + continue + full = os.path.join(target, name) + if os.path.isdir(full): + folders.append( + {"name": name, "type": "folder", "subdir": os.path.join(subdir, name)} + ) + else: + for ext in SUPPORTED_EXTENSIONS: + if name.endswith(ext): + stem = name[: -len(ext)] + tag = ext[1:] # strip the dot + files.append( + { + "stem": stem, + "tag": tag, + "type": "file", + "path": os.path.join(subdir, name), + } + ) + stem_counts[stem] = stem_counts.get(stem, 0) + 1 + break + + entries = list(folders) + for f in files: + display = f["stem"] + if stem_counts.get(f["stem"], 1) > 1: + display = f"{f['stem']} [{f['tag']}]" + entries.append({"name": display, "type": "file", "path": f["path"]}) + return entries diff --git a/python/PiFinder/obslist_formats.py b/python/PiFinder/obslist_formats.py new file mode 100644 index 00000000..fca901d3 --- /dev/null +++ b/python/PiFinder/obslist_formats.py @@ -0,0 +1,1087 @@ +""" +Common observing list format library. + +Reads and writes astronomical observing lists in multiple formats: +- SkySafari (.skylist) +- CSV (.csv) +- Plain Text (.txt) +- Stellarium (.sol) +- Autostar Tour (.txt) +- Argo Navis (.txt) +- NexTour (.hct) +- EQMOD Tour (.lst) +""" + +from __future__ import annotations + +import csv +import io +import json +import os +import re +from dataclasses import dataclass, field +from typing import Optional + +from PiFinder.composite_object import MagnitudeObject, SizeObject + + +# ── Data model ────────────────────────────────────────────────────────── + + +@dataclass +class ObsListEntry: + """Common interchange format for a single observing list object.""" + + name: str + ra: float # RA in degrees (0-360), J2000 + dec: float # Dec in degrees (-90 to +90), J2000 + obj_type: str = "" + mag: MagnitudeObject = field(default_factory=lambda: MagnitudeObject([])) + catalog_code: str = "" + sequence: int = 0 + description: str = "" + size: SizeObject = field(default_factory=lambda: SizeObject([])) + catalog_names: list[str] = field(default_factory=list) + + +@dataclass +class ObsList: + """An observing list with a name and list of entries.""" + + name: str + entries: list[ObsListEntry] = field(default_factory=list) + + +# ── Supported extensions ──────────────────────────────────────────────── + +SUPPORTED_EXTENSIONS = ( + ".skylist", + ".csv", + ".sol", + ".hct", + ".lst", + ".txt", + ".mtf", + ".pifinder", +) + + +# ── Coordinate helpers ────────────────────────────────────────────────── + + +def ra_to_hms(ra_deg: float) -> tuple[int, int, float]: + """Convert RA in degrees to (hours, minutes, seconds).""" + hours = ra_deg / 15.0 + h = int(hours) + rem = (hours - h) * 60 + m = int(rem) + s = (rem - m) * 60 + return h, m, s + + +def dec_to_dms(dec_deg: float) -> tuple[str, int, int, float]: + """Convert Dec in degrees to (sign, degrees, arcminutes, arcseconds).""" + sign = "+" if dec_deg >= 0 else "-" + abs_dec = abs(dec_deg) + d = int(abs_dec) + rem = (abs_dec - d) * 60 + m = int(rem) + s = (rem - m) * 60 + return sign, d, m, s + + +def hms_to_ra(h: int, m: int, s: float) -> float: + """Convert (hours, minutes, seconds) to RA in degrees.""" + return (h + m / 60.0 + s / 3600.0) * 15.0 + + +def dms_to_dec(sign: str, d: int, m: int, s: float) -> float: + """Convert (sign, degrees, arcminutes, arcseconds) to Dec in degrees.""" + dec = d + m / 60.0 + s / 3600.0 + if sign == "-": + dec = -dec + return dec + + +def format_ra_string(ra: float) -> str: + h, m, s = ra_to_hms(ra) + return f"{h}h {m}m {s:.1f}s" + + +def format_dec_string(dec: float) -> str: + sign, d, m, s = dec_to_dms(dec) + return f"{sign}{d}\u00b0 {m}' {round(s)}\"" + + +def _parse_ra_string(s: str) -> float: + """Parse RA from 'Xh Xm X.Xs' format to degrees.""" + match = re.match(r"(\d+)h\s+(\d+)m\s+([\d.]+)s", s.strip()) + if match: + return hms_to_ra( + int(match.group(1)), int(match.group(2)), float(match.group(3)) + ) + return 0.0 + + +def _parse_dec_string(s: str) -> float: + """Parse Dec from '+/-X deg X' X"' format to degrees.""" + match = re.match(r"([+-]?)(\d+)[°]\s*(\d+)['']\s*(\d+)", s.strip()) + if match: + sign = match.group(1) or "+" + return dms_to_dec( + sign, int(match.group(2)), int(match.group(3)), float(match.group(4)) + ) + return 0.0 + + +_TOUR_MARKERS = {"end of tour", "end of list", "end tour"} + + +def _is_tour_marker(name: str) -> bool: + return name.strip().lower() in _TOUR_MARKERS + + +def _parse_hms_colon(s: str) -> float: + """Parse RA from 'HH:MM:SS' to degrees.""" + parts = s.strip().split(":") + if len(parts) == 3: + return hms_to_ra(int(parts[0]), int(parts[1]), float(parts[2])) + return 0.0 + + +def _parse_dms_colon(s: str) -> float: + """Parse Dec from '(+/-)DD:MM:SS' to degrees.""" + s = s.strip() + sign = "+" + if s.startswith("-"): + sign = "-" + s = s[1:] + elif s.startswith("+"): + s = s[1:] + parts = s.split(":") + if len(parts) == 3: + return dms_to_dec(sign, int(parts[0]), int(parts[1]), float(parts[2])) + return 0.0 + + +def _parse_catalog_name(name: str) -> tuple[str, int]: + """Extract catalog code and sequence from 'NGC 224' or 'NGC224' or 'Sh2 155'.""" + name = name.strip() + # With space: "NGC 7640", "Sh2 155", "Messier 31" + match = re.match(r"([A-Za-z]+\d*[A-Za-z]*)\s+(\d+)$", name) + if match: + return match.group(1), int(match.group(2)) + # No space: "NGC7640", "M31" — split at letter/digit boundary + match = re.match(r"([A-Za-z]+)(\d+)$", name) + if match: + return match.group(1), int(match.group(2)) + return "", 0 + + +# ── Type mapping tables ───────────────────────────────────────────────── + +ARGO_TYPE_MAP: dict[str, str] = { + "Gx": "GALAXY", + "OC": "OPEN", + "Gb": "GLOBULAR", + "PN": "PLANETARY", + "Nb": "NEBULA", + "DN": "DARK", + "*": "STAR", + "D*": "DOUBLE", + "***": "TRIPLE", + "C+N": "NEBULA", + "Kt": "NEBULA", + "Ast": "ASTERISM", + "Pla": "STAR", + "CM": "COMET", + "?": "USER", +} +ARGO_TYPE_MAP_INV: dict[str, str] = {} +for _k, _v in ARGO_TYPE_MAP.items(): + if _v not in ARGO_TYPE_MAP_INV: + ARGO_TYPE_MAP_INV[_v] = _k + +CELESTRON_TYPE_MAP: dict[str, str] = { + "Gx": "Galaxy", + "OC": "Open Cluster", + "Gb": "Globular Cluster", + "PN": "Planetary Nebula", + "Nb": "Nebula", + "DN": "Nebula", + "*": "Star", + "D*": "Double Star", + "***": "Triple Star", + "C+N": "Nebula", + "Kt": "Nebula", + "Ast": "Asterism", + "Pla": "Star", + "CM": "Star", + "?": "Star", +} +CELESTRON_TYPE_MAP_INV: dict[str, str] = {} +for _k, _v in CELESTRON_TYPE_MAP.items(): + if _v not in CELESTRON_TYPE_MAP_INV: + CELESTRON_TYPE_MAP_INV[_v] = _k + +SKYSAFARI_CATALOG_NAMES: dict[str, str] = { + "CAL": "C", + "COL": "Cr", +} +SKYSAFARI_CATALOG_NAMES_INV: dict[str, str] = { + v: k for k, v in SKYSAFARI_CATALOG_NAMES.items() +} + + +def _skylist_object_id(obj_type: str) -> str: + if obj_type in ("*", "D*", "***"): + return "2,-1,-1" + if obj_type == "Pla": + return "1,-1,-1" + return "4,-1,-1" + + +# ── SkySafari (.skylist) ──────────────────────────────────────────────── + + +def write_skylist(obs_list: ObsList) -> str: + lines = ["SkySafariObservingListVersion=3.0"] + for i, entry in enumerate(obs_list.entries): + cat = SKYSAFARI_CATALOG_NAMES.get(entry.catalog_code, entry.catalog_code) + catalog_num = ( + f"{cat} {entry.sequence}" if cat and entry.sequence else entry.name + ) + lines.extend( + [ + "SkyObject=BeginObject", + f" ObjectID={_skylist_object_id(entry.obj_type)}", + f" CatalogNumber={catalog_num}", + f" DefaultIndex={i}", + ] + ) + if entry.ra or entry.dec: + lines.append(f" EndObjectRA={entry.ra / 15.0}") + lines.append(f" EndObjectDec={entry.dec}") + if entry.description: + lines.append(f" Comment={entry.description}") + lines.append("EndObject=SkyObject") + return "\n".join(lines) + "\n" + + +def read_skylist(text: str) -> ObsList: + entries: list[ObsListEntry] = [] + catalog_numbers: list[str] = [] + comment = "" + end_ra: Optional[float] = None + end_dec: Optional[float] = None + in_object = False + + for line in text.splitlines(): + line = line.strip() + if line == "SkyObject=BeginObject": + catalog_numbers = [] + comment = "" + end_ra = None + end_dec = None + in_object = True + elif line == "EndObject=SkyObject" and in_object: + name = ( + catalog_numbers[0].strip() + if catalog_numbers + else f"OBJ {len(entries) + 1}" + ) + catalog_code, sequence = _parse_catalog_name(name) + ra = end_ra * 15.0 if end_ra is not None else 0.0 + dec = end_dec if end_dec is not None else 0.0 + entries.append( + ObsListEntry( + name=name, + ra=ra, + dec=dec, + catalog_code=catalog_code, + sequence=sequence, + description=comment, + catalog_names=list(catalog_numbers), + ) + ) + in_object = False + elif line.startswith("CatalogNumber=") and in_object: + catalog_numbers.append(line.split("=", 1)[1]) + elif line.startswith("Comment=") and in_object: + comment = line.split("=", 1)[1].strip() + elif line.startswith("EndObjectRA=") and in_object: + try: + end_ra = float(line.split("=", 1)[1]) + except ValueError: + pass + elif line.startswith("EndObjectDec=") and in_object: + try: + end_dec = float(line.split("=", 1)[1]) + except ValueError: + pass + + return ObsList(name="", entries=entries) + + +# ── CSV (.csv) ────────────────────────────────────────────────────────── + +_CSV_HEADER = "Name,RA,Dec,Magnitude,Type,CatalogCode,Sequence" + + +def write_csv(obs_list: ObsList) -> str: + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow( + ["Name", "RA", "Dec", "Magnitude", "Type", "CatalogCode", "Sequence"] + ) + for entry in obs_list.entries: + mag_str = ( + f"{entry.mag.filter_mag:.1f}" + if entry.mag.filter_mag != MagnitudeObject.UNKNOWN_MAG + else "" + ) + writer.writerow( + [ + entry.name, + format_ra_string(entry.ra), + format_dec_string(entry.dec), + mag_str, + entry.obj_type, + entry.catalog_code, + entry.sequence, + ] + ) + return buf.getvalue() + + +def read_csv(text: str) -> ObsList: + entries: list[ObsListEntry] = [] + reader = csv.DictReader(io.StringIO(text)) + for row in reader: + name = row.get("Name", "") + ra_str = row.get("RA", "") + dec_str = row.get("Dec", "") + mag_str = row.get("Magnitude", "") + obj_type = row.get("Type", "") + catalog_code = row.get("CatalogCode", "") + seq_str = row.get("Sequence", "0") + + ra = _parse_ra_string(ra_str) if ra_str else 0.0 + dec = _parse_dec_string(dec_str) if dec_str else 0.0 + mag: Optional[float] = None + if mag_str: + try: + mag = float(mag_str) + except ValueError: + pass + try: + sequence = int(seq_str) + except ValueError: + sequence = 0 + + entries.append( + ObsListEntry( + name=name, + ra=ra, + dec=dec, + obj_type=obj_type, + mag=MagnitudeObject([mag] if mag is not None else []), + catalog_code=catalog_code, + sequence=sequence, + ) + ) + return ObsList(name="", entries=entries) + + +# ── Plain Text (.txt) ────────────────────────────────────────────────── + + +def write_text(obs_list: ObsList) -> str: + return "\n".join(entry.name for entry in obs_list.entries) + "\n" + + +def read_text(text: str) -> ObsList: + entries: list[ObsListEntry] = [] + for line in text.splitlines(): + name = line.strip() + if not name: + continue + catalog_code, sequence = _parse_catalog_name(name) + entries.append( + ObsListEntry( + name=name, + ra=0.0, + dec=0.0, + catalog_code=catalog_code, + sequence=sequence, + ) + ) + return ObsList(name="", entries=entries) + + +# ── Stellarium (.sol) ────────────────────────────────────────────────── + + +def write_stellarium(obs_list: ObsList) -> str: + data = { + "version": "1.0", + "shortName": obs_list.name, + "description": "Exported from PiFinder", + "objects": [ + { + "designation": entry.name, + "objtype": entry.obj_type, + "ra": format_ra_string(entry.ra), + "dec": format_dec_string(entry.dec), + "magnitude": f"{entry.mag.filter_mag:.2f}" + if entry.mag.filter_mag != MagnitudeObject.UNKNOWN_MAG + else "", + } + for entry in obs_list.entries + ], + } + return json.dumps(data, indent=2) + + +def read_stellarium(text: str) -> ObsList: + data = json.loads(text) + name = data.get("shortName", "") + entries: list[ObsListEntry] = [] + for obj in data.get("objects", []): + designation = obj.get("designation", "") + obj_type = obj.get("objtype", "") + ra = _parse_ra_string(obj.get("ra", "")) + dec = _parse_dec_string(obj.get("dec", "")) + mag: Optional[float] = None + mag_str = obj.get("magnitude", "") + if mag_str: + try: + mag = float(mag_str) + except ValueError: + pass + catalog_code, sequence = _parse_catalog_name(designation) + entries.append( + ObsListEntry( + name=designation, + ra=ra, + dec=dec, + obj_type=obj_type, + mag=MagnitudeObject([mag] if mag is not None else []), + catalog_code=catalog_code, + sequence=sequence, + ) + ) + return ObsList(name=name, entries=entries) + + +# ── Autostar Tour (.txt) ─────────────────────────────────────────────── + + +def write_autostar(obs_list: ObsList) -> str: + title = obs_list.name[:15] + lines = ["/ PiFinder export", f'TITLE "{title}"'] + for entry in obs_list.entries: + h, m, s = ra_to_hms(entry.ra) + ra_str = f"{h:02d}:{m:02d}:{round(s):02d}" + sign, d, dm, ds = dec_to_dms(entry.dec) + sign_char = "-" if sign == "-" else "" + dec_str = f"{sign_char}{d:02d}d{dm:02d}m{round(ds):02d}s" + obj_title = entry.name[:16] + mag_str = ( + f" mag {entry.mag:.1f}" + if entry.mag.filter_mag != MagnitudeObject.UNKNOWN_MAG + else "" + ) + lines.append( + f'USER {ra_str} {dec_str} "{obj_title}" "{entry.obj_type}{mag_str}"' + ) + return "\n".join(lines) + "\n" + + +def read_autostar(text: str) -> ObsList: + entries: list[ObsListEntry] = [] + name = "" + for line in text.splitlines(): + line = line.strip() + if line.startswith("TITLE "): + match = re.match(r'TITLE\s+"([^"]*)"', line) + if match: + name = match.group(1) + continue + if not line.startswith("USER "): + continue + # USER HH:MM:SS ±DDdMMmSSs "name" "type mag" + match = re.match( + r'USER\s+(\d{2}:\d{2}:\d{2})\s+([+-]?\d{1,2}d\d{2}m\d{2}s)\s+"([^"]*)"\s+"([^"]*)"', + line, + ) + if not match: + continue + ra = _parse_hms_colon(match.group(1)) + dec_str = match.group(2) + dec_match = re.match(r"([+-]?)(\d+)d(\d+)m(\d+)s", dec_str) + dec = 0.0 + if dec_match: + dec_sign = "-" if dec_match.group(1) == "-" else "+" + dec = dms_to_dec( + dec_sign, + int(dec_match.group(2)), + int(dec_match.group(3)), + float(dec_match.group(4)), + ) + obj_name = match.group(3) + type_mag = match.group(4) + obj_type = "" + mag: Optional[float] = None + mag_match = re.match(r"(\S+)\s+mag\s+([\d.]+)", type_mag) + if mag_match: + obj_type = mag_match.group(1) + try: + mag = float(mag_match.group(2)) + except ValueError: + pass + else: + obj_type = type_mag.strip() + + catalog_code, sequence = _parse_catalog_name(obj_name) + entries.append( + ObsListEntry( + name=obj_name, + ra=ra, + dec=dec, + obj_type=obj_type, + mag=MagnitudeObject([mag] if mag is not None else []), + catalog_code=catalog_code, + sequence=sequence, + ) + ) + return ObsList(name=name, entries=entries) + + +# ── Argo Navis (.txt) ────────────────────────────────────────────────── + + +def write_argo(obs_list: ObsList) -> str: + lines: list[str] = [] + for entry in obs_list.entries: + h, m, s = ra_to_hms(entry.ra) + ra_str = f"{h:02d}:{m:02d}:{round(s):02d}" + sign, d, dm, ds = dec_to_dms(entry.dec) + dec_str = f"{sign}{d:02d}:{dm:02d}:{round(ds):02d}" + atype = ARGO_TYPE_MAP.get(entry.obj_type, "USER") + mag_str = ( + f"{entry.mag.filter_mag:.1f}" + if entry.mag.filter_mag != MagnitudeObject.UNKNOWN_MAG + else "ANY" + ) + lines.append(f"{entry.name}|{ra_str}|{dec_str}|{atype}|{mag_str}|") + return "\r\n".join(lines) + "\r\n" + + +def read_argo(text: str) -> ObsList: + entries: list[ObsListEntry] = [] + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("|") + if len(parts) < 5: + continue + obj_name = parts[0].strip() + ra = _parse_hms_colon(parts[1]) + dec = _parse_dms_colon(parts[2]) + obj_type = ARGO_TYPE_MAP_INV.get(parts[3].strip(), "") + mag: Optional[float] = None + mag_str = parts[4].strip() + if mag_str and mag_str != "ANY": + try: + mag = float(mag_str) + except ValueError: + pass + catalog_code, sequence = _parse_catalog_name(obj_name) + entries.append( + ObsListEntry( + name=obj_name, + ra=ra, + dec=dec, + obj_type=obj_type, + mag=MagnitudeObject([mag] if mag is not None else []), + catalog_code=catalog_code, + sequence=sequence, + ) + ) + return ObsList(name="", entries=entries) + + +# ── NexTour (.hct) ───────────────────────────────────────────────────── + + +def write_nextour(obs_list: ObsList) -> str: + lines: list[str] = [] + for entry in obs_list.entries: + ra_hours = entry.ra / 15.0 + ra_h = int(ra_hours) + ra_m = (ra_hours - ra_h) * 60 + dec_sign = "+" if entry.dec >= 0 else "-" + dec_abs = abs(entry.dec) + dec_d = int(dec_abs) + dec_m = (dec_abs - dec_d) * 60 + category = entry.catalog_code or "User" + obj_num = str(entry.sequence) if entry.sequence else "" + ctype = CELESTRON_TYPE_MAP.get(entry.obj_type, "Star") + mag_str = ( + f"{entry.mag.filter_mag:.1f}" + if entry.mag.filter_mag != MagnitudeObject.UNKNOWN_MAG + else "" + ) + lines.append( + f"{category}#{obj_num}#{entry.name}#{ctype}#{mag_str}#" + f"#{ra_h}#{ra_m:.1f}#{dec_sign}#{dec_d}#{dec_m:.1f}#" + ) + return "\r\n".join(lines) + "\r\n" + + +def _detect_nextour_variant(text: str) -> str: + """Detect which NexTour variant: 'coord_first' or 'catalog_first'.""" + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + # CSOG variant starts with HH:MM:SS + if re.match(r"\d{2}:\d{2}:\d{2}", line): + return "coord_first" + return "catalog_first" + return "catalog_first" + + +def _read_nextour_coord_first(text: str) -> ObsList: + """Parse CSOG-style NexTour: RA#±Dec#Name#...""" + entries: list[ObsListEntry] = [] + for line in text.splitlines(): + line = line.strip() + if not line: + continue + parts = line.split("#") + if len(parts) < 3: + continue + ra_str = parts[0].strip() + dec_str = parts[1].strip() + obj_name = parts[2].strip() + if not obj_name or not re.match(r"\d{2}:\d{2}:", ra_str): + continue + ra = _parse_hms_colon(ra_str) + dec = _parse_dms_colon(dec_str) + if _is_tour_marker(obj_name): + continue + catalog_code, sequence = _parse_catalog_name(obj_name) + entries.append( + ObsListEntry( + name=obj_name, + ra=ra, + dec=dec, + catalog_code=catalog_code, + sequence=sequence, + ) + ) + return ObsList(name="", entries=entries) + + +def _read_nextour_catalog_first(text: str) -> ObsList: + """Parse web-export NexTour: category#objNum#name#type#mag##raH#raM#sign#decD#decM#""" + entries: list[ObsListEntry] = [] + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split("#") + if len(parts) < 12: + continue + category = parts[0].strip() + obj_num_str = parts[1].strip() + obj_name = parts[2].strip() + ctype = parts[3].strip() + mag_str = parts[4].strip() + # parts[5] is empty (double hash) + ra_h_str = parts[6].strip() + ra_m_str = parts[7].strip() + dec_sign = parts[8].strip() + dec_d_str = parts[9].strip() + dec_m_str = parts[10].strip() + + try: + ra_h = int(ra_h_str) + ra_m = float(ra_m_str) + ra = (ra_h + ra_m / 60.0) * 15.0 + except ValueError: + ra = 0.0 + try: + dec_d = int(dec_d_str) + dec_m = float(dec_m_str) + dec = dec_d + dec_m / 60.0 + if dec_sign == "-": + dec = -dec + except ValueError: + dec = 0.0 + + obj_type = CELESTRON_TYPE_MAP_INV.get(ctype, "") + mag: Optional[float] = None + if mag_str: + try: + mag = float(mag_str) + except ValueError: + pass + try: + obj_num = int(obj_num_str) + except ValueError: + obj_num = 0 + + catalog_code = category if category != "User" else "" + entries.append( + ObsListEntry( + name=obj_name, + ra=ra, + dec=dec, + obj_type=obj_type, + mag=MagnitudeObject([mag] if mag is not None else []), + catalog_code=catalog_code, + sequence=obj_num, + ) + ) + return ObsList(name="", entries=entries) + + +def read_nextour(text: str) -> ObsList: + variant = _detect_nextour_variant(text) + if variant == "coord_first": + return _read_nextour_coord_first(text) + return _read_nextour_catalog_first(text) + + +# ── EQMOD Tour (.lst) ────────────────────────────────────────────────── + + +def write_eqmod(obs_list: ObsList) -> str: + lines = ["!J2000", f"# {obs_list.name} - exported from PiFinder"] + for entry in obs_list.entries: + ra_hours = entry.ra / 15.0 + lines.append(f"{ra_hours:.4f}; {entry.dec:.4f}; {entry.name}") + return "\n".join(lines) + "\n" + + +def read_eqmod(text: str) -> ObsList: + entries: list[ObsListEntry] = [] + name = "" + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("!"): + continue + if line.startswith("#"): + if not name: + name = line.lstrip("# ").split(" - ")[0].strip() + continue + parts = line.split(";") + if len(parts) < 3: + continue + try: + ra_hours = float(parts[0].strip()) + ra = ra_hours * 15.0 + except ValueError: + continue + try: + dec = float(parts[1].strip()) + except ValueError: + continue + obj_name = parts[2].strip() + if _is_tour_marker(obj_name): + continue + catalog_code, sequence = _parse_catalog_name(obj_name) + entries.append( + ObsListEntry( + name=obj_name, + ra=ra, + dec=dec, + catalog_code=catalog_code, + sequence=sequence, + ) + ) + return ObsList(name=name, entries=entries) + + +# ── PiFinder (.pifinder) ─────────────────────────────────────────────── + + +def write_pifinder(obs_list: ObsList) -> str: + objects = [] + for entry in obs_list.entries: + if entry.catalog_code and entry.sequence: + obj: dict = { + "catalog_code": entry.catalog_code, + "sequence": entry.sequence, + } + else: + obj = { + "name": entry.name, + "obj_type": entry.obj_type or "?", + "ra": entry.ra, + "dec": entry.dec, + } + if entry.mag.filter_mag != MagnitudeObject.UNKNOWN_MAG: + obj["mag"] = { + "mags": entry.mag.mags, + "filter_mag": entry.mag.filter_mag, + } + if entry.description: + obj["notes"] = entry.description + objects.append(obj) + data = { + "version": 1, + "name": obs_list.name, + "objects": objects, + } + return json.dumps(data, indent=2) + + +_EPOCH_JD = { + "J2000": 2451545.0, +} +_J2000_JD = 2451545.0 + + +def _epoch_to_jd(epoch_str: str) -> float: + """Convert epoch string like 'J2000' or 'J2016.0' to Julian Date.""" + if epoch_str in _EPOCH_JD: + return _EPOCH_JD[epoch_str] + if epoch_str.startswith("J"): + year = float(epoch_str[1:]) + return _J2000_JD + (year - 2000.0) * 365.25 + raise ValueError(f"Unsupported epoch: {epoch_str}") + + +def _precess_to_j2000(ra_deg: float, dec_deg: float, from_jd: float) -> tuple: + """Precess (ra_deg, dec_deg) from given epoch to J2000.""" + if from_jd == _J2000_JD: + return ra_deg, dec_deg + from PiFinder.calc_utils import epoch_to_epoch + + ra_h_from = ra_deg / 15.0 + ra_h, dec = epoch_to_epoch(from_jd, _J2000_JD, ra_h_from, dec_deg) + return ra_h._degrees, dec.degrees + + +class PiFinderFormatError(ValueError): + """Raised when a .pifinder file fails validation.""" + + +def _validate_pifinder(data: dict) -> None: + """Validate top-level .pifinder structure.""" + if not isinstance(data, dict): + raise PiFinderFormatError("Root must be a JSON object") + for key in ("version", "name", "objects"): + if key not in data: + raise PiFinderFormatError(f"Missing required field: {key}") + if data["version"] != 1: + raise PiFinderFormatError(f"Unsupported version: {data['version']}") + if not isinstance(data["objects"], list): + raise PiFinderFormatError("'objects' must be an array") + + +def _validate_pifinder_object(obj: dict, index: int) -> None: + """Validate a single object entry.""" + prefix = f"objects[{index}]" + if not isinstance(obj, dict): + raise PiFinderFormatError(f"{prefix}: must be a JSON object") + if "catalog_code" in obj: + if "sequence" not in obj: + raise PiFinderFormatError(f"{prefix}: catalog entry missing 'sequence'") + else: + for key in ("name", "obj_type", "ra", "dec"): + if key not in obj: + raise PiFinderFormatError(f"{prefix}: custom entry missing '{key}'") + extents = obj.get("extents") + if extents is not None: + if not isinstance(extents, dict): + raise PiFinderFormatError(f"{prefix}: 'extents' must be an object") + if "shape" not in extents: + raise PiFinderFormatError(f"{prefix}: extents missing 'shape'") + shape = extents["shape"] + if isinstance(shape, list) and shape and isinstance(shape[0], list): + if "geometry" not in extents: + raise PiFinderFormatError( + f"{prefix}: nested shape requires 'geometry' " + f"('polyline' or 'segments')" + ) + + +def _precess_size_to_j2000(size: SizeObject, from_jd: float) -> SizeObject: + """Precess all RA/Dec coordinates in a SizeObject to J2000.""" + if from_jd == _J2000_JD: + return size + if size.is_segments: + new_shape = [] + for seg in size.extents: + new_seg = [] + for pt in seg: + ra, dec = _precess_to_j2000(pt[0], pt[1], from_jd) + new_seg.append([ra, dec]) + new_shape.append(new_seg) + return SizeObject(new_shape, size.position_angle, geometry=size.geometry) + if size.is_vertices: + new_shape = [] + for pt in size.extents: + ra, dec = _precess_to_j2000(pt[0], pt[1], from_jd) + new_shape.append([ra, dec]) + return SizeObject(new_shape, size.position_angle, geometry=size.geometry) + return size + + +def read_pifinder(text: str) -> ObsList: + data = json.loads(text) + _validate_pifinder(data) + name = data["name"] + entries: list[ObsListEntry] = [] + for i, obj in enumerate(data["objects"]): + _validate_pifinder_object(obj, i) + notes = obj.get("notes", "") + if "catalog_code" in obj: + entries.append( + ObsListEntry( + name=f"{obj['catalog_code']} {obj['sequence']}", + ra=0.0, + dec=0.0, + catalog_code=obj["catalog_code"], + sequence=obj["sequence"], + description=notes, + ) + ) + else: + epoch_str = obj.get("epoch", "J2000") + from_jd = _epoch_to_jd(epoch_str) + + raw_mag = obj.get("mag") + if isinstance(raw_mag, dict): + mag_obj = MagnitudeObject(raw_mag.get("mags", [])) + elif raw_mag is not None: + mag_obj = MagnitudeObject([float(raw_mag)]) + else: + mag_obj = MagnitudeObject([]) + + ra, dec = _precess_to_j2000(float(obj["ra"]), float(obj["dec"]), from_jd) + + extents_data = obj.get("extents") + if extents_data: + shape = extents_data["shape"] + pa = extents_data.get("position_angle", 0.0) + geometry = extents_data.get("geometry", "") + size = _precess_size_to_j2000( + SizeObject(shape, pa, geometry=geometry), from_jd + ) + else: + size = SizeObject([]) + entries.append( + ObsListEntry( + name=obj["name"], + ra=ra, + dec=dec, + obj_type=obj["obj_type"], + mag=mag_obj, + description=notes, + size=size, + ) + ) + return ObsList(name=name, entries=entries) + + +# ── Format detection ─────────────────────────────────────────────────── + +_FORMAT_BY_EXT: dict[str, str] = { + ".skylist": "skylist", + ".sol": "stellarium", + ".hct": "nextour", + ".lst": "eqmod", + ".csv": "csv", + ".mtf": "autostar", + ".pifinder": "pifinder", +} + +_READERS: dict[str, object] = { + "skylist": read_skylist, + "csv": read_csv, + "text": read_text, + "stellarium": read_stellarium, + "autostar": read_autostar, + "argo": read_argo, + "nextour": read_nextour, + "eqmod": read_eqmod, + "pifinder": read_pifinder, +} + +_WRITERS: dict[str, object] = { + "skylist": write_skylist, + "csv": write_csv, + "text": write_text, + "stellarium": write_stellarium, + "autostar": write_autostar, + "argo": write_argo, + "nextour": write_nextour, + "eqmod": write_eqmod, + "pifinder": write_pifinder, +} + + +def detect_format(text: str, filename: str = "") -> str: + """Auto-detect observing list format by extension or content sniffing.""" + if filename: + _, ext = os.path.splitext(filename.lower()) + fmt = _FORMAT_BY_EXT.get(ext) + if fmt: + return fmt + + stripped = text.lstrip() + if "SkySafariObservingListVersion" in text: + return "skylist" + if stripped.startswith("{"): + try: + data = json.loads(text) + if "version" in data and "objects" in data: + return "pifinder" + except (json.JSONDecodeError, ValueError): + pass + return "stellarium" + if stripped.startswith("!J2000"): + return "eqmod" + # Check for pipe-delimited lines (Argo Navis) + for line in text.splitlines()[:10]: + line = line.strip() + if line and not line.startswith("#") and line.count("|") >= 4: + return "argo" + # Check for USER lines (Autostar) + for line in text.splitlines()[:20]: + if line.strip().startswith("USER "): + return "autostar" + # Check for CSV header + if stripped.startswith("Name,") or stripped.startswith('"Name"'): + return "csv" + + return "text" + + +def read_file(path: str) -> ObsList: + """Read an observing list file, auto-detecting the format.""" + with open(path, "r") as f: + text = f.read() + fmt = detect_format(text, os.path.basename(path)) + reader = _READERS.get(fmt, read_text) + obs_list = reader(text) + if not obs_list.name: + obs_list.name = os.path.splitext(os.path.basename(path))[0] + return obs_list + + +def write_file(obs_list: ObsList, path: str, fmt: str) -> None: + """Write an observing list to a file in the specified format.""" + writer = _WRITERS.get(fmt) + if not writer: + raise ValueError(f"Unknown format: {fmt}") + content = writer(obs_list) + with open(path, "w") as f: + f.write(content) diff --git a/python/PiFinder/plot.py b/python/PiFinder/plot.py index 9712b291..d956ba03 100644 --- a/python/PiFinder/plot.py +++ b/python/PiFinder/plot.py @@ -315,6 +315,32 @@ def plot_markers(self, marker_list): return ret_image + def project_vertices(self, vertices): + """Project RA/Dec vertex pairs to screen pixel coords. + + vertices: list of [ra_deg, dec_deg] pairs. + Returns list of (x, y) screen tuples. + """ + rows = [(Angle(degrees=ra)._hours, dec) for ra, dec in vertices] + df = pandas.DataFrame(rows, columns=["ra_hours", "dec_degrees"]) + df["epoch_year"] = 1991.25 + positions = self.earth.observe(Star.from_dataframe(df)) + df["x"], df["y"] = self.projection(positions) + + roll_rad = self.roll * (np.pi / 180) + roll_sin = np.sin(roll_rad) + roll_cos = np.cos(roll_rad) + + df = df.assign( + xr=df["x"] * roll_cos - df["y"] * roll_sin, + yr=df["y"] * roll_cos + df["x"] * roll_sin, + ) + df = df.assign( + x_pos=df["xr"] * self.pixel_scale + self.render_center[0], + y_pos=df["yr"] * -1 * self.pixel_scale + self.render_center[1], + ) + return list(zip(df["x_pos"], df["y_pos"])) + def update_projection(self, ra, dec): """ Updates the shared projection used for various plotting diff --git a/python/PiFinder/pos_server.py b/python/PiFinder/pos_server.py index 023a21eb..92a21852 100644 --- a/python/PiFinder/pos_server.py +++ b/python/PiFinder/pos_server.py @@ -16,7 +16,7 @@ from multiprocessing import Queue from typing import Tuple, Union from PiFinder.calc_utils import ra_to_deg, dec_to_deg, sf_utils -from PiFinder.composite_object import CompositeObject, MagnitudeObject +from PiFinder.composite_object import CompositeObject, MagnitudeObject, SizeObject from PiFinder.multiproclogging import MultiprocLogging from skyfield.positionlib import position_of_radec import sys @@ -209,7 +209,7 @@ def handle_goto_command(shared_state, ra_parsed, dec_parsed): "ra": comp_ra, "dec": comp_dec, "const": constellation, - "size": "", + "size": SizeObject([]), "mag": MagnitudeObject([]), "catalog_code": "PUSH", "sequence": sequence, diff --git a/python/PiFinder/ui/callbacks.py b/python/PiFinder/ui/callbacks.py index 71c5190b..bbf7b90a 100644 --- a/python/PiFinder/ui/callbacks.py +++ b/python/PiFinder/ui/callbacks.py @@ -22,7 +22,7 @@ from PiFinder.ui.base import UIModule from PiFinder.ui.textentry import UITextEntry from PiFinder.catalogs import CatalogFilter -from PiFinder.composite_object import CompositeObject, MagnitudeObject +from PiFinder.composite_object import CompositeObject, MagnitudeObject, SizeObject if TYPE_CHECKING: @@ -424,7 +424,7 @@ def create_custom_object_from_coords( "ra": ra_deg, "dec": dec_deg, "const": constellation, - "size": "", + "size": SizeObject([]), "mag": MagnitudeObject([]), "mag_str": "", "catalog_code": "USER", diff --git a/python/PiFinder/ui/chart.py b/python/PiFinder/ui/chart.py index 55349f68..f8dc0b9b 100644 --- a/python/PiFinder/ui/chart.py +++ b/python/PiFinder/ui/chart.py @@ -58,6 +58,7 @@ def plot_markers(self): return marker_list = [] + vertex_objects = [] # is there a target? target = self.ui_state.target() @@ -65,12 +66,16 @@ def plot_markers(self): marker_list.append( (plot.Angle(degrees=target.ra)._hours, target.dec, "target") ) + if target.size.is_vertices: + vertex_objects.append(target) marker_brightness = self.config_object.get_option("chart_dso", 128) if marker_brightness == 0: return for obs_target in self.ui_state.observing_list(): + if obs_target.size.is_vertices: + vertex_objects.append(obs_target) marker = OBJ_TYPE_MARKERS.get(obs_target.obj_type) if marker: marker_list.append( @@ -96,6 +101,13 @@ def plot_markers(self): ) self.screen.paste(ImageChops.add(self.screen, marker_image)) + if vertex_objects: + line_color = self.colors.get(marker_brightness) + for obj in vertex_objects: + screen_pts = self.starfield.project_vertices(obj.size.extents) + if len(screen_pts) >= 2: + self.draw.line(screen_pts, fill=line_color, width=1) + def draw_reticle(self): """ draw the reticle if desired diff --git a/python/PiFinder/ui/menu_structure.py b/python/PiFinder/ui/menu_structure.py index 4412aef8..14019a87 100644 --- a/python/PiFinder/ui/menu_structure.py +++ b/python/PiFinder/ui/menu_structure.py @@ -13,6 +13,7 @@ from PiFinder.ui.sqm import UISQM from PiFinder.ui.equipment import UIEquipment from PiFinder.ui.location_list import UILocationList +from PiFinder.ui.obs_list import UIObsList from PiFinder.ui.locationentry import UILocationEntry from PiFinder.ui.radec_entry import UIRADecEntry import PiFinder.ui.callbacks as callbacks @@ -251,6 +252,10 @@ def _(key: str) -> Any: "objects": "recent", "label": "recent", }, + { + "name": _("Obs Lists"), + "class": UIObsList, + }, { "name": _("Custom"), "class": UIRADecEntry, @@ -890,6 +895,45 @@ def _(key: str) -> Any: }, ], }, + { + "name": _("Image..."), + "class": UITextMenu, + "select": "single", + "items": [ + { + "name": _("NSEW Labels"), + "class": UITextMenu, + "select": "single", + "config_option": "image_nsew", + "items": [ + { + "name": _("On"), + "value": True, + }, + { + "name": _("Off"), + "value": False, + }, + ], + }, + { + "name": _("Object Size"), + "class": UITextMenu, + "select": "single", + "config_option": "image_bbox", + "items": [ + { + "name": _("On"), + "value": True, + }, + { + "name": _("Off"), + "value": False, + }, + ], + }, + ], + }, { "name": _("Camera Exp"), "class": UITextMenu, diff --git a/python/PiFinder/ui/object_details.py b/python/PiFinder/ui/object_details.py index 59d867e9..b81686de 100644 --- a/python/PiFinder/ui/object_details.py +++ b/python/PiFinder/ui/object_details.py @@ -9,6 +9,7 @@ from pydeepskylog.exceptions import InvalidParameterError from PiFinder import cat_images +from PiFinder.composite_object import MagnitudeObject from PiFinder.ui.marking_menus import MarkingMenuOption, MarkingMenu from PiFinder.obj_types import OBJ_TYPES from PiFinder.ui.align import align_on_radec @@ -224,26 +225,29 @@ def update_object_info(self): self.config_object.equipment.active_telescope, self.config_object.equipment.active_eyepiece, ) - if self.object.mag_str == "-": + mag = self.object.mag + magnitude = ( + mag.filter_mag + if mag is not None and mag.filter_mag != MagnitudeObject.UNKNOWN_MAG + else None + ) + if magnitude is None: self.contrast = "" else: try: - if self.object.size: - # Check if the size contains 'x' - if "x" in self.object.size: - diameter1, diameter2 = map( - float, self.object.size.split("x") - ) - diameter1 = ( - diameter1 * 60.0 - ) # Convert arc seconds to arc minutes - diameter2 = diameter2 * 60.0 - elif "'" in self.object.size: - # Convert arc minutes to arc seconds - diameter1 = float(self.object.size.replace("'", "")) * 60.0 - diameter2 = diameter1 + size = self.object.size + if ( + size + and size.extents + and not size.is_vertices + and not size.is_segments + ): + # SizeObject.extents are stored in arcseconds. + if len(size.extents) >= 2: + diameter1 = float(size.extents[0]) + diameter2 = float(size.extents[1]) else: - diameter1 = diameter2 = float(self.object.size) * 60.0 + diameter1 = diameter2 = float(size.extents[0]) else: diameter1 = diameter2 = None @@ -252,7 +256,7 @@ def update_object_info(self): telescope_diameter=self.config_object.equipment.active_telescope.aperture_mm, magnification=magnification, surf_brightness=None, - magnitude=float(self.object.mag_str), + magnitude=magnitude, object_diameter1=diameter1, object_diameter2=diameter2, ) @@ -317,6 +321,9 @@ def update_object_info(self): self.display_class, burn_in=self.object_display_mode in [DM_POSS, DM_SDSS], magnification=magnification, + telescope=self.config_object.equipment.active_telescope, + show_nsew=self.config_object.get_option("image_nsew", True), + show_bbox=self.config_object.get_option("image_bbox", True), ) def active(self): @@ -324,11 +331,10 @@ def active(self): def _check_catalog_initialized(self): code = self.object.catalog_code - if code in ["PUSH", "USER"]: - # Special codes for objects pushed from sky-safari or created by user - return True catalog = self.catalogs.get_catalog_by_code(code) - return catalog and catalog.initialized + if catalog is None: + return True + return catalog.initialized def _render_pointing_instructions(self): # Pointing Instructions diff --git a/python/PiFinder/ui/object_list.py b/python/PiFinder/ui/object_list.py index cb64ec93..c9fba56f 100644 --- a/python/PiFinder/ui/object_list.py +++ b/python/PiFinder/ui/object_list.py @@ -396,7 +396,8 @@ def create_aka_text(self, obj: CompositeObject) -> str: def create_info_text(self, obj: CompositeObject) -> str: obj_mag = self._safe_obj_mag(obj) mag = f"m{obj_mag:2.0f}" if obj_mag != MagnitudeObject.UNKNOWN_MAG else "m--" - size = f"{self.ruler}{obj.size.strip()}" if obj.size.strip() else "" + size_str = str(obj.size) + size = f"{self.ruler}{size_str}" if size_str else "" check = f" {self.checkmark}" if obj.logged else "" size_logged = f"{mag} {size}{check}" if len(size_logged) > 12: diff --git a/python/PiFinder/ui/obs_list.py b/python/PiFinder/ui/obs_list.py new file mode 100644 index 00000000..c79541a7 --- /dev/null +++ b/python/PiFinder/ui/obs_list.py @@ -0,0 +1,177 @@ +""" +UI module for browsing and loading +observing lists from ~/PiFinder_data/obslists/ + +Supports all formats handled by obslist_formats: +SkySafari, CSV, Stellarium, Autostar, Argo Navis, NexTour, EQMOD, plain text. +""" + +import os +import logging +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + + def _(a) -> Any: + return a + + +from PiFinder.ui.text_menu import UITextMenu +from PiFinder.ui.object_list import UIObjectList +from PiFinder.ui.ui_utils import TextLayouterScroll +from PiFinder import obslist + +logger = logging.getLogger("UI.ObsList") + + +class UIObsList(UITextMenu): + """Lists available .skylist files and folders, supports subfolder navigation.""" + + __title__ = "Obs Lists" + + def __init__(self, *args, **kwargs): + incoming = kwargs.get("item_definition", {}) + subdir = incoming.get("subdir", "") + + entries = obslist.get_lists(subdir) + items = [] + for entry in entries: + if entry["type"] == "folder": + items.append( + { + "name": f"[{entry['name']}]", + "class": UIObsList, + "subdir": entry["subdir"], + } + ) + else: + items.append( + { + "name": entry["name"], + "value": entry["path"], + } + ) + + title = os.path.basename(subdir) if subdir else "Obs Lists" + kwargs["item_definition"] = { + "name": title, + "select": "single", + "items": items, + } + super().__init__(*args, **kwargs) + self.__title__ = title + self._scroll_text = None + self._scroll_item = None + + def _get_scrollspeed(self): + scroll_dict = { + "Fast": TextLayouterScroll.FAST, + "Med": TextLayouterScroll.MEDIUM, + "Slow": TextLayouterScroll.SLOW, + } + return scroll_dict.get( + self.config_object.get_option("text_scroll_speed", "Med"), + TextLayouterScroll.MEDIUM, + ) + + def update(self, force=False): + self.clear_screen() + self.draw.rectangle((-1, 60, 129, 80), outline=self.colors.get(128), width=1) + + line_number = 0 + line_horiz_pos = 13 + + for i in range(self._current_item_index - 3, self._current_item_index + 4): + if 0 <= i < self.get_nr_of_menu_items(): + line_font = self.fonts.base + if line_number == 0: + line_color = 96 + line_pos = 0 + elif line_number == 1: + line_color = 128 + line_pos = 13 + elif line_number == 2: + line_color = 192 + line_font = self.fonts.bold + line_pos = 25 + elif line_number == 3: + line_color = 256 + line_font = self.fonts.large + line_pos = 40 + elif line_number == 4: + line_color = 192 + line_font = self.fonts.bold + line_pos = 60 + elif line_number == 5: + line_color = 128 + line_pos = 76 + else: + line_color = 96 + line_pos = 89 + + line_pos += 20 + item_text = str(self._menu_items[i]) + + if line_number == 3: + # Scroll the selected item if it's too long + if self._scroll_item != item_text: + self._scroll_item = item_text + self._scroll_text = TextLayouterScroll( + text=_(item_text), + draw=self.draw, + color=self.colors.get(line_color), + font=line_font, + scrollspeed=self._get_scrollspeed(), + ) + self._scroll_text.draw((line_horiz_pos, line_pos)) + else: + self.draw.text( + (line_horiz_pos, line_pos), + _(item_text), + font=line_font.font, + fill=self.colors.get(line_color), + ) + + line_number += 1 + + return self.screen_update() + + def key_right(self): + if not self._menu_items: + return False + + selected = self._menu_items[self._current_item_index] + item_def = self.get_item(selected) + + if item_def and item_def.get("class"): + self.add_to_stack(item_def) + return False + + list_name = item_def["value"] + + result = obslist.read_list(self.catalogs, list_name) + catalog_objects = result.get("catalog_objects", []) + + parsed = result.get("objects_parsed", 0) + matched = len(catalog_objects) + + if result["result"] != "success": + self.message(f"Error loading\n{parsed} parsed", 2) + return False + + self.ui_state.set_observing_list(catalog_objects) + display_name = os.path.splitext(os.path.basename(list_name))[0] + self.message(f"{display_name}\n{matched}/{parsed} objects", 2) + + object_list_def = { + "name": display_name, + "class": UIObjectList, + "objects": "custom", + "object_list": catalog_objects, + "label": "obs_list", + } + self.add_to_stack(object_list_def) + return False + + def key_left(self): + return True diff --git a/python/PiFinder/utils.py b/python/PiFinder/utils.py index 52322853..4d130d98 100644 --- a/python/PiFinder/utils.py +++ b/python/PiFinder/utils.py @@ -127,25 +127,3 @@ def is_number(s): return False -def format_size_value(value): - """ - Format a size value, removing unnecessary .0 decimals but preserving meaningful decimals. - - Examples: - 17.0 -> "17" - 17.5 -> "17.5" - 17.25 -> "17.3" (rounded to 1 decimal) - """ - if value is None or value == "": - return "" - - try: - num_val = float(value) - # If it's a whole number, return as integer - if num_val == int(num_val): - return str(int(num_val)) - # Otherwise, round to 1 decimal and remove trailing zeros - formatted = f"{num_val:.1f}" - return formatted.rstrip("0").rstrip(".") - except (ValueError, TypeError): - return str(value) # Return as-is if not a number diff --git a/python/tests/test_cat_images.py b/python/tests/test_cat_images.py new file mode 100644 index 00000000..4ea48352 --- /dev/null +++ b/python/tests/test_cat_images.py @@ -0,0 +1,297 @@ +import math +import pytest +from PiFinder.cat_images import ( + cardinal_vectors, + size_overlay_points, + vertex_overlay_points, +) +from PiFinder.composite_object import SizeObject + + +def approx_pt(pt, abs=1e-6): + return pytest.approx(pt, abs=abs) + + +# --- cardinal_vectors --- + + +@pytest.mark.unit +class TestCardinalVectors: + def test_no_rotation(self): + """image_rotate=0: POSS north-up, east-left → N at (0, -1), E at (-1, 0).""" + (nx, ny), (ex, ey) = cardinal_vectors(0) + assert (nx, ny) == approx_pt((0, -1)) + assert (ex, ey) == approx_pt((-1, 0)) + + def test_180_rotation(self): + """image_rotate=180: N flips to (0, 1), E to (1, 0).""" + (nx, ny), (ex, ey) = cardinal_vectors(180) + assert (nx, ny) == approx_pt((0, 1)) + assert (ex, ey) == approx_pt((1, 0)) + + def test_90_rotation(self): + """image_rotate=90: N at (1, 0), E at (0, -1).""" + (nx, ny), (ex, ey) = cardinal_vectors(90) + assert (nx, ny) == approx_pt((1, 0)) + assert (ex, ey) == approx_pt((0, -1)) + + def test_flip_mirrors_x(self): + """flip negates x components of both vectors.""" + (nx, ny), (ex, ey) = cardinal_vectors(0, fx=-1) + assert (nx, ny) == approx_pt((0, -1)) + assert (ex, ey) == approx_pt((1, 0)) + + def test_flop_mirrors_y(self): + """flop negates y components of both vectors.""" + (nx, ny), (ex, ey) = cardinal_vectors(0, fy=-1) + assert (nx, ny) == approx_pt((0, 1)) + assert (ex, ey) == approx_pt((-1, 0)) + + def test_flip_and_flop(self): + """Both flip and flop: equivalent to 180° rotation of vectors.""" + (nx, ny), (ex, ey) = cardinal_vectors(0, fx=-1, fy=-1) + assert (nx, ny) == approx_pt((0, 1)) + assert (ex, ey) == approx_pt((1, 0)) + + def test_orthogonality(self): + """N and E should always be perpendicular.""" + for angle in [0, 45, 90, 135, 180, 270]: + for fx, fy in [(1, 1), (-1, 1), (1, -1), (-1, -1)]: + (nx, ny), (ex, ey) = cardinal_vectors(angle, fx, fy) + dot = nx * ex + ny * ey + assert dot == pytest.approx(0, abs=1e-10), ( + f"Not orthogonal at angle={angle}, fx={fx}, fy={fy}" + ) + + def test_unit_length(self): + """N and E vectors should have unit length.""" + for angle in [0, 30, 45, 90, 180, 270]: + (nx, ny), (ex, ey) = cardinal_vectors(angle) + assert math.hypot(nx, ny) == pytest.approx(1) + assert math.hypot(ex, ey) == pytest.approx(1) + + +# --- size_overlay_points --- + + +@pytest.mark.unit +class TestSizeOverlayPoints: + def test_single_extent_returns_none(self): + """1 extent → None (caller uses native ellipse).""" + assert size_overlay_points([100], 0, 0, 1.0, 64, 64) is None + + def test_empty_returns_none(self): + assert size_overlay_points([], 0, 0, 1.0, 64, 64) is None + + def test_two_extents_point_count(self): + """2 extents → 36-point ellipse polygon.""" + pts = size_overlay_points([120, 60], 0, 0, 1.0, 64, 64) + assert len(pts) == 36 + + def test_two_extents_centered(self): + """Ellipse centroid should be at (cx, cy).""" + cx, cy = 64, 64 + pts = size_overlay_points([120, 60], 0, 0, 1.0, cx, cy) + avg_x = sum(p[0] for p in pts) / len(pts) + avg_y = sum(p[1] for p in pts) / len(pts) + assert avg_x == pytest.approx(cx, abs=0.1) + assert avg_y == pytest.approx(cy, abs=0.1) + + def test_two_extents_symmetry(self): + """No rotation, no PA: major axis aligned with North (vertical).""" + cx, cy = 64, 64 + pts = size_overlay_points([120, 60], 0, 0, 1.0, cx, cy) + xs = [p[0] - cx for p in pts] + ys = [p[1] - cy for p in pts] + # PA=0 → major axis along North → vertical + assert max(abs(x) for x in xs) == pytest.approx(30, abs=0.5) + assert max(abs(y) for y in ys) == pytest.approx(60, abs=0.5) + + def test_two_extents_rotation(self): + """90° image rotation moves major axis from vertical to horizontal.""" + cx, cy = 64, 64 + pts = size_overlay_points([120, 60], 0, 90, 1.0, cx, cy) + xs = [p[0] - cx for p in pts] + ys = [p[1] - cy for p in pts] + # 90° rotation: North moves to +X, major axis now horizontal + assert max(abs(x) for x in xs) == pytest.approx(60, abs=0.5) + assert max(abs(y) for y in ys) == pytest.approx(30, abs=0.5) + + def test_position_angle(self): + """PA=90 rotates opposite to image_rotate (PA goes N→E, image_rotate goes CW).""" + cx, cy = 64, 64 + pts_rot = size_overlay_points([120, 60], 0, 270, 1.0, cx, cy) + pts_pa = size_overlay_points([120, 60], 90, 0, 1.0, cx, cy) + for a, b in zip(pts_rot, pts_pa): + assert a[0] == pytest.approx(b[0], abs=1e-6) + assert a[1] == pytest.approx(b[1], abs=1e-6) + + def test_pa90_aligns_with_east(self): + """PA=90° major axis must align with the East vector from cardinal_vectors.""" + cx, cy = 64, 64 + for rot in [0, 90, 180, 270]: + _, (ex, ey) = cardinal_vectors(rot) + pts = size_overlay_points([200, 40], 90, rot, 1.0, cx, cy) + dists = [(p[0] - cx, p[1] - cy) for p in pts] + farthest = max(dists, key=lambda d: math.hypot(*d)) + direction = ( + farthest[0] / math.hypot(*farthest), + farthest[1] / math.hypot(*farthest), + ) + dot = abs(direction[0] * ex + direction[1] * ey) + assert dot == pytest.approx(1.0, abs=0.02), ( + f"PA=90 major axis not along East at image_rotate={rot}" + ) + + def test_pa0_aligns_with_north(self): + """PA=0 major axis must align with the North vector from cardinal_vectors.""" + cx, cy = 64, 64 + for rot in [0, 90, 180, 270]: + (nx, ny), _ = cardinal_vectors(rot) + pts = size_overlay_points([200, 40], 0, rot, 1.0, cx, cy) + # Find the point farthest from center — should be along North + dists = [(p[0] - cx, p[1] - cy) for p in pts] + farthest = max(dists, key=lambda d: math.hypot(*d)) + direction = ( + farthest[0] / math.hypot(*farthest), + farthest[1] / math.hypot(*farthest), + ) + # Should be parallel to North (same or opposite direction) + dot = abs(direction[0] * nx + direction[1] * ny) + assert dot == pytest.approx(1.0, abs=0.02), ( + f"PA=0 major axis not along North at image_rotate={rot}" + ) + + def test_flip_mirrors_x(self): + """fx=-1 mirrors all points horizontally around cx.""" + cx, cy = 64, 64 + pts_normal = size_overlay_points([120, 60], 30, 180, 1.0, cx, cy) + pts_flip = size_overlay_points([120, 60], 30, 180, 1.0, cx, cy, fx=-1) + for a, b in zip(pts_normal, pts_flip): + assert a[0] - cx == pytest.approx(-(b[0] - cx), abs=1e-6) + assert a[1] == pytest.approx(b[1], abs=1e-6) + + def test_flop_mirrors_y(self): + """fy=-1 mirrors all points vertically around cy.""" + cx, cy = 64, 64 + pts_normal = size_overlay_points([120, 60], 30, 180, 1.0, cx, cy) + pts_flop = size_overlay_points([120, 60], 30, 180, 1.0, cx, cy, fy=-1) + for a, b in zip(pts_normal, pts_flop): + assert a[0] == pytest.approx(b[0], abs=1e-6) + assert a[1] - cy == pytest.approx(-(b[1] - cy), abs=1e-6) + + def test_three_extents_point_count(self): + """3+ extents → polygon with len(extents) points.""" + pts = size_overlay_points([100, 80, 60, 90], 0, 0, 1.0, 64, 64) + assert len(pts) == 4 + + def test_px_per_arcsec_scaling(self): + """Doubling px_per_arcsec doubles the distance from center.""" + cx, cy = 64, 64 + pts1 = size_overlay_points([120, 60], 0, 0, 1.0, cx, cy) + pts2 = size_overlay_points([120, 60], 0, 0, 2.0, cx, cy) + for a, b in zip(pts1, pts2): + assert (b[0] - cx) == pytest.approx(2 * (a[0] - cx), abs=1e-6) + assert (b[1] - cy) == pytest.approx(2 * (a[1] - cy), abs=1e-6) + + +# --- SizeObject vertex mode --- + + +@pytest.mark.unit +class TestSizeObjectVertices: + def test_from_vertices_stores_nested_pairs(self): + verts = [[10.0, 20.0], [10.1, 20.1], [10.2, 20.0]] + s = SizeObject.from_vertices(verts) + assert s.extents == verts + assert s.position_angle == 0.0 + + def test_is_vertices_true(self): + s = SizeObject.from_vertices([[10.0, 20.0], [10.1, 20.1]]) + assert s.is_vertices is True + + def test_is_vertices_false_for_numeric(self): + s = SizeObject.from_arcsec(100, 50) + assert s.is_vertices is False + + def test_is_vertices_false_for_empty(self): + s = SizeObject([]) + assert s.is_vertices is False + + def test_max_extent_arcsec_same_dec(self): + """Two points at same dec, 1° apart in RA at dec=0.""" + s = SizeObject.from_vertices([[10.0, 0.0], [11.0, 0.0]]) + expected = 3600.0 # 1 degree = 3600 arcsec + assert s.max_extent_arcsec == pytest.approx(expected, rel=1e-3) + + def test_max_extent_arcsec_same_ra(self): + """Two points at same RA, 0.5° apart in dec.""" + s = SizeObject.from_vertices([[10.0, 20.0], [10.0, 20.5]]) + expected = 1800.0 # 0.5 degree + assert s.max_extent_arcsec == pytest.approx(expected, rel=1e-3) + + def test_max_extent_arcsec_numeric_fallback(self): + s = SizeObject.from_arcsec(100, 200, 150) + assert s.max_extent_arcsec == 200 + + def test_to_display_string_vertices(self): + """Vertex mode shows ~span format.""" + s = SizeObject.from_vertices([[10.0, 20.0], [10.0, 20.5]]) + display = s.to_display_string() + assert display.startswith("~") + assert "'" in display # 1800 arcsec = 30 arcmin + + def test_json_roundtrip(self): + verts = [[10.0, 20.0], [10.1, 20.1]] + s = SizeObject.from_vertices(verts) + s2 = SizeObject.from_json(s.to_json()) + assert s2.is_vertices is True + assert s2.extents == verts + + +# --- vertex_overlay_points --- + + +@pytest.mark.unit +class TestVertexOverlayPoints: + def test_center_vertex_at_center(self): + """A vertex at the object center projects to (cx, cy).""" + pts = vertex_overlay_points([[10.0, 20.0]], 10.0, 20.0, 0, 1.0, 64, 64) + assert len(pts) == 1 + assert pts[0][0] == pytest.approx(64, abs=0.1) + assert pts[0][1] == pytest.approx(64, abs=0.1) + + def test_offset_vertex_north(self): + """A vertex 100" north of center should appear above center (lower y).""" + dec_offset = 100.0 / 3600.0 # 100 arcsec in degrees + pts = vertex_overlay_points( + [[10.0, 20.0 + dec_offset]], 10.0, 20.0, 0, 1.0, 64, 64 + ) + # image_rotate=0: POSS has N at top of raw image but after + # the 180+roll rotation in get_display_image, here we test + # raw projection + assert len(pts) == 1 + # With image_rotate=0 and no flip, north (positive dec) goes to negative dy + assert pts[0][1] < 64 + + def test_two_vertices_produce_two_points(self): + pts = vertex_overlay_points( + [[10.0, 20.0], [10.01, 20.01]], 10.0, 20.0, 0, 1.0, 64, 64 + ) + assert len(pts) == 2 + + def test_scaling(self): + """Doubling px_per_arcsec doubles offset from center.""" + dec_off = 100.0 / 3600.0 + pts1 = vertex_overlay_points( + [[10.0, 20.0 + dec_off]], 10.0, 20.0, 0, 1.0, 64, 64 + ) + pts2 = vertex_overlay_points( + [[10.0, 20.0 + dec_off]], 10.0, 20.0, 0, 2.0, 64, 64 + ) + dx1 = pts1[0][0] - 64 + dy1 = pts1[0][1] - 64 + dx2 = pts2[0][0] - 64 + dy2 = pts2[0][1] - 64 + assert dx2 == pytest.approx(2 * dx1, abs=0.1) + assert dy2 == pytest.approx(2 * dy1, abs=0.1) diff --git a/python/tests/test_obslist_formats.py b/python/tests/test_obslist_formats.py new file mode 100644 index 00000000..08a13290 --- /dev/null +++ b/python/tests/test_obslist_formats.py @@ -0,0 +1,231 @@ +""" +Roundtrip tests for obslist_formats. + +Each format is tested by creating an ObsList, writing it to a string, +reading it back, and verifying the entries match. +""" + +import pytest +from PiFinder.obslist_formats import ( + ObsList, + ObsListEntry, + detect_format, + ra_to_hms, + dec_to_dms, + hms_to_ra, + dms_to_dec, + read_skylist, + write_skylist, + read_csv, + write_csv, + read_text, + write_text, + read_stellarium, + write_stellarium, + read_autostar, + write_autostar, + read_argo, + write_argo, + read_nextour, + write_nextour, + read_eqmod, + write_eqmod, +) + + +def _sample_entries(): + return [ + ObsListEntry( + name="NGC 224", + ra=10.6847, + dec=41.2689, + obj_type="Gx", + mag=3.4, + catalog_code="NGC", + sequence=224, + description="Andromeda Galaxy", + ), + ObsListEntry( + name="M 42", + ra=83.8221, + dec=-5.3911, + obj_type="Nb", + mag=4.0, + catalog_code="M", + sequence=42, + ), + ObsListEntry( + name="NGC 7789", + ra=359.33, + dec=56.726, + obj_type="OC", + mag=6.7, + catalog_code="NGC", + sequence=7789, + ), + ] + + +def _sample_list(): + return ObsList(name="Test List", entries=_sample_entries()) + + +def _assert_entries_close(original, parsed, check_type=True, check_mag=True, ra_tol=0.1, dec_tol=0.05): + """Verify parsed entries match originals within tolerance.""" + assert len(parsed) == len(original) + for orig, got in zip(original, parsed): + assert got.name == orig.name + if orig.ra > 0 or orig.dec != 0: + assert got.ra == pytest.approx(orig.ra, abs=ra_tol) + assert got.dec == pytest.approx(orig.dec, abs=dec_tol) + if check_type: + assert got.obj_type == orig.obj_type + if check_mag and orig.mag is not None: + assert got.mag == pytest.approx(orig.mag, abs=0.15) + assert got.catalog_code == orig.catalog_code + assert got.sequence == orig.sequence + + +# ── Coordinate helper tests ───────────────────────────────────────────── + + +@pytest.mark.unit +class TestCoordinateHelpers: + def test_ra_roundtrip(self): + for ra in [0.0, 45.0, 90.0, 180.0, 270.0, 359.99]: + h, m, s = ra_to_hms(ra) + assert hms_to_ra(h, m, s) == pytest.approx(ra, abs=0.01) + + def test_dec_roundtrip(self): + for dec in [-89.5, -45.0, 0.0, 41.27, 89.99]: + sign, d, m, s = dec_to_dms(dec) + assert dms_to_dec(sign, d, m, s) == pytest.approx(dec, abs=0.01) + + +# ── Format roundtrip tests ────────────────────────────────────────────── + + +@pytest.mark.unit +def test_skylist_roundtrip(): + obs = _sample_list() + text = write_skylist(obs) + assert "SkySafariObservingListVersion" in text + parsed = read_skylist(text) + assert len(parsed.entries) == 3 + for orig, got in zip(obs.entries, parsed.entries): + assert got.catalog_code == orig.catalog_code + assert got.sequence == orig.sequence + if orig.ra: + assert got.ra == pytest.approx(orig.ra, abs=0.1) + if orig.dec: + assert got.dec == pytest.approx(orig.dec, abs=0.05) + if orig.description: + assert got.description == orig.description + + +@pytest.mark.unit +def test_csv_roundtrip(): + obs = _sample_list() + text = write_csv(obs) + parsed = read_csv(text) + _assert_entries_close(obs.entries, parsed.entries, ra_tol=0.15, dec_tol=0.15) + + +@pytest.mark.unit +def test_text_roundtrip(): + obs = _sample_list() + text = write_text(obs) + parsed = read_text(text) + assert len(parsed.entries) == 3 + for orig, got in zip(obs.entries, parsed.entries): + assert got.name == orig.name + assert got.catalog_code == orig.catalog_code + assert got.sequence == orig.sequence + + +@pytest.mark.unit +def test_stellarium_roundtrip(): + obs = _sample_list() + text = write_stellarium(obs) + parsed = read_stellarium(text) + assert parsed.name == "Test List" + _assert_entries_close(obs.entries, parsed.entries, ra_tol=0.15, dec_tol=0.15) + + +@pytest.mark.unit +def test_autostar_roundtrip(): + obs = _sample_list() + text = write_autostar(obs) + parsed = read_autostar(text) + assert parsed.name == "Test List" + # Autostar rounds coordinates to whole seconds + _assert_entries_close( + obs.entries, parsed.entries, ra_tol=0.5, dec_tol=0.5, check_type=True + ) + + +@pytest.mark.unit +def test_argo_roundtrip(): + obs = _sample_list() + text = write_argo(obs) + parsed = read_argo(text) + # Argo rounds to whole seconds, type map loses some info + _assert_entries_close(obs.entries, parsed.entries, ra_tol=0.5, dec_tol=0.5) + + +@pytest.mark.unit +def test_nextour_roundtrip(): + obs = _sample_list() + text = write_nextour(obs) + parsed = read_nextour(text) + # NexTour stores fractional minutes, moderate precision + _assert_entries_close( + obs.entries, parsed.entries, ra_tol=0.5, dec_tol=0.5, check_type=False + ) + + +@pytest.mark.unit +def test_eqmod_roundtrip(): + obs = _sample_list() + text = write_eqmod(obs) + parsed = read_eqmod(text) + assert parsed.name == "Test List" + # EQMOD stores 4 decimal places, good precision + _assert_entries_close( + obs.entries, parsed.entries, ra_tol=0.02, dec_tol=0.01, check_type=False, check_mag=False + ) + + +# ── Format detection tests ────────────────────────────────────────────── + + +@pytest.mark.unit +class TestDetectFormat: + def test_by_extension(self): + assert detect_format("", "mylist.skylist") == "skylist" + assert detect_format("", "mylist.sol") == "stellarium" + assert detect_format("", "mylist.hct") == "nextour" + assert detect_format("", "mylist.lst") == "eqmod" + assert detect_format("", "mylist.csv") == "csv" + + def test_by_content_skylist(self): + assert detect_format("SkySafariObservingListVersion=3.0\n") == "skylist" + + def test_by_content_stellarium(self): + assert detect_format('{"version": "1.0"}') == "stellarium" + + def test_by_content_eqmod(self): + assert detect_format("!J2000\n1.234; 45.678; NGC 224\n") == "eqmod" + + def test_by_content_argo(self): + assert detect_format("NGC 224|00:42:44|+41:16:09|GALAXY|3.4|\n") == "argo" + + def test_by_content_autostar(self): + text = '/ comment\nTITLE "Test"\nUSER 00:42:44 41d16m09s "NGC 224" "Gx"\n' + assert detect_format(text) == "autostar" + + def test_by_content_csv(self): + assert detect_format("Name,RA,Dec,Magnitude\n") == "csv" + + def test_by_content_text_fallback(self): + assert detect_format("NGC 224\nM 42\n") == "text"