Source code for hpmcm.classify

from __future__ import annotations

from collections import OrderedDict
from typing import TYPE_CHECKING, Any

import numpy as np

if TYPE_CHECKING:
    from .cluster import ClusterData
    from .match import Match


[docs] def clusterStats(cluster_dict: OrderedDict[int, ClusterData]) -> np.ndarray: """Helper function to get stats about the clusters Parameters ---------- cluster_dict: Dict from clusterId to ClusterData object Returns ------- Cluster Statistics (nClusters, n_orphan, n_mixed, n_confused) Notes ----- Return array contains nClusters: Total number of clusters n_orphan: Number of single source clusters (i.e., single detections) n_mixed: Number of clusters with more than one source from each input catalog n_confused: Number of souces with more than four cases of duplication """ n_orphan = 0 n_mixed = 0 n_confused = 0 for val in cluster_dict.values(): if val.n_src == 1: n_orphan += 1 if val.n_src != val.n_unique: n_mixed += 1 if val.n_src > val.n_unique + 3: # pragma: no cover n_confused += 1 return np.array([len(cluster_dict), n_orphan, n_mixed, n_confused])
[docs] def printSummaryStats(matcher: Match) -> np.ndarray: """Helper function to print info about clusters""" stats = np.zeros((4), int) for key, cell_data in matcher.cell_dict.items(): cell_stats = clusterStats(cell_data.cluster_dict) print( f"{key:5}: " f"{cell_stats[0]:5} " f"{cell_stats[1]:5} " f"{cell_stats[2]:5} " f"{cell_stats[3]:5}" ) stats += cell_stats return stats
[docs] def classifyClusters(matcher: Match, **kwargs: Any) -> dict[str, list]: """Sort clusters by their properties This will return a dict of lists of clusters of various types """ nsrcs = [] cut1 = [] cut2 = [] used = [] ideal_faint = [] ideal = [] faint = [] edge_mixed = [] mixed = [] edge_missing = [] edge_extra = [] missing = [] two_missing = [] many_missing = [] extra = [] caught = [] cell_edge = kwargs.get("cell_edge", 75) edge_cut = kwargs.get("edge_cut", 2) snr_cut = kwargs.get("snr_cut", 7.5) n_cat = len(matcher.red_data) for i_c, cell_data in matcher.cell_dict.items(): cd = cell_data.cluster_dict for key, c in cd.items(): k = (i_c, key) assert c.data is not None nsrcs.append(c.n_src) if (np.fabs(c.data.x_cell_coadd) > cell_edge).all() or ( np.fabs(c.data.y_cell_coadd) > cell_edge ).all(): cut1.append(k) continue if ( np.fabs(c.data.x_cell_coadd.mean()) > cell_edge or np.fabs(c.data.y_cell_coadd.mean()) > cell_edge ): cut2.append(k) continue used.append(k) edge_case = False is_faint = False if (np.fabs(c.data.x_cell_coadd) > cell_edge - edge_cut).any() or ( np.fabs(c.data.y_cell_coadd) > cell_edge - edge_cut ).any(): edge_case = True if (c.data.snr < snr_cut).any(): is_faint = True if c.n_src == c.n_unique and c.n_src == n_cat and is_faint: ideal_faint.append(k) elif c.n_src == c.n_unique and c.n_src == n_cat: ideal.append(k) elif c.n_src < n_cat and is_faint: faint.append(k) elif ( c.n_src == n_cat and c.n_unique != n_cat and edge_case ): # pragma: no cover edge_mixed.append(k) elif c.n_src == n_cat and c.n_unique != n_cat: # pragma: no cover mixed.append(k) elif c.n_src < n_cat and edge_case: # pragma: no cover edge_missing.append(k) elif c.n_src > n_cat and edge_case: # pragma: no cover edge_extra.append(k) elif c.n_src == n_cat - 1: # pragma: no cover missing.append(k) elif c.n_src == n_cat - 2: # pragma: no cover two_missing.append(k) elif c.n_src < n_cat - 2: many_missing.append(k) elif c.n_src > n_cat: extra.append(k) else: # pragma: no cover caught.append(k) return dict( nsrcs=nsrcs, cut1=cut1, cut2=cut2, used=used, ideal_faint=ideal_faint, ideal=ideal, faint=faint, edge_mixed=edge_mixed, mixed=mixed, edge_missing=edge_missing, edge_extra=edge_extra, missing=missing, two_missing=two_missing, many_missing=many_missing, extra=extra, caught=caught, )
[docs] def matchObjectsAgainstRef(matcher: Match, **kwargs: Any) -> dict[str, list]: """Match objects against the reference catalog""" nsrcs = [] used = [] ideal_faint = [] ideal = [] faint = [] not_in_ref = [] not_in_ref_faint = [] in_ref = [] extra = [] missing = [] two_missing = [] many_missing = [] caught = [] snr_cut = kwargs.get("snr_cut", 7.5) n_cat = len(matcher.red_data) for i_c, cell_data in matcher.cell_dict.items(): od = cell_data.object_dict for key, c in od.items(): k = (i_c, key) assert c.data is not None nsrcs.append(c.n_src) used.append(k) is_faint = False if (c.data.snr < snr_cut).any(): is_faint = True if (c.catalog_id == 0).any(): in_ref.append(k) else: if is_faint: not_in_ref_faint.append(k) else: not_in_ref.append(k) continue if c.n_src == c.n_unique and c.n_src == n_cat and is_faint: ideal_faint.append(k) elif c.n_src == c.n_unique and c.n_src == n_cat: ideal.append(k) elif is_faint: faint.append(k) elif c.n_src == n_cat - 1: missing.append(k) elif c.n_src == n_cat - 2: # pragma: no cover two_missing.append(k) elif c.n_src < n_cat - 2: # pragma: no cover many_missing.append(k) elif c.n_src > n_cat: extra.append(k) else: # pragma: no cover caught.append(k) return dict( nsrcs=nsrcs, used=used, ideal_faint=ideal_faint, ideal=ideal, faint=faint, missing=missing, in_ref=in_ref, not_in_ref=not_in_ref, not_in_ref_faint=not_in_ref_faint, extra=extra, two_missing=two_missing, many_missing=many_missing, caught=caught, )
[docs] def printObjectMatchTypes(o_dict: dict) -> None: """Print numbers of different types of object matches""" print("All ", len(o_dict["nsrcs"])) print("Used ", len(o_dict["used"])) print(" New ", len(o_dict["not_in_ref"])) print(" New (faint) ", len(o_dict["not_in_ref_faint"])) print("In Ref ", len(o_dict["in_ref"])) print("Faint ", len(o_dict["faint"])) print("Good ", len(o_dict["ideal"])) print(" Good (faint) ", len(o_dict["ideal_faint"])) print("Missing ", len(o_dict["missing"])) print("Two Missing ", len(o_dict["two_missing"])) print("All Missing ", len(o_dict["many_missing"])) print("Extra ", len(o_dict["extra"])) print("Caught ", len(o_dict["caught"]))
[docs] def classifyObjects(matcher: Match, **kwargs: Any) -> dict[str, list]: """Sort objects by their properties This will return a dict of lists of objects """ nsrcs = [] cut1 = [] cut2 = [] used = [] ideal_faint = [] ideal = [] faint = [] edge_mixed = [] mixed = [] edge_missing = [] edge_extra = [] orphan = [] missing = [] two_missing = [] many_missing = [] extra = [] caught = [] cell_edge = kwargs.get("cell_edge", 75) edge_cut = kwargs.get("edge_cut", 2) snr_cut = kwargs.get("snr_cut", 7.5) n_cat = len(matcher.red_data) for i_c, cell_data in matcher.cell_dict.items(): od = cell_data.object_dict for key, c in od.items(): k = (i_c, key) assert c.data is not None nsrcs.append(c.n_src) try: if (np.fabs(c.data.x_cell_coadd) > cell_edge).all() or ( np.fabs(c.data.y_cell_coadd) > cell_edge ).all(): cut1.append(k) continue except Exception: pass try: if ( np.fabs(c.data.x_cell_coadd.mean()) > cell_edge or np.fabs(c.data.y_cell_coadd.mean()) > cell_edge ): cut2.append(k) continue except Exception: pass used.append(k) edge_case = False is_faint = False try: if (np.fabs(c.data.x_cell_coadd) > cell_edge - edge_cut).any() or ( np.fabs(c.data.y_cell_coadd) > cell_edge - edge_cut ).any(): edge_case = True except Exception: edge_case = False if (c.data.snr < snr_cut).any(): is_faint = True if c.n_src == c.n_unique and c.n_src == n_cat and is_faint: ideal_faint.append(k) elif c.n_src == c.n_unique and c.n_src == n_cat: ideal.append(k) elif c.n_src < n_cat and is_faint: faint.append(k) elif ( c.n_src == n_cat and c.n_unique != n_cat and edge_case ): # pragma: no cover edge_mixed.append(k) elif c.n_src == n_cat and c.n_unique != n_cat: mixed.append(k) elif c.n_src < n_cat and edge_case: # pragma: no cover edge_missing.append(k) elif ( c.n_src < n_cat and c.parent_cluster.n_src >= n_cat ): # pragma: no cover orphan.append(k) elif c.n_src == n_cat - 1: missing.append(k) elif c.n_src == n_cat - 2: # pragma: no cover two_missing.append(k) elif c.n_src < n_cat - 2: many_missing.append(k) elif c.n_src > n_cat and edge_case: # pragma: no cover edge_extra.append(k) elif c.n_src > n_cat: extra.append(k) else: # pragma: no cover caught.append(k) return dict( nsrcs=nsrcs, cut1=cut1, cut2=cut2, used=used, ideal_faint=ideal_faint, ideal=ideal, faint=faint, edge_mixed=edge_mixed, mixed=mixed, edge_missing=edge_missing, edge_extra=edge_extra, orphan=orphan, missing=missing, two_missing=two_missing, many_missing=many_missing, extra=extra, caught=caught, )
[docs] def printClusterTypes(cluster_types: dict[str, list]) -> None: """Print numbers of different types of clusters""" print( "All Clusters: ", len(cluster_types["nsrcs"]), ) print("cut 1 ", len(cluster_types["cut1"])) print("cut 2 ", len(cluster_types["cut2"])) print("Used: ", len(cluster_types["used"])) print( "good (n source from n catalogs): ", len(cluster_types["ideal"]), ) print( "good faint ", len(cluster_types["ideal_faint"]), ) print( "faint (< n sources, snr < cut): ", len(cluster_types["faint"]), ) print( "mixed (n source from < n catalogs): ", len(cluster_types["mixed"]), ) print( "edge_mixed (mixed near edge of cell): ", len(cluster_types["edge_mixed"]), ) print( "edge_missing (< n sources, near edge of cell): ", len(cluster_types["edge_missing"]), ) print( "edge_extra (> n sources, near edge of cell): ", len(cluster_types["edge_extra"]), ) print( "faint (< n sources, snr < cut): ", len(cluster_types["faint"]), ) print( "one missing (n-1 sources, not near edge): ", len(cluster_types["missing"]), ) print( "two missing (n-2 sources, not near edge): ", len(cluster_types["two_missing"]), ) print( "many missing (< n-2 sources, not near edge): ", len(cluster_types["many_missing"]), ) print( "extra (> n sources, not near edge): ", len(cluster_types["extra"]), )
[docs] def printObjectTypes(object_types: dict[str, list]) -> None: """Print numbers of different types of objects""" print("All Objects: ", len(object_types["nsrcs"])) print("cut 1 ", len(object_types["cut1"])) print("cut 2 ", len(object_types["cut2"])) print("Used: ", len(object_types["used"])) print("good (n source from n catalogs): ", len(object_types["ideal"])) print( "good faint ", len(object_types["ideal_faint"]), ) print("faint (< n sources, snr < cut): ", len(object_types["faint"])) print("mixed (n source from < n catalogs): ", len(object_types["mixed"])) print( "edge_mixed (mixed near edge of cell): ", len(object_types["edge_mixed"]), ) print( "edge_missing (< n sources, near edge of cell): ", len(object_types["edge_missing"]), ) print( "edge_extra (> n sources, near edge of cell): ", len(object_types["edge_extra"]), ) print("faint (< n sources, snr < cut): ", len(object_types["faint"])) print( "orphan (split off from larger cluster ", len(object_types["orphan"]), ) print( "one missing (n-1 sources, not near edge): ", len(object_types["missing"]), ) print( "two missing (n-2 sources, not near edge): ", len(object_types["two_missing"]), ) print( "many missing (< n-2 sources, not near edge): ", len(object_types["many_missing"]), ) print("extra (> n sources, not near edge): ", len(object_types["extra"]))