Source code for desisim.scripts.quicktransients

"""
desisim.scripts.quicktransients
===============================
"""

import os
import numpy as np
import healpy as hp
from datetime import datetime

from desisim.templates import BGS, ELG, LRG
from desisim.transients import transients

from desitarget.mock.mockmaker import BGSMaker, ELGMaker, LRGMaker
from desitarget.cuts import isBGS_colors, isELG_colors, isLRG_colors

from desisim.simexp import reference_conditions
from desisim.transients import transients
from desisim.scripts.quickspectra import sim_spectra

from desispec.io import read_spectra, write_spectra
from desispec.coaddition import coadd_cameras

from desiutil.log import get_logger, DEBUG

from astropy.table import Table, hstack, vstack

import argparse


[docs]def _set_wave(wavemin=None, wavemax=None, dw=0.8): """Set default wavelength grid for simulations. Parameters ---------- wavemin : float or None Minimum wavelength, in Angstroms. wavemax : float or None Maximum wavelength, in Angstroms. dw : float Bin size. Returns ------- wave : ndarray Grid of wavelength values. """ from desimodel.io import load_throughput if wavemin is None: wavemin = load_throughput('b').wavemin - 10.0 if wavemax is None: wavemax = load_throughput('z').wavemax + 10.0 return np.arange(round(wavemin, 1), wavemax, dw)
[docs]def _get_healpixels_in_footprint(nside=64): """Obtain a list of HEALPix pixels in the DESI footprint. Parameters ---------- nside : int HEALPix nside parameter (in form nside=2**k, k=[1,2,3,...]). Returns ------- healpixels : ndarray List of HEALPix pixels within the DESI footprint. """ from desimodel import footprint from desimodel.io import load_tiles # Load DESI tiles. tile_tab = load_tiles() npix = hp.nside2npix(nside) pix_ids = np.arange(npix) ra, dec = hp.pix2ang(nside, pix_ids, lonlat=True) # Get a list of pixel IDs inside the DESI footprint. in_desi = footprint.is_point_in_desi(tile_tab, ra, dec) healpixels = pix_ids[in_desi] return healpixels
[docs]def write_templates(filename, flux, wave, target, truth, objtruth): """Write galaxy templates to a FITS file. Parameters ---------- filename : str Path to output file. flux : ndarray Array of flux data for template spectra. wave : ndarray Array of wavelengths. target : Table Target information. truth : Table Template simulation truth. objtruth : Table Object-specific truth data. """ import astropy.units as u from astropy.io import fits hx = fits.HDUList() # Write the wavelength table. hdu_wave = fits.PrimaryHDU(wave) hdu_wave.header['EXTNAME'] = 'WAVE' hdu_wave.header['BUNIT'] = 'Angstrom' hdu_wave.header['AIRORVAC'] = ('vac', 'Vacuum wavelengths') hx.append(hdu_wave) # Write the flux table. fluxunits = 1e-17 * u.erg / (u.s * u.cm**2 * u.Angstrom) hdu_flux = fits.ImageHDU(flux) hdu_flux.header['EXTNAME'] = 'FLUX' hdu_flux.header['BUNIT'] = str(fluxunits) hx.append(hdu_flux) # Write targets table. hdu_targets = fits.table_to_hdu(target) hdu_targets.header['EXTNAME'] = 'TARGETS' hx.append(hdu_targets) # Write truth table. hdu_truth = fits.table_to_hdu(truth) hdu_truth.header['EXTNAME'] = 'TRUTH' hx.append(hdu_truth) # Write objtruth table. hdu_objtruth = fits.table_to_hdu(objtruth) hdu_objtruth.header['EXTNAME'] = 'OBJTRUTH' hx.append(hdu_objtruth) hx.writeto(filename, overwrite=True)
[docs]def parse(options=None): """Parse command line options. """ parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Fast galaxy + transient simulator') # Set up observing conditions. cond = parser.add_argument_group('Observing conditions') for c, v in reference_conditions['DARK'].items(): cond.add_argument('--{}'.format(c.lower()), type=float, default=v) # Set up simulation. sims = parser.add_argument_group('Simulation settings') sims.add_argument('--nside', type=int, default=64, help='HEALPix NSIDE') sims.add_argument('--nspec', type=int, default=100, help='Spectra per healpixel') sims.add_argument('--nsim', type=int, default=10, help='Number of simulations') sims.add_argument('--seed', type=int, default=None, help='RNG seed') sims.add_argument('--host', choices=['bgs','elg','lrg'], default='bgs', help='Host galaxy type') sims.add_argument('--outdir', default='', help='Absolute path to simulation output') sims.add_argument('--coadd_cameras', action='store_true', help='If true, coadd cameras in generated spectra') # Set up transient model as a simulation setting. None==no transient. tran = parser.add_argument_group('Transient settings') tran.add_argument('--magrange', nargs=2, type=float, default=[0.,2.5], help='Transient-host mag range; e.g., 0 5') tran.add_argument('--epochrange', nargs=2, type=float, default=[-10.,10.], help='Epoch range w.r.t. t0 in days; e.g., -10 10') modelnames = [] mdict = transients.get_type_dict() for t, models in mdict.items(): for m in models: modelnames.append(m) tran.add_argument('--transient', default=None, help='None or one of these models: {}'.format(transients.get_type_dict())) if options is None: args = parser.parse_args() else: args = parser.parse_args(options) return args
def main(args=None): log = get_logger() if isinstance(args, (list, tuple, type(None))): args = parse(args) thedate = datetime.now().strftime('%Y-%m-%d') # Generate transient model if one is specified. trans_model = None if args.transient is not None: trans_model = transients.get_model(args.transient) # Generate list of HEALPix pixels to randomly sample the mocks. rng = np.random.RandomState(args.seed) nside = args.nside healpixels = _get_healpixels_in_footprint(nside=args.nside) npix = np.minimum(10*args.nsim, len(healpixels)) pixels = rng.choice(healpixels, size=npix, replace=False) ipix = iter(pixels) # Set up the template generator. fluxratio_range = 10**(-np.sort(args.magrange)[::-1] / 2.5) epoch_range = np.sort(args.epochrange) if args.host == 'bgs': maker = BGSMaker(seed=args.seed) tmpl_maker = BGS elif args.host == 'elg': maker = ELGMaker(seed=args.seed) tmpl_maker = ELG elif args.host == 'lrg': maker = LRGMaker(seed=args.seed) tmpl_maker = LRG else: raise ValueError('Unusable host type {}'.format(args.host)) maker.template_maker = tmpl_maker(transient=trans_model, tr_fluxratio=fluxratio_range, tr_epoch=epoch_range) for j in range(args.nsim): # Loop until finding a non-empty healpixel with mock galaxies. tdata = [] while len(tdata) == 0: pixel = next(ipix) tdata = maker.read(healpixels=pixel, nside=args.nside) # Generate spectral templates and write them to truth files. # Keep producing templates until we have enough to pass brightness cuts. wave = None flux, targ, truth, objtr = [], [], [], [] ntosim = np.min([args.nspec, len(tdata['RA'])]) ngood = 0 while ngood < args.nspec: idx = rng.choice(len(tdata['RA']), ntosim) tflux, twave, ttarg, ttruth, tobj = maker.make_spectra(tdata, indx=idx) g, r, z, w1, w2 = [ttruth['FLUX_{}'.format(_)] for _ in ['G','R','Z','W1','W2']] rfib = ttarg['FIBERFLUX_R'] # print(g, r, z, w1, w2, rfib) # Apply color cuts. is_bright = isBGS_colors(rfib, g, r, z, w1, w2, targtype='bright') is_faint = isBGS_colors(rfib, g, r, z, w1, w2, targtype='faint') is_wise = isBGS_colors(rfib, g, r, z, w1, w2, targtype='wise') keep = np.logical_or.reduce([is_bright, is_faint, is_wise]) _ngood = np.count_nonzero(keep) if _ngood > 0: ngood += _ngood flux.append(tflux[keep, :]) targ.append(ttarg[keep]) truth.append(ttruth[keep]) objtr.append(tobj[keep]) wave = maker.wave flux = np.vstack(flux)[:args.nspec, :] targ = vstack(targ)[:args.nspec] truth = vstack(truth)[:args.nspec] objtr = vstack(objtr)[:args.nspec] # Set up and verify the TARGETID across all truth tables. n = len(truth) new_id = 10000*pixel + 100*j + np.arange(1, n+1) targ['OBJID'][:] = new_id truth['TARGETID'][:] = new_id objtr['TARGETID'][:] = new_id assert(len(truth) == args.nspec) assert(np.all(targ['OBJID'] == truth['TARGETID'])) assert(len(targ) == len(np.unique(targ['OBJID']))) assert(len(truth) == len(np.unique(truth['TARGETID']))) assert(len(objtr) == len(np.unique(objtr['TARGETID']))) truthfile = os.path.join(args.outdir, '{}_{}_{:04d}s_{:03d}_truth.fits'.format(args.host, thedate, int(args.exptime), j+1)) write_templates(truthfile, flux, wave, targ, truth, objtr) log.info('Wrote {}'.format(truthfile)) # Get observing conditions and generate spectra. obs = dict(AIRMASS=args.airmass, EXPTIME=args.exptime, MOONALT=args.moonalt, MOONFRAC=args.moonfrac, MOONSEP=args.moonsep, SEEING=args.seeing) fcols = dict(BRICKID=targ['BRICKID'], BRICK_OBJID=targ['OBJID'], FLUX_G=targ['FLUX_G'], FLUX_R=targ['FLUX_R'], FLUX_Z=targ['FLUX_Z'], FLUX_W1=targ['FLUX_W1'], FLUX_W2=targ['FLUX_W2'], FLUX_IVAR_G=targ['FLUX_IVAR_G'], FLUX_IVAR_R=targ['FLUX_IVAR_R'], FLUX_IVAR_Z=targ['FLUX_IVAR_Z'], FLUX_IVAR_W1=targ['FLUX_IVAR_W1'], FLUX_IVAR_W2=targ['FLUX_IVAR_W2'], FIBERFLUX_G=targ['FIBERFLUX_G'], FIBERFLUX_R=targ['FIBERFLUX_R'], FIBERFLUX_Z=targ['FIBERFLUX_Z'], FIBERTOTFLUX_G=targ['FIBERTOTFLUX_G'], FIBERTOTFLUX_R=targ['FIBERTOTFLUX_R'], FIBERTOTFLUX_Z=targ['FIBERTOTFLUX_Z'], MW_TRANSMISSION_G=targ['MW_TRANSMISSION_G'], MW_TRANSMISSION_R=targ['MW_TRANSMISSION_R'], MW_TRANSMISSION_Z=targ['MW_TRANSMISSION_Z'], EBV=targ['EBV']) specfile = os.path.join(args.outdir, '{}_{}_{:04d}s_{:03d}_spect.fits'.format(args.host, thedate, int(args.exptime), j+1)) # redshifts = truth['TRUEZ'] if args.host=='bgs' else None redshifts = None sim_spectra(wave, flux, args.host, specfile, sourcetype=args.host, obsconditions=obs, meta=obs, fibermap_columns=fcols, targetid=truth['TARGETID'], redshift=redshifts, ra=targ['RA'], dec=targ['DEC'], seed=args.seed, expid=j) if args.coadd_cameras: coaddfile = specfile.replace('spect', 'coadd') spectra = read_spectra(specfile) spectra = coadd_cameras(spectra) write_spectra(coaddfile, spectra) log.info('Wrote {}'.format(coaddfile))