from __future__ import annotations
from typing import TYPE_CHECKING, Any
import numpy as np
import pandas
import tables_io
from .shear_data import ShearData
if TYPE_CHECKING:
from .cell import CellData
from .shear_match import ShearMatch
# These are the names of the various shear catalogs
# They are in reverse "alphabetic" order
SHEAR_NAMES = ["ns", "2p", "2m", "1p", "1m"]
# These are the coeffs for the various shear catalogs
DESHEAR_COEFFS = np.array(
[
[0, 0, 0, 0],
[0, 1, 1, 0],
[0, -1, -1, 0],
[1, 0, 0, -1],
[-1, 0, 0, 1],
]
)
# These parameters will have to change if the cells change
CELL_INNER_SIZE = 150
CELL_BUFFER = 25
CELL_OFFSET = 0.5
N_PATCH = 20
# These are calculated from the above
CELL_OUTER_SIZE = CELL_INNER_SIZE + (2 * CELL_BUFFER)
CLEAN_CELL_CUT = int(CELL_INNER_SIZE) / 2
UNCLEAN_CELL_CUT = CLEAN_CELL_CUT + 5
PATCH_OFFSET = (N_PATCH + 1 / 2)
[docs]
def shearStats(df: pandas.DataFrame) -> dict:
"""Return the shear statistics
{st} is the shear type, one of "gauss", "pgauss", "wmom"
{i}, {j} index the shear parameters 1, 2
Parameters
----------
df:
Input DataFrame, must have :py:class:`hpmcm.ShearTable` schema
Returns
-------
Shear stats in a dict.
Notes
-----
Shear stats include:
+-----------------+-----------------------------------------------------+
| Key | Description |
+=================+=====================================================+
| n_{st} | Number of sources from that catalog |
+-----------------+-----------------------------------------------------+
| g_{i}_{st} | g_{i} shear parameter for that catalog |
+-----------------+-----------------------------------------------------+
| delta_g_{i}_{j} | g_{i,j} shear measurment: g_{i}_{j}p - g_{i}_{j}m |
+-----------------+-----------------------------------------------------+
| good | True if every catalog has one source in this object |
+-----------------+-----------------------------------------------------+
If the matching is not good, then delta_g_1 = delta_g_2 = np.nan
"""
out_dict: dict[str, float | int] = {}
all_good = True
for i, name_ in enumerate(SHEAR_NAMES):
mask = df.i_cat == i
n_cat = mask.sum()
if n_cat != 1:
all_good = False
out_dict[f"n_{name_}"] = int(n_cat)
if n_cat:
out_dict[f"g_1_{name_}"] = df[mask].g_1.values.mean()
out_dict[f"g_2_{name_}"] = df[mask].g_2.values.mean()
else:
out_dict[f"g_1_{name_}"] = np.nan
out_dict[f"g_2_{name_}"] = np.nan
if all_good:
out_dict["delta_g_1_1"] = out_dict["g_1_1p"] - out_dict["g_1_1m"]
out_dict["delta_g_2_2"] = out_dict["g_2_2p"] - out_dict["g_2_2m"]
out_dict["delta_g_1_2"] = out_dict["g_1_2p"] - out_dict["g_1_2m"]
out_dict["delta_g_2_1"] = out_dict["g_2_1p"] - out_dict["g_2_1m"]
else:
out_dict["delta_g_1_1"] = np.nan
out_dict["delta_g_2_2"] = np.nan
out_dict["delta_g_1_2"] = np.nan
out_dict["delta_g_2_1"] = np.nan
out_dict["good"] = all_good
return out_dict
[docs]
def shearReport(
basefile: str,
output_file_base: str | None,
shear: float,
cat_type: str,
tract: int,
snr_cut: float = 7.5,
) -> None:
"""Report on the shear calibration
Parameters
----------
basefile
Input base file name (see notes)
output_file_base:
Output file name (see notes)
shear:
Applied shear
cat_type:
Catalog type (one of ["pgauss", "gauss", "wmom"]
tract:
Tract, written to outout data
snr_cut:
Signal-to-noise cut.
Notes
-----
This will read the object shear data from "{basefile}_cluster_shear.pq"
This will read the object statisticis from "{basefile}_cluster_stats.pq"
This will write the shear stats to "{output_file_base}.pkl"
This will write the figures to "{output_file_base}_{figure}.png"
"""
t = tables_io.read(f"{basefile}_cluster_shear.pq")
t2 = tables_io.read(f"{basefile}_cluster_stats.pq")
shear_data = ShearData(t, t2, shear, cat_type, tract, snr_cut=snr_cut)
if output_file_base is not None:
shear_data.save(f"{output_file_base}.pkl")
shear_data.savefigs(output_file_base)
[docs]
def mergeShearReports(
inputs: list[str],
output_file: str,
) -> None:
"""Merge reports on the shear calibration
Parameters
----------
inputs:
List of input ShearData pickle files
output_file:
Where to write the merged file
"""
out_dict: dict[str, Any] = {}
for input_ in inputs:
shear_data = ShearData.load(input_)
input_dict = shear_data.toDict()
for key, val in input_dict.items():
if key in out_dict:
out_dict[key].append(val)
else:
out_dict[key] = [val]
out_df = pandas.DataFrame(out_dict)
out_df.to_parquet(output_file)
[docs]
def splitByTypeAndClean(
basefile: str,
tract: int,
shear: float,
cat_type: str,
*,
clean: bool = False,
) -> None: # pragma: no cover
"""Split a parquet file by shear catalog type and tract
Parameters
----------
basefile:
Original file name
tract:
Tract to select
shear:
Applied shear, saved to output
cat_type:
Catalog type to select
clean:
Remove duplicates
Notes
-----
This will create 5 files with the pattern:
"{basefile}_uncleaned_{tract}_{type}.pq"
+--------------+-------------------------------------+
| Column | Description |
+==============+=====================================+
| id | Index of object inside catalog |
+--------------+-------------------------------------+
| orig_id | Original object id |
+--------------+-------------------------------------+
| cell_idx_x | X-index of Cell |
+--------------+-------------------------------------+
| cell_idx_y | Y-index of Cell |
+--------------+-------------------------------------+
| x_cell_coadd | X-coordinate in cell frame |
+--------------+-------------------------------------+
| y_cell_coadd | Y-coordinate in cell frame |
+--------------+-------------------------------------+
| x_pix | X-coordinate in global WCS frame |
+--------------+-------------------------------------+
| y_pix | Y-coordinate in global WCS frame |
+--------------+-------------------------------------+
| g_1 | Shear g_1 component estimate |
+--------------+-------------------------------------+
| g_2 | Shear g_2 component estimate |
+--------------+-------------------------------------+
| snr | Signal-to-noise ratio |
+--------------+-------------------------------------+
"""
p = tables_io.read(basefile)
if clean:
clean_st = "cleaned"
cell_cut = CLEAN_CELL_CUT
else:
clean_st = "uncleaned"
cell_cut = UNCLEAN_CELL_CUT
for type_ in SHEAR_NAMES:
mask = p["shear_type"] == type_
sub = p[mask].copy(deep=True)
cell_idx_x = (N_PATCH * sub["patch_x"].values + sub["cell_x"].values).astype(
int
)
cell_idx_y = (N_PATCH * sub["patch_y"].values + sub["cell_y"].values).astype(
int
)
cent_x = CELL_INNER_SIZE * (cell_idx_x - CELL_OFFSET)
cent_y = CELL_INNER_SIZE * (cell_idx_y - CELL_OFFSET)
x_cell_coadd = sub["col"] - cent_x
y_cell_coadd = sub["row"] - cent_y
sub["x_pix"] = sub["col"] + CELL_BUFFER
sub["y_pix"] = sub["row"] + CELL_BUFFER
sub["x_cell_coadd"] = x_cell_coadd
sub["y_cell_coadd"] = y_cell_coadd
sub["snr"] = sub[f"{cat_type}_band_flux_r"] / sub[f"{cat_type}_band_flux_err_r"]
sub["g_1"] = sub[f"{cat_type}_g_1"]
sub["g_2"] = sub[f"{cat_type}_g_2"]
sub["cell_idx_x"] = cell_idx_x
sub["cell_idx_y"] = cell_idx_y
sub["orig_id"] = sub.id
sub["id"] = np.arange(len(sub))
central_to_cell = np.bitwise_and(
np.fabs(x_cell_coadd) < cell_cut, np.fabs(y_cell_coadd) < cell_cut
)
central_to_patch = np.bitwise_and(
np.fabs(sub["cell_x"].values - PATCH_OFFSET) < (N_PATCH/2),
np.fabs(sub["cell_y"].values - PATCH_OFFSET) < (N_PATCH/2)
)
right_tract = sub["tract"] == tract
central = np.bitwise_and(central_to_cell, central_to_patch)
selected = np.bitwise_and(right_tract, central)
cleaned = sub[selected].copy(deep=True)
cleaned["shear"] = np.repeat(shear, len(cleaned))
cleaned.to_parquet(basefile.replace(".parq", f"_{clean_st}_{tract}_{type_}.pq"))
[docs]
def reduceShearDataForCell(
cell: CellData, i_cat: int, dataframe: pandas.DataFrame
) -> pandas.DataFrame:
"""Filters dataframe to keep only sources in the cell
Parameters
----------
cell:
The cell being analyzed
i_cat:
Catalog index
dataframe:
Input dataframe
Returns
-------
Filtered datasets
Notes
-----
This will optionally deshear the source positions if `matcher.deshear`
is not None.
This will add these columns to the output dataframes
+-----------+-------------------------------------+
| Column | Description |
+===========+=====================================+
| x_cell | X-coordinate in cell frame |
+-----------+-------------------------------------+
| y_cell | Y-coordinate in cell frame |
+-----------+-------------------------------------+
| x_pix | X-coordinate in global WCS frame |
+-----------+-------------------------------------+
| y_pix | Y-coordinate in global WCS frame |
+-----------+-------------------------------------+
| dx_shear | Change in X position when desheared |
+-----------+-------------------------------------+
| dy_shear | Change in X position when desheared |
+-----------+-------------------------------------+
"""
matcher = cell.matcher
if TYPE_CHECKING:
assert isinstance(matcher, ShearMatch)
filtered_idx = matcher.getCellIndices(dataframe) == cell.idx
reduced = dataframe[filtered_idx].copy(deep=True)
x_cell_orig = reduced["x_cell_coadd"]
y_cell_orig = reduced["y_cell_coadd"]
x_pix_orig = reduced["x_pix"]
y_pix_orig = reduced["y_pix"]
if matcher.deshear is not None:
# De-shear in the cell frame to do matching
dx_shear = matcher.deshear * (
x_cell_orig * DESHEAR_COEFFS[i_cat][0]
+ y_cell_orig * DESHEAR_COEFFS[i_cat][2]
)
dy_shear = matcher.deshear * (
x_cell_orig * DESHEAR_COEFFS[i_cat][1]
+ y_cell_orig * DESHEAR_COEFFS[i_cat][3]
)
x_cell = x_cell_orig + dx_shear
y_cell = y_cell_orig + dy_shear
x_pix = x_pix_orig + dx_shear
y_pix = y_pix_orig + dy_shear
else: # pragma: no cover
dx_shear = np.zeros(len(dataframe))
dy_shear = np.zeros(len(dataframe))
x_cell = x_cell_orig
y_cell = y_cell_orig
x_pix = x_pix_orig
y_pix = y_pix_orig
x_cell = (x_cell + (CELL_OUTER_SIZE/2)) / matcher.pixel_match_scale
y_cell = (y_cell + (CELL_OUTER_SIZE/2)) / matcher.pixel_match_scale
filtered_x = np.bitwise_and(x_cell >= 0, x_cell < cell.n_pix[0])
filtered_y = np.bitwise_and(y_cell >= 0, y_cell < cell.n_pix[1])
filtered_bounds = np.bitwise_and(filtered_x, filtered_y)
red = reduced[filtered_bounds].copy(deep=True)
red["x_cell"] = x_cell[filtered_bounds]
red["y_cell"] = y_cell[filtered_bounds]
red["x_pix"] = x_pix[filtered_bounds]
red["y_pix"] = y_pix[filtered_bounds]
if matcher.deshear is not None:
red["dx_shear"] = dx_shear[filtered_bounds]
red["dy_shear"] = dy_shear[filtered_bounds]
return red
[docs]
def makeMatchedShearSourceCatalogs(
source_base_name: str,
match_base_name: str,
) -> dict[str, pandas.DataFrame]:
"""Use the associations to join the source tables to their match obects
Parameters
----------
source_base_name:
_base file name for souces catalogs
match_base_name:
_base file naem for match tables
Returns
-------
Dict of tables, keyed by shear type, which have the
souces catalogs joined to the associated objects
"""
keys = ["object_stats", "object_assoc", "object_shear"]
shear_types = {v: k for k, v in enumerate(SHEAR_NAMES)}
td = tables_io.read(match_base_name, keys=keys)
itd = tables_io.read(source_base_name, keys=list(shear_types.keys()))
td["object_stats"]["idx"] = np.arange(len(td["object_stats"]))
td["object_shear"]["idx"] = np.arange(len(td["object_shear"]))
merged_object = td["object_stats"].merge(
td["object_shear"], on="idx", how="inner", suffixes=["_l", "_r"]
)
merged_object_assoc = td["object_assoc"].merge(
merged_object, on="objectId", how="inner", suffixes=["_l", "_r"]
)
out_dict = {}
for cat_type_, i_cat_ in shear_types.items():
merged_object_assoc_mask = merged_object_assoc.catalogId == i_cat_
merged_object_assoc_masked = merged_object_assoc[merged_object_assoc_mask]
sources = itd[cat_type_]
sources["sourceId"] = sources["id"]
matched_source = merged_object_assoc_masked.merge(
sources, on="sourceId", how="inner", suffixes=["_l", "_r"]
)
out_dict[cat_type_] = matched_source
return out_dict