# Source code for desisim.pixelsplines

"""
desisim.pixelsplines
====================

Pixel-integrated spline utilities.

Written by A. Bolton, U. of Utah, 2010-2013.
"""
from __future__ import absolute_import, division, print_function

import numpy as n
from scipy import linalg as la
from scipy import sparse as sp
from scipy import special as sf

[docs]def compute_duck_slopes(pixbound, flux):
"""
Compute the slope of the illuminating quadratic spline at
the locations of the 'ducks', i.e., the pixel boundaries,
given the integrated flux per unit baseline within the pixels.

ARGUMENTS:
pixbound: (npix + 1) ndarray of pixel boundaries, in units of
wavelength or log-wavelength or frequency or whatever you like.
flux: (npix) ndarray of spectral flux (energy or counts) per
abscissa unit, averaged over the extent of the pixel

RETURNS:
an (npix+1) ndarray of the slope of the underlying/illuminating
flux per unit abscissa spectrum at the position of the pixel
boundaries, a.k.a. 'ducks'.  The end conditions are taken to
be zero slope, so the exterior points of the output are zeros.
"""
npix = len(flux)
# Test for correct argument dimensions:
if (len(pixbound) - npix) != 1:
print('Need one more element in pixbound than in flux!')
return 0
# The array of "delta-x" values:
dxpix = pixbound[1:] - pixbound[:-1]
# Test for monotonif increase:
if dxpix.min() <= 0.:
print('Pixel boundaries not monotonically increasing!')
return 0
# Encode the tridiagonal matrix that needs to be solved:
maindiag = (dxpix[:-1] + dxpix[1:]) / 3.
offdiag = dxpix[1:-1] / 6.
upperdiag = n.append(0., offdiag)
lowerdiag = n.append(offdiag, 0.)
band_matrix = n.vstack((upperdiag, maindiag, lowerdiag))
# The right-hand side:
rhs = flux[1:] - flux[:-1]
# Solve the banded matrix and return:
acoeff = la.solve_banded((1,1), band_matrix, rhs)
acoeff = n.append(n.append(0., acoeff), 0.)
return acoeff

[docs]def cen2bound(pixelcen):
"""
Convenience function to do the obvious thing to transform
pixel centers to pixel boundaries.
"""
pixbound = 0.5 * (pixelcen[1:] + pixelcen[:-1])
lo_val = 2. * pixbound - pixbound
hi_val = 2. * pixbound[-1] - pixbound[-2]
pixbound = n.append(n.append(lo_val, pixbound), hi_val)
return pixbound

[docs]def gauss_blur_matrix(pixbound, sig_conv):
"""
Function to generate a Gaussian blurring matrix for a pixelized
spectrum, from specified pixel boundaries and 'sigma' vector.
The matrix will be flux-conserving if the spectrum to which it is
applied has units of 'counts per unit x', and pixbound and sig_conv
both have units of x.

pixbound should have one more element than sig_conv.

Output is a scipy sparse matrix that can implement the blurring as:
blurflux = gauss_blur_matrix * flux
where 'flux' has the same dimensions as 'sig_conv'.
"""
# Derived values and error checks:
npix = len(pixbound) - 1
if (len(sig_conv) != npix):
raise PixSplineError('Need one more element in pixbound than in \
sig_conv!')
if (sig_conv.min() <= 0.):
raise PixSplineError('sig_conv must be > 0 everywhere!')
xcen = 0.5 * (pixbound[1:] + pixbound[:-1])
dxpix = pixbound[1:] - pixbound[:-1]
if (dxpix.min() <= 0.):
raise PixSplineError('Pixel boundaries not monotonically increasing!')
# Which "new" pixels does each "old" pixel touch?
# Let's go +/- 6 sigma for all:
sig_width = 6.0
# A minor correction factor to preserve flux conservation:
cfact = 1./sf.erf(sig_width / n.sqrt(2.))
xblur_lo = xcen - sig_width * sig_conv
xblur_hi = xcen + sig_width * sig_conv
bin_lo = n.digitize(xblur_lo, pixbound) - 1
bin_hi = n.digitize(xblur_hi, pixbound) - 1
# Restrict the ranges:
#xblur_lo = n.where((xblur_lo > pixbound), xblur_lo, pixbound)
#xblur_lo = n.where((xblur_lo < pixbound[-1]), xblur_lo, pixbound[-1])
#xblur_hi = n.where((xblur_hi > pixbound), xblur_hi, pixbound)
#xblur_hi = n.where((xblur_hi < pixbound[-1]), xblur_hi, pixbound[-1])
bin_lo = n.where((bin_lo >= 0), bin_lo, 0)
#bin_lo = n.where((bin_lo < npix), bin_lo, npix-1)
#bin_hi = n.where((bin_hi >= 0), bin_hi, 0)
bin_hi = n.where((bin_hi < npix), bin_hi, npix-1)
# Compute total number of non-zero elements in the broadening matrix:
n_each = bin_hi - bin_lo + 1
n_entries = n_each.sum()
ij = n.zeros((2, n_entries), dtype=int)
v_vec = n.zeros(n_entries, dtype=float)
# Loop over pixels in the "old" spectrum:
pcount = 0
roottwo = n.sqrt(2.)
bin_vec = n.arange(npix, dtype=int)
for k in range(npix):
xbound = pixbound[bin_lo[k]:bin_hi[k]+2]
# Gaussian integral in terms of error function:
erf_terms = cfact * 0.5 * sf.erf((xbound - xcen[k]) / (roottwo *
sig_conv[k]))
erf_int = (erf_terms[1:] - erf_terms[:-1]) * \
dxpix[k] / dxpix[bin_lo[k]:bin_hi[k]+1]
ij[0,pcount:pcount+n_each[k]] = bin_vec[bin_lo[k]:bin_hi[k]+1]
ij[1,pcount:pcount+n_each[k]] = k
v_vec[pcount:pcount+n_each[k]] = erf_int
pcount += n_each[k]
conv_matrix = sp.coo_matrix((v_vec, ij), shape=(npix,npix))
return conv_matrix.tocsr()

