import yaml, logging, sys
import numpy as np
import pandas as pd
from pathlib import Path
from warnings import warn, filterwarnings, catch_warnings
from time import time
from cosmoprimo.fiducial import AbacusSummit
from abacusnbody.hod.abacus_hod import AbacusHOD
from densitysplit.pipeline import DensitySplit
from densitysplit.cosmology import Cosmology
from pycorr import TwoPointCorrelationFunction, project_to_multipoles
from CorrHOD.utils import apply_rsd
[docs]class CorrHOD_cubic():
"""
This class is used to compute the 2PCF and the autocorrelation and cross-correlation of the quantiles of the DensitySplit.
It takes HOD parameters and a cosmology as input and uses AbacusHOD to generate a cubic mock.
It then uses DensitySplit to compute the density field and the quantiles.
Finally, it uses pycorr to compute the 2PCF and the autocorrelation and cross-correlation of the quantiles.
"""
def __init__(self,
path2config:str,
HOD_params:dict = None,
los:str = 'z',
boxsize:float = 2000,
cosmo:int = 0):
"""
Initialises the CorrHOD object.
Note that the 'tracer' parameter is fixed to 'LRG' by default. It can be changed later by manually reassigning the parameter.
This tracer is chosen to study LRG and BGS. It can be changed but the code is not guaranteed to work.
Parameters
----------
path2config : str
Path to the config file of AbacusHOD. It is used to load the simulation. See the documentation for more details.
HOD_params : dict, optional
Dictionary containing the HOD parameters. The keys must be the same as the ones used in AbacusHOD.
If not provided, the default parameters of AbacusHOD are used. Defaults to None.
los : str, optional
Line-of-sight direction in which to apply the RSD. Defaults to 'z'.
boxsize : float, optional
Size of the simulation box in Mpc/h. Defaults to 2000.
cosmo : int, optional
Index of the cosmology to use. This index is used to load the cosmology from the AbacusSummit object. Defaults to 0.
See the documentation of AbacusSummit for more details.
Raises
------
ValueError
If the simulation is not precomputed. See the documentation of AbacusHOD for more details.
"""
# Set the cosmology objects
self.cosmo = AbacusSummit(cosmo)
h=self.cosmo.get('h')
omega_m=self.cosmo.get('Omega_m')
self.ds_cosmo = Cosmology(h=h, omega_m=omega_m) # Cosmology object from DensitySplit initialized with the cosmology from cosmoprimo
# Read the config file
config = yaml.safe_load(open(path2config))
self.sim_params = config['sim_params']
self.config_HOD_params = config['HOD_params'] # Temporary HOD parameters from the config file of AbacusHOD
# Make data_params an optional parameter
if 'data_params' in config.keys():
self.data_params = config['data_params']
else:
self.data_params = None
# Set the fixed parameters
self.tracer = 'LRG' # Tracer to use for the HOD. Since this code is only for BGS, this is fixed but can be changed later by reassigning the parameter
self.boxsize = boxsize
self.los = los
self.redshift = self.sim_params['z_mock']
# Set the HOD parameters
self.HOD_params = HOD_params
# Check if the simulation is already precomputed
sim_name = Path(self.sim_params['sim_name'])
subsample_dir = Path(self.sim_params['subsample_dir'])
sim_path = subsample_dir / sim_name / (f'z{self.redshift:4.3f}') # Path to where the simulation is supposed to be
if not sim_path.exists():
err = f'The simulation {sim_path} does not exist. Run prepare_sim first. ',\
'(see https://abacusutils.readthedocs.io/en/latest/hod.html#short-example for more details)'
raise ValueError(err)
[docs] def initialize_halo(self, nthread:int = 16):
"""
Initializes the AbacusHOD object and loads the simulation.
Parameters
----------
nthread : int, optional
Number of threads to use. Defaults to 16.
Returns
-------
Ball : AbacusHOD object
AbacusHOD object containing the simulation.
"""
# Create the AbacusHOD object and load the simulation
if not hasattr(self, 'Ball'): # To avoid loading the slabs if they are already loaded
self.Ball = AbacusHOD(self.sim_params, self.config_HOD_params)
# Update the parameters of the AbacusHOD object
self.Ball.params['Lbox'] = self.boxsize
if self.HOD_params is not None:
# Update the HOD parameters if different ones are provided
for key in self.HOD_params.keys():
self.Ball.tracers[self.tracer][key] = self.HOD_params[key]
# Compute the incompleteness for the tracer (i.e. the fraction of galaxies that are observed)
self.Ball.tracers[self.tracer]['ic'] = 1 # Set the incompleteness to 1 by default
ngal_dict = self.Ball.compute_ngal(Nthread = nthread)[0] # Compute the number of galaxies in the box
N_trc = ngal_dict[self.tracer] # Get the number of galaxies of the tracer type in the box
if self.data_params is not None:
self.Ball.tracers[self.tracer]['ic'] = min(1, self.data_params['tracer_density_mean'][self.tracer]*self.Ball.params['Lbox']**3/N_trc) # Compute the actual incompleteness
return self.Ball
[docs] def populate_halos(self, nthread:int = 16, remove_Ball:bool = False):
"""
Populates the halos with galaxies using the HOD parameters.
Parameters
----------
nthread : int, optional
Number of threads to use. Defaults to 16.
remove_Ball : bool, optional
If True, the Ball object (Dark matter simulation loaded from AbacusHOD) is removed after populating the halos.
This is useful to save memory. Defaults to False.
Returns
-------
cubic_dict : dict
Dictionary containing the HOD catalogue. It contains for each tracer :
* 'x', 'y', 'z' : the positions of the galaxies,
* 'vx', 'vy', 'vz' : their velocities,
* 'id' and 'mass' : the ID and mass of the halo they belong to,
* 'Ncent' : the number of central galaxies in the simulation. The first Ncent galaxies are the centrals.
"""
# Run the HOD and get the dictionary containing the HOD catalogue
self.cubic_dict = self.Ball.run_hod(self.Ball.tracers, self.Ball.want_rsd, Nthread = nthread)
# Log the number density of the populated halos
logger = logging.getLogger('CorrHOD') # Get the logger for the CorrHOD class
actual_n = len(self.cubic_dict['LRG']['x']) / self.boxsize**3
logger.debug(f'Number density of the populated halos : {actual_n:.2e} h^3/Mpc^3')
# Check that the number density of the populated halos is close to the target number density
if self.data_params is not None:
expected_n = self.data_params['tracer_density_mean'][self.tracer]
std_n = self.data_params['tracer_density_std'][self.tracer]
if abs(expected_n - actual_n) > std_n :
warn(f'The number density of the populated halos ({actual_n:.2e}) is not close to the target number density ({expected_n}).', UserWarning)
# Remove the Ball object to save memory
if remove_Ball:
del self.Ball
logger.debug('Removed the Ball object to save memory')
self.nbar = actual_n
return self.cubic_dict
#TODO : Change this : leave it to the user and just run checks to make sure the data is in the right format
[docs] def set_cubic(self,
positions:np.ndarray,
velocities:np.ndarray,
masses:np.ndarray,
halo_id:np.ndarray,
Ncent:int,
tracer:str = 'LRG'):
"""
A method to set a tracer data in the cubic dictionary.
This will overwrite the tracer data if it already exists. Otherwise, it will create it.
Parameters
----------
positions : np.ndarray
The positions of the galaxies. It can be a (N,3) array or a dictionary with keys 'x', 'y', 'z'.
velocities : np.ndarray
The velocities of the galaxies. It can be a (N,3) array or a dictionary with keys 'vx', 'vy', 'vz'.
masses : np.ndarray
The masses of the galaxies. It must be a (N,) array.
halo_id : np.ndarray
The ID of the halo each galaxy belongs to. It must be a (N,) array.
Ncent : int
The number of central galaxies in the simulation. The first Ncent galaxies are the centrals.
tracer : str, optional
The tracer type. Defaults to 'LRG'.
Raises
------
ValueError
If an array is not of the right shape or if a dictionary does not have the right keys.
"""
# Check if positions has keys x, y, z
if not all([key in positions.keys() for key in ['x', 'y', 'z']]):
# If the array has a shape (3,N), we transpose it to (N,3)
if positions.shape[0] == 3:
positions = positions.T
# Then, we check if the array has the right shape (i.e. (N,3))
if positions.shape[1] != 3:
raise ValueError('The positions array must have a shape (N,3) or (3,N)')
x = positions[:,0]
y = positions[:,1]
z = positions[:,2]
else:
x = positions['x']
y = positions['y']
z = positions['z']
# Check if velocities has keys vx, vy, vz
if not all([key in velocities.keys() for key in ['vx', 'vy', 'vz']]):
# If the array has a shape (3,N), we transpose it to (N,3)
if velocities.shape[0] == 3:
velocities = velocities.T
# Then, we check if the array has the right shape (i.e. (N,3))
if velocities.shape[1] != 3:
raise ValueError('The velocities array must have a shape (N,3) or (3,N)')
vx = velocities[:,0]
vy = velocities[:,1]
vz = velocities[:,2]
else:
vx = velocities['vx']
vy = velocities['vy']
vz = velocities['vz']
# Check if masses and halo_id have the right shape
if masses.shape != x.shape:
raise ValueError('The masses array must have the same shape as the positions array')
if halo_id.shape != x.shape:
raise ValueError('The halo_id array must have the same shape as the positions array')
# Set the tracer dictionary
tracer_dict = {
'x': x,
'y': y,
'z': z,
'vx': vx,
'vy': vy,
'vz': vz,
'mass': masses,
'id': halo_id,
'Ncent': Ncent
}
# Set the tracer dictionary in the mock dictionary
if not hasattr(self, 'cubic_dict'):
self.cubic_dict = {}
self.cubic_dict[tracer] = tracer_dict # Overwrite the tracer dictionary if it already exists
# Set the tracer type to the one provided (Warn if it is different from the one already set)
if hasattr(self, 'tracer'):
if self.tracer != tracer:
warn(f"The tracer provided ('{tracer}') will be used instead of the already existing one ('{self.tracer}').", UserWarning)
self.tracer = tracer # Set the tracer type to the one given
[docs] def get_tracer_positions(self):
"""
Get the positions of the galaxies and apply RSD.
Returns
-------
data_positions : np.ndarray
The positions of the galaxies after RSD. It is a (N,3) array.
"""
# Get the positions of the galaxies
self.data_positions = np.c_[apply_rsd(self.cubic_dict, self.boxsize, self.redshift, self.cosmo, tracer=self.tracer, los=self.los)]
return self.data_positions
[docs] def compute_DensitySplit(self,
smooth_radius:float = 10,
cellsize:float = 5,
nquantiles:int = 10,
sampling:str = 'randoms',
randoms=None,
filter_shape:str = 'Gaussian',
return_density:bool = True,
nthread=16):
"""
Compute the density field and the quantiles using DensitySplit.
(see https://github.com/epaillas/densitysplit for more details)
Parameters
----------
smooth_radius : float, optional
The radius of the Gaussian smoothing in Mpc/h. Defaults to 10.
cellsize : float, optional
The size of the cells in the mesh. Defaults to 10.
nquantiles : int, optional
The number of quantiles to compute. Defaults to 10.
sampling : str, optional
The sampling to use. Can be 'randoms' or 'data'. Defaults to 'randoms'.
randoms : np.ndarray, optional
The positions of the randoms. It is a (N,3) array of points in the range [0, boxsize] for each coordinate.
If set to none, a random array will be created of size nquantiles * len(data_positions).
Defaults to None.
filter_shape : str, optional
The shape of the filter to use. Can be 'Gaussian' or 'TopHat'. Defaults to 'Gaussian'.
return_density : bool, optional
If True, the density field is returned. Defaults to True.
nthread : int, optional
The number of threads to use. Defaults to 16.
Returns
-------
quantiles : np.ndarray
The quantiles. It is a (nquantiles,N,3) array.
density : np.ndarray, optional
The density field. It is a (N,3) array.
"""
logger = logging.getLogger('DensitySplit') #tmp
# Get the positions of the galaxies
if not hasattr(self, 'data_positions'):
self.get_tracer_positions()
# Initialize the DensitySplit object and compute the density field
try : #Main branch
logger.debug('Launched densitysplit on main branch')
ds = DensitySplit(data_positions=self.data_positions, boxsize=self.boxsize)
self.density = ds.get_density_mesh(smooth_radius=smooth_radius, cellsize=cellsize, sampling=sampling, filter_shape=filter_shape)
except : #OpenMP branch (an error will be raised because the OpenMP branch used differently)
logger.debug('Launched densitysplit on openmp branch') #tmp
ds = DensitySplit(data_positions=self.data_positions,
boxsize=self.boxsize,
boxcenter=self.boxsize/2,
cellsize=cellsize,
nthreads=nthread)
if sampling == 'randoms' and randoms is None:
# Sample the positions on random points that we have to create in that branch
logger.debug('No randoms provided, creating randoms')
sampling_positions = np.random.uniform(0, self.boxsize, (nquantiles * len(self.data_positions), 3))
elif sampling == 'randoms':
# Sample the positions on the provided randoms
sampling_positions = randoms
elif sampling == 'data':
# Sample the positions on the data positions
sampling_positions = self.data_positions
else:
raise ValueError('The sampling parameter must be either "randoms" or "data"')
self.density = ds.get_density_mesh(sampling_positions=sampling_positions, smoothing_radius=smooth_radius)
# Temporary fix waiting for an update of the code : Remove the unphysical values (density under -1)
self.density[self.density < -1] = -1 # Remove the outliers
self.quantiles = ds.get_quantiles(nquantiles=nquantiles)
if return_density:
return self.quantiles, self.density
return self.quantiles
[docs] def downsample_data(self,
frac: float=None,
new_n: float=None,
npoints: int=None):
"""
Downsample the data to a new number of galaxies.
The quantiles are also downsampled to the same number of galaxies.
(They are generated to have the same number of points as the data in each quantile)
Parameters
----------
new_n : float
The new number density in the box.
Returns
-------
data_positions : np.ndarray
The positions of the galaxies after downsampling. It is a (N,3) array.
quantiles : np.ndarray, optional
The quantiles after downsampling. It is a (nquantiles,N,3) array.
Returns None if the quantiles have not been computed yet.
"""
logger = logging.getLogger('CorrHOD') # Log some info just in case
# First, check that only one of the three parameters is set
if np.sum([frac is not None, new_n is not None, npoints is not None]) != 1:
raise ValueError('Only one of the parameters frac, new_n and npoints must be set.')
if not hasattr(self, 'nbar'):
self.nbar = len(self.data_positions) / self.boxsize**3
N = len(self.data_positions)
# Then, get the other parameters from the one that is set
if frac is not None:
npoints = int(frac * N)
new_n = self.nbar * frac
if npoints is not None:
frac = npoints / N
new_n = self.nbar * frac
if new_n is not None:
frac = new_n / self.nbar
npoints = int(frac * N)
if new_n is None or new_n > self.nbar : # Do nothing
logger.warning(f'Data not downsampled due to number density {new_n} too small or None')
return self.data_positions, self.quantiles
wanted_number = int(new_n*self.boxsize**3) # Get the wanted number of galaxies in the box
sample_indices = np.random.choice(len(self.data_positions), size=wanted_number, replace=False)
self.data_positions = self.data_positions[sample_indices]
logger.info(f'Downsampling the data to a number density of {new_n:.2e} h^3/Mpc^3: {len(self.data_positions)} galaxies remaining from {N} galaxies')
if not hasattr(self, 'quantiles'):
return self.data_positions, None
# Downsample the quantiles to the new number of galaxies (with the same proportions)
for i in range(len(self.quantiles)):
wanted_number = len(self.data_positions)
sample_indices = np.random.choice(len(self.quantiles[i]), size=wanted_number, replace=False)
self.quantiles[i] = self.quantiles[i][sample_indices]
return self.data_positions, self.quantiles
[docs] def compute_auto_corr(self,
quantile:int,
mode:str = 'smu',
edges:list = [np.linspace(0.1, 200, 50), np.linspace(-1, 1, 60)],
mpicomm = None,
mpiroot = None,
nthread:int = 16):
"""
Compute the auto-correlation of a quantile.
The result will also be saved in the class, as `self.CF['Auto'][f'DS{quantile}']`
Parameters
----------
quantile : int
Index of the quantile to compute. It must be between 0 and nquantiles-1.
mode : str, optional
The mode to use for the 2PCF. Defaults to 'smu'.
See pycorr for more details .
edges : list, optional
The edges to use for the 2PCF. Defaults to [np.linspace(0.1, 200, 50), np.linspace(-1, 1, 60)].
See pycorr for more details.
mpicomm : _type_, optional
The MPI communicator. Defaults to None.
mpiroot : _type_, optional
The MPI root. Defaults to None.
nthread : int, optional
The number of threads to use. Defaults to 16.
Returns
-------
xi_quantile : pycorr.TwoPointCorrelationFunction
The auto-correlation of the quantile.
"""
if not hasattr(self, 'quantiles'):
raise ValueError('The quantiles have not been computed yet. Run compute_DensitySplit first.')
# Get the positions of the points in the quantile
quantile_positions = self.quantiles[quantile] # An array of 3 columns (x,y,z)
# Initialize the dictionary for the correlations
if not hasattr(self, 'CF'):
self.CF = {}
if not (self.los in self.CF.keys()):
self.CF[self.los] = {}
if not ('Auto' in self.CF[self.los].keys()):
self.CF[self.los]['Auto'] = {}
# Initialize the dictionary for the pycorr objects
if not hasattr(self, 'xi'):
self.xi = {}
if not (self.los in self.xi.keys()):
self.xi[self.los] = {}
if not ('Auto' in self.xi[self.los].keys()):
self.xi[self.los]['Auto'] = {}
# Compute the 2pcf
xi_quantile = TwoPointCorrelationFunction(mode, edges,
data_positions1=quantile_positions,
boxsize=self.boxsize,
los=self.los,
position_type='pos',
mpicomm=mpicomm, mpiroot=mpiroot, num_threads=nthread)
# Add the 2pcf to the dictionary
if not ('s' in self.CF[self.los]):
# Note that the s is the same for all the lines of sight as long as we give the same edges to the 2PCF function
s, poles = project_to_multipoles(xi_quantile)
self.CF[self.los]['s'] = s
else:
poles = project_to_multipoles(xi_quantile, return_sep=False)
self.CF[self.los]['Auto'][f'DS{quantile}'] = poles
self.xi[self.los]['Auto'][f'DS{quantile}'] = xi_quantile
return xi_quantile
[docs] def compute_cross_corr(self,
quantile:int,
mode:str = 'smu',
edges:list = [np.linspace(0.1, 200, 50), np.linspace(-1, 1, 60)],
mpicomm = None,
mpiroot = None,
nthread:int = 16):
"""
Compute the cross-correlation of a quantile with the galaxies.
The result will also be saved in the class, as `self.CF['Cross'][f'DS{quantile}']`
Parameters
----------
quantile : int
Index of the quantile to compute. It must be between 0 and nquantiles-1.
mode : str, optional
The mode to use for the 2PCF. Defaults to 'smu'.
See pycorr for more details .
edges : list, optional
The edges to use for the 2PCF. Defaults to [np.linspace(0.1, 200, 50), np.linspace(-1, 1, 60)].
See pycorr for more details.
mpicomm : _type_, optional
The MPI communicator. Defaults to None.
mpiroot : _type_, optional
The MPI root. Defaults to None.
nthread : int, optional
The number of threads to use. Defaults to 16.
Returns
-------
xi_quantile : pycorr.TwoPointCorrelationFunction
The cross-correlation of the quantile.
"""
if not hasattr(self, 'quantiles'):
raise ValueError('The quantiles have not been computed yet. Run compute_DensitySplit first.')
# Get the positions of the points in the quantile
quantile_positions = self.quantiles[quantile] # An array of 3 columns (x,y,z)
# Get the positions of the galaxies
if not hasattr(self, 'data_positions'):
self.get_tracer_positions()
# Initialize the dictionary for the correlations
if not hasattr(self, 'CF'):
self.CF = {}
if not (self.los in self.CF.keys()):
self.CF[self.los] = {}
if not ('Cross' in self.CF[self.los].keys()):
self.CF[self.los]['Cross'] = {}
# Initialize the dictionary for the pycorr objects
if not hasattr(self, 'xi'):
self.xi = {}
if not (self.los in self.xi.keys()):
self.xi[self.los] = {}
if not ('Cross' in self.xi[self.los].keys()):
self.xi[self.los]['Cross'] = {}
# Compute the 2pcf
xi_quantile = TwoPointCorrelationFunction(mode, edges,
data_positions1=quantile_positions,
data_positions2=self.data_positions,
boxsize=self.boxsize,
los=self.los,
position_type='pos',
mpicomm=mpicomm, mpiroot=mpiroot, num_threads=nthread)
# Add the 2pcf to the dictionary
if not ('s' in self.CF[self.los]):
# Note that the s is the same for all the lines of sight as long as we give the same edges to the 2PCF function
s, poles = project_to_multipoles(xi_quantile)
self.CF[self.los]['s'] = s
else:
poles = project_to_multipoles(xi_quantile, return_sep=False)
self.CF[self.los]['Cross'][f'DS{quantile}'] = poles
self.xi[self.los]['Cross'][f'DS{quantile}'] = xi_quantile
return xi_quantile
[docs] def compute_2pcf(self,
mode:str = 'smu',
edges:list = [np.linspace(0.1, 200, 50), np.linspace(-1, 1, 60)],
mpicomm = None,
mpiroot = None,
nthread:int = 16):
"""
Compute the 2PCF of the galaxies. The result will also be saved in the class, as `self.CF['2PCF']`
Parameters
----------
mode : str, optional
The mode to use for the 2PCF. Defaults to 'smu'.
See pycorr for more details .
edges : list, optional
The edges to use for the 2PCF. Defaults to [np.linspace(0.1, 200, 50), np.linspace(-1, 1, 60)].
See pycorr for more details.
mpicomm : _type_, optional
The MPI communicator. Defaults to None.
mpiroot : _type_, optional
The MPI root. Defaults to None.
nthread : int, optional
The number of threads to use. Defaults to 16.
Returns
-------
xi : pycorr.TwoPointCorrelationFunction
The 2PCF of the galaxies.
"""
# Get the positions of the galaxies
if not hasattr(self, 'data_positions'):
self.get_tracer_positions()
# Initialize the dictionary for the correlations
if not hasattr(self, 'CF'):
self.CF = {}
if not (self.los in self.CF.keys()):
self.CF[self.los] = {}
if not ('2PCF' in self.CF[self.los].keys()):
self.CF[self.los]['2PCF'] = {}
# Initialize the dictionary for the pycorr objects
if not hasattr(self, 'xi'):
self.xi = {}
if not (self.los in self.xi.keys()):
self.xi[self.los] = {}
if not ('Auto' in self.xi[self.los].keys()):
self.xi[self.los]['2PCF'] = {}
# Compute the 2pcf
xi = TwoPointCorrelationFunction(mode, edges,
data_positions1 = self.data_positions,
boxsize = self.boxsize,
los = self.los,
position_type = 'pos',
mpicomm = mpicomm, mpiroot = mpiroot, num_threads = nthread)
# Add the 2pcf to the dictionary
if not ('s' in self.CF[self.los]):
# Note that the s is the same for all the lines of sight as long as we give the same edges to the 2PCF function
s, poles = project_to_multipoles(xi)
self.CF[self.los]['s'] = s
else:
poles = project_to_multipoles(xi, return_sep=False)
self.CF[self.los]['2PCF'] = poles
self.xi[self.los]['2PCF'] = xi
return xi
[docs] def average_CF(self,
average_on:list = ['x', 'y', 'z']
):
"""
Averages the 2PCF, autocorrelation and cross-correlation of the quantiles on the three lines of sight. available (x, y and z)
Parameters
----------
average_on : list, optional
The lines of sight to average on. Defaults to ['x', 'y', 'z']. The CFs must have been computed on these lines of sight before calling this function.
Returns
-------
CF_average : dict
Dictionary containing the averaged 2PCF, autocorrelation and cross-correlation of the quantiles on the lines of sight.
It contains :
* `s` : the separation bins (identical for all the lines of sight, as long as the same edges are used for the 2PCF),
* `2PCF` : the averaged poles of the 2PCF,
* `Auto` : a dictionary containing the averaged autocorrelation of the quantiles. It contains for each quantile `DS{quantile}` as the poles of the autocorrelation,
* `Cross` : a dictionary containing the averaged cross-correlation of the quantiles. It contains for each quantile `DS{quantile}` as the poles of the cross-correlation.
"""
los_list = average_on
# Initialize the dictionary for the averaged correlations
self.CF['average'] = {}
# Check if the 2PCF, autocorrelation and cross-correlation of the quantiles are already computed
if not hasattr(self, 'CF'):
raise ValueError('No correlation has been computed yet. Run compute_2pcf, compute_auto_corr and/or compute_cross_corr first.')
for los in los_list:
if not (los in self.CF.keys()):
raise ValueError(f'The {los} line of sight has not been computed yet. Run compute_2pcf, compute_auto_corr and/or compute_cross_corr first.')
if not ('2PCF' in self.CF[los].keys()):
raise ValueError(f'The 2PCF of the {los} line of sight has not been computed yet. Run compute_2pcf first.')
if not ('Auto' in self.CF[los].keys()):
raise ValueError(f'The autocorrelation of the {los} line of sight has not been computed yet. Run compute_auto_corr first.')
if not ('Cross' in self.CF[los].keys()):
raise ValueError(f'The cross-correlation of the {los} line of sight has not been computed yet. Run compute_cross_corr first.')
# Note that the s is the same for all the lines of sight as long as we give the same edges to the 2PCF function
s = self.CF[los_list[0]]['s'] # Get the separation bins (Same for all the CFs, we take the first one available)
self.CF['average']['s'] = s
poles=[]
# Average the 2PCF
for los in los_list:
poles.append(self.CF[los]['2PCF'])
self.CF['average']['2PCF'] = np.mean(poles, axis=0)
# Average the autocorrelation of the quantiles
self.CF['average']['Auto'] = {}
for quantile in range(len(self.quantiles)):
poles = []
for los in los_list:
poles.append(self.CF[los]['Auto'][f'DS{quantile}'])
self.CF['average']['Auto'][f'DS{quantile}'] = np.mean(poles, axis=0)
# Average the cross-correlation of the quantiles
self.CF['average']['Cross'] = {}
for quantile in range(len(self.quantiles)):
poles = []
for los in los_list:
poles.append(self.CF[los]['Cross'][f'DS{quantile}'])
self.CF['average']['Cross'][f'DS{quantile}'] = np.mean(poles, axis=0)
return self.CF['average']
[docs] def save(self,
hod_indice:int = 0,
path:str = None,
save_HOD:bool = True,
save_pos:bool = True,
save_density:bool = True,
save_quantiles:bool = True,
save_CF:bool = True,
save_xi:bool = True,
los:str = 'average',
save_all:bool = False):
"""
Save the results of the CorrHOD object.
Some of the results are saved as dictionaries in numpy files. They can be accessed using the `np.load` function,
and the .item() method of the loaded object.
Parameters
----------
hod_indice : int, optional
The indice of the set of HOD parameters provided to the CorrHOD Object. Defaults to 0.
Be careful to set the right indice if you want to save the HOD parameters !
path : str, optional
The path where to save the results. If None, the path is set to the output_dir of config file. Defaults to None.
save_HOD : bool, optional
If True, the HOD parameters are saved.
File saved as `hod{hod_indice}_c{cosmo}_p{phase}.npy`. Defaults to True.
save_pos : bool, optional
If True, the cubic mock dictionary is saved.
File saved as `pos_hod{hod_indice}_c{cosmo}_p{phase}.npy`. Defaults to True.
save_density : bool, optional
If True, the density PDF is saved.
File saved as `density_hod{hod_indice}_c{cosmo}_p{phase}.npy`. Defaults to True.
save_quantiles : bool, optional
If True, the quantiles of the densitysplit are saved.
File saved as `quantiles_hod{hod_indice}_c{cosmo}_p{phase}.npy`. Defaults to True.
save_CF : bool, optional
If True, the 2PCF, the autocorrelation and cross-correlation of the quantiles are saved.
The 2PCF is saved as `tpcf_hod{hod_indice}_c{cosmo}_p{phase}.npy`.
It contains the separation bins in the `s` key and the poles in the `2PCF` key.
The Auto and Corr dictionaries are saved as `ds_auto_hod{hod_indice}_c{cosmo}_p{phase}.npy` and `ds_cross_hod{hod_indice}_c{cosmo}_p{phase}.npy`.
Each dictionnary contains `DS{quantile}` keys with the CF of the quantile. The `s` key contains the separation bins.
Defaults to True.
save_xi : bool, optional
If True, the pycorr objects are saved in an unique dictionary.
The 2PCF can be accessed using the `xi[los]['2PCF']` key.
The auto and cross-correlations of the quantiles can be accessed using the `xi[los]['Auto']['DS{i}']` and `xi[los]['Cross']['DS{i}']` keys.
Defaults to True.
los : str, optional
The line of sight along which to save the 2PCF, the autocorrelation and cross-correlation of the quantiles.
Can be 'x', 'y', 'z' or 'average'. Defaults to 'average'.
save_all : bool, optional
If True, all the results are saved. This overrides the other options. Defaults to False.
"""
if path is None:
output_dir = Path(self.sim_params['output_dir'])
else :
output_dir = Path(path)
sim_name = self.sim_params['sim_name']
# Get the cosmo and phase from sim_name (Naming convention has to end by '_c{cosmo}_p{phase}' !)
cosmo = sim_name.split('_')[-2].split('c')[-1] # Get the cosmology number by splitting the name of the simulation
phase = sim_name.split('_')[-1].split('ph')[-1] # Get the phase number by splitting the name of the simulation
# Get the HOD indice in the right format (same as the cosmology and phase)
hod_indice = f'{hod_indice:03d}'
# Define the base names of the files
base_name = f'hod{hod_indice}_{los}_c{cosmo}_ph{phase}.npy'
hod_name = f'hod{hod_indice}_c{cosmo}_ph{phase}.npy' # HOD parameters don't depend on the line of sight
cubic_name = f'pos_hod{hod_indice}_c{cosmo}_ph{phase}.npy' # The cubic dictionary does not depend on the line of sight either
# Note : If the user explicitly wants to save somethig, it is assumed that it has been computed before.
# No error is explicitly raised if the user tries to save something that has not been computed yet.
# If the user wants to save everything, the results are saved only if they exist.
if save_HOD or (save_all and hasattr(self, 'HOD_params')):
# Add the number density to the HOD parameters
self.HOD_params['nbar'] = self.nbar
path = output_dir / 'hod'
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
np.save(path / hod_name, self.HOD_params)
if save_pos or (save_all and hasattr(self, 'cubic_dict')):
# Pass if the cubic dictionary has not been computed yet
if not hasattr(self, 'cubic_dict'):
warn('The cubic dictionary has not been computed yet. Run populate_halos first.', UserWarning)
pass
path = output_dir
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
np.save(path / cubic_name, self.cubic_dict)
if save_density or (save_all and hasattr(self, 'density')):
path = output_dir / 'ds' / 'density'
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
np.save(path / (f'density_' + base_name), self.density)
if save_quantiles or (save_all and hasattr(self, 'quantiles')):
path = output_dir / 'ds' / 'quantiles'
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
np.save(path / (f'quantiles_' + base_name), self.quantiles)
if save_xi or (save_all and hasattr(self, 'xi')):
path = output_dir / 'xi'
path.mkdir(parents=True, exist_ok=True)
np.save(path / (f'xi_' + base_name), self.xi)
if save_CF or (save_all and hasattr(self, 'CF')):
# First, we check that the provided los is expected
if los not in ['x', 'y', 'z', 'average']:
raise ValueError(f'The line of sight must be "x", "y", "z" or "average". Got {los}.')
# Check that the CF has been computed on the provided los
if los not in self.CF.keys():
raise ValueError(f'The {los} line of sight has not been computed yet. Run compute_2pcf, compute_auto_corr and/or compute_cross_corr first.', UserWarning)
# From now on, we only save the CFs that have been computed on the provided los
if '2PCF' in self.CF[los].keys():
tpcf_dict = {'s': self.CF[los]['s'], '2PCF': self.CF[los]['2PCF']}
path = output_dir / 'tpcf'
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
np.save(path / (f'tpcf_' + base_name), tpcf_dict)
if 'Auto' in self.CF[los].keys():
auto_dict = {'s': self.CF[los]['s'], **self.CF[los]['Auto']}
path = output_dir / 'ds' / 'gaussian'
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
np.save(path / (f'ds_auto_' + base_name), auto_dict)
if 'Cross' in self.CF[los].keys():
cross_dict = {'s': self.CF[los]['s'], **self.CF[los]['Cross']}
path = output_dir / 'ds' / 'gaussian'
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
np.save(path / (f'ds_cross_' + base_name), cross_dict)
[docs] def run_all(self,
is_populated:bool = False,
exit_on_underpopulated:bool = False,
los_to_compute='average',
downsample_to:float = None,
remove_Ball:bool = False,
# Parameters for the DensitySplit
smooth_radius:float = 10,
cellsize:float = 5,
nquantiles:int = 10,
sampling:str = 'randoms',
filter_shape:str = 'Gaussian',
# Parameters for the 2PCF, autocorrelation and cross-correlations
edges = [np.linspace(0.1, 200, 50), np.linspace(-1, 1, 60)],
mpicomm = None,
mpiroot = None,
nthread:int = 16,
# Parameters for saving the results
hod_indice:int = 0,
path:str = None,
**kwargs
):
"""
Run all the steps of the CorrHOD object. See tests/test_corrHOD.py for an example of how to use it and its contents.
Note that the **kwargs are used to set the parameters of the save function. If nothing is provided, nothing will be saved.
See the documentation of the save function for more details.
Times will be saved in the `times_dict` attribute of the CorrHOD object and displayed in the logs.
Parameters
----------
los_to_compute : str, optional
The line of sight along which to compute the 2PCF, the autocorrelation and cross-correlation of the quantiles.
If set to 'average', the 2PCF, the autocorrelation and cross-correlation of the quantiles will be averaged on the three lines of sight. Defaults to 'average'.
is_populated : bool, optional
If True, the halos have already been populated before running this method. Defaults to False.
exit_on_underpopulated : bool, optional
If True, the function will exit if the halos are underpopulated compared to the requested number density. Defaults to False.
downsample_to : float, optional
If provided, the galaxies will be randomly downsampled to the provided number density in h^3/Mpc^3 before computing the CFs.
This can be useful to reduce the computation time.
Defaults to None.
remove_Ball : bool, optional
If True, the Ball object (Dark matter simulation loaded from AbacusHOD) is removed after populating the halos.
This is useful to save memory. Defaults to False.
smooth_radius : float, optional
The radius of the Gaussian smoothing in Mpc/h used in the densitysplit.
See https://github.com/epaillas/densitysplit for more details. Defaults to 10.
cellsize : float, optional
The size of the cells in the mesh used in the densitysplit.
See https://github.com/epaillas/densitysplit for more details. Defaults to 10.
nquantiles : int, optional
The number of quantiles to define in the densitysplit.
see https://github.com/epaillas/densitysplit for more details. Defaults to 10.
sampling : str, optional
The type of sampling to use in the densitysplit.
See https://github.com/epaillas/densitysplit for more details. Defaults to 'randoms'.
filter_shape : str, optional
The shape of the smoothing filter to use in the densitysplit.
see https://github.com/epaillas/densitysplit for more details. Defaults to 'Gaussian'.
edges : list, optional
The edges of the s, mu bins to use for the 2PCF, autocorrelation and cross-correlations. Defaults to [np.linspace(0.1, 200, 50), np.linspace(-1, 1, 60)].
mpicomm : _type_, optional
The MPI communicator used in the 2PCF, autocorrelation and cross-correlations. Defaults to None.
mpiroot : _type_, optional
The MPI root used in the 2PCF, autocorrelation and cross-correlations. Defaults to None.
nthread : int, optional
The number of threads to use in the 2PCF, autocorrelation and cross-correlations. Defaults to 16.
hod_indice : int, optional
The indice of the set of HOD parameters provided to the CorrHOD Object. Defaults to 0.
Be careful to set the right indice if you want to save the HOD parameters !
path : str, optional
The path where to save the results. If None, the path is set to the output_dir of config file. Defaults to None.
**kwargs : dict, optional
The parameters to pass to the save function. See the documentation of the save function for more details.
If nothing is provided, nothing will be saved.
"""
logger = logging.getLogger('CorrHOD') # Get the logger for the CorrHOD class
# Get the MPI communicator and rank
if mpicomm is not None and mpiroot is not None:
size = mpicomm.size # Number of processes
rank = mpicomm.Get_rank() # Rank of the current process (root process has rank 0)
root = (rank == mpiroot) # True if the current process is the root process
else:
root = True
rank = 0
if los_to_compute=='average':
los_list = ['x', 'y', 'z']
else:
los_list = [los_to_compute]
# The Barrier and bcast method can only be called if the communicator is not None
if mpicomm is not None:
mpicomm.Barrier() # Wait for all the processes to reach this point before starting
start_time = time()
self.times_dict = {} # Saving the times taken by each step in a dict to call them later if needed
if root:
logger.info(f'Running CorrHOD with the following parameters ({hod_indice}) :')
logger.info(f"\t Simulation : {self.sim_params['sim_name']}")
for key in self.HOD_params.keys():
logger.info(f'\t {key} : {self.HOD_params[key]}')
if self.data_params is not None:
logger.info(f"\t Number density : {self.data_params['tracer_density_mean'][self.tracer]:.2e} h^3/Mpc^3")
logger.newline()
if root and not is_populated:
logger.info('Initializing and populating the halos ...')
self.initialize_halo() # Initialize the halo
self.times_dict['initialize_halo'] = time()-start_time
logger.info(f"Initialized the halos in {self.times_dict['initialize_halo']:.2f} s\n")
# Catch the UserWarning raised by the densitysplit if the halos are underpopulated
with catch_warnings():
filterwarnings("error")
try :
self.populate_halos(nthread=nthread, remove_Ball=remove_Ball) # Populate the halos
except UserWarning as w: # Catch the warning if needed
logger.warning('Warning : ' + str(w)) # Display the warning without triggering the error
if exit_on_underpopulated:
logger.info('The halos are underpopulated. Exiting the function as requested.\n')
return None # Exit the function to avoid crashing the code
logger.newline() # Just to add a space because populate_halos has a built-in print I can't remove
elif not is_populated:
self.cubic_dict = None
elif root:
logger.info('The halos have already been populated. Skipping the initialization and population steps.\n')
# Here, we run all the CF computations for each los given in los_to_compute.
# We will then average the results on the lines of sight if needed
for los in los_list:
los_time = time()
self.times_dict[los] = {} # Initialize the dict for the times taken by each step for this los
self.los = los # Reassign the los we will work on
if root:
logger.info(f'Computing along the {los} line of sight ...')
logger.newline()
self.get_tracer_positions() # Get the positions of the galaxies with RSD on the line of sight
# Compute the DensitySplit
logger.info('Computing the DensitySplit ...')
tmp_time = time()
self.compute_DensitySplit(smooth_radius=smooth_radius,
cellsize=cellsize,
nquantiles=nquantiles,
sampling=sampling,
filter_shape=filter_shape,
return_density=False,
nthread=nthread)
self.times_dict[los]['compute_DensitySplit'] = time()-tmp_time
logger.info(f"Computed the DensitySplit in {self.times_dict[los]['compute_DensitySplit']:.2f} s\n")
# Downsample the galaxies if needed
if downsample_to is not None:
self.downsample_data(new_n=downsample_to)
logger.info(f'Downsampled the galaxies to {downsample_to:.2e} h^3/Mpc^3\n')
else:
self.data_positions = None
self.density = None
self.quantiles = None
if mpicomm is not None:
self.data_positions = mpicomm.bcast(self.data_positions, root=mpiroot) # Broadcast the positions of the galaxies to all the processes
self.quantiles = mpicomm.bcast(self.quantiles, root=mpiroot) # Broadcast the quantiles to all the processes
if rank == 1:
logger.debug(f'(Broadcast test) Number of galaxies : {len(self.data_positions)} on rank {rank}')
logger.newline()
# Compute the 2PCF
if root :
logger.info('Computing the 2PCF ...')
tmp_time = time()
self.compute_2pcf(edges=edges, mpicomm=mpicomm, mpiroot=mpiroot, nthread=nthread)
self.times_dict[los]['compute_2pcf'] = time()-tmp_time
if root:
logger.info(f"Computed the 2PCF in {self.times_dict[los]['compute_2pcf']:.2f} s\n")
# For each quantile, compute the autocorrelation and cross-correlation
self.times_dict[los]['compute_auto_corr'] = {}
self.times_dict[los]['compute_cross_corr'] = {}
for quantile in range(nquantiles):
if root:
logger.info(f'Computing the auto-correlation and cross-correlation of quantile {quantile} ...')
tmp_time = time()
self.compute_auto_corr(quantile, edges=edges,mpicomm=mpicomm, mpiroot=mpiroot, nthread=nthread)
self.times_dict[los]['compute_auto_corr'][f'DS{quantile}'] = time()-tmp_time
if root:
logger.info(f"Computed the auto-correlation of quantile {quantile} in {self.times_dict[los]['compute_auto_corr'][f'DS{quantile}']:.2f} s")
tmp_time = time()
self.compute_cross_corr(quantile, edges=edges,mpicomm=mpicomm, mpiroot=mpiroot, nthread=nthread)
self.times_dict[los]['compute_cross_corr'][f'DS{quantile}'] = time()-tmp_time
if root:
logger.info(f"Computed the cross-correlation of quantile {quantile} in {self.times_dict[los]['compute_cross_corr'][f'DS{quantile}']:.2f} s")
self.times_dict[los]['run_los'] = time()-los_time
if root:
logger.newline()
logger.info(f"Ran los '{los}' in {self.times_dict[los]['run_los']:.2f} s\n")
if root:
# Average the 2PCF, autocorrelation and cross-correlation of the quantiles on the lines of sight
self.average_CF(average_on=los_list)
if mpicomm is not None:
mpicomm.Barrier() # Wait for all the processes to reach this point before ending the timer
self.times_dict['run_all'] = time()-start_time
if root:
logger.info(f"Run_all in {self.times_dict['run_all']:.2f} s")
# Here, we define the arguments to pass to the save function by default (nothing will be saved)
save_args = {
'hod_indice': hod_indice,
'path': path,
'save_HOD': False,
'save_pos': False,
'save_density': False,
'save_quantiles': False,
'save_CF': False,
'save_xi': False,
'los': 'average', # We save the averaged CFs by default
'save_all': False
}
# We update the arguments with the kwargs provided by the user
for key, value in kwargs.items():
if key in save_args.keys():
save_args[key] = value
else:
# If the argument is not an argument of the save function, we warn the user
warn(f'Unknown argument {key}={value} in run_all. It will be ignored.', UserWarning)
# Save the results
if root:
if save_args['los']=='all':
del save_args['los'] # We will manually save the results for each los
self.save(los='average', **save_args)
self.save(los='x', **save_args)
self.save(los='y', **save_args)
self.save(los='z', **save_args)
else:
self.save(**save_args)