import os
os.environ["GOTO_NUM_THREADS"] = "5"
os.environ["OMP_NUM_THREADS"] = "5"
os.environ["OPENBLAS_NUM_THREADS"] = "5"
os.environ["CRDS_SERVER_URL"] = "https://hst-crds.stsci.edu"
import crds
import datetime
import argparse
import glob
import copy
import asdf
from astropy.io import fits
from astropy.table import Table
from calcos import ccos
import numpy.ma as ma
import numpy as np
import pandas as pd
import dask
import matplotlib.pyplot as plt
dirname = os.path.dirname(__file__)
stylesheet = os.path.join(dirname, "../analysis", "niceplot.mplstyle")
plt.style.use(stylesheet)
from matplotlib import ticker
from matplotlib.backends.backend_pdf import PdfPages
from astropy.convolution import Gaussian2DKernel, interpolate_replace_nans
from acdc.database.query_cos_dark import files_by_mjd
from acdc.utils.utils import get_psa_wca, unbin_coords, bin_coords
[docs]
class Superdark():
def __init__(self, hv, segment, mjdstarts, mjdends, dayint=100, bin_x=1,
bin_y=1, bin_pha=1, phastart=1, phaend=31, pha_bins=None,
gsagtab=None, bpixtab=None, region="inner", outfile=None,
outdir=".", xylimits=None, overwrite=False, sdqflags=8346):
#gsagtab="/grp/hst/cdbs/lref/71j1935gl_gsag.fits",
#bpixtab="/grp/hst/cdbs/lref/69m20528l_bpix.fits",
"""
To keep things consistent, start and stop range are defined the same
as within python- start is inclusive but stop is EXCLUSIVE. E.g.
using a PHA range of 1 to 31 means you will get PHA=1 up to PHA=30, but
not PHA=31.
"""
self.segment = segment
self.hv = hv
self.region = region
self.mjdstarts = mjdstarts
self.mjdends = mjdends
self.ndays = [mjdends[i] - mjdstarts[i] for i in range(len(mjdstarts))]
self.dayint = dayint
self.bin_x = bin_x
self.bin_y = bin_y
self.overwrite = overwrite
self.sdqflags = sdqflags
self.phastart = phastart
self.phaend = phaend
if bin_pha == "all":
bin_pha = phaend - phastart
self.bin_pha = bin_pha
self.pha_bins = pha_bins
self.get_pha_bins()
self.superdarks = []
if gsagtab is None:
gsagtab = self.get_reffile("gsagtab")
self.gsagtab = gsagtab
if bpixtab is None:
bpixtab = self.get_reffile("bpixtab")
self.bpixtab = bpixtab
self.outfile = outfile
self.outdir = outdir
if xylimits == None:
self.get_xy_limits()
self.gsag_holes = get_gsag_holes(self.gsagtab, self.segment, self.hv)
self.fixed_gsag = False
self.is_binned = False
# self.get_bpix_regions()
@classmethod
def from_asdf(cls, superdark, overwrite=False):
af = asdf.open(superdark)
if "pha_bins" in af.keys():
pha_bins = copy.deepcopy(af["pha_bins"])
else:
pha_bins = None
self = cls(hv=af["hv"], segment=af["segment"], mjdstarts=af["mjdstarts"],
mjdends=af["mjdends"], bin_x=af["bin_x"], bin_y=af["bin_y"],
bin_pha=af["bin_pha"], phastart=af["phastart"],
phaend=af["phaend"], pha_bins=pha_bins, xylimits="predetermined",
outfile=superdark)
for k in ["bin_pha", "bin_x", "bin_y", "xstart", "xend", "ystart",
"yend", "phastart", "phaend"]:
setattr(self, k, af[k])
self.outfile = superdark
self.outfile_basename = os.path.basename(superdark)
self.total_exptime = af["total_exptime"]
self.total_files = af["total_files"]
try:
self.fixed_gsag = af["fixed_gsag"]
except KeyError:
self.fixed_gsag = False
try:
self.is_binned = af["is_binned"]
except KeyError:
if "binned" in os.path.basename(superdark):
self.is_binned = True
else:
self.is_binned = False
self.overwrite = overwrite
self.pha_images = {}
try:
self.dq_image = copy.deepcopy(af["dq_image"])
except KeyError:
sh = af[f"pha{pha_bins[0]}-{pha_bins[1]}"].shape
self.dq_image = np.zeros(sh[0]*sh[1]).reshape(sh).astype(int)
for i in range(len(self.pha_bins)-1):
key = f"pha{self.pha_bins[i]}-{self.pha_bins[i+1]}"
self.pha_images[key] = copy.deepcopy(af[key])
self.superdarks.append(copy.deepcopy(af[key]))
try:
self.creation_date = af["creation_date"]
except KeyError:
self.creation_date = "unknown"
af.close()
return self
def get_reffile(self, filetype):
now = datetime.datetime.now()
current_pmap = crds.get_default_context()
config = {"INSTRUME": "COS",
"CENWAVE": 1291,
"DETECTOR": "FUV",
"DATE-OBS": now.strftime("%Y-%m-%d"),
"TIME-OBS": "00:00:00"}
crds_results = crds.getrecommendations(parameters=config, reftypes=[filetype],
context=current_pmap, observatory="hst")
reffile0 = crds_results[filetype]
try:
lref = os.environ["lref"]
except KeyError as e:
print(e.message)
print("You must define the $lref environment variable- this is where all COS reference files are located")
reffile = os.path.join(lref, reffile0)
return reffile
def get_pha_bins(self):
if self.pha_bins is None:
self.pha_bins = np.arange(self.phastart, self.phaend+1, self.bin_pha)
else:
self.phastart = self.pha_bins[0]
self.phaend = self.pha_bins[-1]
grid = list(set(self.pha_bins[1:] - self.pha_bins[:-1]))
if len(grid) == 1:
self.bin_pha = grid[0]
else:
self.bin_pha = None
def get_xy_limits(self):
if self.segment == "FUVA":
if self.region == "inner":
xstart, xend, ystart, yend = 1260, 15119, 375, 660
elif self.segment == "FUVB":
if self.region == "inner":
xstart, xend, ystart, yend = 1000, 14990, 405, 740
else:
raise Exception(f"Invalid segment specified: {self.segment}")
while (xstart // self.bin_x) * self.bin_x < xstart:
xstart += 1
while (ystart // self.bin_y) * self.bin_y < ystart:
ystart += 1
self.xstart, self.ystart = xstart, ystart
self.xend = (xend // self.bin_x) * self.bin_x
self.yend = (yend // self.bin_y) * self.bin_y
self.bin_xstart = self.xstart // self.bin_x
self.bin_ystart = self.ystart // self.bin_y
self.bin_xend = self.xend // self.bin_x
self.bin_yend = self.yend // self.bin_y
def create_superdark(self, dark_df=None):
@dask.delayed
def _bin_dark(self, dark_df, phastart, phaend):
pha_images = {}
darks = dark_df["fileloc"].values
assert len(darks) != 0, "ERROR: no darks found at all!!"
total_exptime = sum([fits.getval(x, "exptime", 1) for x in darks])
# Handle gainsag holes
gsag_df = self.gsag_holes.loc[(self.gsag_holes["DATE"] > self.mjdstarts[0]) &
(self.gsag_holes["DATE"] < self.mjdends[0])]
print(f" Binning corrtags, {phastart} <= PHA < {phaend}...")
sum_image = self.bin_corrtags(darks, phastart=phastart, phaend=phaend)
ylen,xlen = sum_image.shape
dq_image = np.zeros((ylen, xlen)).astype(int)
# Loop through each gainsag hole and mark the DQ image as DQ=1
for j in range(len(gsag_df)):
dq_image[gsag_df.iloc[j]["Y0"]:gsag_df.iloc[j]["Y1"],
gsag_df.iloc[j]["X0"]:gsag_df.iloc[j]["X1"]] = 1
## Handle bad pixel regions
#bpix_df = self.bpix_regions
#bpix_df = bpix_df.loc[(bpix_df.SDQ == True) & (bpix_df.SEGMENT == self.segment)]
## Loop through each bad pixel region and set affected pixels to a false value of 99999
#for j in range(len(bpix_df)):
# sum_image[bpix_df.iloc[j]["Y0"]:bpix_df.iloc[j]["Y1"],
# bpix_df.iloc[j]["X0"]:bpix_df.iloc[j]["X1"]] = 99999
## Handle hotspots
#spot_df = self.hotspots
#spot_df = spot_df.loc[(spot_df.SEGMENT == self.segment) & (spot_df.START > self.mjdstarts[0]) & (spot_df.STOP < self.mjdends[0])]
## Loop through each hotspot and set affected pixels to a false value of 99999
#for j in range(len(spot_df)):
# sum_image[spot_df.iloc[j]["Y0"]:spot_df.iloc[j]["Y1"], spot_df.iloc[j]["X0"]:spot_df.iloc[j]["X1"]] = 99999
tmp = sum_image.reshape(1024 // self.bin_y, self.bin_y,
16384 // self.bin_x, self.bin_x)
binned = tmp.sum(axis=3).sum(axis=1)
binned_inner = binned[self.bin_ystart:self.bin_yend, self.bin_xstart:self.bin_xend]
tmp_dq = dq_image.reshape(1024 // self.bin_y, self.bin_y,
16384 // self.bin_x, self.bin_x)
binned_dq = tmp_dq.sum(axis=3).sum(axis=1)
binned_inner_dq = binned_dq[self.bin_ystart:self.bin_yend, self.bin_xstart:self.bin_xend]
key = f"pha{phastart}-{phaend}"
pha_images[key] = binned_inner
return pha_images, total_exptime, binned_inner_dq
runstart = datetime.datetime.now()
print("\nStart time: {}".format(runstart))
total_exptime = 0
total_files = 0
self.pha_images = {}
if dark_df is None:
notfilled = True
dark_dfs = []
total_days = 0
for k in range(len(self.mjdstarts)):
start = self.mjdstarts[k]
done = False
while done is False:
total_days += self.dayint
if total_days > self.ndays[k]:
total_days = self.ndays[k]
done = True
end = start + self.dayint
if end > self.mjdends[k]:
end = self.mjdends[k]
print(f"Using darks from MJD {start:,}-{end:,}; {total_days}/{self.ndays[k]} days")
print(" Querying...")
dark_df = files_by_mjd(start, end, segment=self.segment, hv=self.hv)
dark_dfs.append(dark_df)
print(" Query done")
start = end
dark_df = pd.concat(dark_dfs)
delayed_bin = [_bin_dark(self, dark_df, self.pha_bins[i], self.pha_bins[i+1]) for i in range(len(self.pha_bins)-1)]
out = dask.compute(*delayed_bin, scheduler='multiprocessing', num_workers=15)
print(" Binning done")
for grp in out:
dct = grp[0]
dct_dq = grp[2]
key = list(dct.keys())[0]
self.pha_images[key] = dct[key]
self.dq_image = out[0][2]
total_exptime += out[0][1]
## Undo 99999 put in for gainsag holes
#for i in range(len(self.pha_bins)-1):
# key = f"pha{self.pha_bins[i]}-{self.pha_bins[i+1]}"
# inds = np.where(self.pha_images[key] >= 99999)
# print(f"Zeroing out {len(inds[0])} gain sagged pixels for {key}")
# self.pha_images[key][inds] = 0
# inds = np.where(self.pha_images[key] >= 99999)
# assert len(inds[0]) == 0, f"Not all gain sag holes were zeroed for {key}"
# self.superdarks.append(self.pha_images[key])
self.total_exptime = total_exptime
self.total_files = len(dark_df["fileloc"].values)
# Write out ASDF file
self.write_superdark()
runend = datetime.datetime.now()
print("End time: {}".format(runend))
print("Total time: {}".format(runend-runstart))
def write_superdark(self, user_outfile=None, overwrite=False):
data_dict = self.pha_images
data_dict["xstart"] = self.xstart
data_dict["ystart"] = self.ystart
data_dict["phastart"] = self.phastart
data_dict["xend"] = self.xend
data_dict["yend"] = self.yend
data_dict["phaend"] = self.phaend
data_dict["bin_x"] = self.bin_x
data_dict["bin_y"] = self.bin_y
data_dict["bin_pha"] = self.bin_pha
data_dict["pha_bins"] = self.pha_bins
data_dict["mjdstarts"] = self.mjdstarts
data_dict["mjdends"] = self.mjdends
data_dict["segment"] = self.segment
data_dict["hv"] = self.hv
data_dict["total_exptime"] = self.total_exptime
data_dict["total_files"] = self.total_files
data_dict["fixed_gsag"] = self.fixed_gsag
data_dict["is_binned"] = self.is_binned
data_dict["dq_image"] = self.dq_image
now = datetime.datetime.now()
self.creation_date = now.strftime("%Y-%m-%d %H:%M")
data_dict["creation_date"] = self.creation_date
af = asdf.AsdfFile(data_dict)
today = datetime.datetime.now().strftime("%d%b%y-%H:%M:%S")
if self.outfile is None and user_outfile is None:
self.outfile = f"superdark_{self.segment}_{self.hv}_{today}.asdf"
elif self.outfile is None and user_outfile is not None:
self.outfile = user_outfile
if os.path.exists(self.outfile):
assert sum([overwrite, self.overwrite]) > 0, f"Output superdark {self.outfile} already exists and overwrite is False"
af.write_to(self.outfile)
print(f"Wrote {self.outfile}")
def fix_gsag(self, method="interpolate", kernel=None, row_threshold=.3,
lp1_interpolate=None):
superdarks = []
if self.segment == "FUVA":
_, lp1_lims = bin_coords([0, 0], [459, 528], self.bin_x,
self.bin_y, self.xstart, self.ystart)
else:
_, lp1_lims = bin_coords([0, 0], [521, 587], self.bin_x,
self.bin_y, self.xstart, self.ystart)
print(f"Looking for rows where >= {int(row_threshold*100)}% of the row is gain-sagged...")
for i in range(len(self.pha_bins)-1):
key = f"pha{self.pha_bins[i]}-{self.pha_bins[i+1]}"
ylen,xlen = self.pha_images[key].shape
for j in range(ylen):
#if lp1_lims[0] <= j <= lp1_lims[1] and lp1_interpolate is True:
# print(f" lp1_interpolate is True, interpolating over row {j}")
# self.dq_image[j,:] = 1
# continue
if lp1_lims[0] <= j <= lp1_lims[1] and lp1_interpolate is False:
continue
sagged = np.where(self.dq_image[j,:] == 1)
frac_sagged = len(sagged[0])/xlen
if frac_sagged >= row_threshold:
print(f" {frac_sagged*100:.1f}% of row {j} is gain-sagged, entire row will be corrected")
self.dq_image[j,:] = 1
inds = np.where(self.dq_image == 1)
# Zero out gain sagged regions
if method == "zero":
print(f"Zeroing out gain sagged pixels...")
for i in range(len(self.pha_bins)-1):
key = f"pha{self.pha_bins[i]}-{self.pha_bins[i+1]}"
self.pha_images[key][inds] = 0
#inds = np.where(self.pha_images[key] >= 99999)
#assert len(inds[0]) == 0, f"Not all gain sag holes were zeroed for {key}"
superdarks.append(self.pha_images[key])
elif method == "boost":
print("Boosting gain sagged pixels...")
for i in range(len(self.pha_bins)-1):
key = f"pha{self.pha_bins[i]}-{self.pha_bins[i+1]}"
indsbelow = (inds[0]-1, inds[1])
indsabove = (inds[0]+1, inds[1])
avg_around = (self.pha_images[key][indsbelow]+self.pha_images[key][indsabove])/2
self.pha_images[key][inds] = avg_around
superdarks.append(self.pha_images[key])
elif method == "interpolate":
print("Interpolating over gain sagged pixels...")
if kernel is None:
kernel = Gaussian2DKernel(x_stddev=.3, y_stddev=.3, x_size=11, y_size=5)
for i in range(len(self.pha_bins)-1):
key = f"pha{self.pha_bins[i]}-{self.pha_bins[i+1]}"
self.pha_images[key][inds] = np.nan
interp_sd = interpolate_replace_nans(self.pha_images[key], kernel)
self.pha_images[key] = interp_sd
superdarks.append(self.pha_images[key])
self.superdarks = superdarks
self.fixed_gsag = True
def get_hotspots(self, extnum):
spot = Table.read(self.spottab, format="fits", hdu=extnum)
spot_df = spot.to_pandas()
spot_df = spot_df.rename(columns={"LX": "X0", "LY": "Y0"})
spot_df["X1"] = spot_df["X0"] + spot_df["DX"]
spot_df["Y1"] = spot_df["Y0"] + spot_df["DY"]
spot_df = spot_df.astype("int32")
str_df = spot_df.select_dtypes([np.object])
str_df = str_df.stack().str.decode('utf-8').unstack()
for col in str_df:
spot_df[col] = str_df[col]
self.hotspots = spot_df
def get_bpix_regions(self):
bpix = Table.read(self.bpixtab, format="fits", hdu=1)
bpix_df = bpix.to_pandas()
bpix_df = bpix_df.rename(columns={"LX": "X0", "LY": "Y0"})
bpix_df["X1"] = bpix_df["X0"] + bpix_df["DX"]
bpix_df["Y1"] = bpix_df["Y0"] + bpix_df["DY"]
# This is to fix the astropy bytes string columns
str_df = bpix_df.select_dtypes([np.object])
str_df = str_df.stack().str.decode('utf-8').unstack()
for col in str_df:
bpix_df[col] = str_df[col]
bpix_df["SDQ"] = np.where(bpix_df["DQ"]&self.sdqflags == bpix_df["DQ"], True, False)
self.bpix_regions = bpix_df
def bin_superdark(self, bin_x, bin_y, pha_bins=None, outfile=None, verbose=True, writefile=True, makeplots=True):
# Bin across PHA
self.pha_images = {}
if pha_bins is not None:
for b in pha_bins:
if b not in self.pha_bins:
raise IndexError(f"Previous PHA bins not compatible with new bins, {self.pha_bins} vs {pha_bins}")
superdarks = []
inds = np.nonzero(np.in1d(self.pha_bins, pha_bins))[0]
for i in range(len(inds)-1):
superdark = self.superdarks[i]
for j in range(i+1, inds[i+1]):
superdark += self.superdarks[j]
superdarks.append(superdark)
self.pha_bins = np.array(pha_bins)
self.get_pha_bins()
self.superdarks = superdarks
infile = self.outfile
if outfile is None and writefile is True:
nowdt = datetime.datetime.now()
now = nowdt.strftime("%d%b%Y")
outfile = self.outfile.replace(".asdf", f"_binned_{now}.asdf")
self.outfile = outfile
if writefile is True and self.overwrite is False and os.path.exists(outfile):
print(f"WARNING: Output superdark {outfile} already exists and overwrite is False, skipping...")
return
sh = np.shape(self.superdarks[0])
xdim = sh[1]
ydim = sh[0]
b_x0 = 0
b_y0 = 0
b_x1 = (xdim // bin_x) * bin_x
b_y1 = (ydim // bin_y) * bin_y
self.xend = b_x1 * self.bin_x + self.xstart
self.yend = b_y1 * self.bin_y + self.ystart
# TODO
# Should this really be *=, not just =?
self.bin_x *= bin_x
self.bin_y *= bin_y
# Bin DQ image
binned_dq = self.dq_image[b_y0:b_y1, b_x0:b_x1]
binned_sh_dq = np.shape(binned_dq)
binned_xdim_dq = binned_sh_dq[1]
binned_ydim_dq = binned_sh_dq[0]
tmp_dq = binned_dq.reshape(binned_ydim_dq // bin_y, bin_y, binned_xdim_dq // bin_x, bin_x)
tmp_dq2 = np.bitwise_or.reduce(tmp_dq, axis=3)
binned_dq = np.bitwise_or.reduce(tmp_dq2, axis=1)
self.dq_image = binned_dq
# Bin superdark in spatial directions
for i,sd in enumerate(self.superdarks):
phastart = self.pha_bins[i]
phaend = self.pha_bins[i+1]
binned = sd[b_y0:b_y1, b_x0:b_x1]
binned_sh = np.shape(binned)
binned_xdim = binned_sh[1]
binned_ydim = binned_sh[0]
tmp = binned.reshape(binned_ydim // bin_y, bin_y, binned_xdim // bin_x, bin_x)
binned = tmp.sum(axis=3).sum(axis=1)
self.superdarks[i] = binned
key = f"pha{phastart}-{phaend}"
self.pha_images[key] = binned
rate = binned/self.total_exptime
zeroinds = np.where(binned == 0)
nzero = len(zeroinds[0])
if verbose is True:
print("")
print(f"Binning superdark {infile}")
print(f"Binning PHAs {phastart} through {phaend}")
print(f"Binning by X={self.bin_x}, Y={self.bin_y}")
print(f"\tTotal number of events: {np.sum(binned):,}")
print(f"\tTotal exptime [s] of superdark: {self.total_exptime:,.1f}")
print(f"\tMinimum number of events in a binned pixel: {np.min(binned)}")
print(f"\tMaximum number of events in a binned pixel: {np.max(binned)}")
print(f"\tMean number of events per binned pixel: {np.mean(binned):.1f}")
print(f"\t Standard deviation: {np.std(binned):.1f}")
print(f"\tMedian number of events per binned pixel: {np.median(binned):.1f}")
print(f"\tMean countrate per binned pixel: {np.mean(rate):.2e}")
print(f"\t Standard deviation: {np.std(rate):.2e}")
print(f"\tMedian countrate per binned pixel: {np.median(rate):.2e}")
print(f"\tNumber of binned pixels with zero events: {nzero:,}")
self.is_binned = True
if writefile is True:
self.write_superdark()
if makeplots is True:
self.plot_superdarks()
def plot_superdarks(self, pdffile=None, vmin=None, vmax=None, savepng=False, pngroot=None):
if savepng is False:
if pdffile is None:
pdffile = os.path.join(self.outdir, self.outfile.replace("asdf", "pdf"))
pdf = PdfPages(pdffile)
if savepng is True and pngroot is None:
pngroot = os.path.join(self.outdir, self.outfile.replace(".asdf", ""))
for i,sd in enumerate(self.superdarks):
phastart = self.pha_bins[i]
phaend = self.pha_bins[i+1]
rate = sd/self.total_exptime
fig, ax = plt.subplots(figsize=(20,5))
#vmin = np.mean(rate) - 3*np.std(rate)
if vmin is None:
vmin = np.median(rate) - np.median(rate)*0.5
if vmin < 0:
vmin = 0
#vmax = np.mean(rate) + 3*np.std(rate)
if vmax is None:
vmax = np.median(rate) + np.median(rate)*1.
im = ax.imshow(rate, aspect="auto", interpolation="nearest",
origin="lower", cmap="inferno", vmin=vmin, vmax=vmax)
cbar = fig.colorbar(im, label="Counts/s", format="%.1e", pad=0.01)
cbarticks = cbar.get_ticks()
# If there are more than 5 tickmarks, reduce it.
if len(cbarticks) > 7: # 1st and last ticks are not shown
cbar.ax.locator_params(nbins=5)
ax.set_title(f"{self.segment}; HV={self.hv}; MJD {self.mjdstarts}-{self.mjdends}; PHA {phastart}-{phaend}; X bin={self.bin_x}, Y bin={self.bin_y}")
ax.set_xlabel("X (binned)")
ax.set_ylabel("Y (binned)")
xticks = ax.xaxis.get_major_ticks()
xticks[0].set_visible(False)
xticks[1].set_visible(False)
if savepng is False:
pdf.savefig(fig, bbox_inches="tight")
else:
pngfile = pngroot+f"_pha{phastart}-{phaend}.png"
fig.savefig(pngfile, bbox_inches="tight", dpi=200)
print(f"Wrote {pngfile}")
if savepng is False:
pdf.close()
print(f"Wrote {pdffile}")
plt.close('all')
plt.close(fig)
# Should investigate if this can be replaced with typical sigma clipping
def screen_counts(self, verbose=True, sigma=10, mask=False, interpolate=False, interp_kernel=None, method=np.median, exclude_zeros=True):
for i,sd in enumerate(self.superdarks):
phastart = self.pha_bins[i]
phaend = self.pha_bins[i+1]
key = f"pha{phastart}-{phaend}"
if exclude_zeros is True:
nonzeroinds = np.where(sd > 0)
sd_stats = sd[nonzeroinds]
else:
sd_stats = sd
mid = method(sd_stats)
std = np.std(sd_stats)
sigma_cutoff = (std*sigma)
bad = np.where(sd > (mid+sigma_cutoff))
if len(bad[0]) == 0:
continue
if verbose is True:
print(f"{len(bad[0])} pixels have counts above {sigma}sigma for {key}")
if interpolate is True:
if interp_kernel is None:
kernel = Gaussian2DKernel(x_stddev=.3, y_stddev=.3, x_size=11, y_size=5)
# From https://docs.astropy.org/en/stable/convolution/index.html
sd[bad] = np.nan
interp_sd = interpolate_replace_nans(sd, kernel)
self.superdarks[i] = interp_sd
self.pha_images[key] = interp_sd
elif mask is True:
sd_masked = ma.masked_greater_equal(sd, mid+sigma_cutoff)
self.superdarks[i] = sd_masked
self.pha_images[key] = sd_masked
else:
sd[bad] = 0.0
self.superdarks[i] = sd
self.pha_images[key] = sd
def write_fits(self, fitsfile=None):
if fitsfile is None:
nowdt = datetime.datetime.now()
now = nowdt.strftime("%d%b%Y")
fitsfile = self.outfile.replace(".asdf", ".fits")
self.fitsfile = fitsfile
if self.overwrite is False and os.path.exists(fitsfile):
print(f"{fitsfile} already exists and overwrite is False")
return
hdr0 = fits.Header()
hdr0["suprdark"] = os.path.basename(self.outfile)
hdr0["hv"] = self.hv
hdr0["segment"] = self.segment
primary = fits.PrimaryHDU(header=hdr0)
hduims = []
for i,sd in enumerate(self.superdarks):
hdr = fits.Header()
hdr["EXTNAME"] = "SCI"
hdr["phastart"] = self.pha_bins[i]
hdr["phaend"] = self.pha_bins[i+1]
hdr["xstart"] = self.xstart
hdr["ystart"] = self.ystart
hdr["xend"] = self.xend
hdr["yend"] = self.yend
hdr["xbinsize"] = self.bin_x
hdr["ybinsize"] = self.bin_y
hdu = fits.ImageHDU(sd, header=hdr)
hduims.append(hdu)
hdulist = fits.HDUList([primary] + hduims)
hdulist.writeto(fitsfile, overwrite=self.overwrite)
print(f"Wrote {fitsfile}")
def write_cube_fits(self, fitsfile=None):
if fitsfile is None:
nowdt = datetime.datetime.now()
now = nowdt.strftime("%d%b%Y")
fitsfile = self.outfile.replace(".asdf", "_cube.fits")
self.fitsfile = fitsfile
if self.overwrite is False and os.path.exists(fitsfile):
print(f"{fitsfile} already exists and overwrite is False")
return
hdr0 = fits.Header()
hdr0["suprdark"] = os.path.basename(self.outfile)
hdr0["hv"] = self.hv
hdr0["segment"] = self.segment
primary = fits.PrimaryHDU(header=hdr0)
cube = np.stack(self.superdarks, axis=0)
hdr1 = fits.Header()
hdr1["EXTNAME"] = "SCI"
hdr1["phastart"] = self.pha_bins[0]
hdr1["phaend"] = self.pha_bins[-1]
hdr1["xstart"] = self.xstart
hdr1["ystart"] = self.ystart
hdr1["xend"] = self.xend
hdr1["yend"] = self.yend
hdr1["xbinsize"] = self.bin_x
hdr1["ybinsize"] = self.bin_y
hdu1 = fits.ImageHDU(cube, header=hdr1)
hdulist = fits.HDUList([primary, hdu1])
hdulist.writeto(fitsfile, overwrite=self.overwrite)
print(f"Wrote {fitsfile}")
def get_binning_pars(self):
keys = ["bin_pha", "bin_x", "bin_y", "xstart", "xend", "ystart", "yend",
"phastart", "phaend"]
binning = {}
for k in keys:
binning[k] = getattr(self, k)
return binning
[docs]
def get_gsag_holes(gsagtab, segment, hv):
extfound = False
extnum = 1
hvkey = f"HVLEVEL{segment[-1]}"
while extfound is False:
try:
gsag_hv = fits.getval(gsagtab, hvkey, extnum)
gsag_seg = fits.getval(gsagtab, "segment", extnum)
except KeyError:
extnum += 1
continue
if gsag_hv == hv and gsag_seg == segment:
extfound = True
else:
extnum += 1
gsag = Table.read(gsagtab, format="fits", hdu=extnum)
gsag_df = gsag.to_pandas()
gsag_df = gsag_df.rename(columns={"LX": "X0", "LY": "Y0"})
gsag_df["X1"] = gsag_df["X0"] + gsag_df["DX"]
gsag_df["Y1"] = gsag_df["Y0"] + gsag_df["DY"]
gsag_df = gsag_df.astype("int32")
return gsag_df