Source code for acdc.utils.utils

import os
import datetime
import numpy as np
import pandas as pd
from astropy.io import fits

from acdc.database.calculate_dark import get_aperture_region

[docs] def sql_to_df(sql_results, returncols): # Convert query results (list of attributes) to pandas dataframe # d = {col: [getattr(x, col) for x in results] for col in returncols} d = {} for col in returncols: try: d[col] = [getattr(x, col) for x in sql_results] except: pass df = pd.DataFrame(d) return df
[docs] def timefunc(func): # IN USE """ Decorator to wrap and time functions. Parameters ---------- func: func Function to be timed Returns ------- wrapper: func Timed version of the function """ def wrapper(*args, **kw): t1 = datetime.datetime.now() result = func(*args, **kw) t2 = datetime.datetime.now() print(f"{func.__name__} executed in {t2 - t1}") return result return wrapper
[docs] def get_binning_pars(af): """ For a given superdark, return the binning information in the spatial directions and PHA. """ keys = ["bin_pha", "bin_x", "bin_y", "xstart", "xend", "ystart", "yend", "phastart", "phaend"] binning = {} for k in keys: binning[k] = af[k] return binning
[docs] def bin_coords(xs, ys, bin_x, bin_y, xstart=0, ystart=0, make_int=False, as_float=False): """ Given a list of coordinates in X & Y, transform them into the superdark's binned (and possibly offset) coordinate system. """ if not isinstance(xs, np.ndarray): xs = np.array(xs) if not isinstance(ys, np.ndarray): ys = np.array(ys) if as_float is True: xsnew = (xs - xstart) / bin_x ysnew = (ys - ystart) / bin_y else: xsnew = (xs - xstart) // bin_x ysnew = (ys - ystart) // bin_y if make_int is True: xsnew = xsnew.astype(int) ysnew = ysnew.astype(int) return xsnew, ysnew
[docs] def unbin_coords(xs, ys, bin_x, bin_y, xstart=0, ystart=0): """ Given a list of binned coordinates in X & Y, transform them into the the unbinned, native coordinate system. """ if not isinstance(xs, np.ndarray): xs = np.array(xs) if not isinstance(ys, np.ndarray): ys = np.array(ys) xsnew0 = (xs*bin_x) + xstart ysnew0 = (ys*bin_y) + ystart xsnew1 = xsnew0 + bin_x - 1 ysnew1 = ysnew0 + bin_y - 1 return (xsnew0, xsnew1), (ysnew0, ysnew1)
[docs] def unbin_image(binned_im, bin_x, bin_y, xstart=0, ystart=0, xend=16384, yend=1024): im_perpixel = binned_im / bin_x / bin_y unbinned_im = np.zeros(16777216).reshape(1024, 16384) xs = np.arange(xstart, xend, bin_x) ys = np.arange(ystart, yend, bin_y) for i in range(len(xs)-1): for j in range(len(ys)-1): unbinned_im[ys[j]:ys[j+1], xs[i]:xs[i+1]] = im_perpixel[j,i] return unbinned_im
[docs] def get_psa_wca(segment, cenwave, lp, binning, pad_psa=[0,0], pad_wca=[0,0]): """ Determine the rows that correspond to the PSA and WCA apertures for a given segment, cenwave, and lifetime position. Return the row indices in the binned superdark coordinate system. Arguments: segment (str): Get PSA/WCA regions for this segment. cenwave (str): Get PSA/WCA regions for this cenwave. lp (str): Get PSA/WCA regions for this lifetime position. binning (dict): Dictionary that describes the binning information in both the spatial and PHA dimensions. Returns: excluded_rows (array): List of rows to exclude. Corresponds to the PSA and WCA regions. apertures (dict): The excluded rows for each aperture, PSA and WCA. """ excluded_rows = np.array(()) apertures = {"PSA": None, "WCA": None} pad = {"PSA": pad_psa, "WCA": pad_wca} for aperture in apertures: aperture_regions = get_aperture_region(cenwave=cenwave, aperture=aperture, segments=[segment], life_adj=[lp]) box = aperture_regions[segment][f"lp{lp}_{aperture.lower()}_{cenwave}"] xmin0, xmax0, ymin0, ymax0 = box xsnew, ysnew = bin_coords(np.array([xmin0, xmax0]), np.array([ymin0, ymax0]), binning["bin_x"], binning["bin_y"], binning["xstart"], binning["ystart"], as_float=True, make_int=False) # ap_xmin, ap_xmax = xsnew # We don't need x coords ap_ymin, ap_ymax = ysnew ap_ymin = int(np.floor(ap_ymin)) ap_ymax = int(np.ceil(ap_ymax)) ap_ymin -= pad[aperture][0] ap_ymax += pad[aperture][1] rows = np.arange(ap_ymin, ap_ymax) apertures[aperture] = rows excluded_rows = np.concatenate((excluded_rows, rows)) excluded_rows = excluded_rows.astype(int) return excluded_rows, apertures
[docs] def get_background_regions(segment, cenwave, lp=None, date_obs="today", round=True, xtractab=None): if xtractab is None: if date_obs == "today": today = datetime.datetime.now() useafter = today.strftime("%Y-%m-%d") if "CRDS_PATH" not in os.environ: if os.path.exists("/grp/crds/cache"): os.environ["CRDS_PATH"] = "/grp/crds/cache" else: raise AssertionError("CRDS_PATH environment variable must first be defined") os.environ["CRDS_SERVER_URL"] = "https://hst-crds.stsci.edu" import crds current_pmap = crds.get_default_context() crds_1dx = crds.getrecommendations(parameters={"INSTRUME": "COS", "DETECTOR": "FUV", "LIFE_ADJ": lp, "OBSTYPE": "SPECTROSCOPIC", "DATE-OBS": useafter, "TIME-OBS": "00:00:00", "CENWAVE": cenwave}, reftypes=["xtractab"], context=current_pmap, observatory="hst") xtractab = os.path.join(os.environ["CRDS_PATH"], "references/hst/", crds_1dx["xtractab"]) data_1dx = fits.getdata(xtractab, 1) ind = np.where((data_1dx["cenwave"] == cenwave) & (data_1dx["segment"] == segment) & (data_1dx["aperture"] == "PSA")) xtract_data = data_1dx[ind] if segment == "FUVA": x0 = 1260 x1 = 15119 elif segment == "FUVA": x0 = 1000 x1 = 14990 background_d = {} for region in [1, 2]: b = xtract_data[f"b_bkg{region}"] height = xtract_data[f"b_hgt{region}"] m = xtract_data["slope"] x = np.arange(0, 16384) # the y values tracing the line in the middle of the bkgd parallelogram bkg_midy = m*x + b bkg_y0 = bkg_midy - height/2. bkg_y1 = bkg_midy + height/2. if round is True: bkg_y0 = np.floor(bkg_y0) bkg_y0 = bkg_y0.astype(int) bkg_y1 = np.ceil(bkg_y1) bkg_y1 = bkg_y1.astype(int) background_d[region] = {} background_d[region]["bkg_y0"] = bkg_y0 background_d[region]["bkg_y1"] = bkg_y1 return background_d