Source code for kcwidrp.primitives.FluxCalibrate

from keckdrpframework.primitives.base_primitive import BasePrimitive
from kcwidrp.primitives.kcwi_file_primitives import kcwi_fits_writer, \
    kcwi_fits_reader, get_master_name, strip_fname
from kcwidrp.core.kcwi_correct_extin import kcwi_correct_extin

import os
import numpy as np
from scipy.interpolate import interp1d

from astropy.io import fits as pf
from astropy import units as u
from astropy.nddata import CCDData


[docs]class FluxCalibrate(BasePrimitive): """ Perform flux calibration. Uses inverse sensitivity curve derived from MakeInvsens to flux calibrate input observation. """ def __init__(self, action, context): BasePrimitive.__init__(self, action, context) self.logger = context.pipeline_logger def _pre_condition(self): """ Checks if we can calibrate flux based on the processing table """ self.logger.info("Checking precondition for FluxCalibrate") target_type = 'INVSENS' tab = self.context.proctab.search_proctab( frame=self.action.args.ccddata, target_type=target_type, nearest=True) self.logger.info("pre condition got %d invsens files, expected >= 1" % len(tab)) if len(tab) <= 0: self.action.args.invsname = None return False else: self.action.args.invsname = get_master_name(tab, target_type) return True def _perform(self): # Header keyword to update key = 'STDCOR' keycom = 'std corrected?' target_type = 'INVSENS' obj = None sky = None self.logger.info("Calibrating object flux") tab = self.context.proctab.search_proctab( frame=self.action.args.ccddata, target_type=target_type, nearest=True) self.logger.info("%d invsens files found" % len(tab)) if self.action.args.invsname is not None: # read in master calibration (inverse sensitivity) invsname = self.action.args.invsname self.logger.info("Reading invsens: %s" % invsname) hdul = pf.open(os.path.join(self.config.instrument.cwd, 'redux', invsname)) mcal = hdul[0].data[1, :] mchdr = hdul[0].header hdul.close() # get dimensions mcsz = mcal.shape # get master std waves mcw0 = mchdr['CRVAL1'] mcdw = mchdr['CDELT1'] mcwav = mcw0 + np.arange(mcsz[0]) * mcdw # get master std image number msimgno = mchdr['FRAMENO'] # get input object image dimensions sz = self.action.args.ccddata.data.shape # get object waves w0 = self.action.args.ccddata.header['CRVAL3'] dw = self.action.args.ccddata.header['CD3_3'] wav = w0 + np.arange(sz[0]) * dw # get exposure time expt = self.action.args.ccddata.header['XPOSURE'] if expt <= 0: self.logger.warning("XPOSURE at 0.0, trying TTIME") expt = self.action.args.ccddata.header['TTIME'] if expt <= 0: self.logger.warning("No valid exposure time found, " "using 1s") expt = 1.0 # resample onto object waves, if needed if w0 != mcw0 or dw != mcdw or wav[-1] != mcwav[-1] or \ sz[0] != mcsz[0]: self.logger.warning("wavelength scales not identical, " "resampling standard") print(w0, mcw0, dw, mcdw, wav[-1], mcwav[-1], sz[0], mcsz[0]) mcint = interp1d(mcwav, mcal, kind='cubic', fill_value='extrapolate') mscal = mcint(wav) * 1.e16 / expt else: mscal = mcal * 1.e16 / expt # extinction correct calibration kcwi_correct_extin(mscal, self.action.args.ccddata.header, logger=self.logger) # do calibration for isl in range(sz[2]): for ix in range(sz[1]): self.action.args.ccddata.data[:, ix, isl] *= mscal if self.action.args.ccddata.noskysub is not None: self.action.args.ccddata.noskysub[:, ix, isl] *= mscal self.action.args.ccddata.uncertainty.array[:, ix, isl] *= \ mscal # check for obj, sky cubes if self.action.args.nasmask and self.action.args.numopen > 1: ofn = self.action.args.name # obj cube objfn = strip_fname(ofn) + '_ocubed.fits' full_path = os.path.join( self.config.instrument.cwd, self.config.instrument.output_directory, objfn) if os.path.exists(full_path): obj = kcwi_fits_reader(full_path)[0] # do calibration for isl in range(sz[2]): for ix in range(sz[1]): obj.data[:, ix, isl] *= mscal # sky cube skyfn = strip_fname(ofn) + '_scubed.fits' full_path = os.path.join( self.config.instrument.cwd, self.config.instrument.output_directory, skyfn) if os.path.exists(full_path): sky = kcwi_fits_reader(full_path)[0] # do calibration for isl in range(sz[2]): for ix in range(sz[1]): sky.data[:, ix, isl] *= mscal # units flam16_u = 1.e16 * u.erg / (u.angstrom * u.cm ** 2 * u.s) self.action.args.ccddata.unit = flam16_u self.action.args.ccddata.uncertainty.unit = flam16_u if obj is not None: obj.unit = flam16_u if sky is not None: sky.unit = flam16_u # update header keywords self.action.args.ccddata.header[key] = (True, keycom) self.action.args.ccddata.header['MSFILE'] = (invsname, "Master std filename") self.action.args.ccddata.header['MSIMNO'] = ( msimgno, 'master std image number') else: self.action.args.ccddata.header[key] = (False, keycom) log_string = FluxCalibrate.__module__ self.action.args.ccddata.header['HISTORY'] = log_string # write out icubes image kcwi_fits_writer(self.action.args.ccddata, table=self.action.args.table, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="icubes") self.context.proctab.update_proctab(frame=self.action.args.ccddata, suffix="icubes", filename=self.action.args.name) self.context.proctab.write_proctab(tfil=self.config.instrument.procfile) # check for sky, obj cube if obj is not None: out_obj = CCDData(obj, meta=self.action.args.ccddata.header, unit=self.action.args.ccddata.unit) kcwi_fits_writer( out_obj, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="ocubes") if sky is not None: out_sky = CCDData(sky, meta=self.action.args.ccddata.header, unit=self.action.args.ccddata.unit) kcwi_fits_writer( out_sky, output_file=self.action.args.name, output_dir=self.config.instrument.output_directory, suffix="scubes") self.logger.info(log_string) return self.action.args
# END: class FluxCalibrate()