from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.core.kcwi_get_std import kcwi_get_std
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer, \
kcwi_fits_reader, strip_fname
import numpy as np
from scipy.ndimage import shift
from astropy.nddata import CCDData
import math
import ref_index
import os
[docs]def atm_disper(w0, w1, airmass, temperature=10.0, pressure_pa=61100.0,
humidity=50.0, co2=400.0):
"""
Calculate atmospheric dispersion at w1 relative to w0
Args:
w0 (float): reference wavelength (Angstroms)
w1 (float): offset wavelength (Angstroms)
airmass (float): unitless airmass
temperature (float): atmospheric temperature (C)
pressure_pa (float): atmospheric pressure (Pa)
humidity (float): relative humidity (%)
co2 (float): Carbon-Dioxide (mu-mole/mole)
"""
# Calculate
z = math.acos(1.0/airmass)
n0 = ref_index.ciddor(wave=w0/10., t=temperature, p=pressure_pa,
rh=humidity, co2=co2)
n1 = ref_index.ciddor(wave=w1/10., t=temperature, p=pressure_pa,
rh=humidity, co2=co2)
return 206265.0 * (n0 - n1) * math.tan(z)
[docs]class CorrectDar(BasePrimitive):
"""
Correct for Differential Atmospheric Refraction
Accounts for rotator orientation, zenith angle, and parallactic angle to
correct input data cube into a padded, DAR corrected output cube.
Calculates the DAR correction for each wavelength slice and adjusts position
in cube using scipy.nddata.shift to implement correction.
Sets flags in padded region of output cube to a value of 128.
Also corrects delta wavelength cube if it is a standard star observation.
"""
def __init__(self, action, context):
BasePrimitive.__init__(self, action, context)
self.logger = context.pipeline_logger
def _pre_condition(self):
"""Checks if DAR correction is appropriate"""
self.logger.info("Checking precondition for CorrectDar")
# Check image
if 'GEOMCOR' not in self.action.args.ccddata.header:
self.logger.error("Can only correct DAR on geometrically corrected "
"images")
self.action.args.ccddata.header['DARCOR'] = (False,
'DAR corrected?')
return False
else:
if not self.action.args.ccddata.header['GEOMCOR']:
self.logger.error(
"Can only correct DAR on geometrically corrected "
"images")
self.action.args.ccddata.header['DARCOR'] = (False,
'DAR corrected?')
return False
else:
return True
def _perform(self):
"""Correct for differential atmospheric refraction"""
self.logger.info("Correcting for DAR")
# image size
image_size = self.action.args.ccddata.data.shape
# get wavelengths
w0 = self.action.args.ccddata.header['CRVAL3']
dw = self.action.args.ccddata.header['CD3_3']
waves = w0 + np.arange(image_size[0]) * dw
wgoo0 = self.action.args.ccddata.header['WAVGOOD0']
wgoo1 = self.action.args.ccddata.header['WAVGOOD1']
wref = self.action.args.ccddata.header['WAVMID']
self.logger.info("Ref WL = %.1f, good WL range = (%.1f - %.1f)" %
(wref, wgoo0, wgoo1))
# spatial scales in arcsec/item
y_scale = self.action.args.ccddata.header['PXSCL'] * 3600.
x_scale = self.action.args.ccddata.header['SLSCL'] * 3600.
# padding depends on grating
if 'H' in self.action.args.grating:
padding_as = 2.0
elif 'M' in self.action.args.grating:
padding_as = 3.0
else:
padding_as = 4.0
padding_x = int(padding_as / x_scale)
padding_y = int(padding_as / y_scale)
# update WCS
crpix1 = self.action.args.ccddata.header['CRPIX1']
crpix2 = self.action.args.ccddata.header['CRPIX2']
self.action.args.ccddata.header['CRPIX1'] = crpix1 + float(padding_x)
self.action.args.ccddata.header['CRPIX2'] = crpix2 + float(padding_y)
# airmass
airmass = self.action.args.ccddata.header['AIRMASS']
self.logger.info("Airmass: %.3f" % airmass)
# IFU orientation
ifu_pa = self.action.args.ccddata.header['IFUPA']
# Parallactic angle
parallactic_angle = self.action.args.ccddata.header['PARANG']
# Projection angle in radians
projection_angle_deg = ifu_pa - parallactic_angle
projection_angle = math.radians(projection_angle_deg)
self.logger.info("DAR Angles: ifu_pa, parang, projang (deg): "
"%.2f, %.2f, %.2f" % (ifu_pa, parallactic_angle,
projection_angle_deg))
# dispersion over goo wl range in arcsec
dispersion_max_as = atm_disper(wgoo1, wgoo0, airmass)
# projected onto IFU
xdmax_as = dispersion_max_as * math.sin(projection_angle)
ydmax_as = dispersion_max_as * math.cos(projection_angle)
self.logger.info("DAR over GOOD WL range: total, x, y (asec): "
"%.2f, %.2f, %.2f" % (dispersion_max_as, xdmax_as,
ydmax_as))
# now in pixels
xdmax_px = xdmax_as / x_scale
ydmax_px = ydmax_as / y_scale
dmax_px = math.sqrt(xdmax_px**2 + ydmax_px**2)
self.logger.info("DAR over GOOD WL range: total, x, y (pix): "
"%.2f, %.2f, %.2f" % (dmax_px, xdmax_px, ydmax_px))
# prepare output cubes
output_image = np.zeros((image_size[0], image_size[1]+2*padding_y,
image_size[2]+2*padding_x), dtype=np.float64)
output_stddev = output_image.copy()
output_mask = np.zeros((image_size[0], image_size[1]+2*padding_y,
image_size[2]+2*padding_x), dtype=np.uint8)
output_flags = np.zeros((image_size[0], image_size[1] + 2*padding_y,
image_size[2] + 2 * padding_x), dtype=np.uint8)
if self.action.args.ccddata.noskysub is not None:
output_noskysub = np.zeros((image_size[0],
image_size[1] + 2*padding_y,
image_size[2] + 2 * padding_x),
dtype=np.float64)
else:
output_noskysub = None
# DAR padded pixel flag
output_flags += 128
output_image[:, padding_y:(padding_y+image_size[1]),
padding_x:(padding_x+image_size[2])] = \
self.action.args.ccddata.data
output_stddev[:, padding_y:(padding_y+image_size[1]),
padding_x:(padding_x+image_size[2])] = \
self.action.args.ccddata.uncertainty.array
output_mask[:, padding_y:(padding_y+image_size[1]),
padding_x:(padding_x+image_size[2])] = \
self.action.args.ccddata.mask
output_flags[:, padding_y:(padding_y+image_size[1]),
padding_x:(padding_x+image_size[2])] = \
self.action.args.ccddata.flags
if output_noskysub is not None:
output_noskysub[:, padding_y:(padding_y + image_size[1]),
padding_x:(padding_x + image_size[2])] = \
self.action.args.ccddata.noskysub
# check for obj, sky cubes
output_obj = None
output_sky = None
if self.action.args.nasmask and self.action.args.numopen > 1:
ofn = self.action.args.name
objfn = strip_fname(ofn) + '_ocube.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, objfn)
if os.path.exists(full_path):
obj = kcwi_fits_reader(full_path)[0]
output_obj = np.zeros(
(image_size[0], image_size[1] + 2 * padding_y,
image_size[2] + 2 * padding_x), dtype=np.float64)
output_obj[:, padding_y:(padding_y + image_size[1]),
padding_x:(padding_x + image_size[2])] = obj.data
skyfn = strip_fname(ofn) + '_scube.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, skyfn)
if os.path.exists(full_path):
sky = kcwi_fits_reader(full_path)[0]
output_sky = np.zeros(
(image_size[0], image_size[1] + 2 * padding_y,
image_size[2] + 2 * padding_x), dtype=np.float64)
output_sky[:, padding_y:(padding_y + image_size[1]),
padding_x:(padding_x + image_size[2])] = sky.data
# check if we have a standard star observation
output_del = None
stdfile, _ = kcwi_get_std(self.action.args.ccddata.header['OBJECT'],
self.logger)
if stdfile is not None:
afn = self.action.args.ccddata.header['ARCFL']
delfn = strip_fname(afn) + '_dcube.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, delfn)
if os.path.exists(full_path):
dew = kcwi_fits_reader(full_path)[0]
output_del = np.zeros(
(image_size[0], image_size[1] + 2 * padding_y,
image_size[2] + 2 * padding_x), dtype=np.float64)
output_del[:, padding_y:(padding_y + image_size[1]),
padding_x:(padding_x + image_size[2])] = dew.data
# Perform correction
for j, wl in enumerate(waves):
dispersion_correction = atm_disper(wref, wl, airmass)
x_shift = dispersion_correction * \
math.sin(projection_angle) / x_scale
y_shift = dispersion_correction * \
math.cos(projection_angle) / y_scale
output_image[j, :, :] = shift(output_image[j, :, :],
(y_shift, x_shift))
output_stddev[j, :, :] = shift(output_stddev[j, :, :],
(y_shift, x_shift))
output_mask[j, :, :] = shift(output_mask[j, :, :],
(y_shift, x_shift))
output_flags[j, :, :] = shift(output_flags[j, :, :],
(y_shift, x_shift))
if output_noskysub is not None:
output_noskysub[j, :, :] = shift(output_noskysub[j, :, :],
(y_shift, x_shift))
# for obj, sky if they exist
if output_obj is not None:
for j, wl in enumerate(waves):
dispersion_correction = atm_disper(wref, wl, airmass)
x_shift = dispersion_correction * \
math.sin(projection_angle) / x_scale
y_shift = dispersion_correction * \
math.cos(projection_angle) / y_scale
output_obj[j, :, :] = shift(output_obj[j, :, :],
(y_shift, x_shift))
if output_sky is not None:
for j, wl in enumerate(waves):
dispersion_correction = atm_disper(wref, wl, airmass)
x_shift = dispersion_correction * \
math.sin(projection_angle) / x_scale
y_shift = dispersion_correction * \
math.cos(projection_angle) / y_scale
output_sky[j, :, :] = shift(output_sky[j, :, :],
(y_shift, x_shift))
# for delta wavelength cube, if it exists
if output_del is not None:
for j, wl in enumerate(waves):
dispersion_correction = atm_disper(wref, wl, airmass)
x_shift = dispersion_correction * \
math.sin(projection_angle) / x_scale
y_shift = dispersion_correction * \
math.cos(projection_angle) / y_scale
output_del[j, :, :] = shift(output_del[j, :, :],
(y_shift, x_shift))
self.action.args.ccddata.data = output_image
self.action.args.ccddata.uncertainty.array = output_stddev
self.action.args.ccddata.mask = output_mask
self.action.args.ccddata.flags = output_flags
self.action.args.ccddata.noskysub = output_noskysub
log_string = CorrectDar.__module__
# update header
self.action.args.ccddata.header['HISTORY'] = log_string
self.action.args.ccddata.header['DARCOR'] = (True, 'DAR corrected?')
self.action.args.ccddata.header['DARANG'] = (projection_angle_deg,
'DAR projection angle '
'(deg)')
self.action.args.ccddata.header['DARPADX'] = (padding_x,
'DAR X padding (pix)')
self.action.args.ccddata.header['DARPADY'] = (padding_y,
'DAR Y padding (pix)')
self.action.args.ccddata.header['DAREFWL'] = (wref,
'DAR reference wl (Ang)')
# write out corrected image
kcwi_fits_writer(self.action.args.ccddata,
table=self.action.args.table,
output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="icubed")
self.context.proctab.update_proctab(frame=self.action.args.ccddata,
suffix="icubed",
filename=self.action.args.name)
self.context.proctab.write_proctab(tfil=self.config.instrument.procfile)
# check for sky, obj cube
if output_obj is not None:
out_obj = CCDData(output_obj,
meta=self.action.args.ccddata.header,
unit=self.action.args.ccddata.unit)
kcwi_fits_writer(
out_obj, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="ocubed")
if output_sky is not None:
out_sky = CCDData(output_sky,
meta=self.action.args.ccddata.header,
unit=self.action.args.ccddata.unit)
kcwi_fits_writer(
out_sky, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="scubed")
# check for delta wave cube
if output_del is not None:
out_del = CCDData(output_del,
meta=self.action.args.ccddata.header,
unit=self.action.args.ccddata.unit)
kcwi_fits_writer(
out_del, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="dcubed")
self.logger.info(log_string)
return self.action.args
# END: class CorrectDar()