#!/usr/bin/env python
#
# edenhofer2023.py
# Reads the Edenhofer2023 dust map, which is described in
# Edenhofer et al. (2023).
#
# Copyright (C) 2023 Gordian Edenhofer and Gregory M. Green
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
from __future__ import division, print_function
import os
import sys
from collections import namedtuple
from functools import partial
import astropy.units as units
import numpy as np
from .map_base import DustMap, ensure_flat_galactic
from .std_paths import data_dir
_DustSphere = namedtuple(
"_DustSphere", (
"data", "nside", "nest", "radii", "coo_bounds", "radii0", "data0",
"units", "data_uncertainty", "data0_uncertainty"
)
)
DATA_DIR_SUBDIR = "edenhofer_2023"
def _removeprefix(s, prefix):
return s[len(prefix):] if s.startswith(prefix) else s[:]
def _removesuffix(s, suffix):
return s[:-len(suffix)] if s.endswith(suffix) else s[:]
def _get_sphere(filepath):
from astropy.io import fits
nside = None
nest = None
radii = None
coo_bounds = None
radii0 = None
rec_dust0 = None
units = None
rec_uncertainty = None
rec0_uncertainty = None
with fits.open(filepath, "readonly") as hdul:
for hdu in hdul:
nm = hdu.name.lower()
if isinstance(hdu, fits.PrimaryHDU) and hdu.data is None:
continue
elif nm in ("primary", "mean", "samples", "healpix times distance"):
dust_density = hdu.data
nside = hdu.header["NSIDE"]
nest = hdu.header["ORDERING"].lower().startswith("nest")
units = hdu.header.get("CUNIT")
if dust_density.shape[-1] != 12 * nside**2:
ve = "invalid shape of dust density {!r}"
raise ValueError(ve.format(dust_density.shape))
elif isinstance(hdu, fits.BinTableHDU):
if hdu.data.names == ['radial pixel centers']:
radii = hdu.data["radial pixel centers"]
elif hdu.data.names == ['radial pixel boundaries']:
coo_bounds = hdu.data["radial pixel boundaries"]
else:
ve = "unrecognized entry in BinTableHDU {!r}"
raise ValueError(ve.format(hdu.data.names))
elif isinstance(hdu, fits.ImageHDU) and (
nm.startswith("mean of integrated inner") or
nm.startswith("integrated inner")
):
prfx = "inner density integrated within"
ctp = hdu.header.get("CTYPE")
ctp = hdu.header.get("CTYPE1") if ctp is None else ctp
if ctp.lower().startswith(prfx):
radii0 = _removeprefix(ctp.lower(), prfx)
if not radii0.endswith("pc"):
ve = "unrecognized units {!r}".format(radii0)
raise ValueError(ve)
radii0 = _removesuffix(radii0, "pc")
radii0 = float(radii0.strip())
rec_dust0 = hdu.data
if nest is None or nside is None:
ve = (
"main reconstruction needs to come before inner"
" in FITS file"
)
raise ValueError(ve)
if nest != hdu.header["ORDERING"].lower(
).startswith("nest"):
raise ValueError("ordering mismatch")
if rec_dust0.shape[-1] != 12 * nside**2:
ve = "incompatible shape of dust density {!r}"
raise ValueError(ve.format(dust_density.shape))
elif isinstance(hdu, fits.ImageHDU
) and nm.startswith("std. of integrated inner"):
prfx = "std. of inner density integrated within"
ctp = hdu.header.get("CTYPE")
ctp = hdu.header.get("CTYPE1") if ctp is None else ctp
if hdu.header["CTYPE"].lower().startswith(prfx):
radii0_unc = _removeprefix(ctp.lower(), prfx)
if not radii0_unc.endswith("pc"):
ve = "unrecognized units {!r}".format(radii0)
raise ValueError(ve)
radii0_unc = _removesuffix(radii0_unc, "pc")
radii0_unc = float(radii0_unc.strip())
rec0_uncertainty = hdu.data
if nest is None or nside is None or radii0 is None:
ve = (
"main reconstruction and inner needs to come before"
" inner std. in FITS file"
)
raise ValueError(ve)
if radii0_unc != radii0:
raise ValueError("radii mismatch")
if nest != hdu.header["ORDERING"].lower(
).startswith("nest"):
raise ValueError("ordering mismatch")
if rec0_uncertainty.shape[-1] != 12 * nside**2:
ve = "incompatible shape of dust density uncertainty {!r}"
raise ValueError(ve.format(rec0_uncertainty.shape))
elif isinstance(hdu, fits.ImageHDU) and nm.lower() == "std.":
rec_uncertainty = hdu.data
if nest is None or nside is None:
ve = "main reconstruction needs to come before std."
raise ValueError(ve)
if nest != hdu.header["ORDERING"].lower().startswith("nest"):
raise ValueError("ordering mismatch")
if rec_uncertainty.shape[-1] != 12 * nside**2:
ve = "incompatible shape of dust density uncertainty {!r}"
raise ValueError(ve.format(rec_uncertainty.shape))
else:
raise ValueError("unrecognized HDU\n{!r}".format(hdu.header))
return _DustSphere(
dust_density,
nside=nside,
nest=nest,
radii=radii,
coo_bounds=coo_bounds,
radii0=radii0,
data0=rec_dust0,
units=units,
data_uncertainty=rec_uncertainty,
data0_uncertainty=rec0_uncertainty,
)
def _interp_hpxr2lbd(data, radii, nside, nest, lon, lat, dist):
"""Interpolate a 3D map of HEALPix times radii to arbitrary longitude,
latitude, distance positions."""
from healpy.pixelfunc import get_interp_weights
assert lon.shape == lat.shape == dist.shape
final_shape = data.shape[:-2] + lon.shape
lon, lat, dist = lon.ravel(), lat.ravel(), dist.ravel()
idx_pos, wgt_pos = get_interp_weights(
nside, lon, lat, nest=nest, lonlat=True
)
idx_r = np.searchsorted(radii, dist)
idx_l = idx_r - 1
mask = (idx_l < 0) | (idx_r >= radii.size)
idx_l, idx_r = idx_l.clip(0, radii.size - 1), idx_r.clip(0, radii.size - 1)
wgt_los = np.abs(np.stack((radii[idx_r] - dist, radii[idx_l] - dist)))
wgt_los /= wgt_los.sum(axis=0)
assert wgt_pos.ndim == 2
data = data[...,
np.stack((idx_l, idx_r))[:, np.newaxis],
np.stack((idx_pos, ) * 2)]
# Manual POS and LOS interpolation
data = (data * wgt_pos).sum(axis=-wgt_pos.ndim)
data = (data * wgt_los).sum(axis=-wgt_los.ndim)
return np.where(mask, np.nan, data).reshape(final_shape)
[docs]class Edenhofer2023Query(DustMap):
"""
A class for querying the Edenhofer et al. (2023) 3D dust map.
The map is in units of E of Zhang, Green, and Rix (2023) per parsec but can
be translated to an extinction at any given wavelength by using the
extinction curve published at https://doi.org/10.5281/zenodo.6674521 . For
further details on the map, see the original paper.
If you use this map in your work, please cite Edenhofer et al. (2023).
The data is deposited at https://doi.org/10.5281/zenodo.8187943.
"""
[docs] def __init__(
self,
map_fname=None,
load_samples=False,
integrated=False,
flavor='main',
seed=None,
):
"""
Args:
map_fname (Optional[str]): Filename of the map. Defaults
to :obj:`None`, meaning that the default location
is used.
load_samples (Optional[bool]): Whether to load the posterior samples
of the extinction density. The samples give more accurate
interpolation resoluts and are required for standard deviations
of integrated extinctions. Defaults to :obj:`False`.
integrated (Optional[bool]): Whether to return integrated extinction
density. In this case, the units of the map are E of Zhang,
Green, and Rix (2023). The pre-processing for efficient access
to the integrated extinction map can take a couple of minutes
but subsequent queries will be fast. Defaults to :obj:`False`.
flavor (Optional[str]): Flavor of the map to use. Must be in
('main', 'less_data_but_2kpc').
seed (Optional[int]): A random seed, used when drawing random
samples from the map. Set this seed in order to make the
pseudo-random draws reproducible.
"""
if not load_samples in (True, False):
raise TypeError("`load_samples` must be bool")
if not isinstance(flavor, str):
raise TypeError("`flavor` must be str")
if map_fname is None:
if flavor.lower() == 'main':
if load_samples is False:
fn = 'mean_and_std_healpix.fits'
else:
fn = 'samples_healpix.fits'
elif flavor.lower() == 'less_data_but_2kpc':
if load_samples is False:
fn = 'validation_with_less_data_but_2kpc_mean_and_std_healpix.fits'
else:
fn = 'validation_with_less_data_but_2kpc_samples_healpix.fits'
else:
raise ValueError("unrecognized flavor {!r}".format(flavor))
map_fname = os.path.join(data_dir(), DATA_DIR_SUBDIR, fn)
if not os.path.isfile(map_fname):
from .dustexceptions import data_missing_message
msg = data_missing_message(
"edenhofer2023", "Edenhofer et al. (2023)"
)
print(msg, file=sys.stderr)
err = "{} does not exist".format(repr(map_fname))
raise FileNotFoundError(err)
self._flavor = flavor
self._rec = _get_sphere(map_fname)
self._has_samples = (self._rec.data.ndim == 3)
if not self._has_samples and load_samples:
raise ValueError("failed to load samples")
if map_fname is None and self._rec.data0 is None:
ve = "default map should have zeroth integrated layer but it doesn't"
raise ValueError(ve)
self._allowed_modes = ("mean",)
# Replace the data in-place with its log respectively square because the
# interpolation is done in log- respectively squared-space.
if integrated is True:
dvol = np.diff(self._rec.coo_bounds)
np.multiply(dvol[:, np.newaxis], self._rec.data, out=self._rec.data)
if self._rec.data0 is not None:
self._rec.data[..., 0, :] += self._rec.data0
msg = "Integrating extinction map (this might take a couple of minutes)..."
print(msg, file=sys.stderr)
np.cumsum(self._rec.data, axis=-2, out=self._rec.data)
msg = "Optimizing map for querying (this might take a couple of seconds)..."
print(msg, file=sys.stderr)
np.log(self._rec.data, out=self._rec.data)
if self._has_samples:
self._allowed_modes += ("std", "samples", "random_sample")
elif integrated is False:
msg = "Optimizing map for querying (this might take a couple of seconds)..."
print(msg, file=sys.stderr)
np.log(self._rec.data, out=self._rec.data)
if self._rec.data_uncertainty is not None:
np.square(
self._rec.data_uncertainty, out=self._rec.data_uncertainty
)
self._allowed_modes += ("std",)
if self._has_samples:
self._allowed_modes += ("samples", "random_sample")
else:
te = "`integrated` must be bool; got {}".format(integrated)
raise TypeError(te)
self._integrated = integrated
# If samples are loaded, initialize random number generator
self._rng = None
if self._has_samples:
self._rng = np.random.default_rng(seed)
[docs] @ensure_flat_galactic
def query(self, coords, mode="mean"):
"""
Returns the 3D dust extinction density or integrated extinction
(depending on how the class was initialized) from Edenhofer et al.
(2023) at the given coordinates. The map is in units of E of Zhang,
Green, and Rix (2023) per parsec or if integrated simply in units of E.
Args:
coords (:obj:`astropy.coordinates.SkyCoord`): Coordinates at which
to query the extinction. Must be 3D (i.e., include distance
information).
mode (str): Which mode to return. Allowed values are 'mean' (for the
mean), 'std' (for the standard deviation), 'samples' (for all
posterior samples), or 'random_sample' (for a single sample).
Defaults to 'mean'.
Notes:
To query integrated extinction, set `integrated=True` during
initizalization.
Returns:
Depending on how the class was initialized, either extinction
density or extinction are returned. The output is either a numpy
array or float, with the same shape as the input :obj:`coords`.
"""
if not isinstance(mode, str):
te = "`mode` must be str; got {}".format(type(mode))
raise TypeError(te)
mode = mode.strip().lower()
if mode not in ("mean", "std", "samples", "random_sample"):
ve = (
"`mode` must be 'mean', 'std', 'samples', or 'random_sample'"
"; got {!r}"
)
raise ValueError(ve.format(mode))
if mode not in self._allowed_modes:
ve = "`mode={!r}` requires samples but none are available"
raise ValueError(ve.format(mode))
interp = partial(
_interp_hpxr2lbd,
radii=self._rec.radii,
nside=self._rec.nside,
nest=self._rec.nest,
lon=coords.galactic.l.deg,
lat=coords.galactic.b.deg,
dist=coords.galactic.distance.to("pc").value
)
if self._has_samples:
if mode == "random_sample":
# Same sample for all coordinates
samp_idx = self._rng.choice(self.n_samples)
res = interp(self._rec.data[samp_idx])
else:
res = interp(self._rec.data)
res = np.exp(res, out=res)
if mode == "mean":
res = res.mean(axis=0)
elif mode == "std":
res = res.std(axis=0)
elif mode == "random_sample":
pass # No sample-axis reduction necessary
else:
assert mode == "samples"
# Swap sample and coordinate axes to be consistent with other
# 3D dust maps. The output shape will be (..., sample).
res = np.swapaxes(res, 0, -1)
else:
if mode == "mean":
res = interp(self._rec.data)
res = np.exp(res, out=res)
else:
assert mode == "std"
res = interp(self._rec.data_uncertainty)
res = np.sqrt(res, out=res)
return res
@property
def distance_bounds(self):
"""
Returns the distance bin edges that the map uses. The return type is
:obj:`astropy.units.Quantity`, which stores unit-full quantities.
"""
return self._rec.coo_bounds * units.pc
@property
def distances(self):
"""
Returns the distance bin centers that the map uses. The return type is
:obj:`astropy.units.Quantity`, which stores unit-full quantities.
"""
return self._rec.radii * units.pc
@property
def integrated(self):
"""
Returns ``True`` if the map contains integrated extinction, or
``False`` if it contains extinction density.
"""
return self._integrated
@property
def n_samples(self):
"""
Returns the number of samples stored in the map. If no samples have
been loaded, then ``None`` is returned.
"""
if self._has_samples:
return self._rec.data.shape[0]
else:
return None
@property
def flavor(self):
"""
Returns the flavor of the map. This is a string that is either
``"main"`` or ``"less_data_but_2kpc"``.
"""
return self._flavor
[docs]def fetch(clobber=False, fetch_samples=False, fetch_2kpc=False):
"""
Downloads the 3D dust map of Edenhofer et al. (2023).
Args:
clobber (Optional[bool]): If ``True``, any existing file will be
overwritten, even if it appears to match. If ``False`` (the
default), ``fetch()`` will attempt to determine if the dataset
already exists. This determination is not 100\% robust against data
corruption.
fetch_samples (Optional[bool]): If ``True``, the samples will also be
downloaded. If ``False`` (the default), only the mean and standard
deviation will be downloaded taking up about 3.2 GB in size while
the samples take up 19 GB.
fetch_2kpc (Optional[bool]): If ``True``, the validation run using
less data though which extends out to 2kpc in distance will also be
downloaded.
"""
from . import fetch_utils
dest_dir = os.path.join(data_dir(), DATA_DIR_SUBDIR)
file_spec = [
('mean_and_std_healpix.fits', '10c823a5fcf81b47b6e15530bcdf54dc')
]
if fetch_2kpc:
file_spec += [
(
'validation_with_less_data_but_2kpc_mean_and_std_healpix.fits',
'0826b2f4cdfccad69bdc5e2d85bb4c54'
)
]
if fetch_samples:
file_spec += [
('samples_healpix.fits', 'aa0aa435e013784fe18a5cb24e379b05')
]
if fetch_2kpc:
file_spec += [
(
'validation_with_less_data_but_2kpc_samples_healpix.fits',
'1ef8ca17a68e724d21c357554951959c'
)
]
for fn, md5sum in file_spec:
fname = os.path.join(dest_dir, fn)
# Download from the server
url = 'https://zenodo.org/record/8187943/files/{}'.format(fn)
fetch_utils.download_and_verify(url, md5sum, fname, clobber=clobber)