[docs]class PixSplineError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)

[docs]class PixelSpline:
"""
Pixel Spline object class.

Initialize as follows:
PS = PixelSpline(pixbound, flux)
where
pixbound = array of pixel boundaries in baseline units
and
flux = array of specific flux values in baseline units.

Assumptions:
'pixbound' should have one more element than 'flux', and
units of 'flux' are -per-unit-baseline, for the baseline
units in which pixbound is expressed, averaged over the
extent of each pixel.
"""
def __init__(self, pixbound, flux):
npix = len(flux)
# Test for correct argument dimensions:
if (len(pixbound) - npix) != 1:
raise PixSplineError('Need one more element in pixbound \
than in flux!')
# The array of "delta-x" values:
dxpix = pixbound[1:] - pixbound[:-1]
# Test for monotonic increase:
if dxpix.min() <= 0.:
raise PixSplineError('Pixel boundaries not monotonically \
increasing!')
self.npix = npix
self.pixbound = pixbound.copy()
self.dxpix = dxpix.copy()
self.xcen = 0.5 * (pixbound[1:] + pixbound[:-1]).copy()
self.flux = flux.copy()
maindiag = (dxpix[:-1] + dxpix[1:]) / 3.
offdiag = dxpix[1:-1] / 6.
upperdiag = n.append(0., offdiag)
lowerdiag = n.append(offdiag, 0.)
band_matrix = n.vstack((upperdiag, maindiag, lowerdiag))
# The right-hand side:
rhs = flux[1:] - flux[:-1]
# Solve the banded matrix for the slopes at the ducks:
acoeff = la.solve_banded((1,1), band_matrix, rhs)
self.duckslopes = n.append(n.append(0., acoeff), 0.)
[docs]    def point_evaluate(self, xnew, missing=0.):
"""
Evaluate underlying pixel spline at array of points
BUG: input currently needs to be at least 1D array.
"""
# Initialize output array:
outflux = 0. * self.flux * xnew + missing
# Digitize into bins:
bin_idx = n.digitize(xnew, self.pixbound)
# Find the indices of those that are actually in-bounds:
wh_in = n.where((bin_idx > 0) * (bin_idx < len(self.pixbound)))
if len(wh_in) == 0:
return outflux
xnew_in = xnew[wh_in]
idx_in = bin_idx[wh_in] - 1
# The pixel centers as per the algorithm in use:
asum = self.duckslopes[idx_in+1] + self.duckslopes[idx_in]
xdiff = xnew_in - self.xcen[idx_in]
fluxvals = adiff * xdiff**2 / (2. * self.dxpix[idx_in]) + asum * xdiff \
/ 2. + self.flux[idx_in] - adiff * self.dxpix[idx_in] / 24.
outflux[wh_in] = fluxvals
return outflux
def find_extrema(self, minima=False):
# Find the formal extrema positions:
x_ext = self.xcen - 0.5 * self.dxpix * \
(self.duckslopes[1:] + self.duckslopes[:-1]) / \
(self.duckslopes[1:] - self.duckslopes[:-1])
# Digitize these into bins:
bin_ext = n.digitize(x_ext, self.pixbound) - 1
# The second derivatives, flipped in sign if minima is set:
curvat = (-1)**(minima == True) * (self.duckslopes[1:] -
self.duckslopes[:-1]) / self.dxpix
# Find in-bin maxima:
wh_ext = n.where((bin_ext == n.arange(self.npix)) * (curvat < 0))
if len(wh_ext) < 1:
return n.array([])
x_ext = x_ext[wh_ext]
return x_ext
def subpixel_average(self, ipix, xlo, xhi):
asum = self.duckslopes[ipix+1] + self.duckslopes[ipix]
xlo_c = xlo - self.xcen[ipix]
xhi_c = xhi - self.xcen[ipix]
outval = adiff * ((xhi-xlo)**2 / 6. + xhi_c * xlo_c / 2.) / \
self.dxpix[ipix] + asum * (xhi_c + xlo_c) / 4. - adiff * \
self.dxpix[ipix] / 24. + self.flux[ipix]
return outval
[docs]    def resample(self, pb_new):
"""
Method to resample a pixelspline analytically onto a new
set of pixel boundaries.
"""
npix_new = len(pb_new) - 1
xnew_lo = pb_new[:-1].copy()
xnew_hi = pb_new[1:].copy()
# Test for monotonic:
new_fulldx = xnew_hi - xnew_lo
if new_fulldx.min() <= 0.:
raise PixSplineError('New pixel boundaries not monotonically \
increasing!')
# Digitize the new boundaries into the original bins:
bin_idx = n.digitize(pb_new, self.pixbound) - 1
bin_lo = bin_idx[:-1].copy()
bin_hi = bin_idx[1:].copy()
# Array for accumulating new counts:
new_counts = n.zeros(npix_new, dtype=self.flux.dtype)
# Array for accumulating new pixel widths by pieces.
# Only used for debugging so far, but may be useful in future.
#new_dxpix = n.zeros(npix_new, dtype=self.flux.dtype)
# For convenience, we define the following.
# Careful not to modify them... they are views, not copies!
xold_lo = self.pixbound[:-1]
xold_hi = self.pixbound[1:]
# 4 cases to cover:
# Case 1: both bin_hi and bin_lo in the same bin:
wh_this = n.where((bin_hi == bin_lo) * (bin_lo >= 0) * \
(bin_hi < self.npix))
if (len(wh_this) > 0):
dx_this = xnew_hi[wh_this] - xnew_lo[wh_this]
avgval_this = self.subpixel_average(bin_lo[wh_this],
xnew_lo[wh_this],
xnew_hi[wh_this])
#new_dxpix[wh_this] += dx_this
new_counts[wh_this] += avgval_this * dx_this
# Case 2: more than one bin, lower segment:
wh_this = n.where((bin_hi > bin_lo) * (bin_lo >= 0))
if (len(wh_this) > 0):
dx_this = xold_hi[bin_lo[wh_this]] - xnew_lo[wh_this]
avgval_this = self.subpixel_average(bin_lo[wh_this],
xnew_lo[wh_this],
xold_hi[bin_lo[wh_this]])
#new_dxpix[wh_this] += dx_this
new_counts[wh_this] += avgval_this * dx_this
# Case 3: more than one bin, upper segment:
wh_this = n.where((bin_hi > bin_lo) * (bin_hi < self.npix))
if (len(wh_this) > 0):
dx_this = xnew_hi[wh_this] - xold_lo[bin_hi[wh_this]]
avgval_this = self.subpixel_average(bin_hi[wh_this],
xold_lo[bin_hi[wh_this]],
xnew_hi[wh_this])
#new_dxpix[wh_this] += dx_this
new_counts[wh_this] += avgval_this * dx_this
# Case 4: enire bins covered, whole pixels:
wh_this = n.where(bin_hi > (bin_lo+1))
nwhole = len(wh_this)
if (nwhole > 0):
pcounts = self.flux * self.dxpix
icounts_this = n.array([pcounts[bin_lo[wh_this[ii]]+1:\
bin_hi[wh_this[ii]]].sum()
for ii in range(nwhole)])
#new_dxpix[wh_this] += dx_this
new_counts[wh_this] += icounts_this
# Divide out for average and return:
return new_counts / new_fulldx

