Source code for CorrHOD.utils
import numpy as np
from os.path import exists
from pathlib import Path
from warnings import warn
[docs]def apply_rsd(data, boxsize, redshift, cosmo, tracer='LRG', los = 'z'):
"""Read positions and velocities from HOD dict and applies Redshift-Space Distortions (RSD).
Parameters
----------
data : dict
HOD dict containing the positions and velocities of the tracer (data[tracer])
It can also be a positional dataset, in which case it must contain the keys 'x', 'y', 'z', 'vx', 'vy', 'vz'
boxsize : float
Size of the simulation box in Mpc/h.
redshift : float
Redshift of the simulation snapshot.
cosmo : cosmology object (cosmoprimo)
Cosmology used for the simulation. It must contain an `efunc(z)` method.
tracer : str, optional
Tracer to use. Default is 'LRG'.
los : str, optional
Line-of-sight direction in which to apply the RSD. Default is 'z'.
Returns
-------
x, y, z : arrays
Redshift-space positions of the tracer. The los axis is replaced by the redshift-space position.
"""
try:
data = data[tracer] # Load the LRG data
except:
warn('The object is not a hod_dict. Trying to load it as a positional dataset.', UserWarning)
# Check that the data contains the right keys
if not all([key in data.keys() for key in ['x', 'y', 'z', 'vx', 'vy', 'vz']]):
raise ValueError('The object is not a hod_dict and does not contain the keys "x", "y", "z", "vx", "vy", "vz"')
# Get the scale factor and Hubble parameter at the redshift of the snapshot
az = 1 / (1 + redshift)
hubble = 100 * cosmo.efunc(redshift)
# Get the velocities
vx = data['vx']
vy = data['vy']
vz = data['vz']
# Get the positions
if np.min(data['x']) < 0 or np.min(data['y']) < 0 or np.min(data['z']) < 0:
# Add boxsize/2 to center the box at boxsize/2 if the box is centered on 0
x = data['x'] + boxsize / 2
y = data['y'] + boxsize / 2
z = data['z'] + boxsize / 2
else:
x = data['x']
y = data['y']
z = data['z']
# Get the redshift-space positions for each axis. That way, we can replace the los axis with the redshift-space position
x_rsd = x + vx / (hubble * az)
y_rsd = y + vy / (hubble * az)
z_rsd = z + vz / (hubble * az)
# Periodic boundary conditions
x_rsd = x_rsd % boxsize
y_rsd = y_rsd % boxsize
z_rsd = z_rsd % boxsize
# Replace the los axis with the redshift-space position
if los == 'x':
x = x_rsd
elif los == 'y':
y = y_rsd
elif los == 'z':
z = z_rsd
else:
raise ValueError('los must be x, y or z')
return x, y, z
[docs]def array_to_dict(array, is_log_sigma=False):
"""
Converts an array of values into a dictionary of HOD parameters
Parameters
----------
array: array_like
Array of values for the HOD parameters. The order of the parameters must be the following:
[logM_cut, logM1, sigma, alpha, kappa, alpha_c, alpha_s, Bcent, Bsat]
Returns
-------
hod_dict: dict
Dictionary of HOD parameters
"""
hod_dict = {
'logM_cut': array[0],
'logM1': array[1],
'sigma': array[2],
'alpha': array[3],
'kappa': array[4],
'alpha_c': array[5],
'alpha_s': array[6],
'Bcent': array[7],
'Bsat': array[8]
}
# Handle the case where sigma is in log10
if is_log_sigma:
hod_dict['sigma'] = 10 ** hod_dict['sigma']
return hod_dict
[docs]def dict_to_array(dic:dict):
"""
Converts a dictionary of HOD parameters into an array of values
Parameters
----------
dic: dict
Dictionary of HOD parameters. The keys must be the following:
['logM_cut', 'logM1', 'sigma', 'alpha', 'kappa', 'alpha_c', 'alpha_s', 'Bcent', 'Bsat']
Returns
-------
array: array_like
Array of values for the HOD parameters
"""
array = np.array([
dic['logM_cut'],
dic['logM1'],
dic['sigma'],
dic['alpha'],
dic['kappa'],
dic['alpha_c'],
dic['alpha_s'],
dic['Bcent'],
dic['Bsat']
])
return array
[docs]def format_HOD_CFs(path:str,
output_dir:str,
cosmo:int=0,
phase:int=0,
HOD_start:int=0,
HOD_number:int=1,
nquantiles=10,
smoothing_filter:str='gaussian',
smoothing_radius:float=10,
merge_2PCF:bool=True,
merge_DS_auto:bool=True,
merge_DS_cross:bool=True):
"""
Creates a dictionary of the CFs from the saved files.
This should bring all the files in the same format as the one used by Sunbird.
It will return for each CF, one file per cosmology and phase, with the following format:
* `s`: array of separation bins
* `multipoles`: array of shape (HOD_number, los_number(3), nquantiles, npoles(3), sep_bin_number)
The name of the file is :
* `tpcf_c{cosmo}_p{phase}.npy` for the 2PCF
* `ds_auto_zsplit_Rs{smoothing_radius}_c{cosmo}_p{phase}.npy` for the DS auto (zsplit because the density is split in redshift space)
Note that it is assumed that the files that we want to merge are all exist, and are respectively in a directory named `tpcf` and `ds/{smoothing_filter}` from the given path.
An error will be raised if any of the files is missing.
It is also assumed that the separation bins are the same for all the files (tpcf, cross and auto). A check is done to verify that.
Parameters
----------
path : str
Path to the directory containing the saved files
output_dir : str
Path to the directory where the formatted files will be saved
cosmo : int, optional
Cosmology number, same as the one used in AbacusSummit. Defaults to 0
phase : int, optional
Phase number, same as the one used in AbacusSummit. Defaults to 0
HOD_start : int, optional
HOD number to start from. Defaults to 0
HOD_number : int, optional
Number of HODs to merge. Defaults to 1
nquantiles : int, optional
Number of quantiles used to split the sample. Defaults to 10
smoothing_filter : str, optional
Smoothing filter used to compute the DensitySplit (see https://github.com/epaillas/densitysplit for more details).
Defaults to 'gaussian'
smoothing_radius : float, optional
Smoothing radius used to compute the DensitySplit (see https://github.com/epaillas/densitysplit for more details).
Defaults to 10
merge_2PCF : bool, optional
Whether to merge the 2PCF files. Defaults to True
merge_DS_auto : bool, optional
Whether to merge the quantiles auto-correlation files. Defaults to True
merge_DS_cross : bool, optional
Whether to merge the quantiles cross-correlation files. Defaults to True
"""
# Note : We consider that the files that we want to merge must all exist in the same directory
# We also consider than the separation bins are the same for all the files (tpcf, cross and auto) ! A check is done to verify that.
path = Path(path)
output_dir = Path(output_dir)
cosmo = f'{cosmo:03d}'
phase = f'{phase:03d}'
# Check that HOD_number is bigger than 0
if HOD_number < 1:
raise ValueError('HOD_number must be bigger than 0')
# Path to the directories containing the files
tpcf_path = path / 'tpcf'
ds_path = path / 'ds' / smoothing_filter
# Load the fisrt file to get the separation bins length
dic = np.load(tpcf_path / f'tpcf_hod{HOD_start:03d}_x_c{cosmo}_p{phase}.npy').item()
s = dic['s']
# The number of poles should be 3, but just to be sure :
npoles = dic['2PCF'].shape[0]
# Initialize the dictionaries with arrays of size (HOD_number, los_number, nquantiles, npoles, sep_bin_number) for the poles
tpcf_dict = {
's': s,
'multipoles':np.empty((HOD_number, 3, npoles, len(s))) # No nquantiles dimension here since it's the 2PCF
}
ds_auto_dict = {
's': s,
'multipoles':np.empty((HOD_number, 3, nquantiles, npoles, len(s)))
}
ds_cross_dict = {
's': s,
'multipoles':np.empty((HOD_number, 3, nquantiles, npoles, len(s)))
}
# Loop over the HODs
for i in range(HOD_start, HOD_start + HOD_number):
hod_indice = f'{i:03d}'
# Loop over the LOS
for los_indice, los in enumerate(['x', 'y', 'z']):
base_name = f'hod{hod_indice}_{los}_c{cosmo}_p{phase}.npy'
# Get the 2PCF path
filename = f'tpcf_' + base_name
if merge_2PCF:
dic = np.load(tpcf_path / filename).item() # Load the CF
# Check that the separation bins are the same
if not np.array_equal(tpcf_dict['s'], dic['s']):
raise ValueError(f'The separation bins are not equal. Impossible to merge the {i}th 2PCF.')
# Add the 2PCF to the dictionary at the right place
tpcf_dict['multipoles'][hod_indice, los_indice, :, :] = dic['2PCF']
# Get the DS auto path
filename = f'ds_auto_' + base_name
if merge_DS_auto:
dic = np.load(ds_path / filename).item()
# Check that the separation bins are the same
if not np.array_equal(ds_auto_dict['s'], dic['s']):
raise ValueError(f'The separation bins are not equal. Impossible to merge the {i}th DS auto.')
# Loop over the quantiles
for quantile_indice in range(nquantiles):
# Add the DS auto to the dictionary at the right place
ds_auto_dict['multipoles'][hod_indice, los_indice, quantile_indice, :, :] = dic[f'DS{quantile_indice}']
# Get the DS cross path
filename = f'ds_cross_' + base_name
if merge_DS_cross:
dic = np.load(ds_path / filename).item()
# Check that the separation bins are the same
if not np.array_equal(ds_cross_dict['s'], dic['s']):
raise ValueError(f'The separation bins are not equal. Impossible to merge the {i}th DS cross.')
# Loop over the quantiles
for quantile_indice in range(nquantiles):
# Add the DS cross to the dictionary at the right place
ds_cross_dict['multipoles'][hod_indice, los_indice, quantile_indice, :, :] = dic[f'DS{quantile_indice}']
# Save the dictionaries
path = output_dir / 'tpcf'
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
if merge_2PCF:
np.save(path / f'tpcf_c{cosmo}_p{phase}.npy', tpcf_dict)
base_name = f'zsplit_Rs{smoothing_radius}_c{cosmo}_p{phase}.npy'
path = output_dir / 'ds' / smoothing_filter
path.mkdir(parents=True, exist_ok=True) # Create the directory if it does not exist
if merge_DS_auto:
np.save(path / f'ds_auto_' + base_name, ds_auto_dict)
if merge_DS_cross:
np.save(path / f'ds_cross_' + base_name, ds_cross_dict)