from keckdrpframework.primitives.base_img import BaseImg
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_reader, \
kcwi_fits_writer, strip_fname, plotlabel
from kcwidrp.core.kcwi_plotting import get_plot_lims
from kcwidrp.core.bokeh_plotting import bokeh_plot
from kcwidrp.core.kcwi_plotting import save_plot
from kcwidrp.core.bspline import Bspline
from bokeh.plotting import figure
from bokeh.models import Range1d
import os
import time
import numpy as np
from scipy.signal.windows import boxcar
import scipy as sp
from scipy.signal import find_peaks
def bm_ledge_position(cwave, dich):
if dich:
fit = [0.240742, 4060.56]
else:
fit = [0.240742, 4044.56]
return fit[1] + fit[0] * cwave
[docs]class MakeMasterFlat(BaseImg):
"""
Generate illumination correction from a stacked flat image.
Uses b-spline fits along with geometry maps to generate a master image for
illumination correction. If the flat is internal, accounts for vignetting
along one edge. Also accounts for ledge seen in BM grating.
Depending on the type of input stack, the following files are written out
and entries are made in the proc file:
* SFLAT - a \*_mflat.fits file and an MFLAT entry
* SDOME - a \*_mdome.fits file and an MDOME entry
* STWIF - a \*_mtwif.fits file and an MTWIF entry
"""
def __init__(self, action, context):
BaseImg.__init__(self, action, context)
self.logger = context.pipeline_logger
def _pre_condition(self):
"""
Checks if we can create a master flat based on the processing table
"""
# check for flat stack to use in generating master flat
self.logger.info("Checking precondition for MakeMasterFlat")
tab = self.context.proctab.search_proctab(
frame=self.action.args.ccddata,
target_type=self.action.args.new_type, nearest=True)
if len(tab) > 0:
self.logger.info("already have %d master %s flats, "
" expecting 0" % (len(tab),
self.action.args.new_type))
return False
else:
self.logger.info("No %s master flat found, "
"checking for flat stack." %
self.action.args.new_type)
self.stack_list = self.context.proctab.search_proctab(
frame=self.action.args.ccddata,
target_type=self.action.args.stack_type,
target_group=self.action.args.groupid)
self.logger.info(f"pre condition got {len(self.stack_list)},"
f" expecting {self.action.args.min_files}")
# do we meet the criterion?
if len(self.stack_list) >= 1:
return True
else:
return False
def _perform(self):
"""
Returns an Argument() with the parameters that depends on this operation
"""
self.logger.info("Creating master illumination correction")
olab = plotlabel(self.action.args)
suffix = self.action.args.new_type.lower()
insuff = self.action.args.stack_type.lower()
stack_list = list(self.stack_list['filename'])
if len(stack_list) <= 0:
self.logger.warning("No flat stack found!")
return self.action.args
# get root for maps
tab = self.context.proctab.search_proctab(
frame=self.action.args.ccddata, target_type='MARC',
target_group=self.action.args.groupid)
if len(tab) <= 0:
self.logger.error("Geometry not solved!")
return self.action.args
mroot = strip_fname(tab['filename'][-1])
# Wavelength map image
wmf = mroot + '_wavemap.fits'
self.logger.info("Reading image: %s" % wmf)
wavemap = kcwi_fits_reader(
os.path.join(self.config.instrument.cwd, 'redux', wmf))[0]
# Slice map image
slf = mroot + '_slicemap.fits'
self.logger.info("Reading image: %s" % slf)
slicemap = kcwi_fits_reader(os.path.join(
self.config.instrument.cwd, 'redux', slf))[0]
# Position map image
pof = mroot + '_posmap.fits'
self.logger.info("Reading image: %s" % pof)
posmap = kcwi_fits_reader(os.path.join(
self.config.instrument.cwd, 'redux', pof))[0]
# Read in stacked flat image
stname = strip_fname(stack_list[0]) + '_' + insuff + '.fits'
self.logger.info("Reading image: %s" % stname)
stacked = kcwi_fits_reader(os.path.join(
self.config.instrument.cwd, 'redux', stname))[0]
plab = " ".join(olab.split()[0:3]) + (
" %d " % stacked.header['FRAMENO']
) + " ".join(olab.split()[4:])
# get type of flat
internal = ('SFLAT' in stacked.header['IMTYPE'])
twiflat = ('STWIF' in stacked.header['IMTYPE'])
domeflat = ('SDOME' in stacked.header['IMTYPE'])
if internal:
self.logger.info("Internal Flat")
elif twiflat:
self.logger.info("Twilight Flat")
elif domeflat:
self.logger.info("Dome Flat")
else:
self.logger.error("Flat of Unknown Type!")
return self.action.args
# knots per pixel
knotspp = self.config.instrument.KNOTSPP
# get image size
ny = stacked.header['NAXIS2']
# get binning
xbin = self.action.args.xbinsize
# Parameters for fitting
if self.action.args.camera == 0: # Blue
# vignetted slice position range
fitl = int(4/xbin)
fitr = int(24/xbin)
# flat fitting slice position range
ffleft = int(10 / xbin)
ffright = int(70 / xbin)
corlim = 0
# ymap not needed for Blue
ymap = None
else: # Red
# vignetted slice position range
fitl = int(114 / xbin)
fitr = int(134 / xbin)
# flat fitting slice position range
ffleft = int(70 / xbin)
ffright = int(130 / xbin)
corlim = int(140 / xbin)
# Get ymap for trimming junk at ends
ymap = posmap.copy()
for i in range(ny):
ymap.data[i, :] = float(i)
# un-vignetted slice position range
flatl = int(34 / xbin)
flatr = int(72 / xbin)
nrefx = int(ffright - ffleft)
buffer = 6.0/float(xbin)
# reference slice
refslice = 9
allidx = np.arange(int(140/xbin))
newflat = stacked.data.copy()
# dichroic fraction
# try:
# dichroic_fraction = wavemap.header['DICHFRAC']
# except KeyError:
# dichroic_fraction = 1.
# get reference slice data
q = [i for i, v in enumerate(slicemap.data.flat) if v == refslice]
# get wavelength limits
waves = wavemap.data.compress((wavemap.data > 0.).flat)
waves = [waves.min(), waves.max()]
self.logger.info("Wavelength limits: %.1f - %1.f" % (waves[0],
waves[1]))
# correct vignetting if we are using internal flats
if internal:
self.logger.info("Internal flats require vignetting correction")
# get good region for fitting
if self.action.args.camera == 0: # Blue
wmin = waves[0]
wmax = min([waves[1], 5620.])
elif self.action.args.camera == 1: # Red
wmin = max([waves[0], 5580.])
wmax = waves[1]
else:
self.logger.warning("Camera keyword not defined")
wmin = waves[0]
wmax = waves[1]
# Use central fraction to calculate vignetting
dw = (wmax - wmin) / 30.0 # fraction of full wavelength range
wavemin = (wmin+wmax) / 2.0 - dw
wavemax = (wmin+wmax) / 2.0 + dw
self.logger.info("Using %.1f - %.1f A of slice %d" % (wavemin,
wavemax,
refslice))
xflat = []
yflat = []
wflat = []
qq = []
for i in q:
if wavemin < wavemap.data.flat[i] < wavemax:
xflat.append(posmap.data.flat[i])
yflat.append(stacked.data.flat[i])
wflat.append(wavemap.data.flat[i])
qq.append(i)
# get un-vignetted portion
qflat = [i for i, v in enumerate(xflat) if flatl <= v <= flatr]
xflat = [xflat[i] for i in qflat]
yflat = [yflat[i] for i in qflat]
wflat = [wflat[i] for i in qflat]
# sort on wavelength
sw = np.argsort(wflat)
ywflat = [yflat[i] for i in sw]
wwflat = [wflat[i] for i in sw]
ww0 = np.min(wwflat)
# fit wavelength slope
wavelinfit = np.polyfit(wwflat-ww0, ywflat, 2)
wslfit = np.polyval(wavelinfit, wflat-ww0)
# plot slope fit
if self.config.instrument.plot_level >= 1:
p = figure(title=plab + ' WAVE SLOPE FIT',
x_axis_label='wave px',
y_axis_label='counts',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.circle(wwflat, ywflat, legend_label="Data")
p.line(wflat, wslfit, line_color='red', line_width=3,
legend_label="Fit")
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
# take out slope
yflat = yflat / wslfit
# now sort on slice position
ss = np.argsort(xflat)
xflat = [xflat[i] for i in ss]
yflat = [yflat[i] for i in ss]
# fit un-vignetted slope
resflat = np.polyfit(xflat, yflat, 1)
# select the points we will fit for the vignetting
# get reference region
xfit = [posmap.data.flat[i] for i in qq]
yfit = [stacked.data.flat[i] for i in qq]
wflat = [wavemap.data.flat[i] for i in qq]
# take out wavelength slope
yfit = yfit / np.polyval(wavelinfit, wflat-ww0)
# select the vignetted region
qfit = [i for i, v in enumerate(xfit) if fitl <= v <= fitr]
xfit = [xfit[i] for i in qfit]
yfit = [yfit[i] for i in qfit]
# sort on slice position
s = np.argsort(xfit)
xfit = [xfit[i] for i in s]
yfit = [yfit[i] for i in s]
# fit vignetted slope
resfit = np.polyfit(xfit, yfit, 1)
# corrected data
ycdata = stacked.data.flat[qq] / \
np.polyval(wavelinfit, wavemap.data.flat[qq]-ww0)
ycmin = 0.5 # np.min(ycdata)
ycmax = 1.25 # np.max(ycdata)
# compute the intersection
xinter = -(resflat[1] - resfit[1]) / (resflat[0] - resfit[0])
# plot slice profile and fits
if self.config.instrument.plot_level >= 1:
p = figure(title=plab + ' Vignetting',
x_axis_label='Slice Pos (px)',
y_axis_label='Ratio',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.circle(posmap.data.flat[qq], ycdata, legend_label='Data')
p.line(allidx, resfit[1] + resfit[0]*allidx,
line_color='purple', legend_label='Vign.')
p.line(allidx, resflat[1] + resflat[0]*allidx, line_color='red',
legend_label='UnVign.')
p.line([fitl, fitl], [ycmin, ycmax], line_color='blue')
p.line([fitr, fitr], [ycmin, ycmax], line_color='blue')
p.line([flatl, flatl], [ycmin, ycmax], line_color='green')
p.line([flatr, flatr], [ycmin, ycmax], line_color='green')
p.line([xinter-buffer, xinter-buffer], [ycmin, ycmax],
line_color='black')
p.line([xinter + buffer, xinter + buffer], [ycmin, ycmax],
line_color='black')
p.line([xinter, xinter], [ycmin, ycmax], line_color='red')
p.y_range = Range1d(ycmin, ycmax)
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
# figure out where the correction applies
if self.action.args.camera == 0:
qcor = [i for i, v in enumerate(posmap.data.flat)
if corlim <= v <= (xinter-buffer)]
else:
qcor = [i for i, v in enumerate(posmap.data.flat)
if (xinter + buffer) <= v <= corlim]
# apply the correction!
self.logger.info("Applying vignetting correction...")
for i in qcor:
newflat.flat[i] = (resflat[1]+resflat[0]*posmap.data.flat[i]) \
/ (resfit[1]+resfit[0]*posmap.data.flat[i]) * \
stacked.data.flat[i]
# now deal with the intermediate (buffer) region
self.logger.info("Done, now handling buffer region")
# get buffer points to fit in reference region
qbff = [i for i in qq if (xinter-buffer) <=
posmap.data.flat[i] <= (xinter+buffer)]
# get slice pos and data for buffer fitting
xbuff = [posmap.data.flat[i] for i in qbff]
ybuff = [stacked.data.flat[i] / np.polyval(wavelinfit,
wavemap.data.flat[i]-ww0)
for i in qbff]
# sort on slice position
ssp = np.argsort(xbuff)
xbuff = [xbuff[i] for i in ssp]
ybuff = [ybuff[i] for i in ssp]
# fit buffer with low-order poly
buffit = np.polyfit(xbuff, ybuff, 3)
# plot buffer fit
if self.config.instrument.plot_level >= 1:
p = figure(title=plab + ' Buffer Region',
x_axis_label='Slice Pos (px)',
y_axis_label='Ratio',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.circle(xbuff, ybuff)
p.line(xbuff, np.polyval(buffit, xbuff), line_color='red')
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
# get all buffer points in image
qbuf = [i for i, v in enumerate(posmap.data.flat)
if (xinter-buffer) <= v <= (xinter+buffer)]
# apply buffer correction to all buffer points in newflat
for i in qbuf:
newflat.flat[i] = \
(resflat[1] + resflat[0] * posmap.data.flat[i]) / \
np.polyval(buffit, posmap.data.flat[i]) * newflat.flat[i]
self.logger.info("Vignetting correction complete.")
self.logger.info("Fitting master illumination")
# now fit master flat
# get reference slice points
if ymap is not None:
self.logger.info("Using ymap to limit yrange of data in qref")
qref = [i for i in q if ffleft <= posmap.data.flat[i] <= ffright and
50 <= ymap.data.flat[i] <= (ny-50)]
else:
qref = [i for i in q if ffleft <= posmap.data.flat[i] <= ffright]
xfr = wavemap.data.flat[qref]
yfr = newflat.flat[qref]
# sort on wavelength
s = np.argsort(xfr)
xfr = xfr[s]
yfr = yfr[s]
wavegood0 = wavemap.header['WAVGOOD0']
wavegood1 = wavemap.header['WAVGOOD1']
# correction for BM where we see a ledge
if 'BM' in self.action.args.grating:
ledge_wave = bm_ledge_position(self.action.args.cwave,
self.action.args.dich)
self.logger.info("BM ledge calculated wavelength "
"for ref slice = %.2f (A)" % ledge_wave)
if wavegood0 <= ledge_wave <= wavegood1:
self.logger.info("BM grating requires correction")
qledge = [i for i, v in enumerate(xfr)
if ledge_wave-25 <= v <= ledge_wave+25]
xledge = [xfr[i] for i in qledge]
yledge = [yfr[i] for i in qledge]
s = np.argsort(xledge)
xledge = [xledge[i] for i in s]
yledge = [yledge[i] for i in s]
win = boxcar(250)
smyledge = sp.signal.convolve(yledge,
win, mode='same') / sum(win)
ylmax = np.max(yledge)
ylmin = np.min(yledge)
fpoints = np.arange(0, 100) / 100. * 50 + (ledge_wave-25)
ledgefit, ledgemsk = Bspline.iterfit(np.asarray(xledge),
smyledge, fullbkpt=fpoints,
upper=1, lower=1)
ylfit, _ = ledgefit.value(np.asarray(fpoints))
deriv = -(np.roll(ylfit, 1) - np.roll(ylfit, -1)) / 2.0
# trim edges
trm = int(len(deriv)/5)
deriv = deriv[trm:-trm]
xvals = fpoints[trm:-trm]
peaks, _ = find_peaks(deriv, height=100)
p = figure(title=plab +
' Ledge', x_axis_label='Wavelength (A)',
y_axis_label='Value',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.circle(xledge, smyledge, fill_color='green')
p.line(fpoints, ylfit)
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
p = figure(title=plab + ' Deriv',
x_axis_label='px (Wavelength)',
y_axis_label='Value',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
xx = list(range(len(deriv)))
ylim = get_plot_lims(deriv)
p.circle(xx, deriv)
for pk in peaks:
p.line([pk, pk], ylim)
bokeh_plot(p, self.context.bokeh_session)
if len(peaks) != 1:
self.logger.warning("Extra peak found!")
print("Please indicate the integer pixel value of the peak")
ipk = int(input("Peak? <int>: "))
else:
ipk = peaks[0]
apk = xvals[ipk]
if self.config.instrument.plot_level >= 3:
p = figure(
title=plab + ' Peak of ledge',
x_axis_label='Wave (A)',
y_axis_label='Value',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.circle(xvals, deriv, legend_label='Data')
p.line([apk, apk], [-50, 200], line_color='red',
legend_label='Pk')
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
xlow = apk - 3 - 5
xhi = apk - 3
zlow = apk + 3
zhi = apk + 3 + 5
qlow = [i for i, v in enumerate(fpoints) if xlow <= v <= xhi]
xlf = np.asarray([fpoints[i] for i in qlow])
ylf = np.asarray([ylfit[i] for i in qlow])
lowfit = np.polyfit(xlf, ylf, 1)
qhi = [i for i, v in enumerate(fpoints) if zlow <= v <= zhi]
xlf = np.asarray([fpoints[i] for i in qhi])
ylf = np.asarray([ylfit[i] for i in qhi])
hifit = np.polyfit(xlf, ylf, 1)
ratio = (hifit[1] + hifit[0] * apk) / \
(lowfit[1] + lowfit[0] * apk)
self.logger.info("BM ledge ratio: %.3f" % ratio)
# correct flat data
qcorr = [i for i, v in enumerate(xfr) if v >= apk]
for i in qcorr:
yfr[i] /= ratio
# plot BM ledge
if self.config.instrument.plot_level >= 1:
p = figure(
title=plab + ' BM Ledge Region',
x_axis_label='Wave (A)',
y_axis_label='Value',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
# Input data
p.circle(xledge, yledge, fill_color='blue',
legend_label='Data')
# correct input data
qcorrect = [i for i, v in enumerate(xledge) if v >= apk]
xplt = []
yplt = []
for i in qcorrect:
xplt.append(xledge[i])
yplt.append(yledge[i] / ratio)
p.circle(xplt, yplt, fill_color='orange',
legend_label='Corrected')
p.line(fpoints, ylfit, line_color='red', legend_label='Fit')
p.line([xlow, xlow], [ylmin, ylmax], line_color='blue')
p.line([xhi, xhi], [ylmin, ylmax], line_color='blue')
p.line([zlow, zlow], [ylmin, ylmax], line_color='black')
p.line([zhi, zhi], [ylmin, ylmax], line_color='black')
p.line(fpoints, lowfit[1] + lowfit[0] * fpoints,
line_color='purple')
p.line(fpoints, hifit[1] + hifit[0] * fpoints,
line_color='green')
p.line([apk, apk], [ylmin, ylmax], line_color='green',
legend_label='Pk')
p.y_range = Range1d(ylmin, ylmax)
p.legend.location = 'top_left'
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
# END: handling BM grating ledge
# if we are fitting a twilight flat, treat it like a sky image with a
# larger number of knots
if twiflat:
knots = int(ny * knotspp)
else:
knots = 1000
self.logger.info("Using %d knots for bspline fit" % knots)
# generate a fit from ref slice points
bkpt = np.min(xfr) + np.arange(knots+1) * \
(np.max(xfr) - np.min(xfr)) / knots
sftr, _ = Bspline.iterfit(xfr[nrefx:-nrefx], yfr[nrefx:-nrefx],
fullbkpt=bkpt)
yfitr, _ = sftr.value(xfr)
# generate a blue slice spectrum bspline fit
if self.action.args.camera == 0: # Blue
blueslice = 12
else:
blueslice = 11
blueleft = 60 / xbin
blueright = 80 / xbin
qb = [i for i, v in enumerate(slicemap.data.flat) if v == blueslice]
qblue = [i for i in qb if blueleft <= posmap.data.flat[i] <= blueright]
xfb = wavemap.data.flat[qblue]
yfb = newflat.flat[qblue]
s = np.argsort(xfb)
xfb = xfb[s]
yfb = yfb[s]
bkpt = np.min(xfb) + np.arange(knots+1) * \
(np.max(xfb) - np.min(xfb)) / knots
sftb, _ = Bspline.iterfit(xfb[nrefx:-nrefx], yfb[nrefx:-nrefx],
fullbkpt=bkpt)
yfitb, _ = sftb.value(xfb)
# generate a red slice spectrum bspline fit
if self.action.args.camera == 0: # Blue
redslice = 23
else:
redslice = 0
redleft = 60 / xbin
redright = 80 / xbin
qr = [i for i, v in enumerate(slicemap.data.flat) if v == redslice]
qred = [i for i in qr if redleft <= posmap.data.flat[i] <= redright]
xfd = wavemap.data.flat[qred]
yfd = newflat.flat[qred]
s = np.argsort(xfd)
xfd = xfd[s]
yfd = yfd[s]
bkpt = np.min(xfd) + np.arange(knots + 1) * \
(np.max(xfd) - np.min(xfd)) / knots
sftd, _ = Bspline.iterfit(xfd[nrefx:-nrefx], yfd[nrefx:-nrefx],
fullbkpt=bkpt)
yfitd, _ = sftd.value(xfd)
# waves
minwave = np.min(xfb)
maxwave = np.max(xfd)
# are we a twilight flat?
if twiflat:
nwaves = int(ny * knotspp)
else:
nwaves = 1000
waves = minwave + (maxwave - minwave) * np.arange(nwaves+1) / nwaves
if self.config.instrument.plot_level >= 1:
# output filename stub
rbfnam = "redblue_%05d_%s_%s_%s" % \
(stacked.header['FRAMENO'],
self.action.args.illum, self.action.args.grating,
self.action.args.ifuname)
if xbin == 1:
stride = int(len(xfr) / 8000.)
if stride <= 0:
stride = 1
else:
stride = 1
xrplt = xfr[::stride]
yrplt = yfitr[::stride]
yrplt_d = yfr[::stride]
xbplt = xfb[::stride]
ybplt = yfitb[::stride]
ybplt_d = yfb[::stride]
xdplt = xfd[::stride]
ydplt = yfitd[::stride]
ydplt_d = yfd[::stride]
p = figure(
title=plab + ' Blue/Red fits',
x_axis_label='Wave (A)',
y_axis_label='Flux (e-)',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(xrplt, yrplt, line_color='black', legend_label='Ref')
p.circle(xrplt, yrplt_d, size=1, line_alpha=0., fill_color='black')
p.line(xbplt, ybplt, line_color='blue', legend_label='Blue')
p.circle(xbplt, ybplt_d, size=1, line_alpha=0., fill_color='blue')
p.line(xdplt, ydplt, line_color='red', legend_label='Red')
p.circle(xdplt, ydplt_d, size=1, line_alpha=0., fill_color='red')
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
save_plot(p, filename=rbfnam+'.png')
wavebuffer = 0.1
minrwave = np.min(xfr)
maxrwave = np.max(xfr)
wavebuffer2 = 0.05
wlb0 = minrwave+(maxrwave-minrwave)*wavebuffer2
wlb1 = minrwave+(maxrwave-minrwave)*wavebuffer
wlr0 = minrwave+(maxrwave-minrwave)*(1.-wavebuffer)
wlr1 = minrwave+(maxrwave-minrwave)*(1.-wavebuffer2)
qbluefit = [i for i, v in enumerate(waves) if wlb0 < v < wlb1]
qredfit = [i for i, v in enumerate(waves) if wlr0 < v < wlr1]
nqb = len(qbluefit)
nqr = len(qredfit)
self.logger.info("Wavelength regions: blue = %.1f - %.1f, "
"red = %.1f - %.1f" % (wlb0, wlb1, wlr0, wlr1))
self.logger.info("Fit points: blue = %d, red = %d" % (nqb, nqr))
if nqb > 0:
bluefit, _ = sftb.value(waves[qbluefit])
refbluefit, _ = sftr.value(waves[qbluefit])
blue_zero_cross = np.nanmin(bluefit) <= 0. or np.nanmin(
refbluefit) <= 0.
bluelinfit = np.polyfit(waves[qbluefit], refbluefit/bluefit, 1)
bluelinfity = bluelinfit[1] + bluelinfit[0] * waves[qbluefit]
else:
bluefit = None
blue_zero_cross = False
refbluefit = None
bluelinfit = None
bluelinfity = None
if nqr > 0:
redfit, _ = sftd.value(waves[qredfit])
refredfit, _ = sftr.value(waves[qredfit])
red_zero_cross = np.nanmin(redfit) <= 0. or np.nanmin(
refredfit) <= 0.
redlinfit = np.polyfit(waves[qredfit], refredfit/redfit, 1)
redlinfity = redlinfit[1] + redlinfit[0] * waves[qredfit]
else:
redfit = None
red_zero_cross = False
refredfit = None
redlinfit = None
redlinfity = None
if blue_zero_cross:
self.logger.info("Blue extension zero crossing detected")
if red_zero_cross:
self.logger.info("Red extension zero crossing detected")
if self.config.instrument.plot_level >= 1:
if nqb > 1:
# plot blue fits
p = figure(
title=plab + ' Blue fits',
x_axis_label='Wave (A)',
y_axis_label='Flux (e-)',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(waves[qbluefit], refbluefit, line_color='black',
legend_label='Ref')
p.circle(waves[qbluefit], refbluefit, fill_color='black')
p.line(waves[qbluefit], bluefit, line_color='blue',
legend_label='Blue')
p.circle(waves[qbluefit], bluefit, fill_color='blue')
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
# plot blue ratios
p = figure(
title=plab + ' Blue ratios',
x_axis_label='Wave (A)',
y_axis_label='Ratio',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(waves[qbluefit], refbluefit/bluefit, line_color='black',
legend_label='Ref')
p.circle(waves[qbluefit], refbluefit/bluefit,
fill_color='black')
p.line(waves[qbluefit], bluelinfity,
line_color='blue', legend_label='Blue')
p.circle(waves[qbluefit], bluelinfity,
fill_color='blue')
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
if nqr > 1:
# plot red fits
p = figure(
title=plab + ' Red fits',
x_axis_label='Wave (A)',
y_axis_label='Flux (e-)',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(waves[qredfit], refredfit, line_color='black',
legend_label='Ref')
p.circle(waves[qredfit], refredfit, fill_color='black')
p.line(waves[qredfit], redfit, line_color='red',
legend_label='Red')
p.circle(waves[qredfit], redfit, fill_color='red')
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
# plot red ratios
p = figure(
title=plab + ' Red ratios',
x_axis_label='Wave (A)',
y_axis_label='Ratio',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.line(waves[qredfit], refredfit/redfit, line_color='black',
legend_label='Ref')
p.circle(waves[qredfit], refredfit/redfit, fill_color='black')
p.line(waves[qredfit], redlinfity,
line_color='red', legend_label='Red')
p.circle(waves[qredfit], redlinfity, fill_color='red')
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
# at this point we are going to try to merge the points
self.logger.info("Correcting points outside %.1f - %.1f A"
% (minrwave, maxrwave))
qselblue = [i for i, v in enumerate(xfb) if v <= minrwave]
qselred = [i for i, v in enumerate(xfd) if v >= maxrwave]
nqsb = len(qselblue)
nqsr = len(qselred)
blue_all_tie = yfitr[0]
red_all_tie = yfitr[-1]
self.logger.info("Blue/Red ref tie values: %.3f, %.3f"
% (blue_all_tie, red_all_tie))
if nqsb > 0:
self.logger.info("Blue ext tie value: %.3f" % yfb[qselblue[-1]])
if blue_zero_cross:
blue_offset = yfb[qselblue[-1]] - blue_all_tie
bluefluxes = [yfb[i] - blue_offset for i in qselblue]
self.logger.info("Blue zero crossing, only applying offset")
else:
blue_offset = yfb[qselblue[-1]] * \
(bluelinfit[1]+bluelinfit[0]*xfb[qselblue[-1]]) \
- blue_all_tie
bluefluxes = [yfb[i] * (bluelinfit[1]+bluelinfit[0]*xfb[i])
- blue_offset for i in qselblue]
self.logger.info("Blue linear ratio fit scaling applied")
self.logger.info("Blue offset of %.2f applied" % blue_offset)
else:
bluefluxes = None
if nqsr > 0:
self.logger.info("Red ext tie value: %.3f" % yfd[qselred[0]])
if red_zero_cross:
red_offset = yfd[qselred[0]] - red_all_tie
redfluxes = [yfd[i] - red_offset for i in qselred]
self.logger.info("Red zero crossing, only applying offset")
else:
red_offset = yfd[qselred[0]] * \
(redlinfit[1]+redlinfit[0]*xfd[qselred[0]]) \
- red_all_tie
redfluxes = [yfd[i] * (redlinfit[1]+redlinfit[0]*xfd[i])
- red_offset for i in qselred]
self.logger.info("Red linear ratio fit scaling applied")
self.logger.info("Red offset of %.2f applied" % red_offset)
else:
redfluxes = None
allx = xfr
allfx = xfr[nrefx:-nrefx]
ally = yfr[nrefx:-nrefx]
if nqsb > 0:
bluex = xfb[qselblue]
allx = np.append(bluex, allx)
allfx = np.append(bluex[nrefx:], allfx)
ally = np.append(bluefluxes[nrefx:], ally)
if nqsr > 0:
redx = xfd[qselred]
allx = np.append(allx, redx)
allfx = np.append(allfx, redx[:-nrefx])
ally = np.append(ally, redfluxes[:-nrefx])
s = np.argsort(allx)
allx = allx[s]
s = np.argsort(allfx)
allfx = allfx[s]
ally = ally[s]
bkpt = np.min(allx) + np.arange(knots+1) * \
(np.max(allx) - np.min(allx)) / knots
sftall, _ = Bspline.iterfit(allfx, ally, fullbkpt=bkpt)
yfitall, _ = sftall.value(allx)
if self.config.instrument.plot_level >= 1:
# output filename stub
fltfnam = "flat_%05d_%s_%s_%s" % \
(stacked.header['FRAMENO'],
self.action.args.illum, self.action.args.grating,
self.action.args.ifuname)
if xbin == 1:
stride = int(len(allx) / 8000.)
else:
stride = 1
xplt = allfx[::stride]
yplt = ally[::stride]
fxplt = allx[::stride]
fplt = yfitall[::stride]
yran = [np.nanmin(ally), np.nanmax(ally)]
p = figure(
title=plab + ' Master Illumination',
x_axis_label='Wave (A)',
y_axis_label='Flux (e-)',
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.circle(xplt, yplt, size=1, line_alpha=0., fill_color='black',
legend_label='Data')
p.line(fxplt, fplt, line_color='red', legend_label='Fit',
line_width=2)
p.line([minrwave, minrwave], yran, line_color='orange',
legend_label='Cor lim')
p.line([maxrwave, maxrwave], yran, line_color='orange')
p.legend.location = "top_left"
bokeh_plot(p, self.context.bokeh_session)
if self.config.instrument.plot_level >= 2:
input("Next? <cr>: ")
else:
time.sleep(self.config.instrument.plot_pause)
save_plot(p, filename=fltfnam+".png")
# OK, Now we have extended to the full range... so... we are going to
# make a ratio flat!
comflat = np.zeros(newflat.shape, dtype=float)
qz = [i for i, v in enumerate(wavemap.data.flat) if v >= 0]
comvals = sftall.value(wavemap.data.flat[qz])
comflat.flat[qz] = comvals
ratio = np.zeros(newflat.shape, dtype=float)
qzer = [i for i, v in enumerate(newflat.flat) if v != 0]
ratio.flat[qzer] = comflat.flat[qzer] / newflat.flat[qzer]
# trim negative points
qq = [i for i, v in enumerate(ratio.flat) if v < 0]
if len(qq) > 0:
ratio.flat[qq] = 0.0
# trim the high points near edges of slice
qq = [i for i, v in enumerate(ratio.flat) if v >= 3. and
(posmap.data.flat[i] <= 4/xbin or
posmap.data.flat[i] >= 136/xbin)]
if len(qq) > 0:
ratio.flat[qq] = 0.0
# don't correct low signal points
qq = [i for i, v in enumerate(newflat.flat) if v < 30.]
if len(qq) > 0:
ratio.flat[qq] = 1.0
# get master flat output name
mfname = stack_list[0].split('.fits')[0] + '_' + suffix + '.fits'
log_string = MakeMasterFlat.__module__
stacked.header['IMTYPE'] = self.action.args.new_type
stacked.header['HISTORY'] = log_string
stacked.header['MASTFLAT'] = (True, 'master flat image?')
stacked.header['WAVMAPF'] = wmf
stacked.header['SLIMAPF'] = slf
stacked.header['POSMAPF'] = pof
# store flat in output frame
stacked.data = ratio
# output master flat
kcwi_fits_writer(stacked, output_file=mfname,
output_dir=self.config.instrument.output_directory)
self.context.proctab.update_proctab(frame=stacked, suffix=suffix,
newtype=self.action.args.new_type,
filename=stacked.header['OFNAME'])
self.context.proctab.write_proctab(tfil=self.config.instrument.procfile)
self.action.args.name = stacked.header['OFNAME']
self.logger.info(log_string)
return self.action.args
# END: class MakeMasterFlat()