from keckdrpframework.models.arguments import Arguments
from astropy.io import fits
from astropy.nddata import CCDData
from astropy.table import Table
# from astropy import units as u
import numpy as np
from keckdrpframework.primitives.base_primitive import BasePrimitive
import os
import logging
import pkg_resources
import subprocess
from pathlib import Path
logger = logging.getLogger('KCWI')
def parse_imsec(section=None):
xfor = True
yfor = True
p1 = int(section[1:-1].split(',')[0].split(':')[0])
p2 = int(section[1:-1].split(',')[0].split(':')[1])
p3 = int(section[1:-1].split(',')[1].split(':')[0])
p4 = int(section[1:-1].split(',')[1].split(':')[1])
# tests for individual axes
if p1 > p2:
x0 = p2 - 1
x1 = p1 - 1
xfor = False
else:
x0 = p1 - 1
x1 = p2 - 1
if p3 > p4:
y0 = p4 - 1
y1 = p3 - 1
yfor = False
else:
y0 = p3 - 1
y1 = p4 - 1
# package output
sec = (y0, y1, x0, x1)
rfor = (yfor, xfor)
# use python axis ordering
return sec, rfor
[docs]class ingest_file(BasePrimitive):
def __init__(self, action, context):
"""
Constructor
"""
BasePrimitive.__init__(self, action, context)
self.logger = context.pipeline_logger
def get_keyword(self, keyword):
try:
keyval = self.ccddata.header[keyword]
except KeyError:
keyval = None
# return self.context.data_set.get_info_column(self.name, keyword)
return keyval
def camera(self):
camera = self.get_keyword('CAMERA')
if 'BLUE' in camera:
return 0
elif 'RED' in camera:
return 1
else:
return -1
def camang(self):
if self.camera() == 0: # Blue
key = 'BARTANG'
elif self.camera() == 1: # Red
key = 'RARTANG'
else:
raise ValueError("unable to determine camera angle: "
"CAMERA undefined")
return self.get_keyword(key)
def filter(self):
if self.camera() == 0: # Blue
filt = self.get_keyword('BFILTNAM')
elif self.camera() == 1: # Red
filt = 'None'
else:
raise ValueError("unable to determine filter: "
"CAMERA undefined")
return filt
def grangle(self):
if self.camera() == 0: # Blue
key = 'BGRANGLE'
elif self.camera() == 1: # Red
key = 'RGRANGLE'
else:
raise ValueError("unable to determine grating angle: "
"CAMERA undefined")
return self.get_keyword(key)
def grating(self):
if self.camera() == 0: # Blue
key = 'BGRATNAM'
elif self.camera() == 1: # Red
key = 'RGRATNAM'
else:
raise ValueError("unable to determine grating: CAMERA undefined")
return self.get_keyword(key)
def adjang(self):
if 'BH' in self.grating() or 'RH' in self.grating():
return 180.0
if 'BM' in self.grating() or 'RM' in self.grating():
return 0.0
if 'BL' in self.grating() or 'RL' in self.grating():
return 0.0
def atsig(self):
if 'H' in self.grating():
if self.ifunum() > 2:
return 0.75
else:
return 2.5
elif 'M' in self.grating():
if self.ifunum() >= 2:
return 2.0
else:
return 4.0
elif 'L' in self.grating():
if self.ifunum() == 2:
return 10.0
elif self.ifunum() == 3:
return 5.0
else:
return 14.0
def rho(self):
if 'BH1' in self.grating():
return 3.751
elif 'BH2' in self.grating():
return 3.255
elif 'BH3' in self.grating():
return 2.800
elif 'RH1' in self.grating():
return 2.420
elif 'RH2' in self.grating():
return 2.068 # kcwi_sim_kcrm value is 2.030
elif 'RH3' in self.grating():
return 1.705
elif 'RH4' in self.grating():
return 1.435
elif 'BM' in self.grating():
return 1.900
elif 'RM1' in self.grating():
return 1.220
elif 'RM2' in self.grating():
return 0.921
elif 'BL' in self.grating():
return 0.870
elif 'RL' in self.grating():
return 0.514
else:
return None
def cwave(self):
if self.camera() == 0: # Blue
key = 'BCWAVE'
elif self.camera() == 1: # Red
key = 'RCWAVE'
else:
raise ValueError("unable to determine central wavelength: "
"CAMERA undefined")
return self.get_keyword(key)
def dich(self):
if self.camera() == 0: # Blue
if self.get_keyword('RCWAVE'):
return True
else:
return False
elif self.camera() == 1: # Red
return True
[docs] def resolution(self):
"""Return FWHM resolution in Angstroms for the given grating"""
# get reference wavelength
refwave = self.cwave()
if refwave:
rw = refwave
else:
if 'B' in self.grating():
rw = 4500.
else:
rw = 7500.
# Calc rez from grating resolution (d-lambda = lambda/R)
# First, assume large slicer IFU
if 'BH' in self.grating():
rez = rw / 5000.
elif 'RH' in self.grating():
rez = rw / 4600.
elif 'BM' in self.grating():
rez = rw / 2500.
elif 'RM' in self.grating():
rez = rw / 1900.
elif 'BL' in self.grating():
rez = rw / 1250.
elif 'RL' in self.grating():
rez = rw / 800.
else:
raise ValueError("unable to compute atlas resolution: "
"grating undefined")
# Adjust for slicer
if self.ifunum() == 2: # Medium slicer
rez /= 2.
elif self.ifunum() == 3: # Small slicer
rez /= 4.
return rez
[docs] def delta_wave_out(self):
"""Return output delta lambda in Angstroms for the given grating"""
# Calc delta wave out from grating
if 'BH' in self.grating():
dw = 0.125 * float(self.ybinsize())
elif 'RH' in self.grating():
dw = 0.125 * float(self.ybinsize())
elif 'BM' in self.grating():
dw = 0.25 * float(self.ybinsize())
elif 'RM' in self.grating():
dw = 0.25 * float(self.ybinsize())
elif 'BL' in self.grating():
dw = 0.5 * float(self.ybinsize())
elif 'RL' in self.grating():
dw = 0.5 * float(self.ybinsize())
else:
raise ValueError("unable to compute output delta lambda: "
"grating undefined")
return dw
def namps(self):
return self.get_keyword('NVIDINP')
def nasmask(self):
if self.camera() == 0: # Blue
if 'Mask' in self.get_keyword('BNASNAM'):
return True
else:
return False
elif self.camera() == 1: # Red
if 'Mask' in self.get_keyword('RNASNAM'):
return True
else:
return False
else:
raise ValueError("unable to determine mask: CAMERA undefined")
def numopen(self):
return self.get_keyword('NUMOPEN')
def shufrows(self):
return self.get_keyword('SHUFROWS')
def ampmode(self):
return self.get_keyword('AMPMODE')
def xbinsize(self):
return int(self.get_keyword('BINNING').split(',')[0])
def ybinsize(self):
return int(self.get_keyword('BINNING').split(',')[-1])
def plotlabel(self):
lab = "[ Img # %d " % self.get_keyword('FRAMENO')
lab += "(%s) " % self.illum()
lab += "Slicer: %s, " % self.ifuname()
lab += "Grating: %s, " % self.grating()
lab += "CWave: %d, " % int(self.cwave())
lab += "Filter: %s " % self.filter()
lab += "] "
return lab
def ifuname(self):
return self.get_keyword('IFUNAM')
def ifunum(self):
return self.get_keyword('IFUNUM')
def imtype(self):
return self.get_keyword('IMTYPE')
def illum(self):
# set ILLUM keyword
# ARCS
if self.get_keyword('IMTYPE') == 'ARCLAMP':
if self.get_keyword('LMP0STAT') == 1 and \
self.get_keyword('LMP0SHST') == 1:
illum = self.get_keyword('LMP0NAM')
elif self.get_keyword('LMP1STAT') == 1 and \
self.get_keyword('LMP1SHST') == 1:
illum = self.get_keyword('LMP1NAM')
else:
illum = 'Test'
# Internal FLATS
elif self.get_keyword('IMTYPE') == 'FLATLAMP':
if self.get_keyword('LMP3STAT') == 1:
illum = 'Contin'
else:
illum = 'Test'
# DOMES
elif self.get_keyword('IMTYPE') == 'DOMEFLAT':
if self.get_keyword('FLIMAGIN') == 'on' or \
self.get_keyword('FLSPECTR') == 'on':
illum = 'Dome'
else:
illum = 'Test'
# Twilight FLATS
elif self.get_keyword('IMTYPE') == 'TWIFLAT':
illum = 'Twilit'
# BARS
elif self.get_keyword('IMTYPE') == 'CONTBARS':
if self.get_keyword('LMP3STAT') == 1:
illum = 'Contin'
else:
illum = 'Test'
# OBJECT
elif self.get_keyword('IMTYPE') == 'OBJECT':
obnam = self.get_keyword('OBJECT')
if obnam is None:
obnam = self.get_keyword('TARGNAME')
if obnam is None:
obnam = 'Object'
# clean up the string
illum = obnam.replace("/", "_").replace(" ", "").replace(".", "_")
else:
illum = 'Test'
return illum
def calibration_lamp(self):
if self.get_keyword('IMTYPE') != 'ARCLAMP':
return None
else:
lamps_dictionary = {
0: "FeAr",
1: "ThAr",
2: "Aux",
3: "Continuum A"
}
for key in lamps_dictionary.keys():
status = self.get_keyword('LMP%dSTAT' % key)
shutter = self.get_keyword('LMP%dSHST' % key)
if status == 1 and shutter == 1:
return lamps_dictionary[key]
[docs] def map_ccd(self):
"""Return CCD section variables useful for processing
Uses FITS keyword NVIDINP to determine how many amplifiers were used
to read out the CCD. Then reads the corresponding BSECn, and
DSECn keywords, where n is the amplifier number. The indices are
converted to Python (0-biased, y axis first) indices and an array
is constructed for each of the two useful sections of the CCD as
follows:
Bsec[0][0] - First amp, y lower limit
Bsec[0][1] - First amp, y upper limit
Bsec[0][2] - First amp, x lower limit
Bsec[0][3] - First amp, x upper limit
Bsec[1][0] - Second amp, y lower limit
etc.
Bsec is the full overscan region for the given amplifier and is used
to calculate and perform the overscan subtraction.
Dsec is the full CCD region for the given amplifier and is used to
trim the image after overscan subtraction has been performed.
Tsec accounts for trimming the image according to Dsec.
Amps are assumed to be organized as follows:
(0,ny) --------- (nx,ny)
| 3 | 4 |
---------
| 1 | 2 |
(0,0) --------- (nx, 0)
Args:
-----
self: instance of CcdPrimitive class (automatic)
Returns:
--------
list: (int) y0, y1, x0, x1 for bias section
list: (int) y0, y1, x0, x1 for data section
list: (int) y0, y1, x0, x1 for trimmed section
list: (bool) y-direction, x-direction, True if forward, else False
"""
namps = int(self.get_keyword('NVIDINP'))
# TODO: check namps
# section lists
bsec = []
dsec = []
tsec = []
direc = []
# loop over amps
for i in range(namps):
section = self.get_keyword('BSEC%d' % (i + 1))
sec, rfor = parse_imsec(section)
bsec.append(sec)
section = self.get_keyword('DSEC%d' % (i + 1))
sec, rfor = parse_imsec(section)
dsec.append(sec)
direc.append(rfor)
if i == 0:
y0 = 0
y1 = sec[1] - sec[0]
x0 = 0
x1 = sec[3] - sec[2]
elif i == 1:
y0 = 0
y1 = sec[1] - sec[0]
x0 = tsec[0][3] + 1
x1 = x0 + sec[3] - sec[2]
elif i == 2:
y0 = tsec[0][1] + 1
y1 = y0 + sec[1] - sec[0]
x0 = 0
x1 = sec[3] - sec[2]
elif i == 3:
y0 = tsec[0][1] + 1
y1 = y0 + sec[1] - sec[0]
x0 = tsec[0][3] + 1
x1 = x0 + sec[3] - sec[2]
else:
# should not get here
y0 = -1
y1 = -1
x0 = -1
x1 = -1
# self.log.info("ERROR - bad amp number: %d" % i)
tsec.append((y0, y1, x0, x1))
return bsec, dsec, tsec, direc
def _perform(self):
# if self.context.data_set is None:
# self.context.data_set = DataSet(None, self.logger, self.config,
# self.context.event_queue)
# self.context.data_set.append_item(self.action.args.name)
self.logger.info(
"------------------- Ingesting file %s -------------------" %
self.action.args.name)
self.action.event._recurrent = True
self.name = self.action.args.name
out_args = Arguments()
ccddata, table = kcwi_fits_reader(self.name)
# save the ccd data into an object
# that can be shared across the functions
self.ccddata = ccddata
out_args.ccddata = ccddata
out_args.table = table
imtype = self.get_keyword("IMTYPE")
groupid = self.get_keyword("GROUPID")
if imtype is None:
fname = os.path.basename(self.action.args.name)
self.logger.warn(f"Unknown IMTYPE {fname}")
self.action.event._reccurent = False
if self.check_if_file_can_be_processed(imtype) is False:
# self.logger.warn("Object frame cannot be reduced. Rescheduling")
self.action.new_event = None
return None
else:
self.action.event._recurrent = False
out_args.name = self.action.args.name
out_args.imtype = imtype
out_args.groupid = groupid
# CAMERA
out_args.camera = self.camera()
# DICH
out_args.dich = self.dich()
# CAMANGLE
out_args.camangle = self.camang()
# FILTER
out_args.filter = self.filter()
# GRANGLE
out_args.grangle = self.grangle()
# GRATING
out_args.grating = self.grating()
# ADJANGLE
out_args.adjang = self.adjang()
# RHO
out_args.rho = self.rho()
# CWAVE
out_args.cwave = self.cwave()
# RESOLUTION
out_args.resolution = self.resolution()
# ATSIG
out_args.atsig = self.atsig()
# DELTA WAVE OUT
out_args.dwout = self.delta_wave_out()
# NAMPS
out_args.namps = int(self.get_keyword('NVIDINP'))
# NASMASK
out_args.nasmask = self.nasmask()
# SHUFROWS
out_args.shufrows = self.shufrows()
# NUMOPEN
out_args.numopen = self.numopen()
# AMPMODE
out_args.ampmode = self.ampmode()
# BINNING
out_args.xbinsize, out_args.ybinsize = \
map(int, self.get_keyword('BINNING').split(','))
# IFUNUM
out_args.ifunum = int(self.get_keyword('IFUNUM'))
# IFUNAM
out_args.ifuname = self.get_keyword('IFUNAM')
# PLOTLABEL
out_args.plotlabel = self.plotlabel()
# ILUM
out_args.illum = self.illum()
# MAPCCD
out_args.map_ccd = self.map_ccd()
# CALIBRATION LAMP
out_args.calibration_lamp = self.calibration_lamp()
# TTIME
out_args.ttime = self.get_keyword('TTIME')
# Are we already in proctab?
out_args.in_proctab = self.context.proctab.in_proctab(
frame=out_args.ccddata)
return out_args
def apply(self):
if self._pre_condition():
try:
output = self._perform()
except ValueError as e:
self.logger.warn("UNABLE TO INGEST THE FILE")
self.logger.warn("Reason: %s" % e)
return None
if self._post_condition():
self.output = output
return self.output
def check_if_file_can_be_processed(self, imtype):
# bias frames
bias_frames = self.context.proctab.search_proctab(frame=self.ccddata, target_type='MBIAS', nearest=True)
# continuum bars
contbars_frames = self.context.proctab.search_proctab(frame=self.ccddata, target_type='CONTBARS', nearest=True)
# master flats
masterflat_frames = self.context.proctab.search_proctab(frame=self.ccddata, target_type='MFLAT', nearest=True)
# inverse sensitivity
inverse_sensitivity_frames = self.context.proctab.search_proctab(frame=self.ccddata, target_type='INVSENS', nearest=True)
# arclamp
arclamp_frames = self.context.proctab.search_proctab(frame=self.ccddata, target_type='ARCLAMP', nearest=True)
if imtype == 'OBJECT':
if len(bias_frames) > 0 \
and len(masterflat_frames) > 0 \
and len(arclamp_frames) > 0:
return True
else:
self.logger.warn("Cannot reduce OBJECT frame. Rescheduling for later. Found:")
self.logger.warn(f"\tMBIAS: {len(bias_frames)}")
self.logger.warn(f"\tMFLAT: {len(masterflat_frames)}")
self.logger.warn(f"\tARCLAMP: {len(arclamp_frames)}")
return False
if imtype == 'ARCLAMP':
if (len(contbars_frames)) > 0:
return True
else:
self.logger.warn("Cannot reduce ARCLAMP frame. Missing continuum bars. Rescheduling for later.")
return False
if imtype in ['FLATLAMP', 'TWIFLAT', 'DOMEFLAT']:
if len(bias_frames) > 0:
return True
else:
self.logger.warn(f"Cannot reduce {imtype} frame. Missing master bias. Rescheduling for later.")
return False
return True
[docs]def kcwi_fits_reader(file):
"""A reader for KeckData objects.
Currently this is a separate function, but should probably be
registered as a reader similar to fits_ccddata_reader.
Arguments:
file -- The filename (or pathlib.Path) of the FITS file to open.
"""
try:
hdul = fits.open(file)
except (FileNotFoundError, OSError) as e:
print(e)
raise e
read_imgs = 0
read_tabs = 0
# primary image
ccddata = CCDData(hdul['PRIMARY'].data, meta=hdul['PRIMARY'].header,
unit='adu')
read_imgs += 1
# check for other legal components
if 'UNCERT' in hdul:
ccddata.uncertainty = hdul['UNCERT'].data
read_imgs += 1
if 'FLAGS' in hdul:
ccddata.flags = hdul['FLAGS'].data
read_imgs += 1
if 'MASK' in hdul:
ccddata.mask = hdul['MASK'].data
read_imgs += 1
if 'Exposure Events' in hdul:
table = hdul['Exposure Events']
read_tabs += 1
else:
table = None
# prepare for floating point
ccddata.data = ccddata.data.astype(np.float64)
# Check for CCDCFG keyword
if 'CCDCFG' not in ccddata.header:
ccdcfg = ccddata.header['CCDSUM'].replace(" ", "")
ccdcfg += "%1d" % ccddata.header['CCDMODE']
ccdcfg += "%02d" % ccddata.header['GAINMUL']
ccdcfg += "%02d" % ccddata.header['AMPMNUM']
ccddata.header['CCDCFG'] = ccdcfg
if ccddata:
if 'BUNIT' in ccddata.header:
ccddata.unit = ccddata.header['BUNIT']
if ccddata.uncertainty:
ccddata.uncertainty.unit = ccddata.header['BUNIT']
# print("setting image units to " + ccddata.header['BUNIT'])
logger.info("<<< read %d imgs and %d tables out of %d hdus in %s" %
(read_imgs, read_tabs, len(hdul), file))
return ccddata, table
def write_table(output_dir=None, table=None, names=None, comment=None,
keywords=None, output_name=None, clobber=False):
output_file = os.path.join(output_dir, output_name)
# check if file already exists
if os.path.exists(output_file) and clobber is False:
logger.warning("Table %s already exists and overwrite is set to False"
% output_file)
return
if os.path.exists(output_file) and clobber is True:
logger.warning("Table %s already exists and will be overwritten"
% output_file)
logger.info("Removing file: %s" % output_file)
os.remove(output_file)
t = Table(table, names=names)
if comment:
t.meta['COMMENT'] = comment
if keywords:
for k, v in keywords.items():
t.meta[k] = v
try:
t.write(output_file, format='fits')
logger.info("Output table: %s" % output_file)
except (FileExistsError, OSError):
logger.error("Something went wrong creating the table: "
"table already exists")
def read_table(input_dir=None, file_name=None):
# Set up return table
input_file = os.path.join(input_dir, file_name)
logger.info("Trying to read table: %s" % input_file)
try:
retab = Table.read(input_file, format='fits')
except FileNotFoundError:
logger.warning("No table to read")
retab = None
return retab
def kcwi_fits_writer(ccddata, table=None, output_file=None, output_dir=None,
suffix=None):
# Determine if the version info is already in the header
contains_version = False
for h in ccddata.header["HISTORY"]:
if "kcwidrp version" in h:
contains_version = True
if not contains_version:
# Add setup.py version number to header
version = pkg_resources.get_distribution('kcwidrp').version
ccddata.header.add_history(f"kcwidrp version={version}")
# Get string filepath to .git dir, relative to this primitive
primitive_loc = os.path.dirname(os.path.abspath(__file__))
git_loc = primitive_loc[:-18] + ".git"
# Attempt to gather git version information
git1 = subprocess.run(["git", "--git-dir", git_loc, "describe",
"--tags", "--long"], capture_output=True)
git2 = subprocess.run(["git", "--git-dir", git_loc, "log", "-1",
"--format=%cd"], capture_output=True)
# If all went well, save to the header
if not bool(git1.stderr) and not bool(git2.stderr):
git_v = git1.stdout.decode('utf-8')[:-1]
git_d = git2.stdout.decode('utf-8')[:-1]
ccddata.header.add_history(f"git version={git_v}")
ccddata.header.add_history(f"git date={git_d}")
else:
logger.debug("Package not installed from a git repo, skipping")
out_file = os.path.join(output_dir, os.path.basename(output_file))
if suffix is not None:
(main_name, extension) = os.path.splitext(out_file)
out_file = main_name + "_" + suffix + extension
hdus_to_save = ccddata.to_hdu()
# check for flags
flags = getattr(ccddata, "flags", None)
if flags is not None:
hdus_to_save.append(fits.ImageHDU(flags, name='FLAGS',
do_not_scale_image_data=True))
# something about the way the original table is written out is wrong
# and causes problems. Leaving it off for now.
# if table is not None:
# hdus_to_save.append(table)
# log
logger.info(">>> Saving %d hdus to %s" % (len(hdus_to_save), out_file))
hdus_to_save.writeto(out_file, overwrite=True)
def strip_fname(filename):
if not filename:
logger.error(f"Failed to strip file {filename}")
return
strip = Path(filename).stem
return strip
def get_master_name(tab, target_type, loc=0):
res = Path(strip_fname(tab['filename'][loc]) + '_' + \
target_type.lower() + ".fits").name
return res
def master_bias_name(ccddata, target_type='MBIAS'):
# Delivers a mbias filename that is unique for each CCD configuration
# Any KCWI frame with a shared CCD configuration can use the same bias
name = target_type.lower() + '_' + ccddata.header['CCDCFG'] + '.fits'
return name
def master_flat_name(ccddata, target_type):
# Delivers a name that is unqie across an observing block
name = target_type.lower() + '_' + ccddata.header['STATEID'] + '.fits'
return name