from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.core.bokeh_plotting import bokeh_plot
from kcwidrp.primitives.kcwi_file_primitives import plotlabel
import numpy as np
import logging
from bokeh.util.logconfig import basicConfig
from bokeh.plotting import figure
from scipy.signal import find_peaks
import time
[docs]class FindBars(BasePrimitive):
"""
Find bars in middle row of cont bars image
Uses the middle row with surrounding rows to find the individual continuum
bar locations in the image. Checks to be sure the correct number of bars
has been found (see NBARS in kcwidrp/configs/kcwi.cfg).
"""
def __init__(self, action, context):
BasePrimitive.__init__(self, action, context)
self.logger = context.pipeline_logger
basicConfig(level=logging.ERROR)
def _pre_condition(self):
self.logger.info("Checking for master contbars")
if 'MCBARS' in self.action.args.ccddata.header['IMTYPE']:
return True
else:
return False
def _perform(self):
self.logger.info("Finding continuum bars")
# initialize
reference_bar = self.config.instrument.REFBAR
middle_centers = []
# get image dimensions
x_size = self.action.args.ccddata.data.shape[1]
y_size = self.action.args.ccddata.data.shape[0]
# get binning
y_binning = self.action.args.ybinsize
# get camera
camera = self.action.args.camera
# get grating
grating = self.action.args.grating
window = int(10 / y_binning)
# get plot label
plab = plotlabel(self.action.args)
# select from center rows of image
div_fac = 2
middle_y_row = int(y_size / div_fac)
# handle dichroic edge by moving middle until we get enough peaks
n_tries = 0
peaks_found = 0
middle_vector = None
bar_thresh = None
average_value_middle_vector = None
stdev_value_middle_vector = None
peaks_vector = None
while peaks_found != self.config.instrument.NBARS and n_tries < 5:
self.logger.info("Middle row for bars finding: %d" % middle_y_row)
middle_vector = np.median(
self.action.args.ccddata.data[
(middle_y_row-window):(middle_y_row+window+1), :], axis=0)
# set threshold for peak finding
average_value_middle_vector = np.average(middle_vector)
stdev_value_middle_vector = np.nanstd(middle_vector)
bar_thresh = average_value_middle_vector + \
0.5 * stdev_value_middle_vector
self.logger.info("peak threshold = %f" % bar_thresh)
# find peaks above threshold
peaks_in_middle_vector, _ = find_peaks(
middle_vector, height=bar_thresh)
peaks_vector = []
for pk in peaks_in_middle_vector:
if 5 < pk < (self.action.args.ccddata.header['NAXIS1'] - 5):
peaks_vector.append(pk)
peaks_found = len(peaks_vector)
if self.config.instrument.plot_level >= 3:
# plot the peak positions
x = np.arange(len(middle_vector))
p = figure(
title=plab + "BARS TRACE at = %d, TRY %d" % (middle_y_row,
n_tries),
x_axis_label='CCD X (px)', y_axis_label='e-',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(x, middle_vector, color='blue', legend_label="Trace")
p.scatter(peaks_vector,
middle_vector[peaks_vector], marker='x',
color='red', legend_label="FoundBar")
p.line([0, x_size], [bar_thresh, bar_thresh],
color='grey', line_dash='dashed')
p.legend.location = "bottom_center"
bokeh_plot(p, self.context.bokeh_session)
input("Next? <cr>: ")
n_tries += 1
# do we have the requisite number?
if peaks_found != self.config.instrument.NBARS:
self.logger.warning("Did not find %d peaks: n peaks = %d" %
(self.config.instrument.NBARS, peaks_found))
self.logger.info("Calculating new middle row and retrying")
if camera == 0: # BLUE
div_fac += 0.5
else: # RED
if 'RH4' in grating:
div_fac += 0.5
else:
div_fac -= 0.5
middle_y_row = int(y_size / div_fac)
if peaks_found != self.config.instrument.NBARS:
self.logger.error("Did not find %d peaks: n peaks = %d" %
(self.config.instrument.NBARS, peaks_found))
else:
self.logger.info("found %d bars" % peaks_found)
if self.config.instrument.plot_level >= 1:
# plot the peak positions
x = np.arange(len(middle_vector))
p = figure(
title=plab + "BARS MID TRACE Thresh = %.2f" % bar_thresh,
x_axis_label='CCD X (px)', y_axis_label='e-',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(x, middle_vector, color='blue', legend_label="MidTrace")
p.scatter(peaks_vector,
middle_vector[peaks_vector], marker='x',
color='red', legend_label="FoundBar")
p.line([0, x_size], [bar_thresh, bar_thresh],
color='grey', line_dash='dashed')
p.legend.location = "bottom_center"
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
# calculate the bar centroids
# Do we plot?
if self.config.instrument.plot_level >= 3:
do_inter = True
else:
do_inter = False
for ip, peak in enumerate(peaks_vector):
xs = list(range(peak-window, peak+window+1))
ys = middle_vector[xs] - np.nanmin(middle_vector[xs])
xc = np.sum(xs*ys) / np.sum(ys)
middle_centers.append(xc)
if do_inter:
p = figure(
title=plab + "BAR %d CENTRD = %.2f" % (ip, xc),
x_axis_label='CCD X (px)', y_axis_label='e-',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(xs, ys, color='blue', legend_label='Bar Trace')
p.circle(xs, ys, color='red', legend_label='Bar Trace')
p.line([xc, xc], [bar_thresh, middle_vector[peak]],
color='green', legend_label='Cntrd')
bokeh_plot(p, self.context.bokeh_session)
if do_inter:
q = input("Next? <cr>, q - quit: ")
if 'Q' in q.upper():
do_inter = False
else:
time.sleep(self.config.instrument.plot_pause)
self.logger.info("Found middle centroids for continuum bars")
# store threshold
self.action.args.bar_thresh = bar_thresh
self.action.args.bar_avg = average_value_middle_vector
self.action.args.bar_std = stdev_value_middle_vector
# store peaks
self.action.args.middle_centers = middle_centers
# store the row where we got them
self.action.args.middle_row = middle_y_row
self.action.args.window = window
# calculate reference delta x based on refbar
self.action.args.reference_delta_x = 0.
try:
if (reference_bar-1) > 0 and \
(reference_bar+3) < self.config.instrument.NBARS:
for ib in range(reference_bar-1, reference_bar+3):
self.action.args.reference_delta_x += \
(middle_centers[ib] - middle_centers[ib-1])
ndiv = 4.
else:
for ib in range(reference_bar, reference_bar+1):
self.action.args.reference_delta_x += \
(middle_centers[ib] - middle_centers[ib-1])
ndiv = 2.
except IndexError:
self.logger.warning(
"Not enough bars per slice to determine ref x sep")
self.action.args.reference_delta_x = 1.
ndiv = 1.
self.action.args.reference_delta_x /= ndiv
# store image info
self.action.args.contbar_image_number = \
self.action.args.ccddata.header['FRAMENO']
self.action.args.contbar_image = \
self.action.args.ccddata.header['OFNAME']
log_string = FindBars.__module__
self.action.args.ccddata.header['HISTORY'] = log_string
self.logger.info(log_string)
return self.action.args
# END: class FindBars()