from __future__ import annotations
from typing import TYPE_CHECKING
import numpy as np
from . import utils
from .cluster import ClusterData
from .object import ObjectData
if TYPE_CHECKING:
from .cell import CellData
[docs]
def heirarchicalProcessObject(
obj_data: ObjectData, cell_data: CellData, pixel_r2_cut: float, recurse: int = 0
) -> None:
"""Recursively process an object and make sub-objects
Parameters
----------
obj_data:
Object being processed
cell_data:
Associated cell
pixel_r2_cut:
Distance cut for associated, in pixels**2
recurse:
Current recursion level
Notes
-----
This function will test if the input obj_data is good,
i.e., that all the sources lie within the pixel_r2_cut,
and will split obj_data into multiple objects if
some of the sources lie outside the cut
The new objects will be added to the parent cluster of
the original object
"""
if recurse > cell_data.matcher.max_sub_division:
return
obj_data.recurse = recurse
if obj_data.n_src == 0:
print("Empty object", obj_data.n_src, obj_data.n_unique, recurse)
return
assert obj_data.data is not None
if obj_data.mask.sum() == 1:
obj_data.x_cent = obj_data.data.x_pix.values[0]
obj_data.y_cent = obj_data.data.y_pix.values[0]
obj_data.dist_2 = np.zeros((1), float)
obj_data.rms_dist = 0.0
obj_data.snr_mean = obj_data.data.snr.values[0]
obj_data.snr_rms = 0.0
return
sum_snr = np.sum(obj_data.data.snr)
obj_data.x_cent = np.sum(obj_data.data.x_pix * obj_data.data.snr) / sum_snr
obj_data.y_cent = np.sum(obj_data.data.y_pix * obj_data.data.snr) / sum_snr
obj_data.dist_2 = np.array(
(obj_data.x_cent - obj_data.data.x_pix) ** 2
+ (obj_data.y_cent - obj_data.data.y_pix) ** 2
)
obj_data.rms_dist = np.sqrt(np.mean(obj_data.dist_2))
obj_data.snr_mean = np.mean(obj_data.data.snr.values)
obj_data.snr_rms = np.std(obj_data.data.snr.values)
sub_mask = obj_data.dist_2 < pixel_r2_cut
if sub_mask.all():
return
if recurse >= cell_data.matcher.max_sub_division:
return
heirarchicalSplitObject(obj_data, cell_data, pixel_r2_cut, recurse=recurse + 1)
return
[docs]
def heirarchicalSplitObject(
obj_data: ObjectData, cell_data: CellData, pixel_r2_cut: float, recurse: int = 0
) -> None:
"""Split up a cluster keeping only one source per input
catalog
Parameters
----------
obj_data:
Object being processed
cell_data:
Associated cell
pixel_r2_cut:
Distance cut for associated, in pixels**2
recurse:
Current recursion level
Notes
-----
This function actually does the splitting of objects
"""
if recurse > cell_data.matcher.max_sub_division:
return
obj_data.recurse = recurse
obj_data.extract()
assert obj_data.data is not None
slice_x = obj_data.parent_cluster.footprint.slice_x
slice_y = obj_data.parent_cluster.footprint.slice_y
zoom_factors = [1, 2, 4, 8, 16, 32]
zoom_factor = zoom_factors[recurse]
n_pix = zoom_factor * np.array(
[slice_x.stop - slice_x.start, slice_y.stop - slice_y.start]
)
zoom_x = (
zoom_factor
* obj_data.data.x_cluster
/ obj_data.parent_cluster.pixel_match_scale
)
zoom_y = (
zoom_factor
* obj_data.data.y_cluster
/ obj_data.parent_cluster.pixel_match_scale
)
counts_map = utils.fillCountsMapFromArrays(zoom_x, zoom_y, n_pix)
fp_dict = utils.getFootprints(counts_map, buf=0)
footprints = fp_dict["footprints"]
n_footprints = len(footprints.footprints)
if n_footprints == 1:
if recurse >= cell_data.matcher.max_sub_division:
return
heirarchicalSplitObject(obj_data, cell_data, pixel_r2_cut, recurse=recurse + 1)
return
footprint_key = fp_dict["footprint_key"]
footprint_ids = utils.findClusterIdsFromArrays(
zoom_x.values.astype(int), zoom_y.values.astype(int), footprint_key
)
biggest = np.argmax(np.bincount(footprint_ids))
biggest_mask = np.zeros(obj_data.mask.shape, dtype=bool)
for i_fp in range(n_footprints):
sub_mask = footprint_ids == i_fp
count = 0
new_mask = np.zeros(obj_data.mask.shape, dtype=bool)
for i, val in enumerate(obj_data.mask):
if val:
new_mask[i] = sub_mask[count]
count += 1
assert sub_mask.sum() == new_mask.sum()
if i_fp == biggest:
biggest_mask = new_mask
continue
new_object = obj_data.parent_cluster.addObject(cell_data, new_mask)
heirarchicalProcessObject(new_object, cell_data, pixel_r2_cut, recurse=recurse)
obj_data.mask = biggest_mask
obj_data.extract()
heirarchicalProcessObject(obj_data, cell_data, pixel_r2_cut, recurse=recurse)
return
[docs]
def heirarchicalProcessCluster(
cluster: ClusterData, cell_data: CellData, pixel_r2_cut: float
) -> list[ObjectData]:
"""Function that is called recursively to
split clusters until they consist only of sources within
the match radius of the cluster centroid.
Recursively process a cluster and make associated objects
Parameters
----------
cluster:
Cluster being processed
cell_data:
Associated cell
pixel_r2_cut:
Distance cut for associated, in pixels**2
Returns
-------
Objects produced in cluster processing
"""
cluster.extract(cell_data)
assert cluster.data is not None
cluster.n_src = len(cluster.data.i_cat)
cluster.n_unique = len(np.unique(cluster.data.i_cat.values))
if cluster.n_src == 0:
print("Empty cluster", cluster.n_src, cluster.n_unique)
return cluster.objects
if cluster.n_src == 1:
cluster.x_cent = cluster.data.x_pix.values[0]
cluster.y_cent = cluster.data.y_pix.values[0]
cluster.dist_2 = np.zeros((1))
cluster.rms_dist = 0.0
cluster.snr_mean = cluster.data.snr.values[0]
cluster.snr_rms = 0.0
initial_object = cluster.addObject(cell_data)
heirarchicalProcessObject(initial_object, cell_data, pixel_r2_cut)
return cluster.objects
sum_snr = np.sum(cluster.data.snr)
cluster.x_cent = np.sum(cluster.data.x_pix * cluster.data.snr) / sum_snr
cluster.y_cent = np.sum(cluster.data.y_pix * cluster.data.snr) / sum_snr
cluster.dist_2 = (cluster.x_cent - cluster.data.x_pix) ** 2 + (
cluster.y_cent - cluster.data.y_pix
) ** 2
cluster.rms_dist = np.sqrt(np.mean(cluster.dist_2))
cluster.snr_mean = np.mean(cluster.data.snr.values)
cluster.snr_rms = np.std(cluster.data.snr.values)
initial_object = cluster.addObject(cell_data)
heirarchicalProcessObject(initial_object, cell_data, pixel_r2_cut)
return cluster.objects