from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.core.kcwi_get_std import kcwi_get_std
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer, \
kcwi_fits_reader, strip_fname
import numpy as np
from scipy.ndimage import shift
from astropy.nddata import CCDData
import math
import ref_index
import os
[docs]def atm_disper(w0, w1, airmass, temperature=10.0, pressure_pa=61100.0,
humidity=50.0, co2=400.0):
"""Calculate atmospheric dispersion at w1 relative to w0
Args:
w0 (float): reference wavelength (Angstroms)
w1 (float): offset wavelength (Angstroms)
airmass (float): unitless airmass
temperature (float): atmospheric temperature (C)
pressure_pa (float): atmospheric pressure (Pa)
humidity (float): relative humidity (%)
co2 (float): Carbon-Dioxide (mu-mole/mole)
"""
# Calculate
z = math.acos(1.0/airmass)
n0 = ref_index.ciddor(wave=w0/10., t=temperature, p=pressure_pa,
rh=humidity, co2=co2)
n1 = ref_index.ciddor(wave=w1/10., t=temperature, p=pressure_pa,
rh=humidity, co2=co2)
return 206265.0 * (n0 - n1) * math.tan(z)
[docs]class CorrectDar(BasePrimitive):
"""Correct for Differential Atmospheric Refraction"""
def __init__(self, action, context):
BasePrimitive.__init__(self, action, context)
self.logger = context.pipeline_logger
def _pre_condition(self):
"""Checks if DAR correction is appropriate"""
self.logger.info("Checking precondition for CorrectDar")
# Check image
if 'GEOMCOR' not in self.action.args.ccddata.header:
self.logger.error("Can only correct DAR on geometrically corrected "
"images")
self.action.args.ccddata.header['DARCOR'] = (False,
'DAR corrected?')
return False
else:
if not self.action.args.ccddata.header['GEOMCOR']:
self.logger.error(
"Can only correct DAR on geometrically corrected "
"images")
self.action.args.ccddata.header['DARCOR'] = (False,
'DAR corrected?')
return False
else:
return True
def _perform(self):
"""Correct for differential atmospheric refraction"""
self.logger.info("Correcting for DAR")
# image size
image_size = self.action.args.ccddata.data.shape
# get wavelengths
w0 = self.action.args.ccddata.header['CRVAL3']
dw = self.action.args.ccddata.header['CD3_3']
waves = w0 + np.arange(image_size[0]) * dw
wgoo0 = self.action.args.ccddata.header['WAVGOOD0']
wgoo1 = self.action.args.ccddata.header['WAVGOOD1']
wref = self.action.args.ccddata.header['WAVMID']
self.logger.info("Ref WL = %.1f, good WL range = (%.1f - %.1f" %
(wref, wgoo0, wgoo1))
# spatial scales in arcsec/item
y_scale = self.action.args.ccddata.header['PXSCL'] * 3600.
x_scale = self.action.args.ccddata.header['SLSCL'] * 3600.
# padding depends on grating
if 'H' in self.action.args.grating:
padding_as = 2.0
elif 'M' in self.action.args.grating:
padding_as = 3.0
else:
padding_as = 4.0
padding_x = int(padding_as / x_scale)
padding_y = int(padding_as / y_scale)
# update WCS
crpix1 = self.action.args.ccddata.header['CRPIX1']
crpix2 = self.action.args.ccddata.header['CRPIX2']
self.action.args.ccddata.header['CRPIX1'] = crpix1 + float(padding_x)
self.action.args.ccddata.header['CRPIX2'] = crpix2 + float(padding_y)
# airmass
airmass = self.action.args.ccddata.header['AIRMASS']
self.logger.info("Airmass: %.3f" % airmass)
# IFU orientation
ifu_pa = self.action.args.ccddata.header['IFUPA']
# Parallactic angle
parallactic_angle = self.action.args.ccddata.header['PARANG']
# Projection angle in radians
projection_angle_deg = ifu_pa - parallactic_angle
projection_angle = math.radians(projection_angle_deg)
self.logger.info("DAR Angles: ifu_pa, parang, projang (deg): "
"%.2f, %.2f, %.2f" % (ifu_pa, parallactic_angle,
projection_angle_deg))
# dispersion over goo wl range in arcsec
dispersion_max_as = atm_disper(wgoo1, wgoo0, airmass)
# projected onto IFU
xdmax_as = dispersion_max_as * math.sin(projection_angle)
ydmax_as = dispersion_max_as * math.cos(projection_angle)
self.logger.info("DAR over GOOD WL range: total, x, y (asec): "
"%.2f, %.2f, %.2f" % (dispersion_max_as, xdmax_as,
ydmax_as))
# now in pixels
xdmax_px = xdmax_as / x_scale
ydmax_px = ydmax_as / y_scale
dmax_px = math.sqrt(xdmax_px**2 + ydmax_px**2)
self.logger.info("DAR over GOOD WL range: total, x, y (pix): "
"%.2f, %.2f, %.2f" % (dmax_px, xdmax_px, ydmax_px))
# prepare output cubes
output_image = np.zeros((image_size[0], image_size[1]+2*padding_y,
image_size[2]+2*padding_x), dtype=np.float64)
output_stddev = output_image.copy()
output_mask = np.zeros((image_size[0], image_size[1]+2*padding_y,
image_size[2]+2*padding_x), dtype=np.uint8)
output_flags = np.zeros((image_size[0], image_size[1] + 2 * padding_y,
image_size[2] + 2 * padding_x), dtype=np.uint8)
# DAR padded pixel flag
output_flags += 128
output_image[:, padding_y:(padding_y+image_size[1]),
padding_x:(padding_x+image_size[2])] = \
self.action.args.ccddata.data
output_stddev[:, padding_y:(padding_y+image_size[1]),
padding_x:(padding_x+image_size[2])] = \
self.action.args.ccddata.uncertainty.array
output_mask[:, padding_y:(padding_y+image_size[1]),
padding_x:(padding_x+image_size[2])] = \
self.action.args.ccddata.mask
output_flags[:, padding_y:(padding_y+image_size[1]),
padding_x:(padding_x+image_size[2])] = \
self.action.args.ccddata.flags
# check for obj, sky cubes
output_obj = None
output_sky = None
if self.action.args.nasmask and self.action.args.numopen > 1:
ofn = self.action.args.name
objfn = strip_fname(ofn) + '_ocube.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, objfn)
if os.path.exists(full_path):
obj = kcwi_fits_reader(full_path)[0]
output_obj = np.zeros(
(image_size[0], image_size[1] + 2 * padding_y,
image_size[2] + 2 * padding_x), dtype=np.float64)
output_obj[:, padding_y:(padding_y + image_size[1]),
padding_x:(padding_x + image_size[2])] = obj.data
skyfn = strip_fname(ofn) + '_scube.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, skyfn)
if os.path.exists(full_path):
sky = kcwi_fits_reader(full_path)[0]
output_sky = np.zeros(
(image_size[0], image_size[1] + 2 * padding_y,
image_size[2] + 2 * padding_x), dtype=np.float64)
output_sky[:, padding_y:(padding_y + image_size[1]),
padding_x:(padding_x + image_size[2])] = sky.data
# check if we have a standard star observation
output_del = None
stdfile, _ = kcwi_get_std(self.action.args.ccddata.header['OBJECT'],
self.logger)
if stdfile is not None:
afn = self.action.args.ccddata.header['ARCFL']
delfn = strip_fname(afn) + '_dcube.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, delfn)
if os.path.exists(full_path):
dew = kcwi_fits_reader(full_path)[0]
output_del = np.zeros(
(image_size[0], image_size[1] + 2 * padding_y,
image_size[2] + 2 * padding_x), dtype=np.float64)
output_del[:, padding_y:(padding_y + image_size[1]),
padding_x:(padding_x + image_size[2])] = dew.data
# Perform correction
for j, wl in enumerate(waves):
dispersion_correction = atm_disper(wref, wl, airmass)
x_shift = dispersion_correction * \
math.sin(projection_angle) / x_scale
y_shift = dispersion_correction * \
math.cos(projection_angle) / y_scale
output_image[j, :, :] = shift(output_image[j, :, :], (y_shift,
x_shift))
output_stddev[j, :, :] = shift(output_stddev[j, :, :], (y_shift,
x_shift))
output_mask[j, :, :] = shift(output_mask[j, :, :], (y_shift,
x_shift))
output_flags[j, :, :] = shift(output_flags[j, :, :], (y_shift,
x_shift))
# for obj, sky if they exist
if output_obj is not None:
for j, wl in enumerate(waves):
dispersion_correction = atm_disper(wref, wl, airmass)
x_shift = dispersion_correction * \
math.sin(projection_angle) / x_scale
y_shift = dispersion_correction * \
math.cos(projection_angle) / y_scale
output_obj[j, :, :] = shift(output_obj[j, :, :], (y_shift,
x_shift))
if output_sky is not None:
for j, wl in enumerate(waves):
dispersion_correction = atm_disper(wref, wl, airmass)
x_shift = dispersion_correction * \
math.sin(projection_angle) / x_scale
y_shift = dispersion_correction * \
math.cos(projection_angle) / y_scale
output_sky[j, :, :] = shift(output_sky[j, :, :], (y_shift,
x_shift))
# for delta wavelength cube, if it exists
if output_del is not None:
for j, wl in enumerate(waves):
dispersion_correction = atm_disper(wref, wl, airmass)
x_shift = dispersion_correction * \
math.sin(projection_angle) / x_scale
y_shift = dispersion_correction * \
math.cos(projection_angle) / y_scale
output_del[j, :, :] = shift(output_del[j, :, :], (y_shift,
x_shift))
self.action.args.ccddata.data = output_image
self.action.args.ccddata.uncertainty.array = output_stddev
self.action.args.ccddata.mask = output_mask
self.action.args.ccddata.flags = output_flags
log_string = CorrectDar.__module__
# update header
self.action.args.ccddata.header['HISTORY'] = log_string
self.action.args.ccddata.header['DARCOR'] = (True, 'DAR corrected?')
self.action.args.ccddata.header['DARANG'] = (projection_angle_deg,
'DAR projection angle '
'(deg)')
self.action.args.ccddata.header['DARPADX'] = (padding_x,
'DAR X padding (pix)')
self.action.args.ccddata.header['DARPADY'] = (padding_y,
'DAR Y padding (pix)')
self.action.args.ccddata.header['DAREFWL'] = (wref,
'DAR reference wl (Ang)')
# write out corrected image
kcwi_fits_writer(self.action.args.ccddata,
table=self.action.args.table,
output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="icubed")
self.context.proctab.update_proctab(frame=self.action.args.ccddata,
suffix="icubed",
filename=self.action.args.name)
self.context.proctab.write_proctab()
# check for sky, obj cube
if output_obj is not None:
out_obj = CCDData(output_obj,
meta=self.action.args.ccddata.header,
unit=self.action.args.ccddata.unit)
kcwi_fits_writer(
out_obj, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="ocubed")
if output_sky is not None:
out_sky = CCDData(output_sky,
meta=self.action.args.ccddata.header,
unit=self.action.args.ccddata.unit)
kcwi_fits_writer(
out_sky, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="scubed")
# check for delta wave cube
if output_del is not None:
out_del = CCDData(output_del,
meta=self.action.args.ccddata.header,
unit=self.action.args.ccddata.unit)
kcwi_fits_writer(
out_del, output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix="dcubed")
self.logger.info(log_string)
return self.action.args
# END: class CorrectDar()