Source code for autoprof.pipeline_steps.PSF

from photutils import DAOStarFinder, IRAFStarFinder
import numpy as np
from scipy.stats import iqr
import matplotlib.pyplot as plt
from astropy.visualization import SqrtStretch, LogStretch, HistEqStretch
from astropy.visualization.mpl_normalize import ImageNormalize
from matplotlib.patches import Ellipse
import matplotlib.cm as cm
import logging
from astropy.io import fits
from itertools import product
from scipy.optimize import minimize
from scipy.stats import norm, multivariate_normal
from scipy.fftpack import fft, ifft
import sys
import os

from ..autoprofutils.SharedFunctions import (
    StarFind,
    AddLogo,
    LSBImage,
    autocolours,
    interpolate_Lanczos,
    interpolate_bicubic,
    Read_Image,
)
from ..autoprofutils.Diagnostic_Plots import Plot_PSF_Stars
from copy import deepcopy

__all__ = ("PSF_IRAF", "PSF_Assumed", "PSF_StarFind", "PSF_Image", "PSF_deconvolve")


[docs] def PSF_IRAF(IMG, results, options): """PSF routine which identifies stars and averages the FWHM. Uses the photutil IRAF wrapper to identify stars in the image and computes the average FWHM. Parameters ----------------- ap_guess_psf : float, default None Initialization value for the PSF calculation in pixels. If not given, AutoProf will default with a guess of 1/*ap_pixscale* ap_set_psf : float, default None force AutoProf to use this PSF value (in pixels) instead of calculating its own. Notes ---------- :References: - 'background' (float) - 'background noise' (float) Returns ------- IMG : ndarray Unaltered galaxy image results : dict .. code-block:: python {'psf fwhm': # FWHM of the average PSF for the image } """ if "ap_set_psf" in options: logging.info("%s: PSF set by user: %.4e" % (options["ap_name"], options["ap_set_psf"])) return IMG, {"psf fwhm": options["ap_set_psf"]} elif "ap_guess_psf" in options: logging.info( "%s: PSF initialized by user: %.4e" % (options["ap_name"], options["ap_guess_psf"]) ) fwhm_guess = options["ap_guess_psf"] else: fwhm_guess = max(1.0, 1.0 / options["ap_pixscale"]) edge_mask = np.zeros(IMG.shape, dtype=bool) edge_mask[ int(IMG.shape[0] / 5.0) : int(4.0 * IMG.shape[0] / 5.0), int(IMG.shape[1] / 5.0) : int(4.0 * IMG.shape[1] / 5.0), ] = True dat = IMG - results["background"] # photutils wrapper for IRAF star finder count = 0 sources = 0 psf_iter = deepcopy(psf_guess) try: while count < 5 and sources < 20: iraffind = IRAFStarFinder( fwhm=psf_iter, threshold=6.0 * results["background noise"], brightest=50 ) irafsources = iraffind.find_stars(dat, edge_mask) psf_iter = np.median(irafsources["fwhm"]) if np.median(irafsources["sharpness"]) >= 0.95: break count += 1 sources = len(irafsources["fwhm"]) except: return IMG, {"psf fwhm": fwhm_guess} if len(irafsources) < 5: return IMG, {"psf fwhm": fwhm_guess} psf = np.median(irafsources["fwhm"]) if "ap_doplot" in options and options["ap_doplot"]: Plot_PSF_Stars( IMG, irafsources["xcentroid"], irafsources["ycentroid"], irafsources["fwhm"], psf, results, options, ) return IMG, {"psf fwhm": psf, "auxfile psf": "psf fwhm: %.3f pix" % psf}
[docs] def PSF_Assumed(IMG, results, options): """ Most astronomical data is assumed to be nyquist sampled, thus we assume a PSF scale of 4.0 pix to speed things up. Note that AutoProf just uses the PSF FWHM to initialize some length scales on the image, accuracy is not very important for the PSF. """ if "ap_set_psf" in options: logging.info("%s: PSF set by user: %.4e pix" % (options["ap_name"], options["ap_set_psf"])) fwhm_guess = options["ap_set_psf"] elif "ap_guess_psf" in options: logging.info( "%s: PSF initialized by user: %.4e pix" % (options["ap_name"], options["ap_guess_psf"]) ) fwhm_guess = options["ap_guess_psf"] else: logging.info("%s: PSF assumed to be: %.4e pix" % (options["ap_name"], 4.0)) fwhm_guess = 4.0 return IMG, {"psf fwhm": fwhm_guess, "auxfile psf": "psf fwhm: %.3f pix" % fwhm_guess}
[docs] def PSF_StarFind(IMG, results, options): """PSF routine which identifies stars and averages the FWHM. The PSF method uses an edge finding convolution filter to identify candidate star pixels, then averages their FWHM. Randomly iterates through the pixels and searches for a local maximum. An FFT is used to identify non-circular star candidate (artifacts or galaxies) which may have been picked up by the edge finder. Circular apertures are placed around the star until half the central flux value is reached, This is recorded as the FWHM for that star. A collection of 50 stars are identified and the most circular (by FFT coefficients) half are kept, a median is taken as the image PSF. Parameters ----------------- ap_guess_psf : float, default None Initialization value for the PSF calculation in pixels. If not given, AutoProf will default with a guess of 1/*ap_pixscale* ap_set_psf : float, default None force AutoProf to use this PSF value (in pixels) instead of calculating its own. Notes ---------- :References: - 'background' - 'background noise' Returns ------- IMG : ndarray Unaltered galaxy image results : dict .. code-block:: python {'psf fwhm': # FWHM of the average PSF for the image } """ if "ap_set_psf" in options: logging.info("%s: PSF set by user: %.4e" % (options["ap_name"], options["ap_set_psf"])) return IMG, {"psf fwhm": options["ap_set_psf"]} elif "ap_guess_psf" in options: logging.info( "%s: PSF initialized by user: %.4e" % (options["ap_name"], options["ap_guess_psf"]) ) fwhm_guess = options["ap_guess_psf"] else: fwhm_guess = max(1.0, 1.0 / options["ap_pixscale"]) if "mask" in results: use_mask = results["mask"] else: use_mask = np.zeros(IMG.shape, dtype=bool) use_mask[ int(IMG.shape[0] / 5.0) : int(4.0 * IMG.shape[0] / 5.0), int(IMG.shape[1] / 5.0) : int(4.0 * IMG.shape[1] / 5.0), ] = True stars = StarFind( IMG - results["background"], fwhm_guess, results["background noise"], use_mask, maxstars=50, ) if len(stars["fwhm"]) <= 10: logging.error( "%s: unable to detect enough stars! PSF results not valid, using 1 arcsec estimate psf of %f" % (options["ap_name"], fwhm_guess) ) return IMG, {"psf fwhm": fwhm_guess} def_clip = 0.1 while np.sum(stars["deformity"] < def_clip) < max(10, len(stars["fwhm"]) / 2): def_clip += 0.1 psf = np.median(stars["fwhm"][stars["deformity"] < def_clip]) if "ap_doplot" in options and options["ap_doplot"]: Plot_PSF_Stars( IMG, stars["x"], stars["y"], stars["fwhm"], psf, results, options, flagstars=stars["deformity"] >= def_clip, ) logging.info( "%s: found psf: %f with deformity clip of: %f" % (options["ap_name"], psf, def_clip) ) return IMG, {"psf fwhm": psf, "auxfile psf": "psf fwhm: %.3f pix" % psf}
[docs] def PSF_Image(IMG, results, options): """PSF routine which identifies stars and averages the FWHM. Constructs an averaged PSF image. Extracts a window of pixels around each identified star (+-10 PSF) and normalizes the flux total to 1. All extraced normalized stars are median stacked. The final PSF is saved as "<name>_psf.fits" and added to the results dictionary. Also calculates the PSF FWHM and adds it to the results dictionary. This method is currently very slow. Parameters ----------------- ap_guess_psf : float, default None Initialization value for the PSF calculation in pixels. If not given, AutoProf will default with a guess of 1/*ap_pixscale* ap_set_psf : float, default None force AutoProf to use this PSF value (in pixels) instead of calculating its own. Notes ---------- :References: - 'background' - 'background noise' Returns ------- IMG : ndarray Unaltered galaxy image results : dict .. code-block:: python {'psf fwhm': # FWHM of the average PSF for the image 'auxfile psf': # aux file message giving the PSF 'psf img': # image of the PSF as numpy array } """ if "ap_set_psf" in options: logging.info("%s: PSF set by user: %.4e" % (options["ap_name"], options["ap_set_psf"])) return IMG, {"psf fwhm": options["ap_set_psf"]} elif "ap_guess_psf" in options: logging.info( "%s: PSF initialized by user: %.4e" % (options["ap_name"], options["ap_guess_psf"]) ) fwhm_guess = options["ap_guess_psf"] else: fwhm_guess = max(1.0, 1.0 / options["ap_pixscale"]) edge_mask = np.zeros(IMG.shape, dtype=bool) edge_mask[ int(IMG.shape[0] / 4.0) : int(3.0 * IMG.shape[0] / 4.0), int(IMG.shape[1] / 4.0) : int(3.0 * IMG.shape[1] / 4.0), ] = True dat = IMG - results["background"] stars = StarFind( dat, fwhm_guess, results["background noise"], edge_mask, detect_threshold=5.0, maxstars=100, ) if len(stars["fwhm"]) <= 10: logging.error( "%s: unable to detect enough stars! PSF results not valid, using 1 arcsec estimate psf of %f" % (options["ap_name"], fwhm_guess) ) def_clip = 0.1 while np.sum(stars["deformity"] < def_clip) < max(10, len(stars["fwhm"]) * 2 / 3): def_clip += 0.1 psf = np.median(stars["fwhm"][stars["deformity"] < def_clip]) psf_iqr = np.quantile(stars["fwhm"][stars["deformity"] < def_clip], [0.1, 0.9]) psf_size = int(psf * 10) if psf_size % 2 == 0: # make PSF odd for easier calculations psf_size += 1 psf_img = None XX, YY = np.meshgrid( np.array(range(psf_size)) - psf_size // 2, np.array(range(psf_size)) - psf_size // 2, ) XX, YY = np.ravel(XX), np.ravel(YY) for i in range(len(stars["x"])): # ignore objects that likely aren't stars if ( stars["deformity"][i] > def_clip or stars["fwhm"][i] < psf_iqr[0] or stars["fwhm"][i] > psf_iqr[1] ): continue # ignore objects that are too close to the edge if ( stars["x"][i] < psf_size // 2 or (dat.shape[1] - stars["x"][i]) < psf_size // 2 or stars["y"][i] < psf_size // 2 or (dat.shape[1] - stars["y"][i]) < psf_size // 2 ): continue flux = interpolate_Lanczos(dat, XX + stars["x"][i], YY + stars["y"][i], 10).reshape( (1, psf_size, psf_size) ) flux /= np.sum(flux) psf_img = flux if psf_img is None else np.concatenate((psf_img, flux)) # stack the PSF psf_img = np.median(psf_img, axis=0) # normalize the PSF psf_img /= np.sum(psf_img) hdul = fits.HDUList([fits.PrimaryHDU(psf_img)]) hdul.writeto( os.path.join( options["ap_saveto"] if "ap_saveto" in options else "", "%s_psf.fits" % options["ap_name"], ), overwrite=True, ) if "ap_doplot" in options and options["ap_doplot"]: plt.imshow( psf_img, origin="lower", cmap="Greys", norm=ImageNormalize(stretch=HistEqStretch(psf_img)), ) my_cmap = cm.Greys_r my_cmap.set_under("k", alpha=0) fluxpeak = psf_img[psf_size // 2 + 1, psf_size // 2 + 1] / 2 plt.imshow( np.clip(psf_img, a_min=fluxpeak / 10, a_max=None), origin="lower", cmap=my_cmap, norm=ImageNormalize(stretch=LogStretch(), clip=False), clim=[fluxpeak / 9, None], vmin=fluxpeak / 9, ) plt.axis("off") plt.tight_layout() if not ("ap_nologo" in options and options["ap_nologo"]): AddLogo(plt.gcf()) plt.savefig( f"{options.get('ap_plotpath','')}PSF_{options['ap_name']}.{options.get('ap_plot_extension', 'jpg')}", dpi=options["ap_plotdpi"] if "ap_plotdpi" in options else 300, ) plt.close() return IMG, { "psf fwhm": psf, "auxfile psf": "psf fwhm: %.3f pix" % psf, "psf img": psf_img, }
[docs] def PSF_deconvolve(IMG, results, options): """routine which deconvolves the PSF from the primary image. Performs Richardson-Lucy deconvolution on the primary galaxy image using the sci-kit image implementation (the user must have skimage in their python installation). This deconvolution procedure is more stable than standard FFT deconvolution. This method is currently very slow. If the user provides an image via 'ap_psf_file' then that will be taken as the psf and deconvolved from the image. If there is no file given, but 'psf img' exists in the results dictionary (ie from the 'psf img' pipeline step) then that will be used. If no other option is available, the 'psf fwhm' will be taken from the results dictionary and a PSF image will be constructed using a Gaussian of the given PSF out to 20 times the PSF size. Parameters ----------------- ap_psf_file : string, default None Optional argument. Path to PSF fits file. For best results the image should have an odd number of pixels with the PSF centered in the image. ap_psf_deconvolution_iterations : int, default 50 number of itterations of the Richardson-Lucy deconvolution algorithm to perform. Notes ---------- :References: - 'psf img' (optional) - 'psf fwhm' (optional) Returns ------- IMG : ndarray deconvolved galaxy image results : dict .. code-block:: python {} """ from skimage import restoration if "ap_psf_file" in options: psf_img = Read_Image(options["ap_psf_file"], options) elif "psf img" in results: psf_img = results["psf img"] else: psf_size = int(results["psf fwhm"] * 20) if psf_size % 2 == 0: # make PSF odd for easier calculations psf_size += 1 XX, YY = np.meshgrid( np.array(range(psf_size)) - psf_size // 2, np.array(range(psf_size)) - psf_size // 2, ) psf_std = results["psf fwhm"] / np.sqrt(8 * np.log(2)) psf_img = np.exp(-(XX**2 + YY**2) / (2 * psf_std**2)) / np.sqrt(2 * np.pi * psf_std**2) psf_img /= np.sum(psf_img) if np.abs(np.sum(psf_img) - 1) > 1e-7: logging.warn("PSF image not normalized! sum(PSF) = %.3e" % np.sum(psf_img)) dmax = np.max(IMG) dmin = np.min(IMG) dat_deconv = restoration.richardson_lucy( (IMG - dmin) / (dmax - dmin) - 0.5, psf_img, iterations=( options["ap_psf_deconvolution_iterations"] if "ap_psf_deconvolution_iterations" in options else 50 ), ) dat_deconv = (dat_deconv + 0.5) * (dmax - dmin) + dmin if "ap_psf_deconvolve_save" in options and options["ap_psf_deconvolve_save"]: header = fits.Header() hdul = fits.HDUList([fits.PrimaryHDU(header=header), fits.ImageHDU(dat_deconv)]) hdul.writeto( os.path.join( options["ap_saveto"] if "ap_saveto" in options else "", "%s_deconvolved.fits" % options["ap_name"], ), overwrite=True, ) return dat_deconv, {}