Source code for acdc.database.query_cos_dark

import yaml
from sqlalchemy.orm import load_only
import pandas as pd

from .connect_db import load_connection
from .schema import Solar, Darks

TESTING = False

[docs] def all_darks(dbname="cos_dark"): """ Query the Darks table to get darks for all time for all PHAs. Args: dbname (str): The location of the SQLite database, with full path, e.g. /path/to/cos_dark.db If in the current directory, do not include . or ./ Returns: df (:obj:`pandas.DataFrame`): Query results. """ # Connect to database session, engine = load_connection(dbname) # Define columns to return from database cols = ["expstart", "solar_flux", "latitude", "longitude", "segment", "hv", "region", "saa_distance"] cols += [f"dark_pha{x}" for x in range(0,32)] if TESTING is True: cols = ["expstart", "latitude", "longitude", "solar_flux", "dark_pha11"] cols_attr = [getattr(Darks, f) for f in cols] # Execute SELECt query with WHERE statements query = session.query(Darks).options(load_only(*cols_attr)) # .filter(Darks.region == "inner")\ # .filter(Darks.segment == "FUVA") if TESTING is True: query = query.filter(Darks.filename == "le0a3mcuq_corrtag_a.fits") results = query.all() # Convert query results (list of attributes) to pandas dataframe d = {col: [getattr(x, col) for x in results] for col in cols} df = pd.DataFrame(d) return df
[docs] def files_by_mjd(mjdstart, mjdend, segment="FUVA", hv=167, morecols=[], dbname="cos_dark"): # Connect to database session, engine = load_connection(dbname) cols = ["fileloc"] if hv == "*" or hv == None: query = session.query(Darks.fileloc.distinct())\ .filter(Darks.expstart >= mjdstart)\ .filter(Darks.expstart < mjdend)\ .filter(Darks.segment == segment)\ .order_by(Darks.expstart) else: query = session.query(Darks.fileloc.distinct())\ .filter(Darks.expstart >= mjdstart)\ .filter(Darks.expstart < mjdend)\ .filter(Darks.segment == segment)\ .filter(Darks.hv == hv)\ .order_by(Darks.expstart) results = query.all() d = {cols[i]: [x[i] for x in results] for i in range(len(cols))} df = pd.DataFrame(d) return df
[docs] def counts_by_mjd(mjdstart, mjdend, morecols=[], dbname="cos_dark"): # Connect to database session, engine = load_connection(dbname) cols = ["expstart", "solar_flux", "latitude", "longitude", "segment", "hv", "region", "saa_distance"] cols += [f"dark_pha{x}" for x in range(0,32)] cols += morecols cols_attr = [getattr(Darks, f) for f in cols] query = session.query(Darks).options(load_only(*cols_attr))\ .filter(Darks.expstart >= mjdstart)\ .filter(Darks.expstart < mjdend) results = query.all() d = {col: [getattr(x, col) for x in results] for col in cols} df = pd.DataFrame(d) return df