Source code for kcwidrp.primitives.ExtractArcs

from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.primitives.kcwi_file_primitives import read_table
from kcwidrp.core.bokeh_plotting import bokeh_plot
from kcwidrp.primitives.kcwi_file_primitives import strip_fname, get_master_name

import numpy as np
import os
from skimage import transform as tf
from bokeh.plotting import figure


[docs]class ExtractArcs(BasePrimitive): """Use derived traces to extract arc spectra along bars""" def __init__(self, action, context): BasePrimitive.__init__(self, action, context) self.logger = context.pipeline_logger self.action.args.reference_bar_separation = None self.action.args.contbar_image_number = None self.action.args.contbar_image = None self.action.args.arc_number = None self.action.args.arc_image = None self.action.args.source_control_points = None self.action.args.destination_control_points = None self.action.args.bar_id = None self.action.args.slice_id = None def _pre_condition(self): contbars_in_proctable = self.context.proctab.search_proctab( frame=self.action.args.ccddata, target_type='CONTBARS', nearest=True) self.logger.info("%d continuum bars frames found" % len(contbars_in_proctable)) if len(contbars_in_proctable) > 0: self.action.args.original_filename = strip_fname( contbars_in_proctable['filename'][0])+ "_trace.fits" return True else: self.action.args.original_filename = None return False def _perform(self): do_plot = (self.config.instrument.plot_level >= 3) self.logger.info("Extracting arc spectra") # Double check if not self.action.args.original_filename: self.logger.error("No traces found") return self.action.args # All is ready original_filename = self.action.args.original_filename self.logger.info("Trace table found: %s" % original_filename) # trace = read_table(tab=tab, indir='redux', suffix='trace') # Find and read control points from continuum bars if hasattr(self.context, 'trace'): trace = self.context.trace else: trace = read_table( input_dir=os.path.join(self.config.instrument.cwd, self.config.instrument.output_directory), file_name=original_filename) self.context.trace = {} for key in trace.meta.keys(): self.context.trace[key] = trace.meta[key] middle_row = self.context.trace['MIDROW'] window = self.context.trace['WINDOW'] self.action.args.reference_bar_separation = self.context.trace[ 'REFDELX'] self.action.args.contbar_image_number = self.context.trace['CBARSNO'] self.action.args.contbar_image = self.context.trace['CBARSFL'] self.action.args.arc_number = self.action.args.ccddata.header['FRAMENO'] self.action.args.arc_image = self.action.args.ccddata.header['OFNAME'] self.action.args.source_control_points = trace['src'] self.action.args.destination_control_points = trace['dst'] self.action.args.bar_id = trace['barid'] self.action.args.slice_id = trace['slid'] self.logger.info("Fitting spatial control points") transformation = tf.estimate_transform( 'polynomial', self.action.args.source_control_points, self.action.args.destination_control_points, order=3) self.logger.info("Transforming arc image") warped_image = tf.warp(self.action.args.ccddata.data, transformation) # Write warped arcs if requested if self.config.instrument.saveintims: from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer # write out warped image self.action.args.ccddata.data = warped_image kcwi_fits_writer(self.action.args.ccddata, table=self.action.args.table, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="warped") self.logger.info("Transformed arcs produced") # extract arcs self.logger.info("Extracting arcs") arcs = [] # sectors for bkgnd subtraction sectors = 16 for xyi, xy in enumerate(self.action.args.source_control_points): if xy[1] == middle_row: xi = int(xy[0]+0.5) arc = np.median( warped_image[:, (xi - window):(xi + window + 1)], axis=1) # divide spectrum into sectors div = int((len(arc)-100) / sectors) # get minimum for each sector xv = [] yv = [] for i in range(sectors): mi = np.nanargmin(arc[50+i*div:50+(i+1)*div]) mn = np.nanmin(arc[50+i*div:50+(i+1)*div]) xv.append(mi+50+i*div) yv.append(mn) # fit minima to model background res = np.polyfit(xv, yv, 3) xp = np.arange(len(arc)) bkg = np.polyval(res, xp) # resulting model # plot if requested if do_plot: p = figure(title=self.action.args.plotlabel + "ARC # %d" % len(arcs), x_axis_label="Y CCD Pixel", y_axis_label="Flux", plot_width=self.config.instrument.plot_width, plot_height=self.config.instrument.plot_height) p.line(xp, arc, legend_label='Arc', color='blue') p.line(xp, bkg, legend_label='Bkg', color='red') bokeh_plot(p, self.context.bokeh_session) q = input("Next? <cr>, q to quit: ") if 'Q' in q.upper(): do_plot = False # subtract model background arc -= bkg # add to arcs list arcs.append(arc) # Did we get the correct number of arcs? if len(arcs) == self.config.instrument.NBARS: self.logger.info("Extracted %d arcs" % len(arcs)) self.context.arcs = arcs else: self.logger.error("Did not extract %d arcs, extracted %d" % (self.config.instrument.NBARS, len(arcs))) log_string = ExtractArcs.__module__ self.action.args.ccddata.header['HISTORY'] = log_string self.logger.info(log_string) return self.action.args
# END: class ExtractArcs()