Source code for autoprof.pipeline_steps.Isophote_Fit

import numpy as np
from scipy.stats import iqr
from scipy.fftpack import fft, ifft
from scipy.optimize import minimize
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import RANSACRegressor, HuberRegressor
from time import time
from astropy.visualization import SqrtStretch, LogStretch
from astropy.visualization.mpl_normalize import ImageNormalize
from photutils.isophote import EllipseSample, EllipseGeometry, Isophote, IsophoteList
from photutils.isophote import Ellipse as Photutils_Ellipse
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from copy import copy, deepcopy
import logging
import sys
import os

from ..autoprofutils.SharedFunctions import (
    _iso_extract,
    _x_to_pa,
    _x_to_eps,
    _inv_x_to_eps,
    _inv_x_to_pa,
    Angle_TwoAngles_sin,
    Angle_TwoAngles_cos,
    Angle_Scatter,
    LSBImage,
    AddLogo,
    PA_shift_convention,
    autocolours,
)
from ..autoprofutils.Diagnostic_Plots import Plot_Isophote_Fit

__all__ = (
    "Photutils_Fit",
    "Isophote_Fit_FixedPhase",
    "Isophote_Fit_FFT_Robust",
    "Isophote_Fit_Forced",
    "Isophote_Fit_FFT_mean",
)


