import sys
import os
from .pipeline_steps import *
# from .pipeline_steps.Plotting_Steps import Plot_Galaxy_Image
# from .pipeline_steps.Background import Background_Mode, Background_DilatedSources, Background_Unsharp, Background_Basic
# from .pipeline_steps.PSF import PSF_IRAF, PSF_StarFind, PSF_Image, PSF_deconvolve
# from .pipeline_steps.Center import Center_2DGaussian, Center_1DGaussian, Center_OfMass, Center_HillClimb, Center_Forced, Center_HillClimb_mean
# from .pipeline_steps.Isophote_Initialize import Isophote_Initialize, Isophote_Initialize_mean, Isophote_Init_Forced
# from .pipeline_steps.Isophote_Fit import Isophote_Fit_FFT_Robust, Isophote_Fit_Forced, Photutils_Fit, Isophote_Fit_FFT_mean, Isophote_Fit_FixedPhase
# from .pipeline_steps.Mask import Star_Mask_IRAF, Mask_Segmentation_Map, Bad_Pixel_Mask, Star_Mask
# from .pipeline_steps.Isophote_Extract import Isophote_Extract, Isophote_Extract_Forced, Isophote_Extract_Photutils
# from .pipeline_steps.Check_Fit import Check_Fit
# from .pipeline_steps.Write_Prof import WriteProf
# from .pipeline_steps.Write_Fi import WriteFi
# from .pipeline_steps.Ellipse_Model import EllipseModel
# from .pipeline_steps.Radial_Profiles import Radial_Profiles
# from .pipeline_steps.Axial_Profiles import Axial_Profiles
# from .pipeline_steps.Slice_Profiles import Slice_Profile
from .autoprofutils.ImageTransform import Crop
from .autoprofutils.SharedFunctions import GetOptions, Read_Image, PA_shift_convention
from multiprocessing import Pool, current_process
from astropy.io import fits
from scipy.stats import iqr
from itertools import starmap
from functools import partial
import importlib
import numpy as np
from time import time, sleep
import logging
import warnings
import traceback
from astropy.io.fits.verify import VerifyWarning
warnings.simplefilter("ignore", category=VerifyWarning)
[docs]
class Isophote_Pipeline(object):
def __init__(self, loggername=None):
"""
Initialize pipeline object, user can replace functions with their own if they want, otherwise defaults are used.
loggername: String to use for logging messages
"""
# Functions avaiable by default for building the pipeline
self.pipeline_methods = {
"background": Background_Mode,
"background dilatedsources": Background_DilatedSources,
"background unsharp": Background_Unsharp,
"background basic": Background_Basic,
"psf": PSF_Assumed,
"psf starfind": PSF_StarFind,
"psf IRAF": PSF_IRAF,
"psf img": PSF_Image,
"psf deconvolve": PSF_deconvolve,
"center": Center_HillClimb,
"center mean": Center_HillClimb_mean,
"center forced": Center_Forced,
"center 2DGaussian": Center_2DGaussian,
"center 1DGaussian": Center_1DGaussian,
"center OfMass": Center_OfMass,
"crop": Crop,
"isophoteinit": Isophote_Initialize,
"isophoteinit forced": Isophote_Init_Forced,
"isophoteinit mean": Isophote_Initialize_mean,
"plot image": Plot_Galaxy_Image,
"writefi": WriteFi,
"isophotefit": Isophote_Fit_FFT_Robust,
"isophotefit fixed": Isophote_Fit_FixedPhase,
"isophotefit mean": Isophote_Fit_FFT_mean,
"isophotefit forced": Isophote_Fit_Forced,
"isophotefit photutils": Photutils_Fit,
"mask badpixels": Bad_Pixel_Mask,
"starmask": Star_Mask,
"starmask IRAF": Star_Mask_IRAF,
"mask segmentation map": Mask_Segmentation_Map,
"isophoteextract": Isophote_Extract,
"isophoteextract photutils": Isophote_Extract_Photutils,
"isophoteextract forced": Isophote_Extract_Forced,
"checkfit": Check_Fit,
"writeprof": WriteProf,
"ellipsemodel": EllipseModel,
"radialprofiles": Radial_Profiles,
"sliceprofile": Slice_Profile,
"axialprofiles": Axial_Profiles,
}
# Default pipeline analysis order
self.pipeline_steps = {
"head": [
"background",
"psf",
"center",
"isophoteinit",
"isophotefit",
"isophoteextract",
"checkfit",
"writeprof",
]
}
# Start the logger
logging.basicConfig(
level=logging.INFO,
filename="AutoProf.log" if loggername is None else loggername,
filemode="w",
)
[docs]
def UpdatePipeline(self, new_pipeline_methods=None, new_pipeline_steps=None):
"""
modify steps in the AutoProf pipeline.
new_pipeline_methods: update the dictionary of methods used by the pipeline. This can either add
new methods or replace existing ones.
new_pipeline_steps: update the list of pipeline step strings. These strings refer to keys in
pipeline_methods. It is posible to add/remove/rearrange steps here. Alternatively
one can supply a dictionary with current pipeline steps as keys and new pipeline
steps as values, the corresponding steps will be replaced.
"""
if new_pipeline_methods:
logging.info(
"PIPELINE updating these pipeline methods: %s" % str(new_pipeline_methods.keys())
)
self.pipeline_methods.update(new_pipeline_methods)
if new_pipeline_steps:
logging.info("PIPELINE new steps: %s" % (str(new_pipeline_steps)))
if type(new_pipeline_steps) == list:
self.pipeline_steps["head"] = new_pipeline_steps
elif type(new_pipeline_steps) == dict:
assert "head" in new_pipeline_steps.keys()
self.pipeline_steps = new_pipeline_steps
[docs]
def Process_Image(self, options={}):
"""
Function which runs the pipeline for a single image. Each sub-function of the pipeline is run
in order and the outputs are passed along. If multiple images are given, the pipeline is
excecuted on the first image and the isophotes are applied to the others.
returns list of times for each pipeline step if successful. else returns 1
"""
# Remove any options with None value so they don't interfere with analysis logic
for key in list(options.keys()):
if options[key] is None:
del options[key]
# Seed the random number generator in numpy so each thread gets unique random numbers
try:
sleep(0.01)
np.random.seed(
int(np.random.randint(10000) * current_process().pid * (time() % 1) % 2**15)
)
except:
pass
# use filename if no name is given
if not ("ap_name" in options and type(options["ap_name"]) == str):
base = os.path.split(options["ap_image_file"])[1]
options["ap_name"] = os.path.splitext(base)[0]
# Read the primary image
try:
dat = Read_Image(options["ap_image_file"], options)
except:
logging.error(
"%s: could not read image %s" % (options["ap_name"], options["ap_image_file"])
)
return 1
# Check that image data exists and is not corrupted
if dat is None or np.all(
dat[
int(len(dat) / 2.0) - 10 : int(len(dat) / 2.0) + 10,
int(len(dat[0]) / 2.0) - 10 : int(len(dat[0]) / 2.0) + 10,
]
== 0
):
logging.error(
"%s Large chunk of data missing, impossible to process image" % options["ap_name"]
)
return 1
# Track time to run analysis
start = time()
# Run the Pipeline
timers = {}
results = {}
key = "head"
step = 0
while step < len(self.pipeline_steps[key]):
try:
logging.info(
"%s: %s %s at: %.1f sec"
% (options["ap_name"], key, self.pipeline_steps[key][step], time() - start)
)
print(
"%s: %s %s at: %.1f sec"
% (options["ap_name"], key, self.pipeline_steps[key][step], time() - start)
)
if "branch" in self.pipeline_steps[key][step]:
decision, newoptions = self.pipeline_methods[self.pipeline_steps[key][step]](
dat, results, options
)
options.update(newoptions)
if type(decision) == str:
key = decision
step = 0
else:
step += 1
else:
step_start = time()
dat, res = self.pipeline_methods[self.pipeline_steps[key][step]](
dat, results, options
)
results.update(res)
timers[self.pipeline_steps[key][step]] = time() - step_start
step += 1
except Exception as e:
logging.error(
"%s: on step %s got error: %s"
% (options["ap_name"], self.pipeline_steps[key][step], str(e))
)
logging.error(
"%s: with full trace: %s" % (options["ap_name"], traceback.format_exc())
)
return 1
print("%s: Processing Complete! (at %.1f sec)" % (options["ap_name"], time() - start))
logging.info(
"%s: Processing Complete! (at %.1f sec)" % (options["ap_name"], time() - start)
)
return timers
[docs]
def Process_List(self, options):
"""
Wrapper function to run "Process_Image" in parallel for many images.
"""
assert type(options["ap_image_file"]) == list
# Format the inputs so that they can be zipped with the images files
# and passed to the Process_Image function.
use_options = []
for i in range(len(options["ap_image_file"])):
tmp_options = {}
for k in options.keys():
if type(options[k]) == list and not k in ["ap_new_pipeline_steps"]:
tmp_options[k] = options[k][i]
else:
tmp_options[k] = options[k]
use_options.append(tmp_options)
# Track how long it takes to run the analysis
start = time()
# Create a multiprocessing pool to parallelize image processing
n_procs = options["ap_n_procs"] if "ap_n_procs" in options else 1
if n_procs > 1:
with Pool(int(n_procs)) as pool:
res = pool.map(
self.Process_Image,
use_options,
chunksize=5 if len(options["ap_image_file"]) > 100 else 1,
)
else:
res = list(map(self.Process_Image, use_options))
# Report completed processing, and track time used
logging.info("All Images Finished Processing at %.1f" % (time() - start))
timers = dict()
counts = dict()
for r in res:
if r == 1:
continue
for s in r.keys():
if s in timers:
timers[s] += r[s]
counts[s] += 1.0
else:
timers[s] = r[s]
counts[s] = 1.0
if len(timers) == 0:
logging.error("All images failed to process!")
return
for s in timers:
timers[s] /= counts[s]
logging.info("%s took %.3f seconds on average" % (s, timers[s]))
# Return the success/fail indicators for every Process_Image excecution
return res
[docs]
def Process_ConfigFile(self, config_file):
"""
Reads in a configuration file and sets parameters for the pipeline. The configuration
file should have variables corresponding to the desired parameters to be set.
congig_file: string path to configuration file
returns: timing of each pipeline step if successful. Else returns 1
"""
# Import the config file regardless of where it is from
if "/" in config_file:
startat = config_file.rfind("/") + 1
else:
startat = 0
if "." in config_file:
use_config = config_file[startat : config_file.rfind(".")]
else:
use_config = config_file[startat:]
if "/" in config_file:
sys.path.append(config_file[: config_file.rfind("/")])
try:
c = importlib.import_module(use_config)
except:
sys.path.append(os.getcwd())
c = importlib.import_module(use_config)
if "forced" in c.ap_process_mode:
self.UpdatePipeline(
new_pipeline_steps=[
"background",
"psf",
"center forced",
"isophoteinit forced",
"isophoteextract forced",
"writeprof",
]
)
try:
self.UpdatePipeline(new_pipeline_methods=c.ap_new_pipeline_methods)
except:
pass
try:
self.UpdatePipeline(new_pipeline_steps=c.ap_new_pipeline_steps)
except:
pass
use_options = GetOptions(c)
if c.ap_process_mode in ["image", "forced image"]:
return self.Process_Image(use_options)
elif c.ap_process_mode in ["image list", "forced image list"]:
return self.Process_List(use_options)
else:
logging.error(
"Unrecognized process_mode! Should be in: [image, image list, forced image, forced image list]"
)
return 1