import numpy as np
import scipy
import matplotlib.pyplot as plt
import emcee
import corner
import glob
from pathlib import Path
import time
import warnings
from matplotlib.ticker import AutoMinorLocator
from scipy import interpolate
from easyspec.extraction import extraction
from astropy import units as u
from scipy.signal import medfilt
import platform
import os
from scipy.integrate import quad
from astropy.constants import c
from astropy.cosmology import FlatLambdaCDM
from .line_models import *
from .aux_fit_lines import *
import re
import types
OS_name = platform.system()
plt.rcParams.update({'font.size': 12})
libpath = Path(__file__).parent.resolve() / Path("lines")
extraction = extraction()
[docs]class CombinedModel:
"""A picklable class that acts like a function representing one line model or combinations of line models"""
def __init__(self, model_names):
self.model_names = model_names
self.param_counts = {
'Gaussian': 3, # mean, amplitude, std
'Lorentz': 3, # mean, amplitude, fwhm
'Voigt': 4, # x_0, amplitude, fwhm_G, fwhm_L
'Skewedgaussian': 4, # peak location, amplitude, std, skewness
'Skewedlorentzian': 4 # peak location, amplitude, fwhm, skewness
}
def __call__(self, theta, x):
"""Make the class callable like a function. This is important for emcee"""
final_model = 0
param_index = 0
for name in self.model_names:
num_params = self.param_counts[name]
comp_theta = theta[param_index:param_index + num_params]
param_index += num_params
if name == "Gaussian":
final_model += model_Gauss(comp_theta, x)
elif name == "Lorentz":
final_model += model_Lorentz(comp_theta, x)
elif name == "Voigt":
final_model += model_Voigt(comp_theta, x)
elif name == "Skewedgaussian":
final_model += model_skewed_gaussian(comp_theta, x)
elif name == "Skewedlorentzian":
final_model += model_skewed_lorentzian(comp_theta, x)
return final_model
[docs]class analysis:
"""This class contains all the functions necessary to perform the analysis of calibrated spectral data."""
def __init__(self):
self.line_models = types.SimpleNamespace()
self.line_models.model_Gauss = model_Gauss
self.line_models.model_Lorentz = model_Lorentz
self.line_models.model_Voigt = model_Voigt
self.line_models.model_skewed_gaussian = model_skewed_gaussian
self.line_models.model_skewed_lorentzian = model_skewed_lorentzian
[docs] def continuum_fit(self,flux, wavelengths, continuum_regions, method = "powerlaw", pl_order=2, smooth_window=111):
"""
This function fits the continuum emission with an spline or a power law.
Parameters
----------
flux: array (float)
Array with the flux density to be fitted and (if spline is chosen) smoothed.
wavelengths: array (float)
Wavelength must be in absolute values, i.e. without units.
continuum_regions: list (float)
The continuum will be computed only within these wavelength regions and then extrapolated everywhere else. E.g.: continuum_regions = [[3000,6830],[6930,7500]].
If None, then all the wavelength range will be used.
method: string
This is the desired method to compute the continuum. Options are 'powerlaw' (or 'pl') and "median_filter".
The 'powerlaw' method is better in case you have large emission/absorption lines. The method "median_filter" is excellent
for extracting the continuum of a spectrum with narrow emission/absorption lines.
pl_order: int
Polynomial order for the power law fit. Used only if input variable method="powerlaw" (or "pl").
smooth_window: int
This is the size of the smooth window for the "median_filter" method. This number must be odd.
Returns
-------
continuum_selection: array (float)
Array with the continuum flux density.
continuum_std_deviation: float
The standard deviation for the continuum.
"""
if continuum_regions is None:
continuum_regions =[wavelengths[0],wavelengths[-1]]
if isinstance(continuum_regions[0],float) or isinstance(continuum_regions[0],int):
continuum_regions = [continuum_regions]
if smooth_window%2 == 0:
smooth_window = smooth_window - 1
print(f"The input parameter 'smooth_window' must be odd. We are resetting it to {smooth_window}.")
selection = np.asarray([])
index_std_deviation = []
for wavelength_region in continuum_regions:
# Here we select the indexes of the wavelengths inside the intervals given by continuum_regions:
index = np.where((wavelengths > wavelength_region[0]) & (wavelengths < wavelength_region[1]))[0]
selection = np.concatenate([selection,index])
index_std_deviation.append(index)
selection = selection.astype(int)
flux_continuum = flux[selection]
wavelengths_continuum = wavelengths[selection]
if method == "median_filter":
tck = interpolate.splrep(wavelengths_continuum, flux_continuum,k=1)
continuum_selection = interpolate.splev(wavelengths, tck)
continuum_selection = medfilt(continuum_selection, smooth_window)
elif method == "PL" or method == "powerlaw" or method == "pl" or method == "power-law":
z = np.polyfit(wavelengths_continuum, flux_continuum, pl_order)
continuum_selection = np.poly1d(z)
continuum_selection = continuum_selection(wavelengths)
else:
raise RuntimeError("The input options for the 'method' variable are 'powerlaw' or 'median_filter'.")
continuum_std_deviation = []
for sub_region_indexes in index_std_deviation:
continuum_std_deviation.append(np.std(flux[sub_region_indexes]-continuum_selection[sub_region_indexes]))
return continuum_selection, continuum_std_deviation
[docs] def load_calibrated_data(self, calibrated_spec_data, target_name = None, output_dir = "./", plot = True):
"""
This function loads the spectral calibrated data. Preferentially, you should use the file 'TARGETNAME_spec_X.dat' generated
with easyfermi function 'extraction.target_flux_calibration()'.
Parameters
----------
calibrated_spec_data: string
The path to the data '.dat' file containing the calibrated spectrum.
target_name: string
Optional. This name will be used in all subsequent plots.
output_dir: string
A string with the path to the output directory.
plot: boolean
If True, the spectrum will be shown.
Returns
-------
wavelengths: numpy.ndarray (astropy.units Angstrom)
The wavelength solution for the given spectrum
flux_density: numpy.ndarray (astropy.units erg/cm2/s/A)
The calibrated spectrum in flux density.
"""
self.output_dir = str(Path(output_dir))
if target_name is None:
target_name = calibrated_spec_data.split(".")[-1]
self.target_name = target_name
data = np.loadtxt(calibrated_spec_data)
try:
wavelengths, flux_density, wavelength_systematic_error, flux_density_sys_error = data[:,0]*u.angstrom, data[:,1]*u.erg / u.cm**2 / u.s / u.AA, data[:,2][0]*u.angstrom, data[:,3]*u.erg / u.cm**2 / u.s / u.AA # Angstrom, erg/cm2/s/Angstrom, Angstrom, erg/cm2/s/Angstrom
self.wavelength_systematic_error = wavelength_systematic_error
self.flux_systematic_error = flux_density_sys_error
except:
wavelengths, flux_density = data[:,0]*u.angstrom, data[:,1]*u.erg / u.cm**2 / u.s / u.AA # Angstrom, erg/cm2/s/Angstrom
self.wavelength_systematic_error = None
self.flux_systematic_error = None
if plot:
plt.figure(figsize=(12,5))
plt.minorticks_on()
plt.grid(which="both",linestyle=":")
plt.xlim(wavelengths.value.min(),wavelengths.value.max())
plt.ylim(0,flux_density.value.max()*1.2)
plt.title(f"Calibrated data - {target_name}")
plt.plot(wavelengths, flux_density, color='orange', label=target_name+" - calibrated data")
plt.legend()
plt.ylabel("F$_{\lambda}$ "+f"[{flux_density.unit}]",fontsize=12)
plt.xlabel(f"Observed $\lambda$ [${wavelengths.unit}$]",fontsize=12)
plt.show()
self.wavelengths = wavelengths
return wavelengths, flux_density
[docs] def find_lines(self, wavelengths, flux_density, line_significance_sigma = 5, peak_distance = 30, peak_width = 10, method = "median_filter", continuum_regions = None,
pl_order=2, smooth_window=111, plot_lines = True, plot_regions = True, save_plot = False):
"""
This function will find all emission/absorption lines with significance above 'line_significance_sigma' with respect to the local continuum.
Parameters
----------
wavelengths: numpy.ndarray (astropy.units Angstrom)
The wavelength solution for the given spectrum.
flux_density: numpy.ndarray (astropy.units erg/cm2/s/A)
The calibrated spectrum in flux density.
line_significance_sigma: float
Defines how many standard deviations above the continuum the line peak must be in order to be detected.
peak_distance: float
The minimal distance (data bins, not in Angstroms) between peaks (>=1).
peak_width: float
Minimum required width of peaks in data bins. The number of data bins is equal to the number of pixels in the reduced spectral image.
method: string
This is the desired method to compute the continuum. Options are 'powerlaw' (or 'pl') and "median_filter".
The 'powerlaw' method is better in case you have large emission/absorption lines. The method "median_filter" is excellent
for extracting the continuum of a spectrum with narrow emission/absorption lines.
continuum_regions: list (float)
The continuum, in Angstroms, will be computed only within these wavelength regions and then extrapolated everywhere else. E.g.: continuum_regions = [[3000,6830],[6930,7500]].
If None, then all the wavelength range will be used.
pl_order: int
Polynomial order for the power law fit. Used only if input variable method="powerlaw" (or "pl").
smooth_window: int
This is the size of the smooth window for the "median_filter" method. This number must be odd.
plot_lines: boolean
If True, the spectrum and all detected lines will be shown.
plot_regions: boolean
If True, the spectrum will be plotted in multiple regions (assuming continuum_regions is not None). For each one of these regions,
the noise is independently estimated from the local continuum.
save_plot: boolean
If True, the spectrum plot will be saved in the output directory defined in analysis.load_calibrated_data().
Returns
-------
continuum_baseline: numpy.ndarray (float)
An array with the continuum density flux. Standard easyspec units are in erg/cm2/s/A.
line_std_deviation: numpy.ndarray (float)
The standard deviation for the local continuum. Line significance is calculated with respect to this value.
wavelength_peak_positions: numpy.ndarray (astropy.units Angstrom)
The position of each peak in Angstroms.
peak_heights: numpy.ndarray (astropy.units erg/cm2/s/A)
The height of each peak in erg/cm2/s/A
line_significance: array (floats)
The line significance with respect to the local continuum standard deviation.
"""
continuum_baseline, continuum_std_deviation = self.continuum_fit(flux_density.value, wavelengths.value, method = method, continuum_regions = continuum_regions, pl_order = pl_order, smooth_window = smooth_window)
peak_heights = np.asarray([])
peak_position_index = np.asarray([])
line_significance = np.asarray([])
line_std_deviation = np.asarray([])
line_position = []
if plot_regions:
plt.figure(figsize=(12,4))
plt.ylabel("F$_{\lambda}$ "+f"[{flux_density.unit}]",fontsize=12)
plt.xlabel(f"Observed $\lambda$ [${wavelengths.unit}$]",fontsize=12)
if continuum_regions is None:
plt.title("Using the full wavelength range to estimate the continuum")
else:
plt.title("The continuum noise is independently estimated in each one of the vertical strips")
plt.minorticks_on()
plt.grid(which="both",linestyle=":")
plt.xlim(wavelengths.value.min(),wavelengths.value.max())
plt.ylim(-1.2*np.abs(np.min(flux_density.value-continuum_baseline)),1.2*np.max(flux_density.value-continuum_baseline))
if continuum_regions is None:
continuum_regions = [[np.min(wavelengths.value),np.max(wavelengths.value)]]
# Below we do a loop over the values of standard deviation for the selected continuum regions. The line significance is estimated based on the standard deviation of the closest continuum region.
ylim_min = 0
for number,std_deviation in enumerate(continuum_std_deviation):
peak_height = line_significance_sigma*std_deviation
if number == 0:
if len(continuum_std_deviation) > 1:
index = np.where(wavelengths.value < (continuum_regions[0][1] + continuum_regions[1][0])/2)[0]
index = np.append(index,index[-1] + 1) # This step is to avoid gaps of 1 pixel between the continuum regions
else:
index = np.asarray(range(len(wavelengths.value)))
elif number != (len(continuum_std_deviation)-1):
index = np.where((wavelengths.value > (continuum_regions[number-1][1] + continuum_regions[number][0])/2 ) & (wavelengths.value < (continuum_regions[number][1] + continuum_regions[number+1][0])/2 ))[0]
index = np.append(index,index[-1] + 1) # This step is to avoid gaps of 1 pixel between the continuum regions
else:
index = np.where(wavelengths.value > (continuum_regions[number-1][1] + continuum_regions[number][0])/2)[0]
continuum_removed_flux = flux_density.value[index]-continuum_baseline[index]
if plot_regions:
plt.plot(wavelengths.value[index], continuum_removed_flux)
plt.fill_betweenx(np.linspace(-1.2*np.abs(np.min(flux_density.value-continuum_baseline)),1.2*np.max(flux_density.value-continuum_baseline),10), continuum_regions[number][0],continuum_regions[number][1],alpha=0.3)#,color="gray")
# Emission lines:
local_peak_position_index, local_peak_heights = scipy.signal.find_peaks(continuum_removed_flux,height=peak_height,distance = peak_distance, width = peak_width)
peak_heights = np.concatenate([peak_heights,local_peak_heights["peak_heights"]])
peak_position_index = np.concatenate([peak_position_index, local_peak_position_index + index.min()])
line_significance = np.concatenate([line_significance,local_peak_heights["peak_heights"]/std_deviation])
line_std_deviation = np.concatenate([line_std_deviation, std_deviation*np.ones(len(local_peak_position_index))])
if len(local_peak_position_index) > 0:
emission_line_position = ["up"]*len(local_peak_position_index)
line_position = line_position + emission_line_position
# Absorption lines:
local_peak_position_index, local_peak_heights = scipy.signal.find_peaks(-1*continuum_removed_flux,height=peak_height,distance = peak_distance, width = peak_width)
peak_heights = np.concatenate([peak_heights,-1*local_peak_heights["peak_heights"]])
peak_position_index = np.concatenate([peak_position_index, local_peak_position_index + index.min()])
line_significance = np.concatenate([line_significance,local_peak_heights["peak_heights"]/std_deviation])
line_std_deviation = np.concatenate([line_std_deviation, std_deviation*np.ones(len(local_peak_position_index))])
if len(local_peak_position_index) > 0:
local_ylim_min = -1.1*local_peak_heights["peak_heights"].max()
if local_ylim_min < ylim_min:
ylim_min = local_ylim_min
absorption_line_position = ["down"]*len(local_peak_position_index)
line_position = line_position + absorption_line_position
if len(peak_position_index) == 0:
raise RuntimeError("No significant emission or absorption line was found. Maybe you can try playing with the input parameters 'line_significance_sigma' and 'peak_width'.")
peak_position_index = peak_position_index.astype(int)
peak_heights = (peak_heights+continuum_baseline[peak_position_index])*flux_density.unit
wavelength_peak_positions = wavelengths[peak_position_index]
continuum_removed_flux = flux_density.value-continuum_baseline
if plot_lines:
plt.figure(figsize=(12,5))
if len(peak_heights) > 0:
for number, peak_wavelength in enumerate(wavelength_peak_positions.value):
if line_position[number] == "up":
plt.text(peak_wavelength, peak_heights.value[number] + 0.05*peak_heights.value.max(), str(round(peak_wavelength,3))+"$\AA$", color="C0",rotation=90,fontsize=10, horizontalalignment="center", verticalalignment="bottom")
else:
text_height = np.mean(np.abs(peak_heights.value))
plt.text(peak_wavelength, continuum_baseline[peak_position_index][number] + text_height,str(round(peak_wavelength,3))+"$\AA$", color="red",rotation=90,fontsize=10, horizontalalignment="center", verticalalignment="bottom")
plt.vlines(peak_wavelength, peak_heights.value[number], continuum_baseline[peak_position_index][number] + 0.95*text_height,color="red")
if method != "median_filter":
plt.plot(wavelengths,continuum_baseline,label="Power-law continuum")
else:
plt.plot(wavelengths,continuum_baseline,label="Median-filter continuum")
plt.plot(wavelengths, flux_density, color='orange')
plt.plot(wavelengths, continuum_removed_flux, alpha=0.15, color='black', label="Continuum-subtracted spec")
plt.minorticks_on()
plt.grid(which="both",linestyle=":")
plt.xlim(wavelengths.value.min(),wavelengths.value.max())
plt.ylim(ylim_min,flux_density.value.max()*1.5)
plt.ylabel("F$_{\lambda}$ "+f"[{flux_density.unit}]",fontsize=12)
plt.xlabel(f"Observed $\lambda$ [${wavelengths.unit}$]",fontsize=12)
plt.title(self.target_name)
plt.legend()
plt.tight_layout()
if save_plot:
plt.savefig(self.output_dir+f"/{self.target_name}_line_wavelengths.pdf",bbox_inches='tight')
if plot_lines or plot_regions:
plt.show()
ordered_indexes = np.argsort(wavelength_peak_positions)
peak_heights = peak_heights[ordered_indexes]
wavelength_peak_positions = wavelength_peak_positions[ordered_indexes]
line_significance = line_significance[ordered_indexes]
return continuum_baseline, line_std_deviation, wavelength_peak_positions, peak_heights, line_significance
[docs] def all_models(self, model_name):
"""
Returns a picklable CombinedModel instance
"""
model_name_list = re.findall('[A-Z][a-z_]*', model_name)
return CombinedModel(model_name_list)
[docs] def lnlike(self, theta, x, y, yerr, model):
"""
Here we compute the likelihood of the current model given the data.
"""
return -0.5 * np.sum(((y - model(theta, x)) / yerr) ** 2)
[docs] def lnprior(self, theta, priors):
"""
This function checks if the input parameters satisfy the prior conditions.
"""
if np.all((priors[:,0] < theta) & (theta < priors[:,1])):
return 0.0
else:
return -np.inf
[docs] def lnprob(self, theta, priors, x, y, yerr, model):
lp = self.lnprior(theta, priors)
if not np.isfinite(lp):
return -np.inf
return lp + self.lnlike(theta, x, y, yerr, model)
[docs] def line_MCMC(self, p0, priors, nwalkers, niter, initial, lnprob, data, model_name, custom_function=None, burn_in=100, show_progress=True):
"""This function runs a MCMC approach for one line (singular or blended) for a given a model."""
priors = tuple([priors]) # The priors are transformed into a tuple containing only one element
if model_name == "custom":
if not callable(custom_function):
raise Exception("Parameter custom_function is not callable. This parameter must be a function to work properly.")
adopted_model = self.all_models(model_name) # Here we choose a function for the fit
adopted_model = tuple([adopted_model]) # The adopted model is transformed into a tuple containing only one element
metadata = priors + data + adopted_model
sampler = emcee.EnsembleSampler(nwalkers, len(initial), lnprob, args=metadata)
start = time.time()
print("Running burn-in...")
p0, _, _ = sampler.run_mcmc(p0, burn_in, progress=show_progress)
sampler.reset()
print("Running production...")
pos, prob, state = sampler.run_mcmc(p0, niter, progress=show_progress)
end = time.time()
serial_time = end - start
print("Single-core processing took {0:.1f} seconds".format(serial_time))
return sampler, pos, prob, state
[docs] def plotter(self, sampler, model_name, x, color="grey", normalization = 1):
"""In this function we plot the 'hairs' (i.e. alternative models) around the maximum likelihood model estimated with the MCMC method."""
samples = sampler.flatchain
adopted_model = self.all_models(model_name)
x_plot = np.linspace(x.min(),x.max(),1000)
for theta in samples[np.random.randint(len(samples), size=100)]:
plt.plot(x_plot, adopted_model(theta, x_plot)*normalization, color=color, zorder=0, alpha=0.1) # plotting with parameters in the posterior destribution
plt.ticklabel_format(style="sci", axis="x", scilimits=(0, 0))
plt.grid(linestyle=":")
plt.legend()
[docs] def MCMC_spread(self, x, samples, model_name, nsamples=100, custom_function=None):
"""
Function to compute the median MCMC model and its standard deviation.
"""
if model_name == "custom":
if not callable(custom_function):
raise Exception("Parameter custom_function is not callable. This parameter must be a function to work properly.")
else:
if custom_function is not None:
custom_function = None
models = []
draw = np.floor(np.random.uniform(0,len(samples),size=nsamples)).astype(int)
thetas = samples[draw] # Each element of thetas contain the N parameters of the assumed model
adopted_model = self.all_models(model_name)
for theta in thetas:
mod = adopted_model(theta,x)
models.append(mod)
spread = np.std(models,axis=0)
median_model = np.median(models,axis=0)
return median_model,spread
[docs] def data_window_selection(self, wavelengths, spec_flux, spec_flux_err, line_region_min,line_region_max):
"""
Function to slice the data into a wavelength window.
Parameters
----------
wavelengths: numpy.array
This variable must be in absolute values, i.e. without units.
spec_flux: numpy.array
The spectral density array.
spec_flux_err: numpy.array
The spectral density array.
line_region_min: float
Inferior window limit.
line_region_max: float
Superior window limit.
Returns
-------
x, y, yerr: numpy.arrays
New arrays containing data within the selected window.
"""
selection = (wavelengths > line_region_min) & (wavelengths < line_region_max)
x=wavelengths[selection]
y=spec_flux[selection]
yerr=spec_flux_err[selection]
return x, y, yerr
[docs] def redshift_calculator(self, q_16,q_50,q_84,air_wavelength_line):
z = (q_50 - air_wavelength_line) / air_wavelength_line
zerror_down = (q_50-q_16)/air_wavelength_line
zerror_up = (q_84-q_50)/air_wavelength_line
return z, zerror_down, zerror_up
[docs] def parameter_estimation(self, samples, air_wavelength_line=None, quantiles=[0.16, 0.5, 0.84], normalization = 1, parlabels=None, line_names="", output_dir=".", savefile=True):
"""
In this function we estimate the parameter values and corresponding errors based on the 16%, 50%, and 84% quantiles of the
MCMC posterior distributions of parameters.
Parameters
----------
samples: list
A list containing the MCMC posterior distributions.
air_wavelength_line: float
The line rest-frame wavelength used to compute the redshift.
quantiles: list
The quantiles adpted to compute the parameter values and corresponding errors. Default is quantiles=[0.16, 0.5, 0.84].
normalization: float
The plot normalization.
parlabels: list
This is a list with the names of the free parameters in the adopted model. If you use a model with two peaks,
then parlabels needs two input lists, e.g.: parlabels=[['a','b'],['c','d']]. If it has three peaks, parlabels
must be parlabels=[['a','b'],['c','d'],['e','f']] and so on.
line_names: string or list of strings
The line names.
outputdir: string
The output directory.
savefile: boolean
If True, the data will be saved in the output directory.
Returns
-------
par_values: list
A list with the parameter values recovered from the 50% quantiles of the MCMC posterior distributions of parameters.
par_values_errors: list
A list with the parameter errors recovered from the 16% and 84% quantiles of the MCMC posterior distributions of parameters.
par_names: list
A list with the parameter names.
"""
if isinstance(parlabels[0],str):
parlabels = [parlabels]
output_dir = str(Path(output_dir))
# If air_wavelength_line is a number, we transform it into a list with a single element:
if air_wavelength_line is not None:
if isinstance(air_wavelength_line,float) or isinstance(air_wavelength_line,int):
air_wavelength_line = [air_wavelength_line]
elif isinstance(air_wavelength_line,np.ndarray) or isinstance(air_wavelength_line,list):
pass
else:
raise Exception("The input value for air_wavelength_line must be a float, an integer, a list, or a numpy array.")
else:
air_wavelength_line = [None] * len(parlabels)
# Checking line_names:
if isinstance(line_names,str):
line_names = [line_names]
elif isinstance(line_names,np.ndarray) or isinstance(line_names,list):
pass
else:
raise Exception("The input value for line_names must be a string, a list, or a numpy array.")
previous_j = 0
par_values = []
par_values_errors = []
par_names = []
for i in range(len(air_wavelength_line)):
if savefile:
f = open(f'{output_dir}/{self.target_name}_{line_names[i]}_line_fit_results.csv','w')
f.write("# Parameter name, value, error_down, error_up\n")
ndim = len(parlabels[i]) # number of dimensions/parameters
parlabels[i] = list(parlabels[i])
for j in range(ndim): # must be done once per variable
q_16, q_50, q_84 = corner.quantile(samples[:,j+previous_j], quantiles)
dx_down, dx_up = q_50-q_16, q_84-q_50
# Computing the redshift of the line:
if parlabels[i][j] == "Location" and air_wavelength_line[i] is not None:
z, zerror_down, zerror_up = self.redshift_calculator(q_16,q_50,q_84,air_wavelength_line[i])
if savefile:
f.write(f"z, {z}, {zerror_down}, {zerror_up}\n")
par_values.append(z)
par_values_errors.append([zerror_down, zerror_up])
par_names.append("redshift")
if parlabels[i][j][0:9] == "Amplitude": # If the variable is the amplitude, then we have to normalize the data
par_values.append(q_50*normalization)
par_values_errors.append([dx_down*normalization, dx_up*normalization])
elif parlabels[i][j][0:3] == "std": # For Gaussian fits, we convert the std to FWHM
parlabels[i][j] = "fwhm_Gauss"
fwhm = q_50*2*np.sqrt(2 * np.log(2))
fwhm_error_down = dx_down*2*np.sqrt(2 * np.log(2))
fwhm_error_up = dx_up*2*np.sqrt(2 * np.log(2))
par_values.append(fwhm)
par_values_errors.append([fwhm_error_down, fwhm_error_up])
else:
par_values.append(q_50)
par_values_errors.append([dx_down, dx_up])
par_names.append(parlabels[i][j])
if savefile:
f.write(f"{parlabels[i][j]}, {par_values[-1]}, {par_values_errors[-1][0]}, {par_values_errors[-1][1]}\n")
previous_j = j + previous_j + 1
if savefile:
f.close()
return par_values, par_values_errors, par_names
[docs] def quick_plot(self, x, y, model_name, parlabels, sampler=None, best_fit_model=None, theta_max=None, normalization = 1, hair_color="grey", title="",xlabel="Observed $\lambda$ [$\AA$]",ylabel="F$_{\lambda}$ [erg cm$^{-2}$ s$^{-1}$ $\AA^{-1}$ ]",overplot_median_model=True,savefig=True, outputdir="./"):
"""
In this function we plot the data window together with the maximum likelihood and/or the median model.
Parameters
----------
x, y: numpy.arrays
Arrays containing the data within the selected window.
model_name: string
The current model adopted in the fit.
parlabels: list
A list with the parameter names for the adopted model.
sampler:
The MCMC sampler.
best_fit_model: numpy.array
Array with the best-fit model.
theta_max: numpy.ndarray
Array with the best-fit parameters.
normalization: float
The plot normalization.
hair_color: string
The color adopted for the MCMC hairs.
title: string
The title of the plot.
xlabel,ylabel: strings
The axes labels.
overplot_median_model: boolean
If True, the median model and model standard deviation will be overploted.
savefig: boolean
If True, the figure will be saved in the output directory.
outputdir: string
The output directory.
Returns
-------
"""
f = plt.figure(figsize=(10,8))
ax = f.add_subplot(1,1,1)
ax.xaxis.set_minor_locator(AutoMinorLocator(2))
ax.yaxis.set_minor_locator(AutoMinorLocator(2))
ax.tick_params(which='major', length=5, direction='in')
ax.tick_params(which='minor', length=2.5, direction='in',bottom=True, top=True, left=True, right=True)
ax.tick_params(bottom=True, top=True, left=True, right=True)
plt.plot(x,y*normalization,color="orange", label="Data after cont. subtraction")
if sampler is not None:
self.plotter(sampler,model_name,x,color=hair_color, normalization = normalization)
if best_fit_model is not None:
x_plot = np.linspace(x.min(),x.max(),len(best_fit_model))
plt.plot(x_plot,best_fit_model*normalization, color="black", label="Highest likelihood model")
if overplot_median_model:
samples = sampler.flatchain
median_model,spread = self.MCMC_spread(x, samples, model_name, nsamples=200)
x_plot = np.linspace(x.min(),x.max(),len(median_model))
plt.plot(x_plot,median_model*normalization, color="C0", ls="--", label="Median model")
plt.fill_between(x_plot,(median_model+spread)*normalization,(median_model-spread)*normalization,color="C0",alpha=0.3,label="$1\sigma$")
plt.xlabel(xlabel,fontsize=12)
plt.ylabel(ylabel,fontsize=12)
plt.title(title)
plt.grid(which="both", linestyle=":")
plt.ticklabel_format(scilimits=(-5, 8))
if normalization < 0:
plt.ylim(1.1*y.max()*normalization,(y.min()-0.1*y.max())*normalization)
else:
plt.ylim((y.min()-0.1*y.max())*normalization,1.1*y.max()*normalization)
if isinstance(parlabels,list):
parlabels = np.asarray(parlabels)
peak_indexes = np.where(parlabels=="Location")[0]
for i in peak_indexes:
plt.vlines(theta_max[i],y.min()-0.1*y.max(),10000, colors="k", linewidth=0.5)
plt.legend()
if savefig:
plt.savefig(outputdir+"/"+title+"_line.png")
return
[docs] def insert_rows(self, array, indices, row):
"""Insert one or more copies of 'row' into 'array' at given indices (axis=0)."""
array = np.asarray(array)
row = np.asarray(row)
# Ensure indices are sorted descending
indices = np.sort(indices)[::-1]
# Clamp indices that are out of range
indices = np.clip(indices, 0, len(array))
for idx in indices:
array = np.insert(array, idx, row, axis=0)
return array
[docs] def merge_fit_results(self, target_name, list_of_files=None, output_dir="./"):
"""
This function merges the individual data files for each line into a single merged data file.
"""
output_dir = str(Path(output_dir))
if list_of_files is None:
list_of_files = glob.glob(output_dir+"/*_line_fit_results.csv")
else:
list_of_files = np.genfromtxt(list_of_files,dtype=str)
f = open(f'{output_dir}/'+target_name+"_lines.csv","w")
data_list = []
line_names = []
maximum_number_of_pars = 0
index_maximum_number_of_pars = 0
# Finding the line data file with the largest number of parameters:
for n,line_file in enumerate(list_of_files):
if line_file[-16:] == "custom_model.csv":
print(f"Skipping {line_file}.")
continue
line_data = np.genfromtxt(line_file,delimiter=",",dtype=str)
data_list.append(line_data)
line_names.append(line_file.split("/")[-1][:-21])
if len(line_data) > maximum_number_of_pars:
maximum_number_of_pars = len(line_data)
index_maximum_number_of_pars = n
# Write header:
parameter_names = np.asarray(data_list[index_maximum_number_of_pars][:,0])
minimum_list_of_parameter_names = np.asarray(['z','Location','Amplitude','fwhm_Lorentz','fwhm_Gauss','skewness'])
if len(parameter_names) < len(minimum_list_of_parameter_names):
parameter_names = minimum_list_of_parameter_names
f.write("# Line name")
for parameter_name in parameter_names:
if parameter_name == "Location":
f.write(", obs_"+parameter_name+" [Ang], obs_"+parameter_name+"_error_down, obs_"+parameter_name+"_error_up")
elif parameter_name == "Amplitude":
f.write(", obs_"+parameter_name+" (flux_dens - continuum) [erg cm-2 s-1 Ang-1], obs_"+parameter_name+"_error_down, obs_"+parameter_name+"_error_up")
elif parameter_name[0:4] == "fwhm":
f.write(", obs_"+parameter_name+" [Ang], obs_"+parameter_name+"_error_down, obs_"+parameter_name+"_error_up")
elif parameter_name == "skewness":
f.write(", obs_"+parameter_name+", obs_"+parameter_name+"_error_down, obs_"+parameter_name+"_error_up")
else:
f.write(", "+parameter_name+", "+parameter_name+"_error_down, "+parameter_name+"_error_up")
if self.wavelength_systematic_error is not None:
f.write(", Systematic wavelength error [Ang]")
if self.flux_systematic_error is not None:
f.write(", Systematic amplitude error (erg/cm2/s/Angstrom)")
# Writing down the parameters and filling the empty spaces with zeros:
for i in range(len(data_list)):
f.write("\n"+line_names[i])
line_array = np.asarray(data_list[i])
mask_in_par_names = np.isin(parameter_names, line_array[:,0])
# Invert mask to get elements missing in parameter_names
mask_not_in_par_names = ~mask_in_par_names
# Indices in parameter_names of elements NOT in line_array[:,0]
idx_missing = np.where(mask_not_in_par_names)[0]
# Insert element 3 at index 2
new_row = np.array(["missing","0","0","0"])
# Repeat the new row
# Sort indices descending to avoid shifting
line_array = self.insert_rows(line_array, idx_missing, new_row)
for parameter in line_array:
f.write(", "+parameter[1]+", "+str(np.abs(float(parameter[2])))+", "+str(np.abs(float(parameter[3]))))
if self.wavelength_systematic_error is not None:
f.write(f", {self.wavelength_systematic_error.value}")
index = extraction.find_nearest(self.wavelengths.value,float(line_array[1][1]))
if self.flux_systematic_error is not None:
f.write(f", {self.flux_systematic_error.value[index]}")
f.close()
return
[docs] def parameter_time_series(self, initial, sampler, labels):
"""
This function plots the time series of the parameters running in the MCMC.
"""
fig, axes = plt.subplots(len(initial), figsize=(10, 2*len(initial)), sharex=True)
samples = sampler.get_chain()
for i in range(len(initial)):
ax = axes[i]
ax.plot(samples[:, :, i], "k", alpha=0.3)
ax.set_xlim(0, len(samples))
ax.set_ylabel(labels[i])
ax.yaxis.set_label_coords(-0.1, 0.5)
return
[docs] def automatic_priors(self, which_model, observed_wavelength, peak_height, line_region_min, line_region_max):
"""
In this function we automatically set the priors, initial parameter values and select the line model.
Parameters
----------
which_models: string
The models to be applied to the line. Options are "Gaussian", "Lorentz" and "Voigt".
observed_wavelength: float
The wavelength corresponding to the peak position.
peak_height: flaot
The height of the peak with respect to the continuum emission.
line_region_min: float
The starting wavelength around the studied line.
line_region_max: float
The final wavelength around the studied line.
Returns
-------
initial: numpy.array
An array with the automatic initial parameter values for the MCMC.
priors: numpy.array
An array containing three or four lists of two elements, i.e. one list for each parameter given in 'initial'.
labels: list
A list with the names of each parameter.
adopted_model: method
A function with the adopted model, i.e., a "Gaussian", "Lorentz" or "Voigt".
"""
if which_model == "Gaussian":
initial = np.array([observed_wavelength, peak_height, 10])
if peak_height > 0: # This step is necessay because the priors must always go fro mthe smallest value up to the largest.
priors = np.array([[line_region_min,line_region_max],[0.1*peak_height, 10*peak_height],[0.1,150]])
else:
priors = np.array([[line_region_min,line_region_max],[10*peak_height,0.1*peak_height],[0.1,150]])
labels = ["Location", "Amplitude", "std"]
adopted_model = model_Gauss
elif which_model == "Lorentz":
initial = np.array([observed_wavelength, peak_height, 10])
if peak_height > 0:
priors = np.array([[line_region_min,line_region_max],[0.1*peak_height, 10*peak_height],[0.1,150]])
else:
priors = np.array([[line_region_min,line_region_max],[10*peak_height,0.1*peak_height],[0.1,150]])
labels = ["Location", "Amplitude", "fwhm_Lorentz"]
adopted_model = model_Lorentz
elif which_model == "Voigt":
initial = np.array([observed_wavelength, peak_height, 10, 10])
if peak_height > 0:
priors = np.array([[line_region_min,line_region_max],[0.1*peak_height, 10*peak_height],[0.1,150],[0.1,150]])
else:
priors = np.array([[line_region_min,line_region_max],[10*peak_height,0.1*peak_height],[0.1,150],[0.1,150]])
labels = ["Location", "Amplitude", "fwhm_Lorentz", "fwhm_Gauss"]
adopted_model = model_Voigt
elif which_model == "Skewedgaussian":
initial = np.array([observed_wavelength, peak_height, 10, 0])
if peak_height > 0:
priors = np.array([[line_region_min,line_region_max],[0.1*peak_height, 10*peak_height],[0.1,150],[-50,50]])
else:
priors = np.array([[line_region_min,line_region_max],[10*peak_height,0.1*peak_height],[0.1,150],[-50,50]])
labels = ["Location", "Amplitude", "std", "skewness"]
adopted_model = model_skewed_gaussian
elif which_model == "Skewedlorentzian":
initial = np.array([observed_wavelength, peak_height, 10, 0])
if peak_height > 0:
priors = np.array([[line_region_min,line_region_max],[0.1*peak_height, 10*peak_height],[0.1,150],[-50,50]])
else:
priors = np.array([[line_region_min,line_region_max],[10*peak_height,0.1*peak_height],[0.1,150],[-50,50]])
labels = ["Location", "Amplitude", "fwhm_Lorentz", "skewness"]
adopted_model = model_skewed_lorentzian
return initial, priors, labels, adopted_model
[docs] def estimate_norm_height(self, flux_density, continuum_baseline, wavelength_peak_position, peak_height):
"""
Funtion to estimate the normalized height of a peak to set the priors for the MCMC.
Parameters
----------
flux_density: numpy.ndarray (astropy.units erg/cm2/s/A)
The calibrated spectrum in flux density.
continuum_baseline: numpy.ndarray (float)
An array with the continuum density flux. Standard easyspec units are in erg/cm2/s/A. This variable is an output of the function analysis.find_lines().
wavelength_peak_position: astropy.units Angstrom
The position of the peak in the wavelength axis in Angstroms.
peak_height: astropy.units erg/cm2/s/A
The height of the peak in erg/cm2/s/A.
Returns
-------
norm_height: float
The estimated normalized height of the input peak. This value can be set to estimate priors for the MCMC.
"""
if isinstance(wavelength_peak_position, u.quantity.Quantity):
wavelength_peak_position = wavelength_peak_position.value
local_normalization = 10**round(np.log10(np.median(flux_density.value - 0.9 * continuum_baseline)))
norm_height = peak_height/local_normalization
return norm_height
[docs] def calculate_continuum_subtracted_heights(self,wavelengths, peak_positions, peak_heights, continuum_baseline):
"""Calculate peak heights relative to local continuum."""
# Find indices closest to each peak position
continuum_indices = [
extraction.find_nearest(wavelengths.value, pos)
for pos in peak_positions
]
# Subtract continuum baseline from peak heights
continuum_values = continuum_baseline[continuum_indices]
return peak_heights - continuum_values
[docs] def fit_lines(self, wavelengths, flux_density, continuum_baseline, wavelength_peak_positions, rest_frame_line_wavelengths, peak_heights, line_std_deviation,
blended_line_min_separation = 50, which_models="Lorentz", line_names = None, overplot_archival_lines = ["H"], priors = None, MCMC_walkers = 100,
MCMC_iterations = 200, MCMC_burn_in=100, MCMC_show_progress=True, plot_spec = True, plot_MCMC = False, overplot_median_model = False, save_results = True):
"""
Perform Markov Chain Monte Carlo (MCMC) fitting of spectral lines to estimate
line parameters and their uncertainties.
This function fits Gaussian, Lorentzian, or Voigt profiles to spectral lines
using MCMC methods, with support for blended lines and parallel processing
for Voigt profile calculations.
Parameters
----------
wavelengths: numpy.ndarray (astropy.units Angstrom)
The wavelength solution for the given spectrum.
flux_density: numpy.ndarray (astropy.units erg/cm2/s/A)
The calibrated spectrum in flux density.
continuum_baseline: numpy.ndarray (float)
An array with the continuum density flux. Standard easyspec units are in erg/cm2/s/A. This variable is an output of the function analysis.find_lines().
wavelength_peak_positions: numpy.ndarray (astropy.units Angstrom)
The position of each peak in Angstroms found with the function analysis.find_lines().
rest_frame_line_wavelengths: list
A list with the rest frame wavelength values for each one of the input lines.
peak_heights: numpy.ndarray (astropy.units erg/cm2/s/A)
The height of each peak in erg/cm2/s/A. This variable is an output of the function analysis.find_lines().
line_std_deviation: numpy.ndarray (float)
The standard deviation for the local continuum. This variable is an output of the function analysis.find_lines().
blended_line_min_separation: float
The minimum separation between blended line peaks in Angstrom. If the line peaks are closer than this value, a blended model will be adopted.
which_models: string or list of strings
A list containing the models to be applied to each line, e.g.: if you are trying to model 2 lines, you can use which_models = ["Lorentz","Gaussian"].
If you wish to use the same model for all lines, you can use which_models = ["Gaussian","Gaussian"] or simply which_models = "Gaussian". Options are
"Gaussian", "Lorentz" and "Voigt".
line_names: list
Optional. A list with line names, e.g.: if you are trying to model 2 lines, you can use line_names = ["Hbeta","Halpha"].
If the names are not provided, easyspec will call the lines line_0, line_1... and so on.
overplot_archival_lines: list
List with the strings corresponding to different elements. E.g.: ["H","He"] will overplot all the Hydrogen and Helium lines redshifted based on the average
redshift of the lines given in wavelength_peak_positions. If you don't want to overplot lines, use overplot_archival_lines = None. If you use too many lines
as input, they will very likely overlap in the plot. Be aware that this feature is meant only for guiding the user! There are several lines which are not
included in our database, but we tried to select the most commonly seen lines in galaxy, quasar and stellar spectra.
priors: list or list of lists
This parameter is complicated. It is better if you leave it as "None". This parameter controls the priors used in the MCMC in the estimation of the
line parameters. The initial parameters for the MCMC are always defined as wavelength_peak_positions (for the position of the line peak) and
peak_heights (for the height of the peak). If priors = None, the priors are set to wavelength_peak_positions +- 100 Angstroms (or to half the distance
to the closest line if this line is closer than 100 Angstroms), 0.1*peak_heights up to 10*peak_heights (normalized based on the continuum),
and the std or fwhm are confined within 0.1 to 150. If you are e.g. analysing 4 lines with the Lorentz model and want to change the priors of the third
line, you can set priors = [None, None,[[7496,7696],[0.1, 150],[2,150]], None], where the list of three ranges here represents the allowed sampling
intervals, i.e. the position of the peak, the peak heigh in terms of the continuum level and the fwhm. For the Voigt model, the input variable would
be priors = [None, None,[[7496,7696],[0.1, 150],[2,150],[1,150]], None]. Of course you can set up the ranges for all lines.
MCMC_walkers: int
This is the number of walkers for the MCMC. Default = 100.
MCMC_iterations: int
This is the number of iterations for the MCMC. Default = 200.
MCMC_burn_in: int
The MCMC burn-in is the initial phase of the simulation where the early iterations are discarded. Default = 100.
MCMC_show_progress: boolean
If True, the progess bars for the MCMC are shown. Default = True.
plot_spec: boolean
If True, a plot of the spectrum with the lines requested in the input variable overplot_archival_lines will be shown.
plot_MCMC: boolean
If True, a series of diagnostic plots for the MCMC will be shown, as the corner plot, the evolution of the parameters over time, and the line fitted to
the data.
overplot_median_model: boolean
If True, the median model and model standard deviation will be overploted in the diagnostic plots.
save_results: boolean
If True, the plots and fit information will be saved in the output directory defined in the function analysis.load_calibrated_data()
Returns
-------
par_values_list: list
This is a list containing sublists with the best-fit values for each line model.
par_values_errors_list: list
A list with the asymmetrical errors for each parameter listed in par_values_list.
par_names_list: list
A list with the names of all the parameters used in each line model.
samples_list: list
A list containing the MCMC posterior distributions.
line_windows: list
A list containing the wavelength intervals around each line.
XXXXXXX_lines.csv:
Optional. This file contains all the best-fit parameters for each line model and is saved in the output directory defined in the
function analysis.load_calibrated_data(). "XXXXXXX" here stands for the target's name.
"""
# Validate and preprocess input parameters
aux_validate_line_names(line_names)
# Convert all inputs to consistent numpy arrays with proper shape handling
wavelength_peak_positions = aux_ensure_numpy_array(wavelength_peak_positions, 'wavelength_peak_positions')
peak_heights = aux_ensure_numpy_array(peak_heights, 'peak_heights')
line_std_deviation = aux_ensure_numpy_array(line_std_deviation, 'line_std_deviation')
rest_frame_line_wavelengths = aux_ensure_numpy_array(rest_frame_line_wavelengths, 'rest_frame_line_wavelengths')
# Validate and expand model specifications
which_models = aux_validate_models(which_models, n_lines = len(rest_frame_line_wavelengths))
copy_which_models = which_models.copy()
# Generate default line names if not provided
line_names = aux_generate_line_names(line_names, len(rest_frame_line_wavelengths))
self.line_names = line_names
# Calculate continuum-subtracted peak heights
peak_heights = self.calculate_continuum_subtracted_heights(wavelengths, wavelength_peak_positions, peak_heights, continuum_baseline)
# Calculate normalization factor for the spectrum
local_normalization = 10**round(np.log10(np.median(flux_density.value - 0.9 * continuum_baseline)))
# Initialize default priors if none provided
if priors is None:
priors = [None] * len(wavelength_peak_positions)
# Here we identify the blended lines:
peak_distances = np.diff(wavelength_peak_positions)
blended_line = [False] # The first line will never be blended with a precedent line
for distance in peak_distances:
if distance < blended_line_min_separation:
blended_line.append(True)
else:
blended_line.append(False)
blended_line.append(False) # We append a last element to the blended_line list to guarantee that our list can go up to number+1 and the last line can be fitted.
par_values_list, par_values_errors_list, par_names_list, samples_list, line_windows = [], [], [], [], []
line_region_min_cache, initial_cache, p0_cache, priors_cache, labels_cache = [], [], [], [], []
for number, peak_height in enumerate(peak_heights):
continuum_subtracted_flux = (flux_density.value - continuum_baseline)/local_normalization
peak_height = peak_height/local_normalization
local_line_std_deviation = line_std_deviation[number]/local_normalization
if priors is None or priors[number] is None:
# Automatic priors: calculate line region from peak positions
line_region_min, line_region_max = aux_calculate_line_region(wavelength_peak_positions, number, len(rest_frame_line_wavelengths))
line_region_min_cache.append(line_region_min)
initial, local_priors, labels, adopted_model = self.automatic_priors(copy_which_models[number], wavelength_peak_positions[number],
peak_height, line_region_min, line_region_max)
else:
# In the case of a single-line analysis, if the user inputs priors=[[7500,7700],[0.1],[2,50]] instead of priors=[ [[7500,7700],[0.1],[2,50]] ], the analysis will work anyway.
if np.isscalar(priors[number][0]) or isinstance(priors[number][0], (int, float)):
priors = [priors]
# User-provided priors - get basic values first
initial, local_priors, line_region_min, line_region_max = aux_parse_custom_priors(priors[number], wavelength_peak_positions[number], peak_height)
# Then call automatic_priors separately
_, _, labels, adopted_model = self.automatic_priors(copy_which_models[number], wavelength_peak_positions[number], peak_height,
line_region_min=None, line_region_max=None)
# Reseting wavelength windows for blended lines:
line_region_min, line_region_max = aux_resetting_wavelength_windows(number,rest_frame_line_wavelengths,priors,blended_line,wavelength_peak_positions,line_region_min_cache, line_region_min, line_region_max)
line_windows.append([line_region_min, line_region_max])
x,y,yerr = self.data_window_selection(wavelengths.value, continuum_subtracted_flux, local_line_std_deviation*np.ones(len(wavelengths.value)), line_region_min, line_region_max)
data = (x, y, yerr)
p0 = [np.array(initial) + 0.01 * np.random.randn(len(initial)) for i in range(MCMC_walkers)] # p0 is the methodology of stepping from one place on a grid to the next.
initial_cache.append(initial)
p0_cache.append(p0)
priors_cache.append(local_priors)
labels_cache.append(labels)
if blended_line[number] is False:
blended_line_names = []
blended_rest_frame_line_wavelengths = []
blended_labels = []
if blended_line[number+1] is False:
sampler, _, _, _ = self.line_MCMC(p0, local_priors, MCMC_walkers, MCMC_iterations, initial, self.lnprob, data, copy_which_models[number], burn_in=MCMC_burn_in, show_progress=MCMC_show_progress)
samples = sampler.flatchain
samples_list.append(samples)
theta_max = samples[np.argmax(sampler.flatlnprobability)]
par_values, par_values_errors, par_names = self.parameter_estimation(samples, air_wavelength_line = rest_frame_line_wavelengths[number],
normalization=local_normalization, parlabels = list.copy(labels), line_names=line_names[number],
output_dir = self.output_dir, savefile=save_results)
par_values_list.append(par_values)
par_values_errors_list.append(par_values_errors)
par_names_list.append(par_names)
x_best_fit = np.linspace(x.min(),x.max(),1000)
best_fit_model = adopted_model(theta_max, x_best_fit)
if plot_MCMC:
corner.corner(
samples,
show_titles=True,
labels=labels,
plot_datapoints=True,
quantiles=[0.16, 0.5, 0.84],
)
plt.suptitle(line_names[number]+"\n(normalized amplitude)", x=0.7)
self.parameter_time_series(initial, sampler, labels)
self.quick_plot(x,y,model_name=copy_which_models[number], parlabels=labels, sampler=sampler,best_fit_model=best_fit_model,theta_max=theta_max, normalization=local_normalization,
hair_color="grey",title=self.target_name+" - "+line_names[number], ylabel="F$_{\lambda} - F_{continuum}$ ["+f"{flux_density.unit}]", overplot_median_model=overplot_median_model, outputdir = self.output_dir)
else:
blended_line_names.append(line_names[number])
blended_rest_frame_line_wavelengths.append(rest_frame_line_wavelengths[number])
blended_labels.append(labels_cache[number])
continue # If the next line is blended, we skip this step of the loop
elif blended_line[number] is True:
copy_which_models[number] = copy_which_models[number-1] + copy_which_models[number]
initial_cache[number] = np.concatenate([initial_cache[number-1], initial_cache[number]])
p0_cache[number] = np.concatenate([p0_cache[number-1], p0_cache[number]],axis=1)
priors_cache[number] = np.concatenate([priors_cache[number-1], priors_cache[number]])
blended_labels.append(labels_cache[number]) # = np.concatenate([[labels_cache[number-1]], [labels_cache[number]]])
blended_rest_frame_line_wavelengths.append(rest_frame_line_wavelengths[number])
blended_line_names.append(line_names[number])
if blended_line[number+1] is False:
p0 = p0_cache[number]
initial = initial_cache[number]
local_priors = priors_cache[number]
labels = blended_labels #list(labels_cache[number])
sampler, _, _, _ = self.line_MCMC(p0, local_priors, MCMC_walkers, MCMC_iterations, initial, self.lnprob, data, copy_which_models[number], burn_in=MCMC_burn_in, show_progress=MCMC_show_progress)
samples = sampler.flatchain
samples_list.append(samples)
theta_max = samples[np.argmax(sampler.flatlnprobability)]
par_values, par_values_errors, par_names = self.parameter_estimation(samples, air_wavelength_line = blended_rest_frame_line_wavelengths,
normalization=local_normalization, parlabels = list.copy(labels), line_names=blended_line_names,
output_dir = self.output_dir, savefile=save_results)
par_names.append("redshift") # This is just to facilitate the loop below.
redshift_index = np.where(np.asarray(par_names)=="redshift")[0]
for i in range(len(redshift_index)-1):
par_values_list.append(par_values[redshift_index[i]:redshift_index[i+1]])
par_values_errors_list.append(par_values_errors[redshift_index[i]:redshift_index[i+1]])
par_names_list.append(par_names[redshift_index[i]:redshift_index[i+1]])
x_best_fit = np.linspace(x.min(),x.max(),1000)
adopted_model = self.all_models(copy_which_models[number])
best_fit_model = adopted_model(theta_max, x_best_fit)
labels_corner = np.asarray([])
line_names_corner = ""
for i,j in zip(labels,blended_line_names):
labels_corner = np.concatenate([labels_corner,i])
if line_names_corner == "":
line_names_corner = line_names_corner+j
else:
line_names_corner = line_names_corner+"+"+j
if plot_MCMC:
corner.corner(
samples,
show_titles=True,
labels=labels_corner,
plot_datapoints=True,
quantiles=[0.16, 0.5, 0.84],
)
plt.suptitle(line_names_corner+"\n(normalized amplitude)", x=0.7)
self.parameter_time_series(initial, sampler, labels_corner)
self.quick_plot(x,y,model_name=copy_which_models[number], parlabels = labels_corner,sampler=sampler,best_fit_model=best_fit_model,theta_max=theta_max, normalization=local_normalization,
hair_color="grey",title=self.target_name+" - "+line_names_corner, ylabel="F$_{\lambda} - F_{continuum}$ ["+f"{flux_density.unit}]",overplot_median_model=overplot_median_model, outputdir = self.output_dir)
else:
continue
redshifts = []
for parameters in par_values_list:
redshifts.append(parameters[0])
redshifts = np.asarray(redshifts)
average_redshift = np.average(redshifts)
std_redshift = np.std(redshifts)
if plot_spec:
if overplot_archival_lines is not None:
print("Archival lines are taken from NIST Atomic Spectra database: https://www.nist.gov/pml/atomic-spectra-database")
print("We adopt vacuum wavelengths for lines with wavelengths < 2000 Angstroms and air wavelengths for lines with wavelengths > 2000 Angstroms.")
print("If you use these lines in your research, please cite the NIST Atomic Spectra database appropriately.")
archival_lines = np.loadtxt(str(libpath)+"/astro_lines.dat",dtype=str,delimiter=",")
archival_wavelengths = archival_lines[:,0].astype(float)
archival_wavelengths = archival_wavelengths*average_redshift + archival_wavelengths # correcting for the redshift
archival_line_names = archival_lines[:,1]
index = np.where((archival_wavelengths > wavelengths.value.min()) & (archival_wavelengths < wavelengths.value.max()))[0] # Index to select the lines within our wavelength range
archival_wavelengths = archival_wavelengths[index]
archival_line_names = archival_line_names[index]
index = [] # Index to select only the desired elements
for n,element in enumerate(archival_line_names):
element_string = element.split("_")[0]
if element_string[0] == "[":
element_string = element_string[1:]
if element_string in overplot_archival_lines:
split_name = archival_line_names[n].split("_")
if len(split_name) == 3:
archival_line_names[n] = split_name[0]+" "+split_name[1]+fr"$\{split_name[2]}$"
else:
archival_line_names[n] = split_name[0]+split_name[1]
index.append(n)
archival_wavelengths = archival_wavelengths[index]
archival_line_names = archival_line_names[index]
plt.figure(figsize=(12,5))
if overplot_archival_lines is not None:
for number, line in enumerate(archival_wavelengths):
text_line_index = extraction.find_nearest(wavelengths.value, line)
if archival_line_names[number][0:2] == "H " or archival_line_names[number][0:2] == "He":
color = "C0"
step = 1.1
elif archival_line_names[number][0] == "O" or archival_line_names[number][0:2] == "[O":
color = "C2"
step = 1.22
elif archival_line_names[number][0] == "N" or archival_line_names[number][0:2] == "[N":
color = "C3"
step = 1.4
elif archival_line_names[number][0] == "S" or archival_line_names[number][0:2] == "[S":
color = "C4"
step = 1.6
else:
color = "black"
step = 1.1
plt.text(line, step*flux_density.value.max(), archival_line_names[number],rotation=90,fontsize=10,color=color, horizontalalignment="center", verticalalignment="bottom")
plt.vlines(line, flux_density.value[text_line_index], 0.98*step*flux_density.value.max(), color=color, linewidth=0.8,alpha=0.5)
plt.plot(wavelengths,continuum_baseline,label="Continuum")
plt.plot(wavelengths, flux_density, color='orange')
plt.minorticks_on()
plt.grid(which="both",linestyle=":")
plt.xlim(wavelengths.value.min(),wavelengths.value.max())
plt.ylim(0,flux_density.value.max()*2)
plt.ylabel("F$_{\lambda}$ "+f"[{flux_density.unit}]",fontsize=12)
plt.xlabel(f"Observed $\lambda$ [${wavelengths.unit}$]",fontsize=12)
plt.title(self.target_name+" - $z_{av} = $"+f"{round(average_redshift,7)}, $\sigma_z = {round(std_redshift,7)}$")
plt.legend()
plt.tight_layout()
if save_results:
plt.savefig(self.output_dir+f"/{self.target_name}_spec.pdf",bbox_inches='tight')
if save_results:
self.merge_fit_results(self.target_name,list_of_files=None,output_dir=self.output_dir)
list_of_files = glob.glob(self.output_dir+"/*_line_fit_results.csv")
for file in list_of_files:
os.remove(file)
if plot_spec or plot_MCMC:
plt.show()
return par_values_list, par_values_errors_list, par_names_list, samples_list, line_windows
[docs] def line_dispersion_and_equiv_width(self, wavelengths_window, flux_density_window, continuum_baseline_window, line_fit_parameters, line_std_deviation, line_name = None, plot = True):
"""
This function estimates the line dispersion (aka the rms width of the line), the profile equivalent width, and the FWHM. All measures are independent
on the line model (i.e. Gaussian, Lorentz, Voigt, or skewed models) and dependent on the interpolated line profile.
Parameters
----------
wavelengths_window: numpy.ndarray
The wavelength window for the given line.
flux_density_window: numpy.ndarray
The flux density window for the given line.
continuum_baseline_window: numpy.ndarray
An array with the continuum density flux window for the given line. Standard easyspec units are in erg/cm2/s/A.
line_fit_parameters: list
This is the list with the best-fit values for a given line model. We use these fitted values here just to estimate the integration limits.
line_std_deviation: numpy.ndarray (float)
The standard deviation for the local continuum. This variable is an output of the function analysis.find_lines().
line_name: string
The line name.
plot: boolean
If True, the line and corresponding centroid and dispersion will be showed.
Returns
-------
line_dispersion: float
The line dispersion is well defined for arbitrary line profiles. See Eqs. 4 and 5 in Peterson et al. 2004 for some guidance.
You should avoid using the line dispersion in case of blended lines. The result here is given in the observed frame.
profile_equiv_width: float
This is a model-independent estimate of the equivalent width and can be used with arbitrary line profiles.
This is the integral of (Fc - F)/Fc over the wavelengths, where Fc is the continuum flux density and F is the interpolated flux density.
The result is given in the observed frame.
profile_FWHM: float
The model-independent FWHM.
line_disp_error: float
The error on the line dispersion computed with a Monte Carlo simulation.
equiv_width_error: float
The error on the equivalent width computed with a Monte Carlo simulation.
fwhm_error: float
The error on the FWHM computed with a Monte Carlo simulation.
"""
warnings.filterwarnings('ignore')
def lambda_P(wavelengths_window,tck):
return interpolate.splev(wavelengths_window, tck)*wavelengths_window
def lambda2_P(wavelengths_window,tck):
return interpolate.splev(wavelengths_window, tck)*(wavelengths_window-first_moment)**2
def P(wavelengths_window,tck):
return interpolate.splev(wavelengths_window, tck)
def equiv_width(wavelengths_window,tck2):
return interpolate.splev(wavelengths_window, tck2)
tck = interpolate.splrep(wavelengths_window, (flux_density_window-continuum_baseline_window), k=1)
integrand_EQW = (continuum_baseline_window - flux_density_window)/continuum_baseline_window
tck2 = interpolate.splrep(wavelengths_window, integrand_EQW, k=1)
line_function = interpolate.splev(wavelengths_window, tck)
line_function_positive = np.sqrt(line_function**2) # We square the profile and take the squareroot such that we can also work with absorption lines
integration_limit_0 = line_fit_parameters[1]-line_fit_parameters[3]/2 # The first integration limit is the fitted line peak minus the FWHM/2.
index_limit_0 = extraction.find_nearest(wavelengths_window, integration_limit_0)
for n,flux_bin in enumerate(np.minimum.accumulate(np.flip(line_function_positive[:index_limit_0]))): # np.minimum.accumulate gives us the accumulated minimum value for each element: the next element will always be equal or smaller than the previous one.
if flux_bin > 2*line_std_deviation and flux_bin <= line_function_positive[index_limit_0]:
if n < (index_limit_0-2): # This condition is necessary such that we don't take e.g. wavelengths_window[-1] (negative index!)
try:
integration_limit_0 = wavelengths_window[index_limit_0-n-2]
except:
break
else:
break
else:
break
integration_limit_1 = line_fit_parameters[1]+line_fit_parameters[3]/2 # The first integration limit is the fitted line peak plus the FWHM/2.
index_limit_1 = extraction.find_nearest(wavelengths_window, integration_limit_1)
for n,flux_bin in enumerate(np.minimum.accumulate(line_function_positive[index_limit_1:])): # np.minimum.accumulate gives us the accumulated minimum value for each element: the next element will always be equal or smaller than the previous one.
if flux_bin > 2*line_std_deviation and flux_bin <= line_function_positive[index_limit_1]:
try:
integration_limit_1 = wavelengths_window[index_limit_1+n+2]
except:
break
else:
break
self.integration_limits_dispersion = [integration_limit_0,integration_limit_1]
numerator = quad(lambda_P, integration_limit_0, integration_limit_1,args=(tck,))[0]
denominator = quad(P, integration_limit_0, integration_limit_1,args=(tck,))[0]
first_moment = numerator/denominator
second_moment = quad(lambda2_P, integration_limit_0, integration_limit_1,args=(tck,))[0]/denominator
line_dispersion = np.sqrt(second_moment)
profile_equiv_width = quad(equiv_width, integration_limit_0, integration_limit_1,args=(tck2,))[0]
interpol_wavelenghts = np.linspace(integration_limit_0,integration_limit_1,1000)
interpol_density_flux = interpolate.splev(interpol_wavelenghts, tck)
if numerator > 0:
line_peak_value = interpol_density_flux.max()
else:
line_peak_value = interpol_density_flux.min() # in case of absorption lines.
max_index = extraction.find_nearest(interpol_density_flux, line_peak_value)
max_index_for_error = extraction.find_nearest((flux_density_window-continuum_baseline_window), line_peak_value) # This is used in the loop a few lines below.
interpol_density_flux_positive = np.sqrt(interpol_density_flux**2)
lambda_0_index = extraction.find_nearest(np.minimum.accumulate(np.flip(interpol_density_flux_positive[0:max_index])), line_peak_value/2)
lambda_0_index = max_index - lambda_0_index
lambda_1_index = extraction.find_nearest(np.minimum.accumulate(interpol_density_flux_positive[max_index:]), line_peak_value/2) + max_index
profile_FWHM = np.abs(interpol_wavelenghts[lambda_1_index]-interpol_wavelenghts[lambda_0_index])
# Error estimate:
equiv_width_error = []
line_disp_error = []
fwhm_error = []
median_profile = medfilt(flux_density_window-continuum_baseline_window, 7)
std_line = np.std((median_profile-P(wavelengths_window,tck))[:int(len(median_profile)/4)] + (median_profile-P(wavelengths_window,tck))[-int(len(median_profile)/4):])
for n in range(100):
noise = np.random.uniform(size=len(flux_density_window))*std_line
noisy_density_flux = flux_density_window + noise
integrand_EQW = (continuum_baseline_window - noisy_density_flux)/continuum_baseline_window
tck3 = interpolate.splrep(wavelengths_window, integrand_EQW, k=1)
local_EQW = quad(equiv_width, integration_limit_0, integration_limit_1,args=(tck3,))[0]
equiv_width_error.append(local_EQW)
tck4 = interpolate.splrep(wavelengths_window, (noisy_density_flux - continuum_baseline_window), k=1)
line_disp_error.append(np.sqrt(quad(lambda2_P, integration_limit_0, integration_limit_1,args=(tck4,))[0]/denominator))
noisy_density_flux_positive = np.sqrt(noisy_density_flux**2)
noisy_density_flux_cumulative0 = np.minimum.accumulate(np.flip(noisy_density_flux_positive[0:max_index_for_error]-continuum_baseline_window[0:max_index_for_error]))
noisy_density_flux_cumulative1 = np.minimum.accumulate(noisy_density_flux_positive[max_index_for_error:]-continuum_baseline_window[max_index_for_error:])
lambda_0_index_error = extraction.find_nearest(noisy_density_flux_cumulative0, line_peak_value/2)
lambda_0_index_error = max_index_for_error - lambda_0_index_error
lambda_1_index_error = extraction.find_nearest(noisy_density_flux_cumulative1, line_peak_value/2) + max_index_for_error
fwhm_error.append(np.abs(wavelengths_window[lambda_1_index_error]-wavelengths_window[lambda_0_index_error]))
equiv_width_error = np.std(np.asarray(equiv_width_error))
line_disp_error = np.std(np.asarray(line_disp_error))
fwhm_error = np.std(np.asarray(fwhm_error))
if plot:
plt.figure(figsize=(12,4))
_, ax = plt.subplots(figsize=(12,4))
plt.plot(wavelengths_window, interpolate.splev(wavelengths_window, tck), color = "C1")
plt.plot(wavelengths_window,np.zeros(len(wavelengths_window)),color="black")
plt.ylabel(r"Flux density - continuum [erg/cm$^2$/s/$\AA$]")
plt.xlabel(r"Observed $\lambda$ [$\AA$]")
plt.grid(which="both",linestyle="dotted")
if numerator > 0:
plt.vlines(first_moment,0,(flux_density_window-continuum_baseline_window).max(),colors="C0",linestyles=":",label="centroid")
try:
plt.fill_betweenx([0,(flux_density_window-continuum_baseline_window).max()],first_moment-line_dispersion,first_moment+line_dispersion, color="C0", alpha=0.3, label=r"$\lambda_0 \pm \sigma_{rms}$")
except:
print(f"Plotting error: first or second line moment is probably NaN. First moment: {first_moment}, second moment: {line_dispersion}")
plt.plot([interpol_wavelenghts[lambda_0_index],interpol_wavelenghts[lambda_1_index]],[interpol_density_flux.max()/2,interpol_density_flux.max()/2],color="black")
plt.text((interpol_wavelenghts[lambda_0_index]+interpol_wavelenghts[lambda_1_index])/2,1.05*interpol_density_flux.max()/2,"FWHM",horizontalalignment='center',)
else:
plt.vlines(first_moment,0,(flux_density_window-continuum_baseline_window).min(),colors="C0",linestyles=":",label="centroid")
try:
plt.fill_betweenx([0,(flux_density_window-continuum_baseline_window).min()],first_moment-line_dispersion,first_moment+line_dispersion, color="C0", alpha=0.3, label=r"$\lambda_0 \pm \sigma_{rms}$")
except:
print(f"Plotting error: first or second line moment is probably NaN. First moment: {first_moment}, second moment: {line_dispersion}")
plt.plot([interpol_wavelenghts[lambda_0_index],interpol_wavelenghts[lambda_1_index]],[interpol_density_flux.min()/2,interpol_density_flux.min()/2],color="black")
plt.text((interpol_wavelenghts[lambda_0_index]+interpol_wavelenghts[lambda_1_index])/2,0.90*interpol_density_flux.min()/2,"FWHM",horizontalalignment='center',)
plt.scatter(interpol_wavelenghts[lambda_0_index],interpol_density_flux[lambda_0_index],color="black")
plt.scatter(interpol_wavelenghts[lambda_1_index],interpol_density_flux[lambda_1_index],color="black")
plt.text(0.05,0.8,"Not intended for\nblended lines!",color="red",transform = ax.transAxes)
plt.legend()
if line_name is not None:
plt.title(line_name+" - Model-independent estimate for $\sigma_{rms}$ (observed frame)")
return line_dispersion, profile_equiv_width, profile_FWHM, line_disp_error, equiv_width_error, fwhm_error
[docs] def line_dispersion_skewed_models(self, par_values, theta_error, model="Lorentz"):
"""
This function estimates the line dispersion (aka the rms width of the line) and the FWHM for the skewed line models.
Parameters
----------
par_values: list
Array with the best-fit line parameters.
theta_error: numpy.ndarray
Array with the asymmetrical errors for each parameter.
model: string
The type of skewed function. Options are "Gauss" or "Lorentz".
Returns
-------
line_dispersion: float
The line dispersion is well defined for skewed line profiles. The result here is given in the observed frame.
model_FWHM: float
The skewed model FWHM.
line_disp_error: float
The error on the line dispersion computed with a Monte Carlo approach.
fwhm_error: float
The error on the FWHM computed with a Monte Carlo approach.
"""
theta = np.asarray(par_values[1:])
if model == "Lorentz":
#theta[2] = theta[2]/2
#theta_error[2] = theta_error[2]/2
def lambda_P(wavelengths_window,theta):
return model_skewed_lorentzian(theta,wavelengths_window)*wavelengths_window
def lambda2_P(wavelengths_window,theta):
return model_skewed_lorentzian(theta,wavelengths_window)*(wavelengths_window-first_moment)**2
def Intensity(wavelengths_window,theta):
return model_skewed_lorentzian(theta,wavelengths_window)
else:
#theta[2] = theta[2]/(2*np.sqrt(2 * np.log(2))) # Gaussian component FWHM to sigma
#theta_error[2] = theta_error[2]/(2*np.sqrt(2 * np.log(2)))
def lambda_P(wavelengths_window,theta):
return model_skewed_gaussian(theta,wavelengths_window)*wavelengths_window
def lambda2_P(wavelengths_window,theta):
return model_skewed_gaussian(theta,wavelengths_window)*(wavelengths_window-first_moment)**2
def Intensity(wavelengths_window,theta):
return model_skewed_gaussian(theta,wavelengths_window)
Normalization_factor_for_integration = 10**round(np.log10(theta[1]))
theta[1] = theta[1]/Normalization_factor_for_integration
x = np.linspace(self.integration_limits_dispersion[0],self.integration_limits_dispersion[1]) # We use the same integration limits adopted in the function line_dispersion_and_equiv_width()
numerator = np.trapz(lambda_P(x,theta), x)*Normalization_factor_for_integration
denominator = np.trapz(Intensity(x,theta), x)*Normalization_factor_for_integration
first_moment = numerator/denominator
second_moment = np.trapz(lambda2_P(x,theta), x)*Normalization_factor_for_integration/denominator
line_dispersion = np.sqrt(second_moment)
wavelength_range_0 = np.linspace(theta[0]-10*theta[2],theta[0],1000)
wavelength_range_1 = np.linspace(theta[0],theta[0]+10*theta[2],1000)
lambda_0_index = extraction.find_nearest(Intensity(wavelength_range_0,theta), theta[1]/2)
lambda_1_index = extraction.find_nearest(Intensity(wavelength_range_1,theta), theta[1]/2)
model_FWHM = np.abs(wavelength_range_1[lambda_1_index]-wavelength_range_0[lambda_0_index])
# Error estimate:
theta_error = np.mean(theta_error,axis=1)
line_disp_error = []
fwhm_error = []
for n in range(100):
noise = np.random.normal(scale=theta_error,size=len(theta))
noisy_theta = theta+noise
wavelength_range_0 = np.linspace(noisy_theta[0]-10*noisy_theta[2],noisy_theta[0],1000)
wavelength_range_1 = np.linspace(noisy_theta[0],noisy_theta[0]+10*noisy_theta[2],1000)
lambda_0_index = extraction.find_nearest(Intensity(wavelength_range_0,noisy_theta), noisy_theta[1]/2)
lambda_1_index = extraction.find_nearest(Intensity(wavelength_range_1,noisy_theta), noisy_theta[1]/2)
fwhm_error.append(np.abs(wavelength_range_1[lambda_1_index]-wavelength_range_0[lambda_0_index]))
denominator = np.trapz(Intensity(x,noisy_theta), x)*Normalization_factor_for_integration
second_moment = np.trapz(lambda2_P(x,noisy_theta), x)*Normalization_factor_for_integration/denominator
line_disp_error.append(np.sqrt(second_moment))
line_disp_error = np.nanstd(np.asarray(line_disp_error))
fwhm_error = np.nanstd(np.asarray(fwhm_error))
return line_dispersion, model_FWHM, line_disp_error, fwhm_error
[docs] def error_propagation_voigt(self,FWHM_Lorentz,FWHM_Gauss,error_lorentz,error_gauss):
"""
In this function we propagate the FWHM error for the Voigt model assuming independent variables.
Parameters
----------
FWHM_Lorentz: float
The Lorentzian component FWHM.
FWHM_Gauss: float
The Gaussian component FWHM.
error_lorentz: float
The associated error to the Lorentzian FWHM.
error_gauss: float
The associated error to the Gaussian FWHM.
Returns
-------
fwhm_error_voigt: float
The propagated uncertainty in FWHM for the Voigt model.
"""
fwhm_error_voigt = np.sqrt( (0.5346 + 0.5*0.2166*2*FWHM_Lorentz/np.sqrt(0.2166*FWHM_Lorentz**2 + FWHM_Gauss**2))*error_lorentz**2 + (FWHM_Gauss*error_gauss/np.sqrt(0.2166*FWHM_Lorentz**2 + FWHM_Gauss**2))**2 )
return fwhm_error_voigt
[docs] def equiv_width_error(self,integrand,integration_limits,line_parameters,line_par_errors, n_samples=100, n_points=300):
"""
In this function, we estimate the modeled equivalent width error for a line based on a Monte Carlo simulation.
Parameters
----------
integrand: function
The function to be integrated. See the details here: https://en.wikipedia.org/wiki/Equivalent_width
integration_limits: list
A list containing the wavelength limits around the line.
line_parameters: list
A list with the line best-fit parameters obtained with the MCMC method.
line_par_errors: list
A list with the asymmetrical parameter errors.
n_samples: int
Number of MC iterations.
n_points: int
Resolution of the wavelength axis.
Returns
-------
EQW_error_down: float
The equivalent width lower error for a modeled line.
EQW_error_up: float
The equivalent width upper error for a modeled line.
"""
line_parameters = np.asarray(line_parameters)
line_par_errors = np.asarray(line_par_errors)
# Create wavelength grid
lam = np.linspace(integration_limits[0], integration_limits[1], n_points)
# Generate random samples for lower and upper error bounds
rand = np.random.uniform(size=(n_samples, len(line_parameters)))
params_down = line_parameters + rand * line_par_errors[:, 0]
params_up = line_parameters + rand * line_par_errors[:, 1]
# Evaluate integrand and integrate using the trapezoidal rule
eqw_down = np.array([
np.trapz(integrand(lam, p), lam)
for p in params_down
])
eqw_up = np.array([
np.trapz(integrand(lam, p), lam)
for p in params_up
])
# Return standard deviations as uncertainties
EQW_error_down = float(np.std(eqw_down))
EQW_error_up = float(np.std(eqw_up))
return EQW_error_down, EQW_error_up
[docs] def line_physics(self, wavelengths, flux_density, continuum_baseline, par_values_list, par_values_errors_list, par_names_list, line_windows, line_std_deviation, sistemic_redshift = None, plot = True, save_file = True):
"""
In this function, we compute several physical properties for the fitted lines.
OBS 1: For the Voigt model, we assume independent variables when propagating the FWHM error. If this is not the case (you can check this in the MCMC
corner plots), we recommend that you use the Gaussian or Lorentzian models.
OBS 2: The integrated flux is computed by taking the equivalent width and multiplying it by the continuum value at the line center.
OBS 3: The line dispersion is well defined for arbitrary line profiles. See e.g. Eqs. 4 and 5 in Peterson et al. 2004. This method is not recommended in case of blended lines.
Parameters
----------
wavelengths: numpy.ndarray (astropy.units Angstrom)
The wavelength solution for the given spectrum.
flux_density: numpy.ndarray (astropy.units erg/cm2/s/A)
The calibrated spectrum in flux density.
continuum_baseline: numpy.ndarray (float)
An array with the continuum density flux. Standard easyspec units are in erg/cm2/s/A. This variable is an output of the function analysis.find_lines().
par_values_list: list
This is a list containing sublists with the best-fit values for each line model.
par_values_errors_list: list
A list with the asymmetrical errors for each parameter listed in par_values_list.
par_names_list: list
A list with the names of all the parameters used in each line model.
line_windows: list
A list containing the wavelength intervals around each line.
line_std_deviation: numpy.ndarray (float)
The standard deviation for the local continuum. This variable is an output of the function analysis.find_lines().
sistemic_redshift: float
If the sistemic redshift is given, it will be used to calculate all the output of this function. Otherwise, the function will use the
redshift of each individual line. Default value is sistemic_redshift = None.
plot: boolean
If True, the line and corresponding centroid and dispersion will be showed.
save_file: boolean
If True, the results of this function will be saved in a .csv file in the output directory defined at analysis.load_calibrated_data().
Returns
-------
profile_equiv_width_rest_frame: list (astropy.units Angstrom)
A list with the equivalent widths (and respective errors) for each line and their respective errors in the rest frame. The values here are obtained with the
interpolation of the line profile and are therefore independent on the line model (i.e. Gaussian, Lorentzian or Voigt).
modeled_equiv_width_rest_frame: list (astropy.units Angstrom)
A list with the model-dependent equivalent widths (and respective errors) for each line and their respective errors in the rest frame.
profile_integrated_flux: list (astropy.units erg/cm2/s)
A list with the model-independent line fluxes and their respective errors.
modeled_integrated_flux: list (astropy.units erg/cm2/s)
A list with the model-dependent line fluxes and their respective errors.
profile_rest_frame_disp_velocity: list (astropy.units km/s)
A list with the model-independent dispersion velocities and their respective errors in the rest frame.
modeled_rest_frame_disp_velocity: list (astropy.units km/s)
A list with the model-dependent dispersion velocities and their respective errors in the rest frame.
profile_line_dispersion_rest_frame: list (astropy.units Angstrom)
A list with the line dispersions (from the line second moments) and their respective errors in the rest frame.
profile_rest_frame_fwhm: list (astropy.units Angstrom)
A list with the model-independent line FWHMs and their respective errors in the rest frame.
modeled_rest_frame_fhwm: list (astropy.units Angstrom)
A list with the model-dependent line FWHMs and their respective errors in the rest frame.
XXXXXXX_line_physics.csv
Optional. This file contains all physics parameters for each modeled line and is saved in the output directory defined in the
function analysis.load_calibrated_data(). "XXXXXXX" here stands for the target's name.
"""
try:
line_names = self.line_names
except:
line_names = [""]*len(par_values_list)
modeled_equiv_width_rest_frame = []
profile_equiv_width_rest_frame = []
profile_line_dispersion_rest_frame = []
modeled_integrated_flux = []
profile_integrated_flux = []
modeled_rest_frame_disp_velocity = []
profile_rest_frame_disp_velocity = []
modeled_rest_frame_fhwm = []
profile_fwhm = []
for number,line_parameters in enumerate(par_values_list):
# taking the redshift and local continuum:
if sistemic_redshift is None:
z = line_parameters[0]
else:
z = sistemic_redshift
peak_position = line_parameters[1]
continuum_index = extraction.find_nearest(wavelengths.value, peak_position)
# Computing the equivalent width directly from the data:
window_indexes = np.where((wavelengths.value > line_windows[number][0]) & (wavelengths.value < line_windows[number][1]))[0]
flux_density_window = flux_density[window_indexes]
continuum_baseline_window = continuum_baseline[window_indexes]
wavelengths_window = wavelengths[window_indexes]
line_dispersion, profile_equiv_width, profile_FWHM, line_disp_error, equiv_width_error, fwhm_error = self.line_dispersion_and_equiv_width(wavelengths_window.value, flux_density_window.value, continuum_baseline_window, line_fit_parameters = par_values_list[number], line_std_deviation = line_std_deviation[number], line_name = line_names[number], plot = plot)
profile_disp_vel_error_down = c.to("km/s").value*np.sqrt( line_disp_error*2/(peak_position**2) + (line_dispersion*par_values_errors_list[number][1][0]/(peak_position**2))**2 )
profile_disp_vel_error_up = c.to("km/s").value*np.sqrt( line_disp_error*2/(peak_position**2) + (line_dispersion*par_values_errors_list[number][1][1]/(peak_position**2))**2 )
profile_rest_frame_disp_velocity.append([c.to("km/s").value*line_dispersion/peak_position, profile_disp_vel_error_down, profile_disp_vel_error_up])
profile_integrated_flux.append([-profile_equiv_width*continuum_baseline[continuum_index], equiv_width_error*continuum_baseline[continuum_index], equiv_width_error*continuum_baseline[continuum_index]])
line_dispersion_err_down = np.sqrt( (line_disp_error/(1+z))**2 + (line_dispersion*par_values_errors_list[number][0][0]/((1+z)**2))**2 )
line_dispersion_err_up = np.sqrt( (line_disp_error/(1+z))**2 + (line_dispersion*par_values_errors_list[number][0][1]/((1+z)**2))**2 )
line_dispersion = line_dispersion/(1+z)
profile_equiv_width_err_down = np.sqrt( (equiv_width_error/(1+z))**2 + (profile_equiv_width*par_values_errors_list[number][0][0]/((1+z)**2))**2 )
profile_equiv_width_err_up = np.sqrt( (equiv_width_error/(1+z))**2 + (profile_equiv_width*par_values_errors_list[number][0][1]/((1+z)**2))**2 )
profile_equiv_width = profile_equiv_width/(1+z)
profile_FWHM_error_down = np.sqrt( (fwhm_error/(1+z))**2 + (profile_FWHM*par_values_errors_list[number][0][0]/((1+z)**2))**2 )
profile_FWHM_error_up = np.sqrt( (fwhm_error/(1+z))**2 + (profile_FWHM*par_values_errors_list[number][0][1]/((1+z)**2))**2 )
profile_FWHM = profile_FWHM/(1+z)
profile_equiv_width_rest_frame.append([profile_equiv_width,profile_equiv_width_err_down, profile_equiv_width_err_up])
profile_line_dispersion_rest_frame.append([line_dispersion,line_dispersion_err_down, line_dispersion_err_up])
profile_fwhm.append([profile_FWHM, profile_FWHM_error_down, profile_FWHM_error_up])
integration_limits = self.integration_limits_dispersion
# Computing the equivalent width from best-fit line models:
if len(line_parameters) == 5:
if par_names_list[number][4] == 'fwhm_Gauss':
# Voigt case:
def integrand(x,theta):
return -1*model_Voigt(theta,x)/continuum_baseline[continuum_index] # The "-1" here is because our spectrum is already continuum-subtracted.
equivalent_width = quad(integrand,integration_limits[0],integration_limits[1],args=line_parameters[1:])
EQW_error_down, EQW_error_up = self.equiv_width_error(integrand,integration_limits,line_parameters[1:],par_values_errors_list[number][1:])
FWHM_Voigt = 0.5346*line_parameters[3] + np.sqrt(0.2166*line_parameters[3]**2 + line_parameters[4]**2)
disp_velocity = c.to("km/s").value*FWHM_Voigt/(peak_position*2*np.sqrt(2 * np.log(2)))
fwhm_error_down = self.error_propagation_voigt(par_values_list[number][3],par_values_list[number][4],par_values_errors_list[number][3][0],par_values_errors_list[number][4][0])
fwhm_error_up = self.error_propagation_voigt(par_values_list[number][3],par_values_list[number][4],par_values_errors_list[number][3][1],par_values_errors_list[number][4][1])
disp_velocity_err_down = c.to("km/s").value*fwhm_error_down/(peak_position*2*np.sqrt(2 * np.log(2)))
disp_velocity_err_up = c.to("km/s").value*fwhm_error_up/(peak_position*2*np.sqrt(2 * np.log(2)))
fwhm_error_down = np.sqrt( (fwhm_error_down/(1+z))**2 + (FWHM_Voigt*par_values_errors_list[number][0][0]/((1+z)**2))**2 )
fwhm_error_up = np.sqrt( (fwhm_error_up/(1+z))**2 + (FWHM_Voigt*par_values_errors_list[number][0][1]/((1+z)**2))**2 )
fwhm_rest_frame = FWHM_Voigt/(1+z)
elif par_names_list[number][3] == 'fwhm_Gauss':
# Skewed Gaussian case:
def integrand(x,theta):
return -1*model_skewed_gaussian(theta,x)/continuum_baseline[continuum_index] # The "-1" here is because our spectrum is already continuum-subtracted.
skewed_gaussian_parameters = line_parameters[1:]
#skewed_gaussian_parameters[2] = skewed_gaussian_parameters[2]/(2*np.sqrt(2 * np.log(2))) # This is necessary because the last Gaussian parameter saved in par_values_list is the FWHM and not the standard deviation.
equivalent_width = quad(integrand,integration_limits[0],integration_limits[1],args=skewed_gaussian_parameters)
skewed_gaussian_parameters_errors = np.asarray(par_values_errors_list[number][1:])
#skewed_gaussian_parameters_errors[2] = skewed_gaussian_parameters_errors[2]/(2*np.sqrt(2 * np.log(2)))
EQW_error_down, EQW_error_up = self.equiv_width_error(integrand,integration_limits,skewed_gaussian_parameters,skewed_gaussian_parameters_errors)
line_dispersion, model_FWHM, line_disp_error, fwhm_error = self.line_dispersion_skewed_models(par_values_list[number], skewed_gaussian_parameters_errors, model="Gauss")
disp_velocity = c.to("km/s").value*line_dispersion/peak_position
fwhm_rest_frame = model_FWHM/(1+z)
disp_velocity_err_down = c.to("km/s").value*line_disp_error/peak_position
disp_velocity_err_up = disp_velocity_err_down
fwhm_error_down = fwhm_error/(1+z)
fwhm_error_up = fwhm_error_down
elif par_names_list[number][3] == 'fwhm_Lorentz':
# Skewed Lorentzian case:
def integrand(x,theta):
return -1*model_skewed_lorentzian(theta,x)/continuum_baseline[continuum_index] # The "-1" here is because our spectrum is already continuum-subtracted.
skewed_lorentzian_parameters = line_parameters[1:]
#skewed_lorentzian_parameters[2] = skewed_lorentzian_parameters[2]/2 # This is necessary because the last Lorentzian parameter saved in par_values_list is the FWHM and not the HWHM.
equivalent_width = quad(integrand,integration_limits[0],integration_limits[1],args=skewed_lorentzian_parameters)
skewed_lorentzian_parameters_errors = np.asarray(par_values_errors_list[number][1:])
#skewed_lorentzian_parameters_errors[2] = skewed_lorentzian_parameters_errors[2]/2
EQW_error_down, EQW_error_up = self.equiv_width_error(integrand,integration_limits,skewed_lorentzian_parameters,skewed_lorentzian_parameters_errors)
line_dispersion, model_FWHM, line_disp_error, fwhm_error = self.line_dispersion_skewed_models(par_values_list[number], skewed_lorentzian_parameters_errors, model="Lorentz")
disp_velocity = c.to("km/s").value*line_dispersion/peak_position
fwhm_rest_frame = model_FWHM/(1+z)
disp_velocity_err_down = c.to("km/s").value*line_disp_error/peak_position
disp_velocity_err_up = disp_velocity_err_down
fwhm_error_down = fwhm_error/(1+z)
fwhm_error_up = fwhm_error_down
elif par_names_list[number][3] == 'fwhm_Lorentz':
# Lorentzian case:
def integrand(x,theta):
return -1*model_Lorentz(theta,x)/continuum_baseline[continuum_index] # The "-1" here is because our spectrum is already continuum-subtracted.
equivalent_width = quad(integrand,integration_limits[0],integration_limits[1],args=line_parameters[1:])
EQW_error_down, EQW_error_up = self.equiv_width_error(integrand,integration_limits,line_parameters[1:],par_values_errors_list[number][1:])
disp_velocity = c.to("km/s").value*line_parameters[3]/(2*peak_position) # The dispersion velocity for a Lorentzian is the half of its FWHM.
disp_velocity_err_down = c.to("km/s").value*par_values_errors_list[number][3][0]/(peak_position*2)
disp_velocity_err_up = c.to("km/s").value*par_values_errors_list[number][3][1]/(peak_position*2)
fwhm_error_down = np.sqrt( (par_values_errors_list[number][3][0]/(1+z))**2 + (line_parameters[3]*par_values_errors_list[number][0][0]/((1+z)**2))**2 )
fwhm_error_up = np.sqrt( (par_values_errors_list[number][3][1]/(1+z))**2 + (line_parameters[3]*par_values_errors_list[number][0][1]/((1+z)**2))**2 )
fwhm_rest_frame = line_parameters[3]/(1+z)
else:
# Gaussian case:
def integrand(x,theta):
return -1*model_Gauss(theta,x)/continuum_baseline[continuum_index] # The "-1" here is because our spectrum is already continuum-subtracted.
gaussian_parameters = line_parameters[1:]
#gaussian_parameters[2] = gaussian_parameters[2]/(2*np.sqrt(2 * np.log(2))) # This is necessary because the last Gaussian parameter saved in par_values_list is the FWHM and not the standard deviation.
equivalent_width = quad(integrand,integration_limits[0],integration_limits[1],args=gaussian_parameters)
gaussian_parameters_errors = par_values_errors_list[number][1:]
#gaussian_parameters_errors[2] = gaussian_parameters_errors[2]/(2*np.sqrt(2 * np.log(2)))
EQW_error_down, EQW_error_up = self.equiv_width_error(integrand,integration_limits,gaussian_parameters,gaussian_parameters_errors)
disp_velocity = c.to("km/s").value*line_parameters[3]/(peak_position*2*np.sqrt(2 * np.log(2))) # The dispersion velocity for a Gaussian is its standard deviation given in velocity units.
disp_velocity_err_down = c.to("km/s").value*par_values_errors_list[number][3][0]/(peak_position*2*np.sqrt(2 * np.log(2)))
disp_velocity_err_up = c.to("km/s").value*par_values_errors_list[number][3][1]/(peak_position*2*np.sqrt(2 * np.log(2)))
fwhm_error_down = np.sqrt( (par_values_errors_list[number][3][0]/(1+z))**2 + (line_parameters[3]*par_values_errors_list[number][0][0]/((1+z)**2))**2 )
fwhm_error_up = np.sqrt( (par_values_errors_list[number][3][1]/(1+z))**2 + (line_parameters[3]*par_values_errors_list[number][0][1]/((1+z)**2))**2 )
fwhm_rest_frame = line_parameters[3]/(1+z)
modeled_integrated_flux.append([-equivalent_width[0]*continuum_baseline[continuum_index], EQW_error_down*continuum_baseline[continuum_index] , EQW_error_up*continuum_baseline[continuum_index]])
EQW_error_down = np.sqrt( (EQW_error_down/(1+z))**2 + (equivalent_width[0]*par_values_errors_list[number][0][0]/((1+z)**2))**2 )
EQW_error_up = np.sqrt( (EQW_error_up/(1+z))**2 + (equivalent_width[0]*par_values_errors_list[number][0][0]/((1+z)**2))**2 )
modeled_rest_frame_disp_velocity.append([disp_velocity,disp_velocity_err_down,disp_velocity_err_up])
modeled_rest_frame_fhwm.append([fwhm_rest_frame,fwhm_error_down,fwhm_error_up])
modeled_equiv_width_rest_frame.append([equivalent_width[0]/(1+z), EQW_error_down, EQW_error_up]) # We divide the measured equivalent width by 1+z to get the rest-frame equiv width.
modeled_rest_frame_fhwm = modeled_rest_frame_fhwm*u.AA
modeled_rest_frame_disp_velocity = modeled_rest_frame_disp_velocity*u.km/u.s
profile_rest_frame_disp_velocity = profile_rest_frame_disp_velocity*u.km/u.s
profile_equiv_width_rest_frame = profile_equiv_width_rest_frame*u.AA
profile_line_dispersion_rest_frame = profile_line_dispersion_rest_frame*u.AA
modeled_equiv_width_rest_frame = modeled_equiv_width_rest_frame*u.AA
modeled_integrated_flux = modeled_integrated_flux*u.erg/u.cm**2/u.s
profile_integrated_flux = profile_integrated_flux*u.erg/u.cm**2/u.s
profile_rest_frame_fwhm = profile_fwhm*u.AA
if plot:
plt.show()
if save_file:
f = open(f'{self.output_dir}/'+self.target_name+"_line_physics.csv","w")
f.write("Line name, profile_equiv_width_rest_frame [Angstrom], err_down, err_up,")
f.write("modeled_equiv_width_rest_frame [Angstrom], err_down, err_up,")
f.write("profile_integrated_flux [erg/cm2/s], err_down, err_up,")
f.write("modeled_integrated_flux [erg/cm2/s], err_down, err_up,")
f.write("profile_rest_frame_disp_velocity [km/s], err_down, err_up,")
f.write("modeled_rest_frame_disp_velocity [km/s], err_down, err_up,")
f.write("profile_line_dispersion_rest_frame [Angstrom], err_down, err_up,")
f.write("profile_rest_frame_fwhm [Angstrom], err_down, err_up,")
f.write("modeled_rest_frame_fhwm [Angstrom], err_down, err_up\n")
for number in range(len(profile_equiv_width_rest_frame)):
f.write(self.line_names[number])
f.write(","+str(profile_equiv_width_rest_frame[number][0].value)+","+str(profile_equiv_width_rest_frame[number][1].value)+","+str(profile_equiv_width_rest_frame[number][2].value))
f.write(","+str(modeled_equiv_width_rest_frame[number][0].value)+","+str(modeled_equiv_width_rest_frame[number][1].value)+","+str(modeled_equiv_width_rest_frame[number][2].value))
f.write(","+str(profile_integrated_flux[number][0].value)+","+str(profile_integrated_flux[number][1].value)+","+str(profile_integrated_flux[number][2].value))
f.write(","+str(modeled_integrated_flux[number][0].value)+","+str(modeled_integrated_flux[number][1].value)+","+str(modeled_integrated_flux[number][2].value))
f.write(","+str(profile_rest_frame_disp_velocity[number][0].value)+","+str(profile_rest_frame_disp_velocity[number][1].value)+","+str(profile_rest_frame_disp_velocity[number][2].value))
f.write(","+str(modeled_rest_frame_disp_velocity[number][0].value)+","+str(modeled_rest_frame_disp_velocity[number][1].value)+","+str(modeled_rest_frame_disp_velocity[number][2].value))
f.write(","+str(profile_line_dispersion_rest_frame[number][0].value)+","+str(profile_line_dispersion_rest_frame[number][1].value)+","+str(profile_line_dispersion_rest_frame[number][2].value))
f.write(","+str(profile_rest_frame_fwhm[number][0].value)+","+str(profile_rest_frame_fwhm[number][1].value)+","+str(profile_rest_frame_fwhm[number][2].value))
f.write(","+str(modeled_rest_frame_fhwm[number][0].value)+","+str(modeled_rest_frame_fhwm[number][1].value)+","+str(modeled_rest_frame_fhwm[number][2].value))
f.write("\n")
f.close()
return profile_equiv_width_rest_frame, modeled_equiv_width_rest_frame, profile_integrated_flux, modeled_integrated_flux, profile_rest_frame_disp_velocity, modeled_rest_frame_disp_velocity, profile_line_dispersion_rest_frame, profile_rest_frame_fwhm, modeled_rest_frame_fhwm
[docs] def BH_mass_Hbeta_VP2006(self, wavelengths, continuum_baseline, FWHM_Hbeta, sistemic_redshift, integrated_flux_Hbeta, Hbeta_rest_wavelength = 4861.333, H0=70):
"""
This function estimates the black hole mass based on Vestergaard & Peterson, 2006, ApJ, 641, "DETERMINING CENTRAL BLACK HOLE MASSES IN DISTANT ACTIVE
GALAXIES AND QUASARS. II. IMPROVED OPTICAL AND UV SCALING RELATIONSHIPS". As stated in this work, here we assume a cosmology with H0 = 70 km/s/Mpc, Omega_Lambda = 0.7,
and Omega_matter = 0.3, although we allow the user to change the value of the Hubble constant (H0).
We assume a systematic error of 0.43 dex for the estimated black hole masses. This value is reported in Vestergaard & Peterson, 2006. Since this error is
much higher than the errors in FWHM and integrated flux, we simply ignore the measured errors in these parameters.
Parameters
----------
wavelengths: numpy.ndarray (astropy.units Angstrom)
The wavelength solution for the given spectrum.
continuum_baseline: numpy.ndarray (float)
An array with the continuum density flux. Standard easyspec units are in erg/cm2/s/A. This variable is an output of the function analysis.find_lines().
FWHM_Hbeta: float
The FWHM for the Hbeta line in Angstrom, in the rest frame.
sistemic_redshift: float
The sistemic redshift of the AGN.
integrated_flux_Hbeta: float
The integrated flux for the Hbeta line in erg/cm2/s.
Hbeta_rest_wavelength: float
The rest wavelength of the Hbeta line in Angstrom.
H0: float
This is the Hubble constant value. Default is 70 km/s/Mpc.
Returns
-------
log10_BH_mass_continuum: float
The black hole mass in log10 scale and its corresponding error in dex computed based on Eq. 5 from Vestergaard & Peterson, 2006, ApJ, 641.
log10_BH_mass_line_lum: float
The black hole mass in log10 scale and its corresponding error in dex computed based on Eq. 6 from Vestergaard & Peterson, 2006, ApJ, 641.
"""
if isinstance(FWHM_Hbeta, u.Quantity):
FWHM_Hbeta = FWHM_Hbeta.value
if isinstance(integrated_flux_Hbeta, u.Quantity):
integrated_flux_Hbeta = integrated_flux_Hbeta.value
systematic_error = 0.43 # dex. Result from Vestergaard & Peterson, 2006.
FWHM_Hbeta_velocity = c.to("km/s").value*FWHM_Hbeta/Hbeta_rest_wavelength
cosmo = FlatLambdaCDM(H0=H0, Om0=0.3, Tcmb0 = 2.7) # Omega lambda is implicitly 0.7
Distance = cosmo.luminosity_distance(sistemic_redshift).value*3.086e24 # Luminosity distance. The constant converts Mpc to cm.
Lum_Hbeta = integrated_flux_Hbeta*4*np.pi*(Distance**2)
continuum_index = extraction.find_nearest(wavelengths.value, 5100*(1+sistemic_redshift))
Lum_5100 = continuum_baseline[continuum_index]*wavelengths.value[continuum_index]*4*np.pi*(Distance**2)
log10_BH_mass_continuum = np.log10( ((FWHM_Hbeta_velocity/1000)**2) * np.sqrt((Lum_5100/(10**44))) ) + 6.91
log10_BH_mass_line_lum = np.log10( ((FWHM_Hbeta_velocity/1000)**2) * ((Lum_Hbeta/(10**42)))**0.63 ) + 6.67
log10_BH_mass_continuum = [log10_BH_mass_continuum,systematic_error]
log10_BH_mass_line_lum = [log10_BH_mass_line_lum,systematic_error]
return log10_BH_mass_continuum, log10_BH_mass_line_lum
[docs] def BH_mass_CIV_VP2006(self, wavelengths, continuum_baseline, FWHM_CIV, sistemic_redshift, CIV_rest_wavelength = 1549.4795, H0=70):
"""
This function estimates the black hole mass based on Vestergaard & Peterson, 2006, ApJ, 641, "DETERMINING CENTRAL BLACK HOLE
MASSES IN DISTANT ACTIVE GALAXIES AND QUASARS. II. IMPROVED OPTICAL AND UV SCALING RELATIONSHIPS". As stated in this work,
here we assume a cosmology with H0 = 70 km/s/Mpc, Omega_Lambda = 0.7, and Omega_matter = 0.3, although we allow the user to
change the value of the Hubble constant (H0).
We assume a systematic error of 0.36 dex for the estimated black hole masses. This value is reported in Vestergaard & Peterson, 2006. Since this error is
much higher than the errors in FWHM and integrated flux, we simply ignore the measured errors in these parameters.
Parameters
----------
wavelengths: numpy.ndarray (astropy.units Angstrom)
The wavelength solution for the given spectrum.
continuum_baseline: numpy.ndarray (float)
An array with the continuum density flux. Standard easyspec units are in erg/cm2/s/A. This variable is an output of the function analysis.find_lines().
FWHM_CIV: float
The FWHM for the CIV line in Angstrom, in the rest frame.
sistemic_redshift: float
The sistemic redshift of the AGN.
CIV_rest_wavelength: float
The rest wavelength of the CIV line in Angstrom.
H0: float
This is the Hubble constant value. Default is 70 km/s/Mpc.
Returns
-------
log10_BH_mass_CIV: float
The black hole mass in log10 scale and its corresponding error in dex computed based on Eq. 8 from Vestergaard & Peterson, 2006, ApJ, 641.
"""
if isinstance(FWHM_CIV, u.Quantity):
FWHM_CIV = FWHM_CIV.value
systematic_error_FWHM = 0.36 # dex. Result from Vestergaard & Peterson, 2006.
FWHM_CIV_velocity = c.to("km/s").value*FWHM_CIV/CIV_rest_wavelength
cosmo = FlatLambdaCDM(H0=H0, Om0=0.3, Tcmb0 = 2.7) # Omega lambda is implicitly 0.7
Distance = cosmo.luminosity_distance(sistemic_redshift).value*3.086e24 # Luminosity distance. The constant converts Mpc to cm.
continuum_index = extraction.find_nearest(wavelengths.value, 1350*(1+sistemic_redshift))
Lum_1350 = continuum_baseline[continuum_index]*wavelengths.value[continuum_index]*4*np.pi*(Distance**2)
log10_BH_mass_CIV = np.log10( ((FWHM_CIV_velocity/1000)**2) * (Lum_1350/(10**44))**0.53 ) + 6.66
log10_BH_mass_CIV = [log10_BH_mass_CIV,systematic_error_FWHM]
return log10_BH_mass_CIV
[docs] def BH_mass_CIV_Coatman2017(self, wavelengths, continuum_baseline, FWHM_CIV, CIV_redshift, sistemic_redshift, CIV_rest_wavelength = 1549.4795, H0=73):
"""
This function estimates the black hole mass based on Coatman et al., 2017, MNRAS, 465, "Correcting C IV-based virial black hole masses".
As stated in that work, we assume a cosmology with H0 = 71 km/s/Mpc, Omega_Lambda = 0.73, and Omega_matter = 0.27, although we allow the user to
change the value of the Hubble constant (H0).
We assume a systematic error of 0.36 dex, like in Vestergaard & Peterson, 2006, since the estimated errors are not given in Coatman et al. 2017.
Since this error is much higher than the errors in FWHM and integrated flux, we simply ignore the measured errors in these parameters.
Parameters
----------
wavelengths: numpy.ndarray (astropy.units Angstrom)
The wavelength solution for the given spectrum.
continuum_baseline: numpy.ndarray (float)
An array with the continuum density flux. Standard easyspec units are in erg/cm2/s/A. This variable is an output of the function analysis.find_lines().
FWHM_CIV: float
The measured FWHM for the CIV line in Angstrom, in the rest frame.
CIV_redshift: float
The redshift of the CIV line, in km/s.
sistemic_redshift: float
The sistemic redshift of the AGN.
CIV_rest_wavelength: float
The rest wavelength of the CIV line in Angstrom.
H0: float
This is the Hubble constant value. Default is 73 km/s/Mpc.
Returns
-------
log10_BH_mass_CIV: float
The black hole mass in log10 scale and its estimated error.
"""
if isinstance(FWHM_CIV, u.Quantity):
FWHM_CIV = FWHM_CIV.value
systematic_error_FWHM = 0.36 # dex. Result from Vestergaard & Peterson, 2006.
FWHM_CIV_velocity = c.to("km/s").value*FWHM_CIV/CIV_rest_wavelength
vel_outflow = c.to("km/s").value * (CIV_redshift - sistemic_redshift) / (1 + sistemic_redshift)
corrected_FWHM = FWHM_CIV_velocity/(0.41*(vel_outflow/1000) + 0.62)
cosmo = FlatLambdaCDM(H0=H0, Om0=0.27, Tcmb0 = 2.7) # Omega lambda is implicitly 0.73
Distance = cosmo.luminosity_distance(sistemic_redshift).value*3.086e24 # Luminosity distance. The constant converts Mpc to cm.
continuum_index = extraction.find_nearest(wavelengths.value, 1350*(1+sistemic_redshift))
Lum_1350 = continuum_baseline[continuum_index]*wavelengths.value[continuum_index]*4*np.pi*(Distance**2)
log10_BH_mass_CIV = np.log10( ((corrected_FWHM/1000)**2) * (Lum_1350/(10**44))**0.53 ) + 6.71
log10_BH_mass_CIV = [log10_BH_mass_CIV,systematic_error_FWHM]
return log10_BH_mass_CIV
[docs] def BH_mass_MgII_VO2009(self, wavelengths, continuum_baseline, FWHM_MgII, sistemic_redshift, MgII_rest_wavelength = 2798, H0=70):
"""
This function estimates the black hole mass based on Vestergaard & Osmer, 2009, ApJ, 699, "MASS FUNCTIONS OF THE ACTIVE BLACK HOLES
IN DISTANT QUASARS FROM THE LARGE BRIGHT QUASAR SURVEY, THE BRIGHT QUASAR SURVEY, AND THE COLOR-SELECTED SAMPLE OF THE SDSS FALL
EQUATORIAL STRIPE". As stated in this work, here we assume a cosmology with H0 = 70 km/s/Mpc, Omega_Lambda = 0.7,
and Omega_matter = 0.3, although we allow the user to change the value of the Hubble constant (H0).
We assume a systematic error of 0.55 dex for the estimated black hole masses. This value is reported in Vestergaard & Osmer, 2009.
Since this error is much higher than the errors in FWHM and integrated flux, we simply ignore the measured errors in these parameters.
Parameters
----------
wavelengths: numpy.ndarray (astropy.units Angstrom)
The wavelength solution for the given spectrum.
continuum_baseline: numpy.ndarray (float)
An array with the continuum density flux. Standard easyspec units are in erg/cm2/s/A. This variable is an output of the function analysis.find_lines().
FWHM_MgII: float
The FWHM for the MgII line in Angstrom, in the rest frame.
sistemic_redshift: float
The sistemic redshift of the AGN.
MgII_rest_wavelength: float
The rest wavelength of the MgII line in Angstrom.
H0: float
This is the Hubble constant value. Default is 70 km/s/Mpc.
Returns
-------
log10_BH_mass_MgII: float
The black hole mass in log10 scale and its corresponding error in dex computed based on Eq. 1 from Vestergaard & Osmer, 2009, ApJ, 699,
taking the continuum luminosity at 3000 Angstroms. If the continuum at 3000 Angstroms is not available, it automatically uses the
continuum at 2100 Angstroms.
"""
if isinstance(FWHM_MgII, u.Quantity):
FWHM_MgII = FWHM_MgII.value
systematic_error_FWHM = 0.55 # dex. Result from Vestergaard & Osmer, 2009.
FWHM_MgII_velocity = c.to("km/s").value*FWHM_MgII/MgII_rest_wavelength
cosmo = FlatLambdaCDM(H0=H0, Om0=0.3, Tcmb0 = 2.7) # Omega lambda is implicitly 0.7
Distance = cosmo.luminosity_distance(sistemic_redshift).value*3.086e24 # Luminosity distance. The constant converts Mpc to cm.
continuum_index = extraction.find_nearest(wavelengths.value, 3000*(1+sistemic_redshift))
if wavelengths.value[continuum_index] < 0.9*3000*(1+sistemic_redshift):
continuum_index = extraction.find_nearest(wavelengths.value, 2100*(1+sistemic_redshift))
Lum_2100 = continuum_baseline[continuum_index]*wavelengths.value[continuum_index]*4*np.pi*(Distance**2)
log10_BH_mass_MgII = np.log10( ((FWHM_MgII_velocity/1000)**2) * (Lum_2100/(10**44))**0.5 ) + 6.79
else:
Lum_3000 = continuum_baseline[continuum_index]*wavelengths.value[continuum_index]*4*np.pi*(Distance**2)
log10_BH_mass_MgII = np.log10( ((FWHM_MgII_velocity/1000)**2) * (Lum_3000/(10**44))**0.5 ) + 6.86
log10_BH_mass_MgII = [log10_BH_mass_MgII,systematic_error_FWHM]
return log10_BH_mass_MgII
[docs] def BH_mass_Halpha_Shen2011(self, FWHM_Halpha, sistemic_redshift, integrated_flux_Halpha, Halpha_rest_wavelength = 6562.819, H0=70):
"""
This function estimates the black hole mass based on Shen at al. 2011, ApJS, 194:45, "A CATALOG OF QUASAR PROPERTIES FROM
SLOAN DIGITAL SKY SURVEY DATA RELEASE 7". As stated in this work, here we assume a cosmology with H0 = 70 km/s/Mpc, Omega_Lambda = 0.7,
and Omega_matter = 0.3, although we allow the user to change the value of the Hubble constant (H0).
We assume a systematic error of 0.18 dex for the estimated black hole masses. This value is reported in Shen et al. 2011.
Since this error is much higher than the errors in FWHM and integrated flux, we simply ignore the measured errors in these parameters.
Parameters
----------
FWHM_Halpha: float
The FWHM for the Halpha line in Angstrom, in the rest frame.
sistemic_redshift: float
The sistemic redshift of the AGN.
integrated_flux_Halpha: float
The integrated flux for the Halpha line in erg/cm2/s.
Halpha_rest_wavelength: float
The rest wavelength of the Halpha line in Angstrom.
H0: float
This is the Hubble constant value. Default is 70 km/s/Mpc.
Returns
-------
log10_BH_mass_Halpha: float
The black hole mass in log10 scale and its corresponding error in dex computed based on Eq. 10 from Shen et al. 2011.
"""
if isinstance(FWHM_Halpha, u.Quantity):
FWHM_Halpha = FWHM_Halpha.value
if isinstance(integrated_flux_Halpha, u.Quantity):
integrated_flux_Halpha = integrated_flux_Halpha.value
systematic_error_FWHM = 0.18
FWHM_Halpha_velocity = c.to("km/s").value*FWHM_Halpha/Halpha_rest_wavelength
cosmo = FlatLambdaCDM(H0=H0, Om0=0.3, Tcmb0 = 2.7) # Omega lambda is implicitly 0.7
Distance = cosmo.luminosity_distance(sistemic_redshift).value*3.086e24 # Luminosity distance. The constant converts Mpc to cm.
Lum_Halpha = integrated_flux_Halpha*4*np.pi*(Distance**2)
log10_BH_mass_Halpha = np.log10( (FWHM_Halpha_velocity**2.1) * ((Lum_Halpha/(10**42)))**0.43 ) + 0.379
log10_BH_mass_Halpha = [log10_BH_mass_Halpha,systematic_error_FWHM]
return log10_BH_mass_Halpha
[docs] def get_highest_resolution(self, specs_dict):
"""
This function gets the spectrum with the highest resolution in specs_dict.
Parameters:
-----------
specs_dict: dictionary
A dictionary where each key is a spectrum name and each value is a list containing [lambda0, lambdaf, R].
Returns:
--------
highest_R_spec: string
The name of the spectrum with the highest resolution.
highest_R: float
The highest resolution value found in specs_dict.
"""
highest_R = 0
highest_R_spec = None
for spec in specs_dict:
resolution = specs_dict[spec][2] # retrieves the list for the current 'spec' and gets the element at index [2], which is 'R'
if resolution > highest_R:
highest_R = resolution
highest_R_spec = spec
return highest_R_spec, highest_R
[docs] def create_common_grid(self, specs_dict, over_resolution_factor):
"""
This function creates a common wavelength grid for all spectra to be used for the final stacked spectrum.
Parameters:
-----------
specs_dict: dictionary
A dictionary where each key is a spectrum and each value is a list containing [lambda0, lambdaf, R].
over_resolution_factor: int
This factor increases the number of points in the stacked spectrum wavelength grid. This value must be greater than 1.
Returns:
--------
wavelengths_stacked: numpy.ndarray
An array containing the interpolated wavelength values for the stacked spectrum.
"""
highest_R = self.get_highest_resolution(specs_dict)[1] # stores the 2° value of the return of the function(the resolution) in "highest_R"
all_lambda0 = [value[0] for value in specs_dict.values()] # stores lambda0 for all spectra (spect_dict values are [lambda0, lambdaf, R])
all_lambdaf = [value[1] for value in specs_dict.values()] # stores lambdaf for all spectra
lambda_min = min(all_lambda0) # smallest value of lambda0 among all spectra
lambda_max = max(all_lambdaf) # largest value of lambdaf among all spectra
N_points = int(over_resolution_factor * highest_R * (lambda_max - lambda_min)) # number of points for the new wavelength array, using 5x highest_R
wavelengths_stacked = np.linspace(lambda_min,lambda_max,N_points) # array with the wavelength values for the stacked spectrum
return wavelengths_stacked
[docs] def interpolate_spectra(self, list_of_wavelengths, list_of_fluxes, wavelengths_stacked):
"""
This function interpolates all spectra to the common wavelength grid of the stacked spectrum,
fills the grid with NaN's where there is no data and creates a matrix where each row is an interpolated spectrum.
Parameters:
-----------
list_of_wavelengths: list
A list where each element is a numpy array containing the wavelength values of each spectrum.
list_of_fluxes: list
A list where each element is a numpy array containing the flux values of each spectrum.
wavelengths_stacked: numpy.ndarray
An array containing the wavelength values of the stacked spectrum in Å. These are not the measured wavelengths but the values of the common grid created for stacking.
Returns:
--------
spectra_interp_matrix: numpy.ndarray
A matrix where each row is an interpolated spectrum with the size of the common wavelength grid.
"""
spectra_interp_list = []
for wavelength, flux in zip(list_of_wavelengths, list_of_fluxes):
# Creates the splines and interpolates
tck = interpolate.splrep(wavelength, flux, k=3)
flux_interp = interpolate.splev(wavelengths_stacked, tck)
# Creates an array of NaN's with the same shape as wavelengths_stacked
filled_flux = np.full_like(wavelengths_stacked, np.nan, dtype=float)
# Creates a mask for the valid wavelength range of each spectrum
mask = (wavelengths_stacked >= wavelength.min()) & (wavelengths_stacked <= wavelength.max())
# Fills the valid range in filled_flux with the interpolated flux values
filled_flux[mask] = flux_interp[mask]
spectra_interp_list.append(filled_flux)
# Creates the matrix where each row is an interpolated spectrum
spectra_interp_matrix = np.vstack(spectra_interp_list)
return spectra_interp_matrix
[docs] def calculate_stack(self, spectra_interp_matrix, method):
"""
This function calculates the stacked spectrum using the specified method (median or mean).
Parameters:
-----------
spectra_interp_matrix: numpy.ndarray
A matrix where each row is an interpolated spectrum with the size of the common wavelength grid.
method: string
The stacking method to use, either "median" or "mean".
Returns:
--------
stacked_flux: numpy.ndarray
An array containing the interpolated flux values of the stacked spectrum.
"""
if method == "median":
stacked_flux = np.nanmedian(spectra_interp_matrix, axis=0)
elif method == "mean":
stacked_flux = np.nanmean(spectra_interp_matrix, axis=0)
else:
raise ValueError("Invalid stacking method. Use 'median' or 'mean'.")
return stacked_flux
[docs] def stack_calib_spectra(self, input_data, method="median", target_name=None, output_dir="./", over_resolution_factor=5, save_file=False, plot=True, plot_overlayed_spectra=True):
"""
This function stacks calibrated .dat spectra from a list of file paths or a directory containing .dat spectra.
Parameters:
-----------
input_data: list or string
A list of ".dat" file paths containing calibrated spectra or a directory path containing ".dat" spectra.
method: string
The stacking method to use, either "median" or "mean".
target_name: string
Optional. This will be the title of the plot.
output_dir: string
The directory where the stacked spectrum file will be saved if save_file is True.
over_resolution_factor: int
This factor increases the number of points in the stacked spectrum wavelength grid. This value must be greater than 1. Default value = 5.
save_file: bool
If True, saves the stacked spectrum to a .dat file.
plot: bool
If True, plots the stacked spectrum.
plot_overlayed_spectra: bool
If True, plots all individual spectra overlaid with the stacked spectrum.
Returns:
--------
wavelengths_stacked: numpy.ndarray (astropy.units Å)
An array containing the wavelength values of the stacked spectrum in Å. These are not the measured wavelengths but the values of the common grid created for stacking.
stacked_flux: numpy.ndarray (astropy.units. erg / (Å cm² s))
An array containing the interpolated flux values of the stacked spectrum in erg / (Å cm² s).
"""
files_list = []
if isinstance(input_data, list): # Checks if input_data is a list
for file_path in input_data:
if not file_path.endswith(".dat"): # Test if all elements are .dat files
raise RuntimeError("The input_data variable must be a list of data paths or a directory filled with .dat spectra.")
files_list = input_data
elif isinstance(input_data,str) and Path(input_data).is_dir(): # Checks if input_data is a directory path
files_list = [str(path) for path in Path(input_data).glob("*.dat")]
else:
raise RuntimeError("The input_data variable must be a list of data paths or a directory filled with .dat spectra.")
list_of_wavelengths = []
list_of_fluxes = []
for file in files_list:
wavelength, flux = self.load_calibrated_data(file, target_name=target_name, output_dir="./",plot=False)
list_of_wavelengths.append(wavelength.value)
list_of_fluxes.append(flux.value)
names = [Path(file_path).stem for file_path in files_list] # fills the names list with the name of each file(.stem removes the .dat extension)
lambda0_list = []
lambdaf_list = []
specs_dict = {}
for name, wavelength in zip(names, list_of_wavelengths): # iterates through both lists simultaneously
lambda0 = wavelength[0] # first wavelength value
lambdaf = wavelength[-1] # last wavelength value
lambda0_list.append(lambda0) # adds all lambda0 values to lambda0_list
lambdaf_list.append(lambdaf) # adds all lambdaf values to lambdaf_list
delta_lambda = lambdaf - lambda0
resolution = len(wavelength)/(delta_lambda) # points per wavelength
specs_dict[name] = [lambda0, lambdaf, resolution] # assigns each name to its corresponding values to fill the dictionary
wavelengths_stacked = self.create_common_grid(specs_dict, over_resolution_factor) # Creates the wavelength interval for the stacked spectrum
spectra_interp_matrix = self.interpolate_spectra(list_of_wavelengths, list_of_fluxes, wavelengths_stacked) # Interpolates all spectra to the common wavelength grid
stacked_flux = self.calculate_stack(spectra_interp_matrix, method) # Calculates the stacked flux
if plot and not plot_overlayed_spectra:
plt.figure(figsize=(12, 4), dpi=150)
plt.plot(wavelengths_stacked, stacked_flux, color='orange', linewidth=0.5)
plt.xlabel('Observed ${\lambda}$[$\AA$]')
plt.ylabel(r'$F_{\lambda}$ [erg / ($\AA$ cm² s)]', fontsize=12)
if target_name is not None:
plt.title('Stacked Spectrum of '+ target_name, fontsize=12)
else:
plt.title('Stacked Spectrum', fontsize=12)
plt.grid(True, which='both', linewidth=0.1, linestyle='--', color='gray')
plt.minorticks_on()
plt.legend()
plt.show()
elif plot and plot_overlayed_spectra: # Plots all individual spectra overlaid with the stacked spectrum
plt.figure(figsize=(12, 4), dpi=150)
for i in range(spectra_interp_matrix.shape[0]):
plt.plot(wavelengths_stacked, spectra_interp_matrix[i], alpha=0.8, linewidth=0.5)
plt.plot(wavelengths_stacked, stacked_flux, label='Stacked Spectrum', color='black', linewidth=0.5)
plt.xlabel('Observed ${\lambda}$[$\AA$]')
plt.ylabel(r'$F_{\lambda}$ [erg / ($\AA$ cm² s)]', fontsize=12)
if target_name is not None:
plt.title('Stacked Spectrum of '+ target_name + ' overlayed with original spectra', fontsize=12)
else:
plt.title('Stacked Spectrum overlayed with original spectra', fontsize=12)
plt.grid(True, which='both', linewidth=0.1, linestyle='--', color='gray')
plt.minorticks_on()
plt.legend()
plt.show()
if save_file:
output_path = f"{str(Path(output_dir))}/stacked_spectrum_{method}.dat"
np.savetxt(output_path, np.column_stack((wavelengths_stacked, stacked_flux)))
return wavelengths_stacked*u.AA, stacked_flux*u.erg/(u.AA*u.cm**2*u.s)