Source code for kcwidrp.primitives.ExtractArcs
from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.primitives.kcwi_file_primitives import read_table, kcwi_fits_reader
from kcwidrp.core.bokeh_plotting import bokeh_plot
from kcwidrp.primitives.kcwi_file_primitives import strip_fname, plotlabel
import numpy as np
from numpy.polynomial import polynomial as poly
from scipy.signal import savgol_filter
import os
from skimage import transform as tf
from bokeh.plotting import figure
[docs]class ExtractArcs(BasePrimitive):
"""
Use derived traces to extract arc spectra along continuum bars.
Reads in traces from continuum bars and then uses them to extract spectra
along each trace. Also performs a background subtraction for each extracted
spectrum. Adds a HISTORY record to the FITS header.
Uses the following configuration parameter:
* saveintims: if set to ``True`` write out a warped version of arc image in \*_warped.fits. Defaults to ``False``.
Adds extracted arcs to returned arguments.
"""
def __init__(self, action, context):
BasePrimitive.__init__(self, action, context)
self.logger = context.pipeline_logger
self.action.args.reference_bar_separation = None
self.action.args.contbar_image_number = None
self.action.args.contbar_image = None
self.action.args.arc_number = None
self.action.args.arc_image = None
self.action.args.source_control_points = None
self.action.args.destination_control_points = None
self.action.args.bar_id = None
self.action.args.slice_id = None
def _pre_condition(self):
marcs_in_proctable = self.context.proctab.search_proctab(
frame=self.action.args.ccddata, target_type='MARC',
nearest=True
)
self.logger.info("%d master arc frames found" %
len(marcs_in_proctable))
contbars_in_proctable = self.context.proctab.search_proctab(
frame=self.action.args.ccddata, target_type='MCBARS',
nearest=True)
self.logger.info("%d master continuum bars frames found" %
len(contbars_in_proctable))
if len(contbars_in_proctable) > 0 and len(marcs_in_proctable) > 0:
self.action.args.original_filename = strip_fname(
contbars_in_proctable['filename'][0]) + "_trace.fits"
return True
else:
self.action.args.original_filename = None
return False
def _perform(self):
do_plot = (self.config.instrument.plot_level >= 3)
plab = plotlabel(self.action.args)
self.logger.info("Extracting arc spectra")
# Double check
if not self.action.args.original_filename:
self.logger.error("No traces found")
return self.action.args
# All is ready
original_filename = self.action.args.original_filename
self.logger.info("Trace table found: %s" % original_filename)
# trace = read_table(tab=tab, indir='redux', suffix='trace')
# Find and read control points from continuum bars
trace = read_table(
input_dir=os.path.join(self.config.instrument.cwd,
self.config.instrument.output_directory),
file_name=original_filename)
self.context.trace = {}
for key in trace.meta.keys():
self.context.trace[key] = trace.meta[key]
middle_row = self.context.trace['MIDROW']
window = self.context.trace['WINDOW']
self.action.args.reference_bar_separation = self.context.trace[
'REFDELX']
self.action.args.contbar_image_number = self.context.trace['CBARSNO']
self.action.args.contbar_image = self.context.trace['CBARSFL']
self.action.args.arc_number = self.action.args.ccddata.header['FRAMENO']
self.action.args.arc_image = self.action.args.ccddata.header['OFNAME']
self.action.args.source_control_points = trace['src']
self.action.args.destination_control_points = trace['dst']
self.action.args.bar_id = trace['barid']
self.action.args.slice_id = trace['slid']
self.logger.info("Fitting spatial control points")
# This is strictly for AIT data!
if self.config.instrument.NBARS < 60:
# get trimmed bars image so geometry is consistent
barsfn = self.action.args.contbar_image.split(
'.fits')[0] + '_int.fits'
barim, bartab = kcwi_fits_reader(
os.path.join(self.config.instrument.output_directory, barsfn))
arcs = []
bars = []
for ib in range(self.config.instrument.NBARS):
bind = np.where(trace['barid'] == ib)
xp = trace['dst'][bind][:, 0]
yp = trace['dst'][bind][:, 1]
sind = np.argsort(yp)
yps = yp[sind]
xps = xp[sind]
c, stats = poly.polyfit(yps, xps, deg=3, full=True)
arc = []
bar = []
bkg = []
for iy in range(self.action.args.ccddata.header['NAXIS2']):
xv = int(poly.polyval(iy, c))
yy = self.action.args.ccddata.data[
iy, (xv - window):(xv + window + 1)]
byy = barim.data[iy, (xv - window):(xv + window + 1)]
nyy = len(yy)
if nyy > 0:
bkg.append(np.nanmin(yy) * float(nyy))
arc.append(np.nansum(yy))
bar.append(np.nansum(byy))
else:
bkg.append(0.)
arc.append(0.)
bar.append(0.)
arc = np.asarray(arc, dtype=float)
bar = np.asarray(bar, dtype=float)
bkg = np.asarray(bkg, dtype=float)
bkgf = savgol_filter(bkg, 51, 5)
if do_plot:
xp = np.arange(len(arc))
p = figure(title=plab + "ARC # %d" % len(arcs),
x_axis_label="Y CCD Pixel",
y_axis_label="Flux",
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(xp, arc, legend_label='Arc', color='blue')
p.line(xp, bkg, legend_label='Bkg', color='red')
p.line(xp, bkgf, legend_label='BkgFit', color='green')
bokeh_plot(p, self.context.bokeh_session)
q = input("Next? <cr>, q to quit: ")
if 'Q' in q.upper():
do_plot = False
arc -= bkgf
arcs.append(arc)
bars.append(bar)
else:
# fit transform
# NOTE: we do not need an asymmetric polynomial for this
# global fit (only for per slice fitting: see SolveGeom.py)
tform = tf.estimate_transform(
'polynomial', self.action.args.source_control_points,
self.action.args.destination_control_points, order=3)
self.logger.info("Transforming arc image")
warped_image = tf.warp(self.action.args.ccddata.data, tform)
# Write warped arcs if requested
if self.config.instrument.saveintims:
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer
# write out warped image
self.action.args.ccddata.data = warped_image
kcwi_fits_writer(
self.action.args.ccddata, table=self.action.args.table,
output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="warped")
self.logger.info("Transformed arcs produced")
# extract arcs
self.logger.info("Extracting arcs")
arcs = []
bars = []
# sectors for bkgnd subtraction
sectors = 16
for xyi, xy in enumerate(self.action.args.source_control_points):
if xy[1] == middle_row:
xi = int(xy[0]+0.5)
arc = np.nanmedian(
warped_image[:, (xi - window):(xi + window + 1)],
axis=1)
# divide spectrum into sectors
div = int((len(arc)-100) / sectors)
# get minimum for each sector
xv = []
yv = []
for i in range(sectors):
try:
mi = np.nanargmin(arc[50+i*div:50+(i+1)*div])
mn = np.nanmin(arc[50+i*div:50+(i+1)*div])
xv.append(mi+50+i*div)
yv.append(mn)
except ValueError:
self.logger.warning("Bad sector %d" % i)
# fit minima to model background
res = np.polyfit(xv, yv, 3)
xp = np.arange(len(arc))
bkg = np.polyval(res, xp) # resulting model
# plot if requested
if do_plot:
p = figure(
title=plab + "ARC # %d" % len(arcs),
x_axis_label="Y CCD Pixel", y_axis_label="Flux",
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(xp, arc, legend_label='Arc', color='blue')
p.line(xp, bkg, legend_label='Bkg', color='red')
bokeh_plot(p, self.context.bokeh_session)
q = input("Next? <cr>, q to quit: ")
if 'Q' in q.upper():
do_plot = False
# subtract model background
arc -= bkg
# add to arcs list
arcs.append(arc)
# Did we get the correct number of arcs?
if len(arcs) == self.config.instrument.NBARS:
self.logger.info("Extracted %d arcs" % len(arcs))
self.context.arcs = arcs
if self.config.instrument.NBARS < 60:
self.context.bars = bars
else:
self.logger.error("Did not extract %d arcs, extracted %d" %
(self.config.instrument.NBARS, len(arcs)))
log_string = ExtractArcs.__module__
self.action.args.ccddata.header['HISTORY'] = log_string
self.logger.info(log_string)
return self.action.args
# END: class ExtractArcs()