[docs] def Photutils_Fit(IMG, results, options): """Photutils elliptical isophote wrapper. This simply gives users access to the photutils isophote fitting method. See: `photutils <https://photutils.readthedocs.io/en/stable/isophote.html>`_ for more information. Notes ---------- :References: - 'background' - 'center' - 'init R' - 'init ellip' - 'init pa' Returns ------- IMG : ndarray Unaltered galaxy image results : dict .. code-block:: python {'fit ellip': , # array of ellipticity values (ndarray) 'fit pa': , # array of PA values (ndarray) 'fit R': , # array of semi-major axis values (ndarray) 'fit ellip_err': , # optional, array of ellipticity error values (ndarray) 'fit pa_err': , # optional, array of PA error values (ndarray) 'auxfile fitlimit': # optional, auxfile message (string) } """ dat = IMG - results["background"] geo = EllipseGeometry( x0=results["center"]["x"], y0=results["center"]["y"], sma=results["init R"] / 2, eps=results["init ellip"], pa=results["init pa"], ) ellipse = Photutils_Ellipse(dat, geometry=geo) isolist = ellipse.fit_image(fix_center=True, linear=False) res = { "fit R": isolist.sma[1:], "fit ellip": isolist.eps[1:], "fit ellip_err": isolist.ellip_err[1:], "fit pa": isolist.pa[1:], "fit pa_err": isolist.pa_err[1:], "fit photutils isolist": isolist, "auxfile fitlimit": "fit limit semi-major axis: %.2f pix" % isolist.sma[-1], } if "ap_doplot" in options and options["ap_doplot"]: Plot_Isophote_Fit( dat, res["fit R"], res["fit ellip"], res["fit pa"], res["fit ellip_err"], res["fit pa_err"], results, options, ) return IMG, res
[docs] def Isophote_Fit_FixedPhase(IMG, results, options): """Simply applies fixed position angle and ellipticity at the initialization values. Parameters ----------------- ap_scale : float, default 0.2 growth scale when fitting isophotes, not the same as *ap_sample---scale*. ap_fit_limit : float, default 2 noise level out to which to extend the fit in units of pixel background noise level. Default is 2, smaller values will end fitting further out in the galaxy image. Notes ---------- :References: - 'background' - 'background noise' - 'psf fwhm' - 'center' - 'mask' (optional) - 'init ellip' - 'init pa' Returns ------- IMG : ndarray Unaltered galaxy image results : dict .. code-block:: python {'fit ellip': , # array of ellipticity values (ndarray) 'fit pa': , # array of PA values (ndarray) 'fit R': , # array of semi-major axis values (ndarray) 'fit ellip_err': , # optional, array of ellipticity error values (ndarray) 'fit pa_err': , # optional, array of PA error values (ndarray) 'auxfile fitlimit': # optional, auxfile message (string) } """ if "ap_scale" in options: scale = options["ap_scale"] else: scale = 0.2 # subtract background from image during processing dat = IMG - results["background"] mask = results["mask"] if "mask" in results else None if not np.any(mask): mask = None # Determine sampling radii ###################################################################### shrink = 0 while shrink < 5: sample_radii = [max(1.0, results["psf fwhm"] / 2)] while sample_radii[-1] < (max(IMG.shape) / 2): isovals = _iso_extract( dat, sample_radii[-1], {"ellip": results["init ellip"], "pa": results["init pa"]}, results["center"], more=False, mask=mask, ) if ( np.median(isovals) < (options["ap_fit_limit"] if "ap_fit_limit" in options else 2) * results["background noise"] ): break sample_radii.append(sample_radii[-1] * (1.0 + scale / (1.0 + shrink))) if len(sample_radii) < 15: shrink += 1 else: break if shrink >= 5: raise Exception( "Unable to initialize ellipse fit, check diagnostic plots. Possible missed center." ) ellip = np.ones(len(sample_radii)) * results["init ellip"] pa = np.ones(len(sample_radii)) * results["init pa"] logging.debug("%s: sample radii: %s" % (options["ap_name"], str(sample_radii))) res = { "fit ellip": ellip, "fit pa": pa, "fit R": sample_radii, "auxfile fitlimit": "fit limit semi-major axis: %.2f pix" % sample_radii[-1], } if "init ellip_err" in results: res["fit ellip_err"] = np.ones(len(sample_radii)) * results["init ellip_err"] if "init pa_err" in results: res["fit pa_err"] = np.ones(len(sample_radii)) * results["init pa_err"] return IMG, res
def _ellip_smooth(R, E, deg): model = make_pipeline(PolynomialFeatures(deg), HuberRegressor(epsilon=2.0)) model.fit(np.log10(R).reshape(-1, 1), _inv_x_to_eps(E)) return _x_to_eps(model.predict(np.log10(R).reshape(-1, 1))) def _pa_smooth(R, PA, deg): model_s = make_pipeline(PolynomialFeatures(deg), HuberRegressor()) model_c = make_pipeline(PolynomialFeatures(deg), HuberRegressor()) model_c.fit(np.log10(R).reshape(-1, 1), np.cos(2 * PA)) model_s.fit(np.log10(R).reshape(-1, 1), np.sin(2 * PA)) pred_pa_s = np.clip(model_s.predict(np.log10(R).reshape(-1, 1)), a_min=-1, a_max=1) pred_pa_c = np.clip(model_c.predict(np.log10(R).reshape(-1, 1)), a_min=-1, a_max=1) # np.arctan(pred_pa_s / pred_pa_c) + (np.pi * (pred_pa_c < 0)) return ((np.arctan2(pred_pa_s, pred_pa_c)) % (2 * np.pi)) / 2 def _FFT_Robust_loss( dat, R, PARAMS, i, C, noise, mask=None, reg_scale=1.0, robust_clip=0.15, fit_coefs=None, name="" ): isovals = _iso_extract( dat, R[i], PARAMS[i], C, mask=mask, interp_mask=False if mask is None else True, interp_method="bicubic", ) try: coefs = fft(np.clip(isovals, a_max=np.quantile(isovals, 1.0 - robust_clip), a_min=None)) except: coefs = np.zeros(100) isovals = np.zeros(100) if fit_coefs is None: f2_loss = np.abs(coefs[2]) / ( len(isovals) * (max(0, np.median(isovals)) + noise / np.sqrt(len(isovals))) ) else: f2_loss = np.sum(np.abs(coefs[np.array(fit_coefs)])) / ( len(fit_coefs) * len(isovals) * (max(0, np.median(isovals)) + noise / np.sqrt(len(isovals))) ) reg_loss = 0 if not PARAMS[i]["m"] is None: fmode_scale = 1.0 / len(PARAMS[i]["m"]) if i < (len(R) - 1): reg_loss += abs( (PARAMS[i]["ellip"] - PARAMS[i + 1]["ellip"]) / (1 - PARAMS[i + 1]["ellip"]) ) reg_loss += abs(Angle_TwoAngles_sin(PARAMS[i]["pa"], PARAMS[i + 1]["pa"]) / (0.2)) if not PARAMS[i]["m"] is None: for m in range(len(PARAMS[i]["m"])): reg_loss += fmode_scale * abs((PARAMS[i]["Am"][m] - PARAMS[i + 1]["Am"][m]) / 0.2) reg_loss += fmode_scale * abs( Angle_TwoAngles_cos( PARAMS[i]["m"][m] * PARAMS[i]["Phim"][m], PARAMS[i + 1]["m"][m] * PARAMS[i + 1]["Phim"][m], ) / (PARAMS[i]["m"][m] * 0.1) ) if not PARAMS[i]["C"] is None: reg_loss += abs(np.log10(PARAMS[i]["C"] / PARAMS[i + 1]["C"])) / 0.1 if i > 0: reg_loss += abs( (PARAMS[i]["ellip"] - PARAMS[i - 1]["ellip"]) / (1 - PARAMS[i - 1]["ellip"]) ) reg_loss += abs(Angle_TwoAngles_sin(PARAMS[i]["pa"], PARAMS[i - 1]["pa"]) / (0.2)) if not PARAMS[i]["m"] is None: for m in range(len(PARAMS[i]["m"])): reg_loss += fmode_scale * abs((PARAMS[i]["Am"][m] - PARAMS[i - 1]["Am"][m]) / 0.2) reg_loss += fmode_scale * abs( Angle_TwoAngles_cos( PARAMS[i]["m"][m] * PARAMS[i]["Phim"][m], PARAMS[i - 1]["m"][m] * PARAMS[i - 1]["Phim"][m], ) / (PARAMS[i]["m"][m] * 0.1) ) if not PARAMS[i]["C"] is None: reg_loss += abs(np.log10(PARAMS[i]["C"] / PARAMS[i - 1]["C"])) / 0.1 return f2_loss * (1 + reg_loss * reg_scale) def _FFT_Robust_Errors( dat, R, PARAMS, C, noise, mask=None, reg_scale=1.0, robust_clip=0.15, fit_coefs=None, name="" ): PA_err = np.zeros(len(R)) E_err = np.zeros(len(R)) for ri in range(len(R)): temp_fits = [] for i in range(10): low_ri = max(0, ri - 1) high_ri = min(len(R) - 1, ri + 1) temp_fits.append( minimize( lambda x: _FFT_Robust_loss( dat, [R[low_ri], R[ri] * (1 - 0.05 + i * 0.1 / 9), R[high_ri]], [ PARAMS[low_ri], { "ellip": np.clip(x[0], 0, 0.999), "pa": x[1] % np.pi, "m": PARAMS[ri]["m"], "Am": PARAMS[ri]["Am"], "Phim": PARAMS[ri]["Phim"], }, PARAMS[high_ri], ], 1, C, noise, mask=mask, reg_scale=reg_scale, robust_clip=robust_clip, fit_coefs=fit_coefs, name=name, ), x0=[PARAMS[ri]["ellip"], PARAMS[ri]["pa"]], method="SLSQP", options={"ftol": 0.001}, ).x ) temp_fits = np.array(temp_fits) E_err[ri] = iqr(np.clip(temp_fits[:, 0], 0, 1), rng=[16, 84]) / 2 PA_err[ri] = ( Angle_Scatter(2 * (temp_fits[:, 1] % np.pi)) / 4.0 ) # multiply by 2 to get [0, 2pi] range return E_err, PA_err
[docs] def Isophote_Fit_FFT_Robust(IMG, results, options): """Fit elliptical isophotes to a galaxy image using FFT coefficients and regularization. The isophotal fitting routine simultaneously optimizes a collection of elliptical isophotes by minimizing the 2nd FFT coefficient power, regularized for robustness. A series of isophotes are constructed which grow geometrically until they begin to reach the background level. Then the algorithm iteratively updates the position angle and ellipticity of each isophote individually for many rounds. Each round updates every isophote in a random order. Each round cycles between three options: optimizing position angle, ellipticity, or both. To optimize the parameters, 5 values (pa, ellip, or both) are randomly sampled and the "loss" is computed. The loss is a combination of the relative amplitude of the second FFT coefficient (compared to the median flux), and a regularization term. The regularization term penalizes adjacent isophotes for having different position angle or ellipticity (using the l1 norm). Thus, all the isophotes are coupled and tend to fit smoothly varying isophotes. When the optimization has completed three rounds without any isophotes updating, the profile is assumed to have converged. An uncertainty for each ellipticity and position angle value is determined by repeatedly re-optimizing each ellipse with slight adjustments to it's semi-major axis length (+- 5%). The standard deviation of the PA/ellipticity after repeated fitting gives the uncertainty. Parameters ----------------- ap_scale : float, default 0.2 growth scale when fitting isophotes, not the same as *ap_sample---scale*. ap_fit_limit : float, default 2 noise level out to which to extend the fit in units of pixel background noise level. Default is 2, smaller values will end fitting further out in the galaxy image. ap_regularize_scale : float, default 1 scale factor to apply to regularization coupling factor between isophotes. Default of 1, larger values make smoother fits, smaller values give more chaotic fits. ap_isofit_robustclip : float, default 0.15 quantile of flux values at which to clip when extracting values along an isophote. Clipping outlier values (such as very bright stars) while fitting isophotes allows for robust computation of FFT coefficients along an isophote. ap_isofit_losscoefs : tuple, default (2,) Tuple of FFT coefficients to use in optimization procedure. AutoProf will attemp to minimize the power in all listed FFT coefficients. Must be a tuple, not a list. ap_isofit_superellipse : bool, default False If True, AutoProf will fit superellipses instead of regular ellipses. A superellipse is typically used to represent boxy/disky isophotes. The variable controlling the transition from a rectangle to an ellipse to a four-armed-star like shape is C. A value of C = 2 represents an ellipse and is the starting point of the optimization. ap_isofit_fitcoefs : tuple, default None Tuple of FFT coefficients to use in fitting procedure. AutoProf will attemp to fit ellipses with these Fourier mode perturbations. Such perturbations allow for lopsided, boxy, disky, and other types of isophotes beyond straightforward ellipses. Must be a tuple, not a list. Note that AutoProf will first fit ellipses, then turn on the Fourier mode perturbations, thus the fitting time will always be longer. ap_isofit_fitcoefs_FFTinit : bool, default False If True, the coefficients for the Fourier modes fitted from ap_isofit_fitcoefs will be initialized using an FFT decomposition along fitted elliptical isophotes. This can improve the fit result, though it is less stable and so users should examine the results after fitting. ap_isofit_perturbscale_ellip : float, default 0.03 Sampling scale for random adjustments to ellipticity made while optimizing isophotes. Smaller values will converge faster, but get stuck in local minima; larger values will escape local minima, but takes longer to converge. ap_isofit_perturbscale_pa : float, default 0.06 Sampling scale for random adjustments to position angle made while optimizing isophotes. Smaller values will converge faster, but get stuck in local minima; larger values will escape local minima, but takes longer to converge. ap_isofit_iterlimitmax : int, default 300 Maximum number of iterations (each iteration adjusts every isophote once) before automatically stopping optimization. For galaxies with lots of structure (ie detailed spiral arms) more iterations may be needed to fully fit the light distribution, but runtime will be longer. ap_isofit_iterlimitmin : int, default 0 Minimum number of iterations before optimization is allowed to stop. ap_isofit_iterstopnochange : float, default 3 Number of iterations with no updates to parameters before optimization procedure stops. Lower values will process galaxies faster, but may still be stuck in local minima, higher values are more likely to converge on the global minimum but can take a long time to run. Fractional values are allowed though not recomended. Notes ---------- :References: - 'background' - 'background noise' - 'psf fwhm' - 'center' - 'mask' (optional) - 'init ellip' - 'init pa' Returns ------- IMG : ndarray Unaltered galaxy image results : dict .. code-block:: python {'fit ellip': , # array of ellipticity values (ndarray) 'fit pa': , # array of PA values (ndarray) 'fit R': , # array of semi-major axis values (ndarray) 'fit ellip_err': , # optional, array of ellipticity error values (ndarray) 'fit pa_err': , # optional, array of PA error values (ndarray) 'fit C': , # optional, superellipse scale parameter (ndarray) 'fit Fmodes': , # optional, fitted Fourier mode indices (tuple) 'fit Fmode A*': , # optional, fitted Fourier mode amplitudes, * for each index (ndarray) 'fit Fmode Phi*': , # optional, fitted Fourier mode phases, * for each index (ndarray) 'auxfile fitlimit': # optional, auxfile message (string) } """ if "ap_scale" in options: scale = options["ap_scale"] else: scale = 0.2 # subtract background from image during processing dat = IMG - results["background"] mask = results["mask"] if "mask" in results else None if not np.any(mask): mask = None if "ap_isoinit_R_set" in options: minR = options["ap_isoinit_R_set"] elif "init R" in results: minR = results["init R"] * 0.8 else: minR = 0.0 # Determine sampling radii ###################################################################### shrink = 0 while shrink < 5: sample_radii = [max(1.0, results["psf fwhm"] / 2)] while sample_radii[-1] < (max(IMG.shape) / 2): isovals = _iso_extract( dat, sample_radii[-1], {"ellip": results["init ellip"], "pa": results["init pa"]}, results["center"], more=False, mask=mask, ) if ( sample_radii[-1] > minR and np.median(isovals) < (options["ap_fit_limit"] if "ap_fit_limit" in options else 2) * results["background noise"] ): break sample_radii.append(sample_radii[-1] * (1.0 + scale / (1.0 + shrink))) if len(sample_radii) < 15: shrink += 1 else: break if shrink >= 5: raise Exception( "Unable to initialize ellipse fit, check diagnostic plots. Possible missed center." ) ellip = np.ones(len(sample_radii)) * results["init ellip"] pa = np.ones(len(sample_radii)) * results["init pa"] logging.debug("%s: sample radii: %s" % (options["ap_name"], str(sample_radii))) # Fit isophotes ###################################################################### perturb_scale = 0.03 regularize_scale = options["ap_regularize_scale"] if "ap_regularize_scale" in options else 1.0 robust_clip = options["ap_isofit_robustclip"] if "ap_isofit_robustclip" in options else 0.15 N_perturb = 5 fit_coefs = options["ap_isofit_losscoefs"] if "ap_isofit_losscoefs" in options else None fit_params = options["ap_isofit_fitcoefs"] if "ap_isofit_fitcoefs" in options else None fit_superellipse = ( options["ap_isofit_superellipse"] if "ap_isofit_superellipse" in options else False ) parameters = list( { "ellip": ellip[i], "pa": pa[i], "m": fit_params, "C": 2 if fit_superellipse else None, "Am": None if fit_params is None else np.zeros(len(fit_params)), "Phim": None if fit_params is None else np.zeros(len(fit_params)), } for i in range(len(ellip)) ) count = 0 iterlimitmax = ( options["ap_isofit_iterlimitmax"] if "ap_isofit_iterlimitmax" in options else 1000 ) iterlimitmin = options["ap_isofit_iterlimitmin"] if "ap_isofit_iterlimitmin" in options else 0 iterstopnochange = ( options["ap_isofit_iterstopnochange"] if "ap_isofit_iterstopnochange" in options else 3 ) count_nochange = 0 use_center = copy(results["center"]) I = np.array(range(len(sample_radii))) param_cycle = 2 base_params = 2 + int(fit_superellipse) while count < iterlimitmax: # Periodically include logging message if count % 10 == 0: logging.debug("%s: count: %i" % (options["ap_name"], count)) count += 1 np.random.shuffle(I) N_perturb = int(1 + (10 / np.sqrt(count))) for i in I: perturbations = [] perturbations.append(deepcopy(parameters)) perturbations[-1][i]["loss"] = _FFT_Robust_loss( dat, sample_radii, perturbations[-1], i, use_center, results["background noise"], mask=mask, reg_scale=regularize_scale if count > 4 else 0, robust_clip=robust_clip, fit_coefs=fit_coefs, name=options["ap_name"], ) for n in range(N_perturb): perturbations.append(deepcopy(parameters)) if count % param_cycle == 0: perturbations[-1][i]["ellip"] = _x_to_eps( _inv_x_to_eps(perturbations[-1][i]["ellip"]) + np.random.normal(loc=0, scale=perturb_scale) ) elif count % param_cycle == 1: perturbations[-1][i]["pa"] = ( perturbations[-1][i]["pa"] + np.random.normal(loc=0, scale=np.pi * perturb_scale) ) % np.pi elif (count % param_cycle) == 2 and not parameters[i]["C"] is None: perturbations[-1][i]["C"] = 10 ** ( np.log10(perturbations[-1][i]["C"]) + np.random.normal(loc=0, scale=np.log10(1.0 + perturb_scale)) ) elif count % param_cycle < (base_params + len(parameters[i]["m"])): perturbations[-1][i]["Am"][ (count % param_cycle) - base_params ] += np.random.normal(loc=0, scale=perturb_scale) elif count % param_cycle < (base_params + 2 * len(parameters[i]["m"])): phim_index = (count % param_cycle) - base_params - len(parameters[i]["m"]) perturbations[-1][i]["Phim"][phim_index] = ( perturbations[-1][i]["Phim"][phim_index] + np.random.normal( loc=0, scale=2 * np.pi * perturb_scale / parameters[i]["m"][phim_index], ) ) % (2 * np.pi / parameters[i]["m"][phim_index]) else: raise Exception( "Unrecognized optimization parameter id: %i" % (count % param_cycle) ) perturbations[-1][i]["loss"] = _FFT_Robust_loss( dat, sample_radii, perturbations[-1], i, use_center, results["background noise"], mask=mask, reg_scale=regularize_scale if count > 4 else 0, robust_clip=robust_clip, fit_coefs=fit_coefs, name=options["ap_name"], ) best = np.argmin(list(p[i]["loss"] for p in perturbations)) if best > 0: parameters = deepcopy(perturbations[best]) del parameters[i]["loss"] count_nochange = 0 else: count_nochange += 1 if not ( count_nochange < (iterstopnochange * (len(sample_radii) - 1)) or count < iterlimitmin ): if param_cycle > 2 or (parameters[i]["m"] is None and not fit_superellipse): break elif parameters[i]["m"] is None and fit_superellipse: logging.info( "%s: Started C fitting at iteration %i" % (options["ap_name"], count) ) param_cycle = 3 iterstopnochange = max(iterstopnochange, param_cycle) count_nochange = 0 count = 0 if fit_coefs is None: fit_coefs = (2, 4) else: logging.info( "%s: Started Fmode fitting at iteration %i" % (options["ap_name"], count) ) if fit_superellipse: logging.info( "%s: Started C fitting at iteration %i" % (options["ap_name"], count) ) param_cycle = base_params + 2 * len(parameters[i]["m"]) iterstopnochange = max(iterstopnochange, param_cycle) count_nochange = 0 count = 0 if fit_coefs is None and not fit_params is None: fit_coefs = fit_params if not 2 in fit_coefs: fit_coefs = tuple(sorted(set([2] + list(fit_coefs)))) if not parameters[i]["C"] is None and ( not "ap_isofit_losscoefs" in options or options["ap_isofit_losscoefs"] is None ): fit_coefs = tuple(sorted(set([4] + list(fit_coefs)))) if ( "ap_isofit_fitcoefs_FFTinit" in options and options["ap_isofit_fitcoefs_FFTinit"] ): for ii in I: isovals = _iso_extract( dat, sample_radii[ii], parameters[ii], use_center, mask=mask, interp_mask=False if mask is None else True, interp_method="bicubic", ) if mask is None: coefs = fft( np.clip( isovals, a_max=np.quantile(isovals, 0.85), a_min=None, ) ) else: coefs = fft( np.clip( isovals, a_max=np.quantile(isovals, 0.9), a_min=None, ) ) for m in range(len(parameters[ii]["m"])): parameters[ii]["Am"][m] = np.abs( coefs[parameters[ii]["m"][m]] / coefs[0] ) * np.sign(np.angle(coefs[parameters[ii]["m"][m]])) parameters[ii]["Phim"][m] = np.angle( coefs[parameters[ii]["m"][m]] ) % (2 * np.pi) if not ( count_nochange < (iterstopnochange * (len(sample_radii) - 1)) or count < iterlimitmin ): break logging.info("%s: Completed isohpote fit in %i itterations" % (options["ap_name"], count)) # Compute errors ###################################################################### ellip_err, pa_err = _FFT_Robust_Errors( dat, sample_radii, parameters, use_center, results["background noise"], mask=mask, reg_scale=regularize_scale, robust_clip=robust_clip, fit_coefs=fit_coefs, name=options["ap_name"], ) for i in range(len(ellip)): parameters[i]["ellip err"] = ellip_err[i] parameters[i]["pa err"] = pa_err[i] # Plot fitting results ###################################################################### if "ap_doplot" in options and options["ap_doplot"]: Plot_Isophote_Fit(dat, sample_radii, parameters, results, options) res = { "fit ellip": np.array(list(parameters[i]["ellip"] for i in range(len(parameters)))), "fit pa": np.array(list(parameters[i]["pa"] for i in range(len(parameters)))), "fit R": sample_radii, "fit ellip_err": ellip_err, "fit pa_err": pa_err, "auxfile fitlimit": "fit limit semi-major axis: %.2f pix" % sample_radii[-1], } if not fit_params is None: res.update({"fit Fmodes": fit_params}) for m in range(len(fit_params)): res.update( { "fit Fmode A%i" % fit_params[m]: np.array( list(parameters[i]["Am"][m] for i in range(len(parameters))) ), "fit Fmode Phi%i" % fit_params[m]: np.array( list(parameters[i]["Phim"][m] for i in range(len(parameters))) ), } ) if fit_superellipse: res.update({"fit C": np.array(list(parameters[i]["C"] for i in range(len(parameters))))}) return IMG, res
[docs] def Isophote_Fit_Forced(IMG, results, options): """Read previously fit PA/ellipticity profile. Reads a .prof file and extracts the corresponding PA/ellipticity profile. The profile is extracted generically, so any csv file with columns for 'R', 'pa', 'ellip', and optionally 'pa_e' and 'ellip_e' will be able to create a forced fit. This can be used for testing purposes, such as selecting a specific isophote to extract or comparing AutoProf SB extraction methods with other softwares. Parameters ----------------- ap_forcing_profile : string, default None File path to .prof file providing forced photometry PA and ellip values to apply to *ap_image_file* (required for forced photometry) Notes ---------- :References: - 'background' - 'background noise' - 'center' Returns ------- IMG : ndarray Unaltered galaxy image results : dict .. code-block:: python {'fit ellip': , # array of ellipticity values (ndarray) 'fit pa': , # array of PA values (ndarray) 'fit R': , # array of semi-major axis values (ndarray) 'fit ellip_err': , # optional, array of ellipticity error values (ndarray) 'fit pa_err': , # optional, array of PA error values (ndarray) } """ with open(options["ap_forcing_profile"], "r") as f: raw = f.readlines() for i, l in enumerate(raw): if l[0] != "#": readfrom = i break header = list(h.strip() for h in raw[readfrom].split(",")) force = dict((h, []) for h in header) for l in raw[readfrom + 2 :]: for d, h in zip(l.split(","), header): force[h].append(float(d.strip())) force["pa"] = PA_shift_convention(np.array(force["pa"]), deg=True) if "ap_doplot" in options and options["ap_doplot"]: parameters = [] for i in range(len(force["R"])): parameters.append({"ellip": force["ellip"][i], "pa": force["pa"][i], "C": None}) Plot_Isophote_Fit( IMG - results["background"], np.array(force["R"]), parameters, results, options, ) res = { "fit ellip": np.array(force["ellip"]), "fit pa": np.array(force["pa"]) * np.pi / 180, "fit R": list(np.array(force["R"]) / options["ap_pixscale"]), } if "ellip_e" in force and "pa_e" in force: res["fit ellip_err"] = np.array(force["ellip_e"]) res["fit pa_err"] = np.array(force["pa_e"]) * np.pi / 180 return IMG, res
###################################################################### def _FFT_mean_loss(dat, R, E, PA, i, C, noise, mask=None, reg_scale=1.0, name=""): isovals = _iso_extract( dat, R[i], {"ellip": E[i], "pa": PA[i]}, C, mask=mask, interp_mask=False if mask is None else True, ) if not np.all(np.isfinite(isovals)): logging.warning( "Failed to evaluate isophotal flux values, skipping this ellip/pa combination" ) return np.inf coefs = fft(isovals) f2_loss = np.abs(coefs[2]) / (len(isovals) * (max(0, np.mean(isovals)) + noise)) reg_loss = 0 if i < (len(R) - 1): reg_loss += abs((E[i] - E[i + 1]) / (1 - E[i + 1])) reg_loss += abs(Angle_TwoAngles_sin(PA[i], PA[i + 1]) / (0.3)) if i > 0: reg_loss += abs((E[i] - E[i - 1]) / (1 - E[i - 1])) reg_loss += abs(Angle_TwoAngles_sin(PA[i], PA[i - 1]) / (0.3)) return f2_loss * (1 + reg_loss * reg_scale)
[docs] def Isophote_Fit_FFT_mean(IMG, results, options): """Fit elliptical isophotes to a galaxy image using FFT coefficients and regularization. Same as the standard isophote fitting routine, except uses less robust mean/std measures. This is only intended for low S/N data where pixels have low integer counts. Parameters ----------------- ap_scale : float, default 0.2 growth scale when fitting isophotes, not the same as *ap_sample---scale*. ap_fit_limit : float, default 2 noise level out to which to extend the fit in units of pixel background noise level. Default is 2, smaller values will end fitting further out in the galaxy image. ap_regularize_scale : float, default 1 scale factor to apply to regularization coupling factor between isophotes. Default of 1, larger values make smoother fits, smaller values give more chaotic fits. Notes ---------- :References: - 'background' - 'background noise' - 'center' - 'psf fwhm' - 'init ellip' - 'init pa' Returns ------- IMG : ndarray Unaltered galaxy image results : dict .. code-block:: python {'fit ellip': , # array of ellipticity values (ndarray) 'fit pa': , # array of PA values (ndarray) 'fit R': , # array of semi-major axis values (ndarray) 'fit ellip_err': , # optional, array of ellipticity error values (ndarray) 'fit pa_err': , # optional, array of PA error values (ndarray) 'auxfile fitlimit': # optional, auxfile message (string) } """ if "ap_scale" in options: scale = options["ap_scale"] else: scale = 0.2 # subtract background from image during processing dat = IMG - results["background"] mask = results["mask"] if "mask" in results else None if not np.any(mask): mask = None # Determine sampling radii ###################################################################### shrink = 0 while shrink < 5: sample_radii = [3 * results["psf fwhm"] / 2] while sample_radii[-1] < (max(IMG.shape) / 2): isovals = _iso_extract( dat, sample_radii[-1], {"ellip": results["init ellip"], "pa": results["init pa"]}, results["center"], more=False, mask=mask, ) if ( np.mean(isovals) < (options["ap_fit_limit"] if "ap_fit_limit" in options else 1) * results["background noise"] ): break sample_radii.append(sample_radii[-1] * (1.0 + scale / (1.0 + shrink))) if len(sample_radii) < 15: shrink += 1 else: break if shrink >= 5: raise Exception( "Unable to initialize ellipse fit, check diagnostic plots. Possible missed center." ) ellip = np.ones(len(sample_radii)) * results["init ellip"] pa = np.ones(len(sample_radii)) * results["init pa"] logging.debug("%s: sample radii: %s" % (options["ap_name"], str(sample_radii))) # Fit isophotes ###################################################################### perturb_scale = np.array([0.03, 0.06]) regularize_scale = options["ap_regularize_scale"] if "ap_regularize_scale" in options else 1.0 N_perturb = 5 count = 0 count_nochange = 0 use_center = copy(results["center"]) I = np.array(range(len(sample_radii))) while count < 300 and count_nochange < (3 * len(sample_radii)): # Periodically include logging message if count % 10 == 0: logging.debug("%s: count: %i" % (options["ap_name"], count)) count += 1 np.random.shuffle(I) for i in I: perturbations = [] perturbations.append({"ellip": copy(ellip), "pa": copy(pa)}) perturbations[-1]["loss"] = _FFT_mean_loss( dat, sample_radii, perturbations[-1]["ellip"], perturbations[-1]["pa"], i, use_center, results["background noise"], mask=mask, reg_scale=regularize_scale if count > 4 else 0, name=options["ap_name"], ) for n in range(N_perturb): perturbations.append({"ellip": copy(ellip), "pa": copy(pa)}) if count % 3 in [0, 1]: perturbations[-1]["ellip"][i] = _x_to_eps( _inv_x_to_eps(perturbations[-1]["ellip"][i]) + np.random.normal(loc=0, scale=perturb_scale[0]) ) if count % 3 in [1, 2]: perturbations[-1]["pa"][i] = ( perturbations[-1]["pa"][i] + np.random.normal(loc=0, scale=perturb_scale[1]) ) % np.pi perturbations[-1]["loss"] = _FFT_mean_loss( dat, sample_radii, perturbations[-1]["ellip"], perturbations[-1]["pa"], i, use_center, results["background noise"], mask=mask, reg_scale=regularize_scale if count > 4 else 0, name=options["ap_name"], ) best = np.argmin(list(p["loss"] for p in perturbations)) if best > 0: ellip = copy(perturbations[best]["ellip"]) pa = copy(perturbations[best]["pa"]) count_nochange = 0 else: count_nochange += 1 logging.info("%s: Completed isohpote fit in %i itterations" % (options["ap_name"], count)) # detect collapsed center ###################################################################### for i in range(5): if (_inv_x_to_eps(ellip[i]) - _inv_x_to_eps(ellip[i + 1])) > 0.5: ellip[: i + 1] = ellip[i + 1] pa[: i + 1] = pa[i + 1] # Smooth ellip and pa profile ###################################################################### smooth_ellip = copy(ellip) smooth_pa = copy(pa) ellip[:3] = min(ellip[:3]) smooth_ellip = _ellip_smooth(sample_radii, smooth_ellip, 5) smooth_pa = _pa_smooth(sample_radii, smooth_pa, 5) if "ap_doplot" in options and options["ap_doplot"]: ranges = [ [ max(0, int(use_center["x"] - sample_radii[-1] * 1.2)), min(dat.shape[1], int(use_center["x"] + sample_radii[-1] * 1.2)), ], [ max(0, int(use_center["y"] - sample_radii[-1] * 1.2)), min(dat.shape[0], int(use_center["y"] + sample_radii[-1] * 1.2)), ], ] LSBImage( dat[ranges[1][0] : ranges[1][1], ranges[0][0] : ranges[0][1]], results["background noise"], ) # plt.imshow(np.clip(dat[ranges[1][0]: ranges[1][1], ranges[0][0]: ranges[0][1]], # a_min = 0,a_max = None), origin = 'lower', cmap = 'Greys', norm = ImageNormalize(stretch=LogStretch())) for i in range(len(sample_radii)): plt.gca().add_patch( Ellipse( xy=(use_center["x"] - ranges[0][0], use_center["y"] - ranges[1][0]), width=2 * sample_radii[i], height=2 * sample_radii[i] * (1.0 - ellip[i]), angle=pa[i] * 180 / np.pi, fill=False, linewidth=((i + 1) / len(sample_radii)) ** 2, color="r", ) ) if not ("ap_nologo" in options and options["ap_nologo"]): AddLogo(plt.gcf()) plt.savefig( f"{options.get('ap_plotpath','')}fit_ellipse_{options['ap_name']}.{options.get('ap_plot_extension', 'jpg')}", dpi=options["ap_plotdpi"] if "ap_plotdpi" in options else 300, ) plt.close() plt.scatter(sample_radii, ellip, color="r", label="ellip") plt.scatter(sample_radii, pa / np.pi, color="b", label="pa/$np.pi$") show_ellip = _ellip_smooth(sample_radii, ellip, deg=5) show_pa = _pa_smooth(sample_radii, pa, deg=5) plt.plot( sample_radii, show_ellip, color="orange", linewidth=2, linestyle="--", label="smooth ellip", ) plt.plot( sample_radii, show_pa / np.pi, color="purple", linewidth=2, linestyle="--", label="smooth pa/$np.pi$", ) # plt.xscale('log') plt.legend() if not ("ap_nologo" in options and options["ap_nologo"]): AddLogo(plt.gcf()) plt.savefig( f"{options.get('ap_plotpath','')}phaseprofile_{options['ap_name']}.{options.get('ap_plot_extension', 'jpg')}", dpi=options["ap_plotdpi"] if "ap_plotdpi" in options else 300, ) plt.close() # Compute errors ###################################################################### ellip_err = np.zeros(len(ellip)) ellip_err[:2] = np.sqrt(np.sum((ellip[:4] - smooth_ellip[:4]) ** 2) / 4) ellip_err[-1] = np.sqrt(np.sum((ellip[-4:] - smooth_ellip[-4:]) ** 2) / 4) pa_err = np.zeros(len(pa)) pa_err[:2] = np.sqrt(np.sum((pa[:4] - smooth_pa[:4]) ** 2) / 4) pa_err[-1] = np.sqrt(np.sum((pa[-4:] - smooth_pa[-4:]) ** 2) / 4) for i in range(2, len(pa) - 1): ellip_err[i] = np.sqrt( np.sum((ellip[i - 2 : i + 2] - smooth_ellip[i - 2 : i + 2]) ** 2) / 4 ) pa_err[i] = np.sqrt(np.sum((pa[i - 2 : i + 2] - smooth_pa[i - 2 : i + 2]) ** 2) / 4) res = { "fit ellip": ellip, "fit pa": pa, "fit R": sample_radii, "fit ellip_err": ellip_err, "fit pa_err": pa_err, "auxfile fitlimit": "fit limit semi-major axis: %.2f pix" % sample_radii[-1], } return IMG, res