"""
desisim.archetypes
==================
Archetype routines for desisim.
"""
from __future__ import absolute_import, division, print_function
import os
import numpy as np
from desiutil.log import get_logger
log = get_logger()
[docs]def compute_chi2(flux, ferr=None):
"""Compute the chi2 distance matrix.
Parameters
----------
flux : numpy.ndarray
Array [Nspec, Npix] of spectra or templates where Nspec is the number of
spectra and Npix is the number of pixels.
ferr : numpy.ndarray
Uncertainty spectra ccorresponding to flux (default None).
Returns
-------
Tuple of (chi2, amp) where:
chi2 : numpy.ndarray
Chi^2 matrix [Nspec, Nspec] between all combinations of normalized spectra.
amp : numpy.ndarray
Amplitude matrix [Nspec, Nspec] between all combinations of spectra.
"""
nspec, npix = flux.shape
chi2 = np.zeros((nspec, nspec), dtype='f4')
amp = np.zeros((nspec, nspec), dtype='f4')
flux = flux.copy()
rescale = np.sqrt(npix/np.sum(flux**2,axis=1))
flux *= rescale[:,None]
if ferr is None:
for ii in range(nspec):
if ii % 500 == 0:
log.info('Computing chi2 matrix for spectra {}-{} out of {}.'.format(
int(ii/500) * 500, np.min(((ii+1) * int(ii/500), nspec-1)), nspec))
amp1 = np.sum(flux[ii]*flux,axis=1)/npix
chi2[ii,:] = npix*(1.-amp1**2)
amp[ii,:] = amp1
else:
from SetCoverPy import mathutils
ferr = ferr.copy()
ferr *= rescale[:,None]
for ii in range(nspec):
if ii % 500 == 0 or ii == 0:
log.info('Computing chi2 matrix for spectra {}-{} out of {}.'.format(
int(ii/500) * 500, np.min(((ii+1) * int(ii/500), nspec-1)), nspec))
xx = flux[ii, :].reshape(1, npix)
xxerr = ferr[ii, :].reshape(1, npix)
amp[ii,:], chi2[ii,:] = mathutils.quick_amplitude(xx, flux, xxerr, ferr, niter=1)
amp *= rescale[:,None]/rescale[None,:]
np.fill_diagonal(chi2,0.)
np.fill_diagonal(amp,1.)
return chi2, amp
[docs]class ArcheTypes(object):
"""Object for generating archetypes and determining their responsibility.
Parameters
----------
chi2 : numpy.ndarray
Chi^2 matrix computed by desisim.archetypes.compute_chi2().
"""
def __init__(self, chi2):
self.chi2 = chi2
[docs] def get_archetypes(self, chi2_thresh=0.1, responsibility=False):
"""Solve the SCP problem to get the final set of archetypes and, optionally,
their responsibility.
Note: We assume that each template has uniform "cost" but a more general
model in principle could be used / implemented.
Parameters
----------
chi2 : numpy.ndarray
Chi^2 matrix computed by archetypes.compute_chi2().
chi2_thresh : float
Threshold chi2 value to differentiate "different" templates.
responsibility : bool
If True, then compute and return the responsibility of each archetype.
Returns
-------
If responsibility==True then returns a tuple of (iarch, resp, respindx) where:
iarch : integer numpy.array
Indices of the archetypes [N].
resp : integer numpy.array
Responsibility of each archetype [N].
respindx : list of
Indices the parent sample each archetype is responsible for [N].
If responsibility==False then only iarch is returned.
"""
from SetCoverPy import setcover
nspec = self.chi2[0].shape
cost = np.ones(nspec) # uniform cost
a_matrix = (self.chi2 <= chi2_thresh) * 1
gg = setcover.SetCover(a_matrix, cost)
sol, time = gg.SolveSCP()
iarch = np.nonzero(gg.s)[0]
if responsibility:
resp, respindx = self.responsibility(iarch, a_matrix)
return iarch, resp, respindx
else:
return iarch
[docs] def responsibility(self, iarch, a_matrix):
"""Method to determine the responsibility of each archetype.
In essence, the responsibility is the number of templates described by each
archetype.
Parameters
----------
iarch : indices of the archetypes
a_matrix : distance matrix
Returns
-------
resp : responsibility of each archetype (number of objects represented by each archetype)
respindx : list containing the indices of the parent objects represented by each archetype
"""
narch = len(iarch)
resp = np.zeros(narch).astype('int16')
respindx = []
for ii, this in enumerate(iarch):
respindx.append(np.where(a_matrix[:, this] == 1)[0])
resp[ii] = np.count_nonzero(a_matrix[:, this])
return resp, respindx