from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer, \
kcwi_fits_reader, strip_fname
import time
import os
import math
import pickle
import numpy as np
from skimage import transform as tf
from astropy.coordinates import SkyCoord
from astropy import units as u
from astropy.nddata import CCDData
from kcwidrp.core.bokeh_plotting import bokeh_plot
from bokeh.plotting import figure
from multiprocessing import Pool
[docs]def make_cube_helper(argument):
"""
Warp each slice.
Helper program for threaded warping of all slices.
Args:
argument (dict): Dictionary of params for warping an individual slice.
:returns:
- int: Slice number being warped
- ndimage: warped intensity image
- ndimage: warped uncertainty image
- ndimage: warped mask image
- ndimage: warped flag image
- ndimage: warped image without sky subtraction
- ndimage: warped nod-and-shuffle object image
- ndimage: warped nod-and-shuffle sky image
- ndimage: warped delta-wavelength image
"""
logger = argument['logger']
logger.info("Transforming image slice %d" % (argument['slice_number']+1))
# input params
slice_number = argument['slice_number']
xsize = argument['xsize']
ysize = argument['ysize']
tform = argument['geom']['tform'][slice_number]
xl0 = argument['geom']['xl0'][slice_number]
xl1 = argument['geom']['xl1'][slice_number]
# slice data
slice_img = argument['img'][:, xl0:xl1]
slice_unc = argument['std'][:, xl0:xl1]
slice_msk = argument['msk'][:, xl0:xl1]
slice_flg = argument['flg'][:, xl0:xl1]
if 'nsk' in argument:
slice_nsk = argument['nsk'][:, xl0:xl1]
else:
slice_nsk = None
if 'obj' in argument:
slice_obj = argument['obj'][:, xl0:xl1]
else:
slice_obj = None
if 'sky' in argument:
slice_sky = argument['sky'][:, xl0:xl1]
else:
slice_sky = None
if 'del' in argument:
slice_del = argument['del'][:, xl0:xl1]
else:
slice_del = None
# do the warping
warped = tf.warp(slice_img, tform, order=3,
output_shape=(ysize, xsize))
uarped = tf.warp(slice_unc, tform, order=3,
output_shape=(ysize, xsize))
marped = tf.warp(slice_msk, tform, order=3,
output_shape=(ysize, xsize))
farped = tf.warp(slice_flg, tform, order=3,
output_shape=(ysize, xsize), preserve_range=True)
if slice_nsk is not None:
karped = tf.warp(slice_nsk, tform, order=3,
output_shape=(ysize, xsize))
else:
karped = None
if slice_obj is not None:
oarped = tf.warp(slice_obj, tform, order=3,
output_shape=(ysize, xsize))
else:
oarped = None
if slice_sky is not None:
sarped = tf.warp(slice_sky, tform, order=3,
output_shape=(ysize, xsize))
else:
sarped = None
if slice_del is not None:
darped = tf.warp(slice_del, tform, order=3,
output_shape=(ysize, xsize), preserve_range=True)
else:
darped = None
return argument['slice_number'], warped, uarped, marped, farped, karped, \
oarped, sarped, darped
[docs]class MakeCube(BasePrimitive):
"""
Transform 2D images to 3D data cubes.
Uses thread pool to warp each slice in parallel and then
assemble a 3D image cube from the slices. Rotates RED channel
data by 180 degress to align with BLUE channel data.
Creates WCS keywords that reflect the new geometry of the
cube based on position keywords from the telescope.
Outputs FITS 3d cube images:
* icube - main cube with primary, uncertainty, mask, flag, and noskysub extensions
* ocube - for nod-and-shuffle, the object frame as a cube
* scube - for nod-and-shuffle, the sky frame as a cube
* dcube - the delta-wavelength cube
"""
def __init__(self, action, context):
BasePrimitive.__init__(self, action, context)
self.logger = context.pipeline_logger
def _perform(self):
self.logger.info("Creating data cube")
log_string = MakeCube.__module__
# Are we interactive?
if self.config.instrument.plot_level >= 3:
do_inter = True
else:
do_inter = False
self.logger.info("Generating data cube")
# Find and read geometry transformation
tab = self.context.proctab.search_proctab(
frame=self.action.args.ccddata, target_type='MARC',
nearest=True)
if not len(tab):
self.logger.error("No reference geometry, cannot make cube!")
self.action.args.ccddata.header['GEOMCOR'] = (False,
'Geometry corrected?')
self.logger.info(log_string)
return self.action.args
self.logger.info("%d arc frames found" % len(tab))
ofn = strip_fname(tab['filename'][0]) + "_geom.pkl"
geom_file = os.path.join(self.config.instrument.cwd,
self.config.instrument.output_directory,
ofn)
if os.path.exists(geom_file):
self.logger.info("Reading %s" % geom_file)
with open(geom_file, 'rb') as ifile:
geom = pickle.load(ifile)
# Slice size
xsize = geom['xsize']
ysize = geom['ysize']
out_cube = np.zeros((ysize, xsize, 24), dtype=np.float64)
out_uube = np.zeros((ysize, xsize, 24), dtype=np.float64)
out_mube = np.zeros((ysize, xsize, 24), dtype=np.uint8)
out_fube = np.zeros((ysize, xsize, 24), dtype=np.uint8)
out_oube = np.zeros((ysize, xsize, 24), dtype=np.float64)
out_sube = np.zeros((ysize, xsize, 24), dtype=np.float64)
out_dube = np.zeros((ysize, xsize, 24), dtype=np.float64)
# Store original data
data_img = self.action.args.ccddata.data
data_std = self.action.args.ccddata.uncertainty.array
data_msk = self.action.args.ccddata.mask
data_flg = self.action.args.ccddata.flags
# check for noskysub property
if self.action.args.ccddata.noskysub is not None:
out_kube = np.zeros((ysize, xsize, 24), dtype=np.float64)
data_nsk = self.action.args.ccddata.noskysub
else:
data_nsk = None
out_kube = None
# check for obj, sky images
obj = None
data_obj = None
sky = None
data_sky = None
if self.action.args.nasmask and self.action.args.numopen > 1:
ofn = self.action.args.name
objfn = strip_fname(ofn) + '_objf.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, objfn)
if os.path.exists(full_path):
obj = kcwi_fits_reader(full_path)[0]
data_obj = obj.data
skyfn = strip_fname(ofn) + '_skyf.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, skyfn)
if os.path.exists(full_path):
sky = kcwi_fits_reader(full_path)[0]
data_sky = sky.data
# check for delta wavelength map
dew = None
data_dew = None
if 'ARCLAMP' in self.action.args.imtype:
ofn = self.action.args.name
dewfn = strip_fname(ofn) + '_delmap.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, dewfn)
if os.path.exists(full_path):
dew = kcwi_fits_reader(full_path)[0]
data_dew = dew.data
# Loop over 24 slices
my_arguments = []
for isl in range(0, 24):
arguments = {
'slice_number': isl,
'geom': geom,
'img': data_img,
'std': data_std,
'msk': data_msk,
'flg': data_flg,
'xsize': xsize,
'ysize': ysize,
'logger': self.logger
}
if data_nsk is not None:
arguments['nsk'] = data_nsk
if obj is not None:
arguments['obj'] = data_obj
if sky is not None:
arguments['sky'] = data_sky
if dew is not None:
arguments['del'] = data_dew
my_arguments.append(arguments)
p = Pool()
results = p.map(make_cube_helper, list(my_arguments))
p.close()
self.logger.info("Building cube")
for partial_cube in results:
slice_number = partial_cube[0]
out_cube[:, :, slice_number] = partial_cube[1]
out_uube[:, :, slice_number] = partial_cube[2]
out_mube[:, :, slice_number] = partial_cube[3]
out_fube[:, :, slice_number] = partial_cube[4]
if data_nsk is not None:
out_kube[:, :, slice_number] = partial_cube[5]
if obj is not None:
out_oube[:, :, slice_number] = partial_cube[6]
if sky is not None:
out_sube[:, :, slice_number] = partial_cube[7]
if dew is not None:
out_dube[:, :, slice_number] = partial_cube[8]
if self.config.instrument.plot_level >= 3:
for isl in range(0, 24):
warped = out_cube[:, :, isl]
ptitle = self.action.args.plotlabel + \
"WARPED Slice %d" % isl
p = figure(tooltips=[("x", "$x"), ("y", "$y"),
("value", "@image")],
title=ptitle,
x_axis_label="X (px)", y_axis_label="Y (px)",
plot_width=self.config.instrument.plot_width,
plot_height=self.config.instrument.plot_height)
p.x_range.range_padding = p.y_range.range_padding = 0
p.image([warped], x=0, y=0, dw=xsize, dh=ysize,
palette="Spectral11", level="image")
bokeh_plot(p, self.context.bokeh_session)
if do_inter:
q = input("Next? <cr>, q to quit: ")
if 'Q' in q.upper():
do_inter = False
else:
time.sleep(self.config.instrument.plot_pause)
# Rotate RED data by 180 to align with Blue
if 'RED' in self.action.args.ccddata.header['CAMERA'].upper():
out_cube = np.rot90(out_cube, 2, (1, 2))
out_uube = np.rot90(out_uube, 2, (1, 2))
out_mube = np.rot90(out_mube, 2, (1, 2))
out_fube = np.rot90(out_fube, 2, (1, 2))
if data_nsk is not None:
out_kube = np.rot90(out_kube, 2, (1, 2))
if obj is not None:
out_oube = np.rot90(out_oube, 2, (1, 2))
if sky is not None:
out_sube = np.rot90(out_sube, 2, (1, 2))
if dew is not None:
out_dube = np.rot90(out_dube, 2, (1, 2))
# Calculate some WCS parameters
# Get object pointing
try:
if self.action.args.nasmask:
rastr = self.action.args.ccddata.header['RABASE']
decstr = self.action.args.ccddata.header['DECBASE']
else:
rastr = self.action.args.ccddata.header['RA']
decstr = self.action.args.ccddata.header['DEC']
except KeyError:
try:
rastr = self.action.args.ccddata.header['TARGRA']
decstr = self.action.args.ccddata.header['TARGDEC']
except KeyError:
rastr = ''
decstr = ''
if len(rastr) > 0 and len(decstr) > 0:
try:
coord = SkyCoord(rastr, decstr, unit=(u.hourangle, u.deg))
except ValueError:
coord = None
else:
coord = None
# Get rotator position
if 'ROTPOSN' in self.action.args.ccddata.header:
rpos = self.action.args.ccddata.header['ROTPOSN']
else:
rpos = 0.
if 'ROTREFAN' in self.action.args.ccddata.header:
rref = self.action.args.ccddata.header['ROTREFAN']
else:
rref = 0.
skypa = rpos + rref
crota = math.radians(-(skypa + self.config.instrument.ROTOFF))
cdelt1 = -geom['slscl']
cdelt2 = geom['pxscl']
if coord is None:
ra = 0.
dec = 0.
crota = 1
else:
ra = coord.ra.degree
dec = coord.dec.degree
cd11 = cdelt1 * math.cos(crota)
cd12 = abs(cdelt2) * np.sign(cdelt1) * math.sin(crota)
cd21 = -abs(cdelt1) * np.sign(cdelt2) * math.sin(crota)
cd22 = cdelt2 * math.cos(crota)
crpix1 = 12.
crpix2 = xsize / 2.
crpix3 = 1.
porg = self.action.args.ccddata.header['PONAME']
ifunum = self.action.args.ifunum
if 'IFU' in porg:
if ifunum == 1:
off1 = 1.0
off2 = 4.0
elif ifunum == 2:
off1 = 1.0
off2 = 5.0
elif ifunum == 3:
off1 = 0.05
off2 = 5.6
else:
self.logger.warning("Unknown IFU number: %d" % ifunum)
off1 = 0.
off2 = 0.
off1 = off1 / float(self.action.args.xbinsize)
off2 = off2 / float(self.action.args.ybinsize)
crpix1 += off1
crpix2 += off2
# Update header
# Geometry corrected?
self.action.args.ccddata.header['GEOMCOR'] = (
True, 'Geometry corrected?')
#
# Spatial geometry
self.action.args.ccddata.header['BARSEP'] = (
geom['barsep'], 'separation of bars (binned pix)')
self.action.args.ccddata.header['BAR0'] = (
geom['bar0'], 'first bar pixel position')
# Wavelength ranges
if self.action.args.nasmask:
self.action.args.ccddata.header['WAVALL0'] = (
geom['wavensall0'], 'Low inclusive wavelength')
self.action.args.ccddata.header['WAVALL1'] = (
geom['wavensall1'], 'High inclusive wavelength')
self.action.args.ccddata.header['WAVGOOD0'] = (
geom['wavensgood0'], 'Low good wavelength')
self.action.args.ccddata.header['WAVGOOD1'] = (
geom['wavensgood1'], 'High good wavelength')
self.action.args.ccddata.header['WAVMID'] = (
geom['wavensmid'], 'middle wavelength')
else:
self.action.args.ccddata.header['WAVALL0'] = (
geom['waveall0'], 'Low inclusive wavelength')
self.action.args.ccddata.header['WAVALL1'] = (
geom['waveall1'], 'High inclusive wavelength')
self.action.args.ccddata.header['WAVGOOD0'] = (
geom['wavegood0'], 'Low good wavelength')
self.action.args.ccddata.header['WAVGOOD1'] = (
geom['wavegood1'], 'High good wavelength')
self.action.args.ccddata.header['WAVMID'] = (
geom['wavemid'], 'middle wavelength')
# Dichroic fraction
try:
dichroic_fraction = geom['dich_frac']
except AttributeError:
dichroic_fraction = 1.
self.action.args.ccddata.header['DICHFRAC'] = (
dichroic_fraction, 'Dichroic Fraction')
# Wavelength fit statistics
self.action.args.ccddata.header['AVWVSIG'] = (
geom['avwvsig'], 'Avg. bar wave sigma (Ang)')
self.action.args.ccddata.header['SDWVSIG'] = (
geom['sdwvsig'], 'Stdev. var wave sigma (Ang)')
# Pixel scales
self.action.args.ccddata.header['PXSCL'] = (
geom['pxscl'], 'Pixel scale along slice (deg)')
self.action.args.ccddata.header['SLSCL'] = (
geom['slscl'], 'Pixel scale perp. to slices (deg)')
# Geometry origins
self.action.args.ccddata.header['CBARSNO'] = (
geom['cbarsno'], 'Continuum bars image number')
self.action.args.ccddata.header['CBARSFL'] = (
geom['cbarsfl'], 'Continuum bars image filename')
self.action.args.ccddata.header['ARCNO'] = (
geom['arcno'], 'Arc image number')
self.action.args.ccddata.header['ARCFL'] = (
geom['arcfl'], 'Arc image filename')
self.action.args.ccddata.header['GEOMFL'] = (
geom_file.split('/')[-1], 'Geometry file')
# WCS
self.action.args.ccddata.header['IFUPA'] = (
skypa, 'IFU position angle (degrees)')
self.action.args.ccddata.header['IFUROFF'] = (
self.config.instrument.ROTOFF, 'IFU-SKYPA offset (degrees)')
self.action.args.ccddata.header['WCSDIM'] = (
3, 'number of dimensions in WCS')
self.action.args.ccddata.header['WCSNAME'] = 'KCWI'
self.action.args.ccddata.header['EQUINOX'] = 2000.
self.action.args.ccddata.header['RADESYS'] = 'FK5'
self.action.args.ccddata.header['CTYPE1'] = 'RA---TAN'
self.action.args.ccddata.header['CTYPE2'] = 'DEC--TAN'
self.action.args.ccddata.header['CTYPE3'] = ('AWAV',
'Air Wavelengths')
self.action.args.ccddata.header['CUNIT1'] = ('deg', 'RA units')
self.action.args.ccddata.header['CUNIT2'] = ('deg', 'DEC units')
self.action.args.ccddata.header['CUNIT3'] = ('Angstrom',
'Wavelength units')
self.action.args.ccddata.header['CNAME1'] = ('KCWI RA', 'RA name')
self.action.args.ccddata.header['CNAME2'] = ('KCWI DEC', 'DEC name')
self.action.args.ccddata.header['CNAME3'] = ('KCWI Wavelength',
'Wavelength name')
self.action.args.ccddata.header['CRVAL1'] = (ra, 'RA zeropoint')
self.action.args.ccddata.header['CRVAL2'] = (dec, 'DEC zeropoint')
self.action.args.ccddata.header['CRVAL3'] = (geom['wave0out'],
'Wavelength zeropoint')
self.action.args.ccddata.header['CRPIX1'] = (crpix1,
'RA reference pixel')
self.action.args.ccddata.header['CRPIX2'] = (crpix2,
'DEC reference pixel')
self.action.args.ccddata.header['CRPIX3'] = (
crpix3, 'Wavelength reference pixel')
self.action.args.ccddata.header['CD1_1'] = (
cd11, 'RA degrees per column pixel')
self.action.args.ccddata.header['CD2_1'] = (
cd21, 'DEC degrees per column pixel')
self.action.args.ccddata.header['CD1_2'] = (
cd12, 'RA degrees per row pixel')
self.action.args.ccddata.header['CD2_2'] = (
cd22, 'DEC degrees per row pixel')
self.action.args.ccddata.header['CD3_3'] = (
geom['dwout'], 'Wavelength Angstroms per pixel')
self.action.args.ccddata.header['LONPOLE'] = (
180.0, 'Native longitude of Celestial pole')
self.action.args.ccddata.header['LATPOLE'] = (
0.0, 'Native latitude of Celestial pole')
# write out cube
self.action.args.ccddata.header['HISTORY'] = log_string
self.action.args.ccddata.data = out_cube
self.action.args.ccddata.uncertainty.array = out_uube
self.action.args.ccddata.mask = out_mube
self.action.args.ccddata.flags = out_fube
if data_nsk is not None:
self.action.args.ccddata.noskysub = out_kube
# write out int image
kcwi_fits_writer(self.action.args.ccddata,
table=self.action.args.table,
output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="icube")
self.context.proctab.update_proctab(frame=self.action.args.ccddata,
suffix="icube",
filename=self.action.args.name)
self.context.proctab.write_proctab(
tfil=self.config.instrument.procfile)
# check for obj, sky outputs
if obj is not None:
out_obj = CCDData(out_oube,
meta=self.action.args.ccddata.header,
unit=self.action.args.ccddata.unit)
kcwi_fits_writer(
out_obj, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="ocube")
if sky is not None:
out_sky = CCDData(out_sube,
meta=self.action.args.ccddata.header,
unit=self.action.args.ccddata.unit)
kcwi_fits_writer(
out_sky, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="scube")
# check for dew outputs
if dew is not None:
out_dew = CCDData(out_dube,
meta=self.action.args.ccddata.header,
unit=dew.unit)
kcwi_fits_writer(
out_dew, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="dcube")
else:
self.logger.error("Geometry file not found: %s" % geom_file)
self.logger.info(log_string)
return self.action.args
# END: class MakeCube()