Source code for kcwidrp.primitives.MakeCube

from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer, \
    kcwi_fits_reader, strip_fname

import time
import os
import math
import pickle
import numpy as np
from skimage import transform as tf
from astropy.coordinates import SkyCoord
from astropy import units as u
from astropy.nddata import CCDData
from kcwidrp.core.bokeh_plotting import bokeh_plot
from bokeh.plotting import figure
from multiprocessing import Pool


[docs]def make_cube_helper(argument): """ Warp each slice. Helper program for threaded warping of all slices. Args: argument (dict): Dictionary of params for warping an individual slice. :returns: - int: Slice number being warped - ndimage: warped intensity image - ndimage: warped uncertainty image - ndimage: warped mask image - ndimage: warped flag image - ndimage: warped image without sky subtraction - ndimage: warped nod-and-shuffle object image - ndimage: warped nod-and-shuffle sky image - ndimage: warped delta-wavelength image """ logger = argument['logger'] logger.info("Transforming image slice %d" % (argument['slice_number']+1)) # input params slice_number = argument['slice_number'] xsize = argument['xsize'] ysize = argument['ysize'] tform = argument['geom']['tform'][slice_number] xl0 = argument['geom']['xl0'][slice_number] xl1 = argument['geom']['xl1'][slice_number] # slice data slice_img = argument['img'][:, xl0:xl1] slice_unc = argument['std'][:, xl0:xl1] slice_msk = argument['msk'][:, xl0:xl1] slice_flg = argument['flg'][:, xl0:xl1] if 'nsk' in argument: slice_nsk = argument['nsk'][:, xl0:xl1] else: slice_nsk = None if 'obj' in argument: slice_obj = argument['obj'][:, xl0:xl1] else: slice_obj = None if 'sky' in argument: slice_sky = argument['sky'][:, xl0:xl1] else: slice_sky = None if 'del' in argument: slice_del = argument['del'][:, xl0:xl1] else: slice_del = None # do the warping warped = tf.warp(slice_img, tform, order=3, output_shape=(ysize, xsize)) uarped = tf.warp(slice_unc, tform, order=3, output_shape=(ysize, xsize)) marped = tf.warp(slice_msk, tform, order=3, output_shape=(ysize, xsize)) farped = tf.warp(slice_flg, tform, order=3, output_shape=(ysize, xsize), preserve_range=True) if slice_nsk is not None: karped = tf.warp(slice_nsk, tform, order=3, output_shape=(ysize, xsize)) else: karped = None if slice_obj is not None: oarped = tf.warp(slice_obj, tform, order=3, output_shape=(ysize, xsize)) else: oarped = None if slice_sky is not None: sarped = tf.warp(slice_sky, tform, order=3, output_shape=(ysize, xsize)) else: sarped = None if slice_del is not None: darped = tf.warp(slice_del, tform, order=3, output_shape=(ysize, xsize), preserve_range=True) else: darped = None return argument['slice_number'], warped, uarped, marped, farped, karped, \ oarped, sarped, darped
[docs]class MakeCube(BasePrimitive): """ Transform 2D images to 3D data cubes. Uses thread pool to warp each slice in parallel and then assemble a 3D image cube from the slices. Rotates RED channel data by 180 degress to align with BLUE channel data. Creates WCS keywords that reflect the new geometry of the cube based on position keywords from the telescope. Outputs FITS 3d cube images: * icube - main cube with primary, uncertainty, mask, flag, and noskysub extensions * ocube - for nod-and-shuffle, the object frame as a cube * scube - for nod-and-shuffle, the sky frame as a cube * dcube - the delta-wavelength cube """ def __init__(self, action, context): BasePrimitive.__init__(self, action, context) self.logger = context.pipeline_logger def _perform(self): self.logger.info("Creating data cube") log_string = MakeCube.__module__ # Are we interactive? if self.config.instrument.plot_level >= 3: do_inter = True else: do_inter = False self.logger.info("Generating data cube") # Find and read geometry transformation tab = self.context.proctab.search_proctab( frame=self.action.args.ccddata, target_type='MARC', nearest=True) if not len(tab): self.logger.error("No reference geometry, cannot make cube!") self.action.args.ccddata.header['GEOMCOR'] = (False, 'Geometry corrected?') self.logger.info(log_string) return self.action.args self.logger.info("%d arc frames found" % len(tab)) ofn = strip_fname(tab['filename'][0]) + "_geom.pkl" geom_file = os.path.join(self.config.instrument.cwd, self.config.instrument.output_directory, ofn) if os.path.exists(geom_file): self.logger.info("Reading %s" % geom_file) with open(geom_file, 'rb') as ifile: geom = pickle.load(ifile) # Slice size xsize = geom['xsize'] ysize = geom['ysize'] out_cube = np.zeros((ysize, xsize, 24), dtype=np.float64) out_uube = np.zeros((ysize, xsize, 24), dtype=np.float64) out_mube = np.zeros((ysize, xsize, 24), dtype=np.uint8) out_fube = np.zeros((ysize, xsize, 24), dtype=np.uint8) out_oube = np.zeros((ysize, xsize, 24), dtype=np.float64) out_sube = np.zeros((ysize, xsize, 24), dtype=np.float64) out_dube = np.zeros((ysize, xsize, 24), dtype=np.float64) # Store original data data_img = self.action.args.ccddata.data data_std = self.action.args.ccddata.uncertainty.array data_msk = self.action.args.ccddata.mask data_flg = self.action.args.ccddata.flags # check for noskysub property if self.action.args.ccddata.noskysub is not None: out_kube = np.zeros((ysize, xsize, 24), dtype=np.float64) data_nsk = self.action.args.ccddata.noskysub else: data_nsk = None out_kube = None # check for obj, sky images obj = None data_obj = None sky = None data_sky = None if self.action.args.nasmask and self.action.args.numopen > 1: ofn = self.action.args.name objfn = strip_fname(ofn) + '_objf.fits' full_path = os.path.join( self.config.instrument.cwd, self.config.instrument.output_directory, objfn) if os.path.exists(full_path): obj = kcwi_fits_reader(full_path)[0] data_obj = obj.data skyfn = strip_fname(ofn) + '_skyf.fits' full_path = os.path.join( self.config.instrument.cwd, self.config.instrument.output_directory, skyfn) if os.path.exists(full_path): sky = kcwi_fits_reader(full_path)[0] data_sky = sky.data # check for delta wavelength map dew = None data_dew = None if 'ARCLAMP' in self.action.args.imtype: ofn = self.action.args.name dewfn = strip_fname(ofn) + '_delmap.fits' full_path = os.path.join( self.config.instrument.cwd, self.config.instrument.output_directory, dewfn) if os.path.exists(full_path): dew = kcwi_fits_reader(full_path)[0] data_dew = dew.data # Loop over 24 slices my_arguments = [] for isl in range(0, 24): arguments = { 'slice_number': isl, 'geom': geom, 'img': data_img, 'std': data_std, 'msk': data_msk, 'flg': data_flg, 'xsize': xsize, 'ysize': ysize, 'logger': self.logger } if data_nsk is not None: arguments['nsk'] = data_nsk if obj is not None: arguments['obj'] = data_obj if sky is not None: arguments['sky'] = data_sky if dew is not None: arguments['del'] = data_dew my_arguments.append(arguments) p = Pool() results = p.map(make_cube_helper, list(my_arguments)) p.close() self.logger.info("Building cube") for partial_cube in results: slice_number = partial_cube[0] out_cube[:, :, slice_number] = partial_cube[1] out_uube[:, :, slice_number] = partial_cube[2] out_mube[:, :, slice_number] = partial_cube[3] out_fube[:, :, slice_number] = partial_cube[4] if data_nsk is not None: out_kube[:, :, slice_number] = partial_cube[5] if obj is not None: out_oube[:, :, slice_number] = partial_cube[6] if sky is not None: out_sube[:, :, slice_number] = partial_cube[7] if dew is not None: out_dube[:, :, slice_number] = partial_cube[8] if self.config.instrument.plot_level >= 3: for isl in range(0, 24): warped = out_cube[:, :, isl] ptitle = self.action.args.plotlabel + \ "WARPED Slice %d" % isl p = figure(tooltips=[("x", "$x"), ("y", "$y"), ("value", "@image")], title=ptitle, x_axis_label="X (px)", y_axis_label="Y (px)", plot_width=self.config.instrument.plot_width, plot_height=self.config.instrument.plot_height) p.x_range.range_padding = p.y_range.range_padding = 0 p.image([warped], x=0, y=0, dw=xsize, dh=ysize, palette="Spectral11", level="image") bokeh_plot(p, self.context.bokeh_session) if do_inter: q = input("Next? <cr>, q to quit: ") if 'Q' in q.upper(): do_inter = False else: time.sleep(self.config.instrument.plot_pause) # Rotate RED data by 180 to align with Blue if 'RED' in self.action.args.ccddata.header['CAMERA'].upper(): out_cube = np.rot90(out_cube, 2, (1, 2)) out_uube = np.rot90(out_uube, 2, (1, 2)) out_mube = np.rot90(out_mube, 2, (1, 2)) out_fube = np.rot90(out_fube, 2, (1, 2)) if data_nsk is not None: out_kube = np.rot90(out_kube, 2, (1, 2)) if obj is not None: out_oube = np.rot90(out_oube, 2, (1, 2)) if sky is not None: out_sube = np.rot90(out_sube, 2, (1, 2)) if dew is not None: out_dube = np.rot90(out_dube, 2, (1, 2)) # Calculate some WCS parameters # Get object pointing try: if self.action.args.nasmask: rastr = self.action.args.ccddata.header['RABASE'] decstr = self.action.args.ccddata.header['DECBASE'] else: rastr = self.action.args.ccddata.header['RA'] decstr = self.action.args.ccddata.header['DEC'] except KeyError: try: rastr = self.action.args.ccddata.header['TARGRA'] decstr = self.action.args.ccddata.header['TARGDEC'] except KeyError: rastr = '' decstr = '' if len(rastr) > 0 and len(decstr) > 0: try: coord = SkyCoord(rastr, decstr, unit=(u.hourangle, u.deg)) except ValueError: coord = None else: coord = None # Get rotator position if 'ROTPOSN' in self.action.args.ccddata.header: rpos = self.action.args.ccddata.header['ROTPOSN'] else: rpos = 0. if 'ROTREFAN' in self.action.args.ccddata.header: rref = self.action.args.ccddata.header['ROTREFAN'] else: rref = 0. skypa = rpos + rref crota = math.radians(-(skypa + self.config.instrument.ROTOFF)) cdelt1 = -geom['slscl'] cdelt2 = geom['pxscl'] if coord is None: ra = 0. dec = 0. crota = 1 else: ra = coord.ra.degree dec = coord.dec.degree cd11 = cdelt1 * math.cos(crota) cd12 = abs(cdelt2) * np.sign(cdelt1) * math.sin(crota) cd21 = -abs(cdelt1) * np.sign(cdelt2) * math.sin(crota) cd22 = cdelt2 * math.cos(crota) crpix1 = 12. crpix2 = xsize / 2. crpix3 = 1. porg = self.action.args.ccddata.header['PONAME'] ifunum = self.action.args.ifunum if 'IFU' in porg: if ifunum == 1: off1 = 1.0 off2 = 4.0 elif ifunum == 2: off1 = 1.0 off2 = 5.0 elif ifunum == 3: off1 = 0.05 off2 = 5.6 else: self.logger.warning("Unknown IFU number: %d" % ifunum) off1 = 0. off2 = 0. off1 = off1 / float(self.action.args.xbinsize) off2 = off2 / float(self.action.args.ybinsize) crpix1 += off1 crpix2 += off2 # Update header # Geometry corrected? self.action.args.ccddata.header['GEOMCOR'] = ( True, 'Geometry corrected?') # # Spatial geometry self.action.args.ccddata.header['BARSEP'] = ( geom['barsep'], 'separation of bars (binned pix)') self.action.args.ccddata.header['BAR0'] = ( geom['bar0'], 'first bar pixel position') # Wavelength ranges if self.action.args.nasmask: self.action.args.ccddata.header['WAVALL0'] = ( geom['wavensall0'], 'Low inclusive wavelength') self.action.args.ccddata.header['WAVALL1'] = ( geom['wavensall1'], 'High inclusive wavelength') self.action.args.ccddata.header['WAVGOOD0'] = ( geom['wavensgood0'], 'Low good wavelength') self.action.args.ccddata.header['WAVGOOD1'] = ( geom['wavensgood1'], 'High good wavelength') self.action.args.ccddata.header['WAVMID'] = ( geom['wavensmid'], 'middle wavelength') else: self.action.args.ccddata.header['WAVALL0'] = ( geom['waveall0'], 'Low inclusive wavelength') self.action.args.ccddata.header['WAVALL1'] = ( geom['waveall1'], 'High inclusive wavelength') self.action.args.ccddata.header['WAVGOOD0'] = ( geom['wavegood0'], 'Low good wavelength') self.action.args.ccddata.header['WAVGOOD1'] = ( geom['wavegood1'], 'High good wavelength') self.action.args.ccddata.header['WAVMID'] = ( geom['wavemid'], 'middle wavelength') # Dichroic fraction try: dichroic_fraction = geom['dich_frac'] except AttributeError: dichroic_fraction = 1. self.action.args.ccddata.header['DICHFRAC'] = ( dichroic_fraction, 'Dichroic Fraction') # Wavelength fit statistics self.action.args.ccddata.header['AVWVSIG'] = ( geom['avwvsig'], 'Avg. bar wave sigma (Ang)') self.action.args.ccddata.header['SDWVSIG'] = ( geom['sdwvsig'], 'Stdev. var wave sigma (Ang)') # Pixel scales self.action.args.ccddata.header['PXSCL'] = ( geom['pxscl'], 'Pixel scale along slice (deg)') self.action.args.ccddata.header['SLSCL'] = ( geom['slscl'], 'Pixel scale perp. to slices (deg)') # Geometry origins self.action.args.ccddata.header['CBARSNO'] = ( geom['cbarsno'], 'Continuum bars image number') self.action.args.ccddata.header['CBARSFL'] = ( geom['cbarsfl'], 'Continuum bars image filename') self.action.args.ccddata.header['ARCNO'] = ( geom['arcno'], 'Arc image number') self.action.args.ccddata.header['ARCFL'] = ( geom['arcfl'], 'Arc image filename') self.action.args.ccddata.header['GEOMFL'] = ( geom_file.split('/')[-1], 'Geometry file') # WCS self.action.args.ccddata.header['IFUPA'] = ( skypa, 'IFU position angle (degrees)') self.action.args.ccddata.header['IFUROFF'] = ( self.config.instrument.ROTOFF, 'IFU-SKYPA offset (degrees)') self.action.args.ccddata.header['WCSDIM'] = ( 3, 'number of dimensions in WCS') self.action.args.ccddata.header['WCSNAME'] = 'KCWI' self.action.args.ccddata.header['EQUINOX'] = 2000. self.action.args.ccddata.header['RADESYS'] = 'FK5' self.action.args.ccddata.header['CTYPE1'] = 'RA---TAN' self.action.args.ccddata.header['CTYPE2'] = 'DEC--TAN' self.action.args.ccddata.header['CTYPE3'] = ('AWAV', 'Air Wavelengths') self.action.args.ccddata.header['CUNIT1'] = ('deg', 'RA units') self.action.args.ccddata.header['CUNIT2'] = ('deg', 'DEC units') self.action.args.ccddata.header['CUNIT3'] = ('Angstrom', 'Wavelength units') self.action.args.ccddata.header['CNAME1'] = ('KCWI RA', 'RA name') self.action.args.ccddata.header['CNAME2'] = ('KCWI DEC', 'DEC name') self.action.args.ccddata.header['CNAME3'] = ('KCWI Wavelength', 'Wavelength name') self.action.args.ccddata.header['CRVAL1'] = (ra, 'RA zeropoint') self.action.args.ccddata.header['CRVAL2'] = (dec, 'DEC zeropoint') self.action.args.ccddata.header['CRVAL3'] = (geom['wave0out'], 'Wavelength zeropoint') self.action.args.ccddata.header['CRPIX1'] = (crpix1, 'RA reference pixel') self.action.args.ccddata.header['CRPIX2'] = (crpix2, 'DEC reference pixel') self.action.args.ccddata.header['CRPIX3'] = ( crpix3, 'Wavelength reference pixel') self.action.args.ccddata.header['CD1_1'] = ( cd11, 'RA degrees per column pixel') self.action.args.ccddata.header['CD2_1'] = ( cd21, 'DEC degrees per column pixel') self.action.args.ccddata.header['CD1_2'] = ( cd12, 'RA degrees per row pixel') self.action.args.ccddata.header['CD2_2'] = ( cd22, 'DEC degrees per row pixel') self.action.args.ccddata.header['CD3_3'] = ( geom['dwout'], 'Wavelength Angstroms per pixel') self.action.args.ccddata.header['LONPOLE'] = ( 180.0, 'Native longitude of Celestial pole') self.action.args.ccddata.header['LATPOLE'] = ( 0.0, 'Native latitude of Celestial pole') # write out cube self.action.args.ccddata.header['HISTORY'] = log_string self.action.args.ccddata.data = out_cube self.action.args.ccddata.uncertainty.array = out_uube self.action.args.ccddata.mask = out_mube self.action.args.ccddata.flags = out_fube if data_nsk is not None: self.action.args.ccddata.noskysub = out_kube # write out int image kcwi_fits_writer(self.action.args.ccddata, table=self.action.args.table, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="icube") self.context.proctab.update_proctab(frame=self.action.args.ccddata, suffix="icube", filename=self.action.args.name) self.context.proctab.write_proctab( tfil=self.config.instrument.procfile) # check for obj, sky outputs if obj is not None: out_obj = CCDData(out_oube, meta=self.action.args.ccddata.header, unit=self.action.args.ccddata.unit) kcwi_fits_writer( out_obj, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="ocube") if sky is not None: out_sky = CCDData(out_sube, meta=self.action.args.ccddata.header, unit=self.action.args.ccddata.unit) kcwi_fits_writer( out_sky, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="scube") # check for dew outputs if dew is not None: out_dew = CCDData(out_dube, meta=self.action.args.ccddata.header, unit=dew.unit) kcwi_fits_writer( out_dew, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="dcube") else: self.logger.error("Geometry file not found: %s" % geom_file) self.logger.info(log_string) return self.action.args
# END: class MakeCube()