import os
import numpy as np
from scipy.interpolate import interp1d
from astropy import units as u
from astropy.coordinates import SkyCoord, EarthLocation
from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer, \
kcwi_fits_reader, \
strip_fname
[docs]class WavelengthCorrections(BasePrimitive):
"""
Perform wavelength corrections
Based on the type specified in the kcwidrp/configs/kcwi.cfg file in the
``radial_velocity_correction`` parameter, perform the requested correction.
The current options are "heliocentric", "barycentric", and "none". The
default is "heliocentric".
Also, if the ``air_to_vacuum`` parameter is set to ``True`` (default),
apply air-to-vacuum correction.
"""
def __init__(self, action, context):
BasePrimitive.__init__(self, action, context)
self.logger = context.pipeline_logger
def _pre_condition(self):
"""
Checks if we can correct wavelengths based on the processing table
"""
self.logger.info("Checking precondition for WavelengthCorrections")
precondition = False
suffix = 'icube'
obj = self.locate_object_file(suffix)
if obj:
precondition = True
else:
self.logger.error("Precondition for WavelengthCorrections failed!")
return precondition
def _perform(self):
# Determine which radial velocity correction to make
correction_mode = self.config.instrument.radial_velocity_correction
options = ["none", "barycentric", "heliocentric"]
# If the config file has an invalid option, return
if not bool([el for el in options if el in correction_mode]):
self.logger.error('Bad config option for radial_velocity_correction\
, options are ["none", "heliocentric", "barycentric"]')
return self.action.args
suffix = 'icube' # Can be ammended to handle ocube files
obj = self.locate_object_file(suffix)
if "none" in correction_mode:
self.logger.info("Skipping radial velocity correction")
obj.header['VCORR'] = (0.0, 'km/s')
obj.header['VCORRTYP'] = (correction_mode, 'Vcorr type')
else:
self.logger.info(f"Performing {correction_mode} correction")
obj = self.heliocentric(obj, correction_mode)
if self.config.instrument.air_to_vacuum:
self.logger.info("Performing Air to Vacuum Conversion")
obj = self.air2vac(obj)
log_string = WavelengthCorrections.__module__
obj.header['HISTORY'] = log_string
kcwi_fits_writer(obj,
table=self.action.args.table,
output_file=self.action.args.name,
output_dir=self.config.instrument.output_directory,
suffix=f'{suffix}w')
self.context.proctab.update_proctab(frame=self.action.args.ccddata,
suffix=f'{suffix}w',
filename=self.action.args.name)
self.context.proctab.write_proctab(tfil=self.config.instrument.procfile)
# Unsure here: Is this right? it seems to make DAR happy
self.action.args.ccddata = obj
return self.action.args
[docs] def air2vac(self, obj, mask=False):
"""
Convert wavelengths in a cube from standard air to vacuum.
Args:
obj (astropy HDU / HDUList): Input HDU/HDUList with 3D data.
mask (bool): Set if the cube is a mask cube.
:returns:
HDU / HDUList: Trimmed FITS object with updated header.
Return type matches type of obj argument.
"""
cube = np.nan_to_num(obj.data, nan=0, posinf=0, neginf=0)
if obj.header['CTYPE3'] == 'WAVE':
self.logger.warn("FITS already in vacuum wavelength.")
return
wave_air = self.get_wav_axis(obj.header) * u.AA
wave_vac = self.a2v_conversion(wave_air)
cwave_air = wave_air[int(wave_air.shape[0] / 2)]
cwave_vac = wave_vac[int(wave_vac.shape[0] / 2)]
self.logger.info("Air to Vacuum for (%.3f) gives %.3f" %
(cwave_air.value, cwave_vac.value))
# resample to uniform grid
cube_new = np.zeros_like(cube)
for i in range(cube.shape[2]):
for j in range(cube.shape[1]):
spec0 = cube[:, j, i]
if not mask:
f_cubic = interp1d(
wave_vac,
spec0,
kind='cubic',
fill_value='extrapolate'
)
spec_new = f_cubic(wave_air)
else:
f_pre = interp1d(
wave_vac,
spec0,
kind='previous',
bounds_error=False,
fill_value=128
)
spec_pre = f_pre(wave_air)
f_nex = interp1d(
wave_vac,
spec0,
kind='next',
bounds_error=False,
fill_value=128
)
spec_nex = f_nex(wave_air)
spec_new = np.zeros_like(spec0)
for k in range(spec0.shape[0]):
spec_new[k] = max(spec_pre[k], spec_nex[k])
cube_new[:, j, i] = spec_new
obj.header['CTYPE3'] = ('WAVE', 'Vacuum Wavelengths')
obj.data = cube_new
return obj
[docs] def a2v_conversion(self, wave):
"""
Convert air-based wavelengths to vacuum
Adapted from wave.py in: https://github.com/pypeit/PypeIt/
Formula from https://ui.adsabs.harvard.edu/abs/1996ApOpt..35.1566C/
Args:
wave (ndarray): Wavelengths
Returns:
Wavelength array corrected to vacuum wavelengths
"""
# Convert to AA
wave = wave.to(u.AA)
wavelength = wave.value
# Standard conversion format
sigma_sq = (1.e4/wavelength)**2. # wavenumber squared
factor = 1 + (5.792105e-2/(238.0185-sigma_sq)) + \
(1.67918e-3/(57.362-sigma_sq))
rind = factor[int(factor.shape[0] / 2)]
rwav = wavelength[int(wavelength.shape[0] / 2)]
self.logger.info("Refractive index = %.10f at %.3f Ang" % (rind, rwav))
# only modify above 2000A
factor = factor*(wavelength >= 2000.) + 1.*(wavelength < 2000.)
# Convert
wavelength = wavelength*factor
# Units
new_wave = wavelength*u.AA
new_wave.to(wave.unit)
return new_wave
[docs] def heliocentric(self, obj, correction_mode, mask=False, resample=True,
vcorr=None):
"""
Apply heliocentric correction to the cubes.
*Note that this only works for KCWI data because the location of
Keck Observatory is hard-coded in the function.*
Adapted from https://github.com/dbosul/cwitools.git
Args:
obj (astropy HDU / HDUList): Input HDU/HDUList with 3D data.
correction_mode (str): "none", "barycentric", or "heliocentric"
mask (bool): Set if the cube is a mask cube. This only works for
resampled cubes.
resample (bool): Resample the cube to the original wavelength grid?
vcorr (float): Use a different correction velocity.
Returns:
HDU / HDUList: Trimmed FITS object with updated header.
vcorr (float): (if vcorr is True) Correction velocity in km/s.
Return type matches type of fits_in argument.
Examples:
To apply heliocentric correction,
>>> hdu_new = heliocentric(hdu_old)
However, this resamples the wavelengths back to the original grid.
To use the new grid without resampling the data,
>>> hdu_new = heliocentric(hdu_old, resample=False)
"""
barycentric = ("barycentric" in correction_mode)
cube = np.nan_to_num(obj.data,
nan=0, posinf=0, neginf=0)
v_old = 0.
if 'VCORR' in obj.header:
v_old = obj.header['VCORR']
self.logger.info("Rolling back the existing correction with:")
self.logger.info("Vcorr = %.2f km/s." % v_old)
if vcorr is None:
targ = SkyCoord(
obj.header['TARGRA'],
obj.header['TARGDEC'],
unit='deg',
obstime=obj.header['DATE-BEG']
)
lat = self.config.instrument.latitude
lon = self.config.instrument.longitude
alt = self.config.instrument.altitude
keck = EarthLocation.from_geodetic(lat=lat, lon=lon, height=alt)
if barycentric:
vcorr = targ.radial_velocity_correction(
kind='barycentric', location=keck)
else:
vcorr = targ.radial_velocity_correction(
kind='heliocentric', location=keck)
vcorr = vcorr.to('km/s').value
self.logger.info("Helio/Barycentric correction:")
self.logger.info("Vcorr = %.2f km/s." % vcorr)
v_tot = vcorr-v_old
if not resample:
obj.header['CRVAL3'] *= (1 + v_tot / 2.99792458e5)
obj.header['CD3_3'] *= (1 + v_tot / 2.99792458e5)
obj.header['VCORR'] = (vcorr, 'km/s')
obj.header['VCORRTYP'] = (correction_mode, 'Vcorr type')
return obj
wav_old = self.get_wav_axis(obj.header)
wav_hel = wav_old * (1 + v_tot / 2.99792458e5)
cwave_hel = self.action.args.cwave * (1 + v_tot / 2.99792458e5)
self.logger.info("Vcorr for CWAVE (%.3f) gives %.3f" %
(self.action.args.cwave, cwave_hel))
# resample to uniform grid
self.logger.info("Resampling to uniform grid")
cube_new = np.zeros_like(cube)
for i in range(cube.shape[2]):
for j in range(cube.shape[1]):
spc0 = cube[:, j, i]
if not mask:
f_cubic = interp1d(wav_hel, spc0, kind='cubic',
fill_value='extrapolate')
spec_new = f_cubic(wav_old)
else:
f_pre = interp1d(wav_hel, spc0, kind='previous',
bounds_error=False, fill_value=128)
spec_pre = f_pre(wav_old)
f_nex = interp1d(wav_hel, spc0, kind='next',
bounds_error=False, fill_value=128)
spec_nex = f_nex(wav_old)
spec_new = np.zeros_like(spc0)
for k in range(spc0.shape[0]):
spec_new[k] = max(spec_pre[k], spec_nex[k])
cube_new[:, j, i] = spec_new
obj.header['VCORR'] = (vcorr, 'km/s')
obj.header['VCORRTYP'] = (correction_mode, 'Vcorr type')
obj.data = cube_new
return obj
[docs] def get_wav_axis(self, header):
"""Returns a NumPy array representing the wavelength axis of a cube.
Adapted from https://github.com/dbosul/cwitools.git
Args:
header (astropy.io.fits.Header): header that contains wavelength
or velocity axis that is specified in 'CTYPE' keywords in any
dimension.
Returns:
numpy.ndarray: Wavelength axis for this data.
"""
# Select the appropriate axis.
naxis = header['NAXIS']
axis = None
for i in range(naxis):
# Keyword entry
card = "CTYPE{0}".format(i + 1)
if card not in header:
self.logger.warning.error(
"Header must contain 'CTYPE' keywords.")
# Possible wave types.
if header[card] in ['AWAV', 'WAVE', 'VELO']:
axis = i + 1
break
# No wavelength axis
if axis is None:
self.logger.error("Header must contain a wavelength/velocity axis.")
retval = None
else:
# Get keywords defining wavelength axis
nwav = header["NAXIS{0}".format(axis)]
wav0 = header["CRVAL{0}".format(axis)]
dwav = header["CD{0}_{0}".format(axis)]
pix0 = header["CRPIX{0}".format(axis)]
# Calculate and return
retval = np.array([wav0 + (i - pix0) * dwav for i in range(nwav)])
return retval
[docs] def locate_object_file(self, suffix):
"""
Return FITS HDU list if current file with requested suffix can be found.
Will return ``None`` if file cannot be found.
Args:
suffix (str): The file suffix that you want to operate on.
Returns:
FITS HDU List from kcwi_fits_reader routine, or ``None`` if file
with suffix not found.
"""
ofn = self.action.args.name
objfn = strip_fname(ofn) + f'_{suffix}.fits'
full_path = os.path.join(
self.config.instrument.cwd,
self.config.instrument.output_directory, objfn)
if os.path.exists(full_path):
return kcwi_fits_reader(full_path)[0]
else:
self.logger.error(f'Unable to read file {objfn}')
return None