Source code for kcwidrp.primitives.CorrectDar

from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.core.kcwi_get_std import kcwi_get_std
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer, \
    kcwi_fits_reader, strip_fname

import numpy as np
from scipy.ndimage import shift
from astropy.nddata import CCDData
import math
import ref_index
import os


[docs]def atm_disper(w0, w1, airmass, temperature=10.0, pressure_pa=61100.0, humidity=50.0, co2=400.0): """ Calculate atmospheric dispersion at w1 relative to w0 Args: w0 (float): reference wavelength (Angstroms) w1 (float): offset wavelength (Angstroms) airmass (float): unitless airmass temperature (float): atmospheric temperature (C) pressure_pa (float): atmospheric pressure (Pa) humidity (float): relative humidity (%) co2 (float): Carbon-Dioxide (mu-mole/mole) """ # Calculate z = math.acos(1.0/airmass) n0 = ref_index.ciddor(wave=w0/10., t=temperature, p=pressure_pa, rh=humidity, co2=co2) n1 = ref_index.ciddor(wave=w1/10., t=temperature, p=pressure_pa, rh=humidity, co2=co2) return 206265.0 * (n0 - n1) * math.tan(z)
[docs]class CorrectDar(BasePrimitive): """ Correct for Differential Atmospheric Refraction Accounts for rotator orientation, zenith angle, and parallactic angle to correct input data cube into a padded, DAR corrected output cube. Calculates the DAR correction for each wavelength slice and adjusts position in cube using scipy.nddata.shift to implement correction. Sets flags in padded region of output cube to a value of 128. Also corrects delta wavelength cube if it is a standard star observation. """ def __init__(self, action, context): BasePrimitive.__init__(self, action, context) self.logger = context.pipeline_logger def _pre_condition(self): """Checks if DAR correction is appropriate""" self.logger.info("Checking precondition for CorrectDar") # Check image if 'GEOMCOR' not in self.action.args.ccddata.header: self.logger.error("Can only correct DAR on geometrically corrected " "images") self.action.args.ccddata.header['DARCOR'] = (False, 'DAR corrected?') return False else: if not self.action.args.ccddata.header['GEOMCOR']: self.logger.error( "Can only correct DAR on geometrically corrected " "images") self.action.args.ccddata.header['DARCOR'] = (False, 'DAR corrected?') return False else: return True def _perform(self): """Correct for differential atmospheric refraction""" self.logger.info("Correcting for DAR") # image size image_size = self.action.args.ccddata.data.shape # get wavelengths w0 = self.action.args.ccddata.header['CRVAL3'] dw = self.action.args.ccddata.header['CD3_3'] waves = w0 + np.arange(image_size[0]) * dw wgoo0 = self.action.args.ccddata.header['WAVGOOD0'] wgoo1 = self.action.args.ccddata.header['WAVGOOD1'] wref = self.action.args.ccddata.header['WAVMID'] self.logger.info("Ref WL = %.1f, good WL range = (%.1f - %.1f)" % (wref, wgoo0, wgoo1)) # spatial scales in arcsec/item y_scale = self.action.args.ccddata.header['PXSCL'] * 3600. x_scale = self.action.args.ccddata.header['SLSCL'] * 3600. # padding depends on grating if 'H' in self.action.args.grating: padding_as = 2.0 elif 'M' in self.action.args.grating: padding_as = 3.0 else: padding_as = 4.0 padding_x = int(padding_as / x_scale) padding_y = int(padding_as / y_scale) # update WCS crpix1 = self.action.args.ccddata.header['CRPIX1'] crpix2 = self.action.args.ccddata.header['CRPIX2'] self.action.args.ccddata.header['CRPIX1'] = crpix1 + float(padding_x) self.action.args.ccddata.header['CRPIX2'] = crpix2 + float(padding_y) # airmass airmass = self.action.args.ccddata.header['AIRMASS'] self.logger.info("Airmass: %.3f" % airmass) # IFU orientation ifu_pa = self.action.args.ccddata.header['IFUPA'] # Parallactic angle parallactic_angle = self.action.args.ccddata.header['PARANG'] # Projection angle in radians projection_angle_deg = ifu_pa - parallactic_angle projection_angle = math.radians(projection_angle_deg) self.logger.info("DAR Angles: ifu_pa, parang, projang (deg): " "%.2f, %.2f, %.2f" % (ifu_pa, parallactic_angle, projection_angle_deg)) # dispersion over goo wl range in arcsec dispersion_max_as = atm_disper(wgoo1, wgoo0, airmass) # projected onto IFU xdmax_as = dispersion_max_as * math.sin(projection_angle) ydmax_as = dispersion_max_as * math.cos(projection_angle) self.logger.info("DAR over GOOD WL range: total, x, y (asec): " "%.2f, %.2f, %.2f" % (dispersion_max_as, xdmax_as, ydmax_as)) # now in pixels xdmax_px = xdmax_as / x_scale ydmax_px = ydmax_as / y_scale dmax_px = math.sqrt(xdmax_px**2 + ydmax_px**2) self.logger.info("DAR over GOOD WL range: total, x, y (pix): " "%.2f, %.2f, %.2f" % (dmax_px, xdmax_px, ydmax_px)) # prepare output cubes output_image = np.zeros((image_size[0], image_size[1]+2*padding_y, image_size[2]+2*padding_x), dtype=np.float64) output_stddev = output_image.copy() output_mask = np.zeros((image_size[0], image_size[1]+2*padding_y, image_size[2]+2*padding_x), dtype=np.uint8) output_flags = np.zeros((image_size[0], image_size[1] + 2*padding_y, image_size[2] + 2 * padding_x), dtype=np.uint8) if self.action.args.ccddata.noskysub is not None: output_noskysub = np.zeros((image_size[0], image_size[1] + 2*padding_y, image_size[2] + 2 * padding_x), dtype=np.float64) else: output_noskysub = None # DAR padded pixel flag output_flags += 128 output_image[:, padding_y:(padding_y+image_size[1]), padding_x:(padding_x+image_size[2])] = \ self.action.args.ccddata.data output_stddev[:, padding_y:(padding_y+image_size[1]), padding_x:(padding_x+image_size[2])] = \ self.action.args.ccddata.uncertainty.array output_mask[:, padding_y:(padding_y+image_size[1]), padding_x:(padding_x+image_size[2])] = \ self.action.args.ccddata.mask output_flags[:, padding_y:(padding_y+image_size[1]), padding_x:(padding_x+image_size[2])] = \ self.action.args.ccddata.flags if output_noskysub is not None: output_noskysub[:, padding_y:(padding_y + image_size[1]), padding_x:(padding_x + image_size[2])] = \ self.action.args.ccddata.noskysub # check for obj, sky cubes output_obj = None output_sky = None if self.action.args.nasmask and self.action.args.numopen > 1: ofn = self.action.args.name objfn = strip_fname(ofn) + '_ocube.fits' full_path = os.path.join( self.config.instrument.cwd, self.config.instrument.output_directory, objfn) if os.path.exists(full_path): obj = kcwi_fits_reader(full_path)[0] output_obj = np.zeros( (image_size[0], image_size[1] + 2 * padding_y, image_size[2] + 2 * padding_x), dtype=np.float64) output_obj[:, padding_y:(padding_y + image_size[1]), padding_x:(padding_x + image_size[2])] = obj.data skyfn = strip_fname(ofn) + '_scube.fits' full_path = os.path.join( self.config.instrument.cwd, self.config.instrument.output_directory, skyfn) if os.path.exists(full_path): sky = kcwi_fits_reader(full_path)[0] output_sky = np.zeros( (image_size[0], image_size[1] + 2 * padding_y, image_size[2] + 2 * padding_x), dtype=np.float64) output_sky[:, padding_y:(padding_y + image_size[1]), padding_x:(padding_x + image_size[2])] = sky.data # check if we have a standard star observation output_del = None stdfile, _ = kcwi_get_std(self.action.args.ccddata.header['OBJECT'], self.logger) if stdfile is not None: afn = self.action.args.ccddata.header['ARCFL'] delfn = strip_fname(afn) + '_dcube.fits' full_path = os.path.join( self.config.instrument.cwd, self.config.instrument.output_directory, delfn) if os.path.exists(full_path): dew = kcwi_fits_reader(full_path)[0] output_del = np.zeros( (image_size[0], image_size[1] + 2 * padding_y, image_size[2] + 2 * padding_x), dtype=np.float64) output_del[:, padding_y:(padding_y + image_size[1]), padding_x:(padding_x + image_size[2])] = dew.data # Perform correction for j, wl in enumerate(waves): dispersion_correction = atm_disper(wref, wl, airmass) x_shift = dispersion_correction * \ math.sin(projection_angle) / x_scale y_shift = dispersion_correction * \ math.cos(projection_angle) / y_scale output_image[j, :, :] = shift(output_image[j, :, :], (y_shift, x_shift)) output_stddev[j, :, :] = shift(output_stddev[j, :, :], (y_shift, x_shift)) output_mask[j, :, :] = shift(output_mask[j, :, :], (y_shift, x_shift)) output_flags[j, :, :] = shift(output_flags[j, :, :], (y_shift, x_shift)) if output_noskysub is not None: output_noskysub[j, :, :] = shift(output_noskysub[j, :, :], (y_shift, x_shift)) # for obj, sky if they exist if output_obj is not None: for j, wl in enumerate(waves): dispersion_correction = atm_disper(wref, wl, airmass) x_shift = dispersion_correction * \ math.sin(projection_angle) / x_scale y_shift = dispersion_correction * \ math.cos(projection_angle) / y_scale output_obj[j, :, :] = shift(output_obj[j, :, :], (y_shift, x_shift)) if output_sky is not None: for j, wl in enumerate(waves): dispersion_correction = atm_disper(wref, wl, airmass) x_shift = dispersion_correction * \ math.sin(projection_angle) / x_scale y_shift = dispersion_correction * \ math.cos(projection_angle) / y_scale output_sky[j, :, :] = shift(output_sky[j, :, :], (y_shift, x_shift)) # for delta wavelength cube, if it exists if output_del is not None: for j, wl in enumerate(waves): dispersion_correction = atm_disper(wref, wl, airmass) x_shift = dispersion_correction * \ math.sin(projection_angle) / x_scale y_shift = dispersion_correction * \ math.cos(projection_angle) / y_scale output_del[j, :, :] = shift(output_del[j, :, :], (y_shift, x_shift)) self.action.args.ccddata.data = output_image self.action.args.ccddata.uncertainty.array = output_stddev self.action.args.ccddata.mask = output_mask self.action.args.ccddata.flags = output_flags self.action.args.ccddata.noskysub = output_noskysub log_string = CorrectDar.__module__ # update header self.action.args.ccddata.header['HISTORY'] = log_string self.action.args.ccddata.header['DARCOR'] = (True, 'DAR corrected?') self.action.args.ccddata.header['DARANG'] = (projection_angle_deg, 'DAR projection angle ' '(deg)') self.action.args.ccddata.header['DARPADX'] = (padding_x, 'DAR X padding (pix)') self.action.args.ccddata.header['DARPADY'] = (padding_y, 'DAR Y padding (pix)') self.action.args.ccddata.header['DAREFWL'] = (wref, 'DAR reference wl (Ang)') # write out corrected image kcwi_fits_writer(self.action.args.ccddata, table=self.action.args.table, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="icubed") self.context.proctab.update_proctab(frame=self.action.args.ccddata, suffix="icubed", filename=self.action.args.name) self.context.proctab.write_proctab(tfil=self.config.instrument.procfile) # check for sky, obj cube if output_obj is not None: out_obj = CCDData(output_obj, meta=self.action.args.ccddata.header, unit=self.action.args.ccddata.unit) kcwi_fits_writer( out_obj, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="ocubed") if output_sky is not None: out_sky = CCDData(output_sky, meta=self.action.args.ccddata.header, unit=self.action.args.ccddata.unit) kcwi_fits_writer( out_sky, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="scubed") # check for delta wave cube if output_del is not None: out_del = CCDData(output_del, meta=self.action.args.ccddata.header, unit=self.action.args.ccddata.unit) kcwi_fits_writer( out_del, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="dcubed") self.logger.info(log_string) return self.action.args
# END: class CorrectDar()