"""
Objet class for weighted rebinning and coaddition of spectra

Initialize as follows:
where
fluxes = list of arrays of specific flux values
invvars = list of arrays of associated inverse variances
pixbounds = list of arrays of pixel boundaries in baseline units
"""
def __init__(self, fluxes, invvars, pixbounds):
# Determine minimum and maximum values of independent variable:
self.min_indep = [this_bound.min() for this_bound in pixbounds]
self.max_indep = [this_bound.max() for this_bound in pixbounds]
self._n_input = len(fluxes)
# Compute pixel widths:
dpixes = [this_bound[1:] - this_bound[:-1] for this_bound in pixbounds]
# Compute "specific inverse variances":
sp_invvars = [invvars[i] / dpixes[i] for i in range(self._n_input)]
# Compute pixelspline objects for fluxes:
self._PXS_fluxes = [PixelSpline(pixbounds[i], fluxes[i]) for i in \
range(self._n_input)]
# Compute pixelspline objects for specific inverse variances:
self._PXS_sp_invvars = [PixelSpline(pixbounds[i], sp_invvars[i]) for \
i in range(self._n_input)]
masks = [(pixbound_out[:-1] > self.min_indep[i]) *
(pixbound_out[1:] < self.max_indep[i]) for i in \
range(self._n_input)]
# Compute output pixel widths:
dpix_out = pixbound_out[1:] - pixbound_out[:-1]
# Compute interpolated fluxes:
new_fluxes = [this_PXS.resample(pixbound_out) for this_PXS in \
self._PXS_fluxes]
# Compute interpolated specific inverse variances (converted
# to inverse variances):
new_invvars = [dpix_out * this_PXS.resample(pixbound_out) for \
this_PXS in self._PXS_sp_invvars]
# Compute coadded flux and inverse variance and return: