"""
desisim.pixsim
==============
Tools for DESI pixel level simulations using specter
"""
from __future__ import absolute_import, division, print_function
import sys
import os
import os.path
import random
from time import asctime
import socket
import astropy.units as u
import numpy as np
import desimodel.io
import desispec.io
from desispec.image import Image
import desispec.cosmics
from . import obs, io
from desiutil.log import get_logger
log = get_logger()
# Inhibit download of IERS-A catalog, even from a good server.
# Note that this is triggered by a call to astropy.time.Time(),
# which is subsequently used to compute sidereal_time().
# It's the initialization of astropy.time.Time() itself that makes the call.
from desiutil.iers import freeze_iers
from astropy.time import Time
[docs]def simulate_exposure(simspecfile, rawfile, cameras=None,
ccdshape=None, simpixfile=None, addcosmics=None, comm=None, keywords=None, outfibermap=None,
**kwargs):
"""
Simulate frames from an exposure, including I/O
Args:
simspecfile: input simspec format file with spectra
rawfile: output raw data file to write
Options:
cameras: str or list of str, e.g. b0, r1, .. z9
ccdshape: (npix_y, npix_x) primarily used to limit memory while testing
simpixfile: output file for noiseless truth pixels
addcosmics: if True (must be specified via command input), add cosmics from real data
comm: MPI communicator object
keywords: dictionnary with keywords to add to the headers
outfibermap: output fibermap
Additional keyword args are passed to pixsim.simulate()
For a lower-level pixel simulation interface that doesn't perform I/O,
see pixsim.simulate()
Note: call desi_preproc or desispec.preproc.preproc to pre-process the
output desi*.fits file for overscan subtraction, noise estimation, etc.
"""
#- Split communicator by nodes; each node processes N frames
#- Assumes / requires equal number of ranks per node
if comm is not None:
rank, size = comm.rank, comm.size
num_nodes = mpi_count_nodes(comm)
comm_node, node_index, num_nodes = mpi_split_by_node(comm, 1)
node_rank = comm_node.rank
node_size = comm_node.size
else:
log.debug('Not using MPI')
rank, size = 0, 1
comm_node = None
node_index = 0
num_nodes = 1
node_rank = 0
node_size = 1
if rank == 0:
log.debug('Starting simulate_exposure at {}'.format(asctime()))
if keywords is None :
keywords = dict()
adds={"OBSTYPE":"SCIENCE","FLAVOR":"science"}
for k in adds :
if k not in keywords :
keywords[k]=adds[k]
fibermap=desispec.io.read_fibermap(simspecfile)
log.info("Setting in fibermap PETAL_LOC=FIBER//500")
fibermap["PETAL_LOC"]=fibermap['FIBER']//500
if outfibermap is not None :
desispec.io.write_fibermap(outfibermap,fibermap)
log.info("Wrote "+outfibermap)
if cameras is None:
if rank == 0:
cameras = io.fibers2cameras(fibermap['FIBER'])
log.debug('Found cameras {} in input simspec file'.format(cameras))
if len(cameras) % num_nodes != 0:
raise ValueError('Number of cameras {} should be evenly divisible by number of nodes {}'.format(
len(cameras), num_nodes))
if comm is not None:
cameras = comm.bcast(cameras, root=0)
#- Fail early if camera alreaady in output file
if rank == 0 and os.path.exists(rawfile):
from astropy.io import fits
err = False
with fits.open(rawfile) as fx:
for camera in cameras:
if camera in fx:
log.error('Camera {} already in {}'.format(camera, rawfile))
err = True
if err:
raise ValueError('Some cameras already in output file')
#- Read simspec input; I/O layer handles MPI broadcasting
if rank == 0:
log.debug('Reading simspec at {}'.format(asctime()))
mycameras = cameras[node_index::num_nodes]
if node_rank == 0:
log.info("Assigning cameras {} to comm_exp node {}".format(mycameras, node_index))
simspec = io.read_simspec(simspecfile, cameras=mycameras,
readflux=False, comm=comm)
night = simspec.header['NIGHT']
expid = simspec.header['EXPID']
if rank == 0:
log.debug('Reading PSFs at {}'.format(asctime()))
psfs = dict()
#need to initialize previous channel
previous_channel = 'a'
for camera in mycameras:
#- Note: current PSF object can't be pickled and thus every
#- rank must read it instead of rank 0 read + bcast
channel = camera[0]
if channel not in psfs:
log.info('Reading {} PSF at {}'.format(channel, asctime()))
psfs[channel] = desimodel.io.load_psf(channel)
#- Trim effective CCD size; mainly to limit memory for testing
if ccdshape is not None:
psfs[channel].npix_y, psfs[channel].npix_x = ccdshape
psf = psfs[channel]
cosmics=None
#avoid re-broadcasting cosmics if we can
if previous_channel != channel:
if (addcosmics is True) and (node_rank == 0):
cosmics_file = io.find_cosmics(camera, simspec.header['EXPTIME'])
log.info('Reading cosmics templates {} at {}'.format(
cosmics_file, asctime()))
shape = (psf.npix_y, psf.npix_x)
cosmics = io.read_cosmics(cosmics_file, expid, shape=shape)
if (addcosmics is True) and (comm_node is not None):
if node_rank == 0:
log.info('Broadcasting cosmics at {}'.format(asctime()))
cosmics = comm_node.bcast(cosmics, root=0)
else:
log.debug("Cosmics not requested")
if node_rank == 0:
log.info("Starting simulate for camera {} on node {}".format(camera,node_index))
image, rawpix, truepix = simulate(camera, simspec, psf, comm=comm_node, preproc=False, cosmics=cosmics, **kwargs)
if keywords is not None :
for k in keywords :
image.meta[k] = keywords[k]
#- Use input communicator as barrier since multiple sub-communicators
#- will write to the same output file
if rank == 0:
log.debug('Writing outputs at {}'.format(asctime()))
tmprawfile = rawfile + '.tmp'
if comm is not None:
for i in range(comm.size):
if (i == comm.rank) and (comm_node.rank == 0):
desispec.io.write_raw(tmprawfile, rawpix, image.meta,
camera=camera, primary_header=keywords)
if simpixfile is not None:
io.write_simpix(simpixfile, truepix, camera=camera,
meta=image.meta)
comm.barrier()
else:
desispec.io.write_raw(tmprawfile, rawpix, image.meta, camera=camera, primary_header=keywords)
if simpixfile is not None:
io.write_simpix(simpixfile, truepix, camera=camera,
meta=image.meta)
if rank == 0:
log.info('Wrote {}'.format(rawfile))
log.debug('done at {}'.format(asctime()))
previous_channel = channel
#- All done; rename temporary raw file to final location
if comm is None or comm.rank == 0:
os.rename(tmprawfile, rawfile)
[docs]def simulate(camera, simspec, psf, nspec=None, ncpu=None,
cosmics=None, wavemin=None, wavemax=None, preproc=True, comm=None):
"""Run pixel-level simulation of input spectra
Args:
camera (string) : b0, r1, .. z9
simspec : desispec.io.SimSpec object from desispec.io.read_simspec()
psf : subclass of specter.psf.psf.PSF, e.g. from desimodel.io.load_psf()
Options:
nspec (int): number of spectra to simulate
ncpu (int): number of CPU cores to use in parallel
cosmics (desispec.image.Image): e.g. from desisim.io.read_cosmics()
wavemin (float): minimum wavelength range to simulate
wavemax (float): maximum wavelength range to simulate
preproc (boolean, optional) : also preprocess raw data (default True)
Returns:
(image, rawpix, truepix) tuple, where image is the preproc Image object
(only header is meaningful if preproc=False), rawpix is a 2D
ndarray of unprocessed raw pixel data, and truepix is a 2D ndarray
of truth for image.pix
"""
freeze_iers()
if (comm is None) or (comm.rank == 0):
log.info('Starting pixsim.simulate camera {} at {}'.format(camera,
asctime()))
#- parse camera name into channel and spectrograph number
channel = camera[0].lower()
ispec = int(camera[1])
assert channel in 'brz', \
'unrecognized channel {} camera {}'.format(channel, camera)
assert 0 <= ispec < 10, \
'unrecognized spectrograph {} camera {}'.format(ispec, camera)
assert len(camera) == 2, \
'unrecognized camera {}'.format(camera)
#- Load DESI parameters
params = desimodel.io.load_desiparams()
#- this is not necessarily true, the truth in is the fibermap
nfibers = params['spectro']['nfibers']
phot = simspec.cameras[camera].phot
if simspec.cameras[camera].skyphot is not None:
phot += simspec.cameras[camera].skyphot
if nspec is not None:
phot = phot[0:nspec]
else:
nspec = phot.shape[0]
#- Trim wavelengths if needed
wave = simspec.cameras[camera].wave
if wavemin is not None:
ii = (wave >= wavemin)
phot = phot[:, ii]
wave = wave[ii]
if wavemax is not None:
ii = (wave <= wavemax)
phot = phot[:, ii]
wave = wave[ii]
#- Project to image and append that to file
if (comm is None) or (comm.rank == 0):
log.info('Starting {} projection at {}'.format(camera, asctime()))
# The returned true pixel values will only exist on rank 0 in the
# MPI case. Otherwise it will be None.
truepix = parallel_project(psf, wave, phot, ncpu=ncpu, comm=comm)
if (comm is None) or (comm.rank == 0):
log.info('Finished {} projection at {}'.format(camera,
asctime()))
image = None
rawpix = None
if (comm is None) or (comm.rank == 0):
#- Start metadata header
header = simspec.header.copy()
header['CAMERA'] = camera
header['DOSVER'] = 'SIM'
header['FEEVER'] = 'SIM'
header['DETECTOR'] = 'SIM'
#- Add cosmics from library of dark images
ny = truepix.shape[0] // 2
nx = truepix.shape[1] // 2
if cosmics is not None:
# set to zeros values with mask bit 0 (= dead column or hot pixels)
cosmics_pix = cosmics.pix*((cosmics.mask&1)==0)
pix = np.random.poisson(truepix) + cosmics_pix
try: #- cosmics templates >= v0.3
rdnoiseA = cosmics.meta['OBSRDNA']
rdnoiseB = cosmics.meta['OBSRDNB']
rdnoiseC = cosmics.meta['OBSRDNC']
rdnoiseD = cosmics.meta['OBSRDND']
except KeyError: #- cosmics templates <= v0.2
print(cosmic.meta)
rdnoiseA = cosmics.meta['RDNOISE0']
rdnoiseB = cosmics.meta['RDNOISE1']
rdnoiseC = cosmics.meta['RDNOISE2']
rdnoiseD = cosmics.meta['RDNOISE3']
else:
pix = truepix
readnoise = params['ccd'][channel]['readnoise']
rdnoiseA = rdnoiseB = rdnoiseC = rdnoiseD = readnoise
#- data already has noise if cosmics were added
noisydata = (cosmics is not None)
#- Split by amplifier and expand into raw data
nprescan = params['ccd'][channel]['prescanpixels']
if 'overscanpixels' in params['ccd'][channel]:
noverscan = params['ccd'][channel]['overscanpixels']
else:
noverscan = 50
#- Reproducibly random overscan bias level offsets across diff exp
assert channel in 'brz'
if channel == 'b':
irand = ispec
elif channel == 'r':
irand = 10 + ispec
elif channel == 'z':
irand = 20 + ispec
seeds = np.random.RandomState(0).randint(2**32-1, size=30)
rand = np.random.RandomState(seeds[irand])
nyraw = ny
nxraw = nx + nprescan + noverscan
rawpix = np.empty( (nyraw*2, nxraw*2), dtype=np.int32 )
gain = params['ccd'][channel]['gain']
#- Amp A/1 Lower Left
rawpix[0:nyraw, 0:nxraw] = \
photpix2raw(pix[0:ny, 0:nx], gain, rdnoiseA,
readorder='lr', nprescan=nprescan, noverscan=noverscan,
offset=rand.uniform(100, 200),
noisydata=noisydata)
#- Amp B/2 Lower Right
rawpix[0:nyraw, nxraw:nxraw+nxraw] = \
photpix2raw(pix[0:ny, nx:nx+nx], gain, rdnoiseB,
readorder='rl', nprescan=nprescan, noverscan=noverscan,
offset=rand.uniform(100, 200),
noisydata=noisydata)
#- Amp C/3 Upper Left
rawpix[nyraw:nyraw+nyraw, 0:nxraw] = \
photpix2raw(pix[ny:ny+ny, 0:nx], gain, rdnoiseC,
readorder='lr', nprescan=nprescan, noverscan=noverscan,
offset=rand.uniform(100, 200),
noisydata=noisydata)
#- Amp D/4 Upper Right
rawpix[nyraw:nyraw+nyraw, nxraw:nxraw+nxraw] = \
photpix2raw(pix[ny:ny+ny, nx:nx+nx], gain, rdnoiseD,
readorder='rl', nprescan=nprescan, noverscan=noverscan,
offset=rand.uniform(100, 200),
noisydata=noisydata)
def xyslice2header(xyslice):
'''
convert 2D slice into IRAF style [a:b,c:d] header value
e.g. xyslice2header(np.s_[0:10, 5:20]) -> '[6:20,1:10]'
'''
yy, xx = xyslice
value = '[{}:{},{}:{}]'.format(xx.start+1, xx.stop,
yy.start+1, yy.stop)
return value
#- Amp order from DESI-1964 (previously 1-4 instead of A-D)
#- C D
#- A B
xoffset = nprescan+nx+noverscan
header['PRESECA'] = xyslice2header(np.s_[0:nyraw, 0:0+nprescan])
header['DATASECA'] = xyslice2header(np.s_[0:nyraw, nprescan:nprescan+nx])
header['BIASSECA'] = xyslice2header(np.s_[0:nyraw, nprescan+nx:nprescan+nx+noverscan])
header['CCDSECA'] = xyslice2header(np.s_[0:ny, 0:nx])
header['PRESECB'] = xyslice2header(np.s_[0:nyraw, xoffset+noverscan+nx:xoffset+noverscan+nx+nprescan])
header['DATASECB'] = xyslice2header(np.s_[0:nyraw, xoffset+noverscan:xoffset+noverscan+nx])
header['BIASSECB'] = xyslice2header(np.s_[0:nyraw, xoffset:xoffset+noverscan])
header['CCDSECB'] = xyslice2header(np.s_[0:ny, nx:2*nx])
header['PRESECC'] = xyslice2header(np.s_[nyraw:2*nyraw, 0:0+nprescan])
header['DATASECC'] = xyslice2header(np.s_[nyraw:2*nyraw, nprescan:nprescan+nx])
header['BIASSECC'] = xyslice2header(np.s_[nyraw:2*nyraw, nprescan+nx:nprescan+nx+noverscan])
header['CCDSECC'] = xyslice2header(np.s_[ny:2*ny, 0:nx])
header['PRESECD'] = xyslice2header(np.s_[nyraw:2*nyraw, xoffset+noverscan+nx:xoffset+noverscan+nx+nprescan])
header['DATASECD'] = xyslice2header(np.s_[nyraw:2*nyraw, xoffset+noverscan:xoffset+noverscan+nx])
header['BIASSECD'] = xyslice2header(np.s_[nyraw:2*nyraw, xoffset:xoffset+noverscan])
header['CCDSECD'] = xyslice2header(np.s_[ny:2*ny, nx:2*nx])
#- Add additional keywords to mimic real raw data
header['INSTRUME'] = 'DESI'
header['PROCTYPE'] = 'RAW'
header['PRODTYPE'] = 'image'
header['EXPFRAME'] = 0
header['REQTIME'] = simspec.header['EXPTIME']
header['TIMESYS'] = 'UTC'
#- DATE-OBS format YEAR-MM-DDThh:mm:ss.sss -> OBSID kpnoYEARMMDDthhmmss
header['OBSID']='kp4m'+header['DATE-OBS'][0:19].replace('-','').replace(':','').lower()
header['TIME-OBS'] = header['DATE-OBS'].split('T')[1]
header['DELTARA'] = 0.0
header['DELTADEC'] = 0.0
header['SPECGRPH'] = ispec
header['CCDNAME'] = 'CCDS' + str(ispec) + str(channel).upper()
header['CCDPREP'] = 'purge,clear'
header['CCDSIZE'] = '{},{}'.format(rawpix.shape[0], rawpix.shape[1])
header['CCDTEMP'] = 850.0
header['CPUTEMP'] = 63.7
header['CASETEMP'] = 62.8
header['CCDTMING'] = 'sim_timing.txt'
header['CCDCFG'] = 'sim.cfg'
header['SETTINGS'] = 'sim_detectors.json'
header['VESSEL'] = 7 #- I don't know what this is
header['FEEBOX'] = 'sim097'
header['PGAGAIN'] = 5
header['OCSVER'] = 'SIM'
header['CONSTVER'] = 'SIM'
header['BLDTIME'] = 0.35
header['DIGITIME'] = 61.9
#- Remove some spurious header keywords from upstream
if 'BUNIT' in header and header['BUNIT'] == 'Angstrom':
del header['BUNIT']
if 'MJD' in header and 'MJD-OBS' not in header:
header['MJD-OBS'] = header['MJD']
del header['MJD']
for key in ['RA', 'DEC']:
if key in header:
del header[key]
#- Drive MJD-OBS from DATE-OBS if needed
if 'MJD-OBS' not in header:
header['MJD-OBS'] = Time(header['DATE-OBS']).mjd
#- from http://www-kpno.kpno.noao.edu/kpno-misc/mayall_params.html
kpno_longitude = -(111. + 35/60. + 59.6/3600) * u.deg
#- Convert DATE-OBS to sexigesimal (sigh) Local Sidereal Time
#- Use mean ST as close enough for sims to avoid nutation calc
t = Time(header['DATE-OBS'])
# This calculation can raise a non-catastrophic "overflow encountered in
# double_scalars" error in astropy (see
# https://github.com/desihub/specsim/pull/120); catch it here.
with np.errstate(all='ignore'):
st = t.sidereal_time('mean', kpno_longitude).to('deg').value
hour = st/15
minute = (hour % 1)*60
second = (minute % 1)*60
header['ST'] = '{:02d}:{:02d}:{:0.3f}'.format(
int(hour), int(minute), second)
if preproc:
log.debug('Running preprocessing at {}'.format(asctime()))
image = desispec.preproc.preproc(rawpix, header, primary_header=simspec.header)
else:
log.debug('Skipping preprocessing')
image = Image(np.zeros(truepix.shape), np.zeros(truepix.shape), meta=header)
if (comm is None) or (comm.rank == 0):
log.info('Finished pixsim.simulate for camera {} at {}'.format(camera,
asctime()))
return image, rawpix, truepix
[docs]def photpix2raw(phot, gain=1.0, readnoise=3.0, offset=None,
nprescan=7, noverscan=50, readorder='lr', noisydata=True):
'''
Add prescan, overscan, noise, and integerization to an image
Args:
phot: 2D float array of mean input photons per pixel
gain (float, optional): electrons/ADU
readnoise (float, optional): CCD readnoise in electrons
offset (float, optional): bias offset to add
nprescan (int, optional): number of prescan pixels to add
noverscan (int, optional): number of overscan pixels to add
readorder (str, optional): 'lr' or 'rl' to indicate readout order
'lr' : add prescan on left and overscan on right of image
'rl' : add prescan on right and overscan on left of image
noisydata (boolean, optional) : if True, don't add noise,
e.g. because input signal already had noise from a cosmics image
Returns 2D integer ndarray:
image = int((poisson(phot) + offset + gauss(readnoise))/gain)
Integerization happens twice: the mean photons are poisson sampled
into integers, but then offets, readnoise, and gain are applied before
resampling into ADU integers
This is intended to be used per-amplifier, not for an entire CCD image.
'''
ny = phot.shape[0]
nx = phot.shape[1] + nprescan + noverscan
#- reading from right to left is effectively swapping pre/overscan counts
if readorder.lower() in ('rl', 'rightleft'):
nprescan, noverscan = noverscan, nprescan
img = np.zeros((ny, nx), dtype=float)
img[:, nprescan:nprescan+phot.shape[1]] = phot
if offset is None:
offset = np.random.uniform(100, 200)
if noisydata:
#- Data already has noise; just add offset and noise to pre/overscan
img += offset
img[0:ny, 0:nprescan] += np.random.normal(scale=readnoise, size=(ny, nprescan))
ix = phot.shape[1] + nprescan
img[0:ny, ix:ix+noverscan] += np.random.normal(scale=readnoise, size=(ny, noverscan))
img /= gain
else:
#- Add offset and noise to everything
noise = np.random.normal(loc=offset, scale=readnoise, size=img.shape)
img = np.random.poisson(img) + noise
img /= gain
return img.astype(np.int32)
#- Helper function for multiprocessing parallel project
[docs]def _project(args):
"""
Helper function to project photons onto a subimage
Args:
tuple/array of [psf, wave, phot, specmin]
Returns (xyrange, subimage) such that
xmin, xmax, ymin, ymax = xyrange
image[ymin:ymax, xmin:xmax] += subimage
"""
try:
psf, wave, phot, specmin = args
nspec = phot.shape[0]
if phot.shape[-1] != wave.shape[-1]:
raise ValueError('phot.shape {} vs. wave.shape {} mismatch'.format(phot.shape, wave.shape))
xyrange = psf.xyrange( [specmin, specmin+nspec], wave )
img = psf.project(wave, phot, specmin=specmin, xyrange=xyrange)
return (xyrange, img)
except Exception as e:
if os.getenv('UNITTEST_SILENT') is None:
import traceback
print('-'*60)
print('ERROR in _project', psf.wmin, psf.wmax, wave[0], wave[-1], phot.shape, specmin)
traceback.print_exc()
print('-'*60)
raise e
#- Move this into specter itself?
[docs]def parallel_project(psf, wave, phot, specmin=0, ncpu=None, comm=None):
"""
Using psf, project phot[nspec, nw] vs. wave[nw] onto image
Return 2D image
"""
img = None
if comm is not None:
# MPI version
# Get a smaller communicator if not enough spectra
nspec = phot.shape[0]
if nspec < comm.size:
keep = int(comm.rank < nspec)
comm = comm.Split(color=keep)
if not keep:
return None
specs = np.arange(phot.shape[0], dtype=np.int32)
myspecs = np.array_split(specs, comm.size)[comm.rank]
nspec = phot.shape[0]
iispec = np.linspace(specmin, nspec, int(comm.size+1)).astype(int)
args = list()
if comm.rank == 0:
for i in range(comm.size):
if iispec[i+1] > iispec[i]:
args.append( [psf, wave, phot[iispec[i]:iispec[i+1]], iispec[i]] )
args=comm.scatter(args,root=0)
#now that all ranks have args, we can call _project
xy_subimg=_project(args)
#_project calls project calls spotgrid etc
xy_subimg=comm.gather(xy_subimg,root=0)
if comm.rank ==0:
#now all the data should be back at rank 0
# use same technique as multiprocessing to recombine the data
img = np.zeros( (psf.npix_y, psf.npix_x) )
for xyrange, subimg in xy_subimg:
xmin, xmax, ymin, ymax = xyrange
img[ymin:ymax, xmin:xmax] += subimg
#end of mpi section
else:
import multiprocessing as mp
if ncpu is None:
# Avoid hyperthreading
ncpu = mp.cpu_count() // 2
if ncpu <= 1:
#- Serial version
log.debug('Not using multiprocessing (ncpu={})'.format(ncpu))
img = psf.project(wave, phot, specmin=specmin)
else:
#- multiprocessing version
#- Split the spectra into ncpu groups
log.debug('Using multiprocessing (ncpu={})'.format(ncpu))
nspec = phot.shape[0]
iispec = np.linspace(specmin, nspec, ncpu+1).astype(int)
args = list()
for i in range(ncpu):
if iispec[i+1] > iispec[i]: #- can be false if nspec < ncpu
args.append( [psf, wave, phot[iispec[i]:iispec[i+1]], iispec[i]] )
#- Create pool of workers to do the projection using _project
#- xyrange, subimg = _project( [psf, wave, phot, specmin] )
pool = mp.Pool(ncpu)
xy_subimg = pool.map(_project, args)
#print("xy_subimg from pool")
#print(xy_subimg)
#print(len(xy_subimg))
img = np.zeros( (psf.npix_y, psf.npix_x) )
for xyrange, subimg in xy_subimg:
xmin, xmax, ymin, ymax = xyrange
img[ymin:ymax, xmin:xmax] += subimg
#- Prevents hangs of Travis tests
pool.close()
pool.join()
return img
[docs]def get_nodes_per_exp(nnodes,nexposures,ncameras,user_nodes_per_comm_exp=None):
"""
Calculate how many nodes to use per exposure
Args:
nnodes: number of nodes in MPI COMM_WORLD (not number of ranks)
nexposures: number of exposures to process
ncameras: number of cameras per exposure
user_nodes_per_comm_exp (int, optional): user override of number of
nodes to use; used to check requirements
Returns number of nodes to include in sub-communicators used to process
individual exposures
Notes:
* Uses the largest number of nodes per exposure that will still
result in efficient node usage
* requires that (nexposures*ncameras) / nnodes = int
* the derived nodes_per_comm_exp * nexposures / nodes = int
* See desisim.test.test_pixsim.test_get_nodes_per_exp() for examples
* if user_nodes_per_comm_exp is given, requires that
GreatestCommonDivisor(nnodes, ncameras) / user_nodes_per_comm_exp = int
"""
from math import gcd
import desiutil.log as logging
log = logging.get_logger()
log.setLevel(logging.INFO)
#check if nframes is evenly divisible by nnodes
nframes = ncameras*nexposures
if nframes % nnodes !=0:
### msg=("nframes {} must be evenly divisible by nnodes {}, try again".format(nframes, nnodes))
### raise ValueError(msg)
msg=("nframes {} is not evenly divisible by nnodes {}; packing will be inefficient".format(nframes, nnodes))
log.warning(msg)
else:
log.debug("nframes {} is evenly divisible by nnodes {}, check passed".format(nframes, nnodes))
#find greatest common divisor between nnodes and ncameras
#greatest common divisor = greatest common factor
#we use python's built in gcd
greatest_common_factor=gcd(nnodes,ncameras)
#the greatest common factor must be greater than one UNLESS we are on one node
if nnodes > 1:
if greatest_common_factor == 1:
msg=("greatest common factor {} between nnodes {} and nframes {} must be larger than one, try again".format(greatest_common_factor, nnodes, nframes))
raise ValueError(msg)
else:
log.debug("greatest common factor {} between nnodes {} and nframes {} is greater than one, check passed".format(greatest_common_factor, nnodes, nframes))
#check to make sure the user hasn't specified a really asinine value of user_nodes_per_comm_exp
if user_nodes_per_comm_exp is not None:
if greatest_common_factor % user_nodes_per_comm_exp !=0:
msg=("user-specified value of user_nodes_per_comm_exp {} is bad, try again".format(user_nodes_per_comm_exp))
raise ValueError(msg)
else:
log.debug("user-specified value of user_nodes_per_comm_exp {} is good, check passed".format(user_nodes_per_comm_exp))
nodes_per_comm_exp=user_nodes_per_comm_exp
#if the user didn't specify anything, use the greatest common factor
if user_nodes_per_comm_exp is None:
nodes_per_comm_exp=greatest_common_factor
#finally check to make sure exposures*gcf/nnodes is an integer to avoid inefficient node use
if (nexposures*nodes_per_comm_exp) % nnodes != 0:
### msg=("nexposures {} * nodes_per_comm_exp {} does not divide evenly into nnodes {}, try again".format(nexposures, nodes_per_comm_exp, nnodes))
### raise ValueError(msg)
msg=("nexposures {} * nodes_per_comm_exp {} does not divide evenly into nnodes {}; packing will be inefficient".format(nexposures, nodes_per_comm_exp, nnodes))
log.warning(msg)
else:
log.debug("nexposures {} * nodes_per_comm_exp {} divides evenly into nnodes {}, check passed".format(nexposures, nodes_per_comm_exp, nnodes))
return nodes_per_comm_exp
#-------------------------------------------------------------------------
#- MPI utility functions
#- These functions assist with splitting a communicator across node boundaries.
#- That constraint isn't required by MPI, but can be convenient for humans
#- thinking about "I want to process one camera with one node" or "I want to
#- process 6 exposures with 20 nodes using 10 nodes per exposure"
[docs]def mpi_count_nodes(comm):
'''
Return the number of nodes in this communicator
'''
nodenames = comm.allgather(socket.gethostname())
num_nodes=len(set(nodenames))
return num_nodes
[docs]def mpi_split_by_node(comm, nodes_per_communicator):
'''
Split an MPI communicator into sub-communicators with integer numbers
of nodes per communicator
Args:
comm: MPI communicator
nodes_per_communicator: number of nodes per sub-communicator
Returns:
MPI sub-communicator, node_index, total_num_nodes
Notes:
* total number of nodes in original communicator must be an integer
multiple of nodes_per_communicator
* if comm is split into N sub-communicators, node_index is the index
of which of the N is returned for this rank
* total_num_nodes = number of nodes in original communicator
'''
num_nodes = mpi_count_nodes(comm)
if comm.size % num_nodes != 0:
raise ValueError('Variable number of ranks per node')
if num_nodes % nodes_per_communicator != 0:
raise ValueError('Input number of nodes {} must be divisible by nodes_per_communicator {}'.format(
num_nodes, nodes_per_communicator))
ranks_per_communicator = comm.size // (num_nodes // nodes_per_communicator)
node_index = comm.rank // ranks_per_communicator
comm_node = comm.Split(color = node_index)
return comm_node, node_index, num_nodes