Source code for gaiasupdate.solver

#   Copyright (c) European Space Agency, 2026.
#
#   This file is subject to the terms and conditions defined in file "LICENSE.txt", which
#   is part of this source code package. No part of the package, including
#   this file, may be copied, modified, propagated, or distributed except according to
#   the terms contained in the file "LICENSE.txt".

"""
Module for solving systems of linear equations.

These methods are equivalent to the Java implementation in the Gaia AGIS processing software.

Note
----
credit authors that contributed to this code:
Sahlmann, Bombrun, Lindegren

"""
from collections import OrderedDict
import copy
from dataclasses import dataclass
import logging
from typing import Tuple

import numpy as np
from scipy.linalg import cho_factor, cho_solve

from .metrics import SolutionStatistic, chi_squared


[docs] def decay_downweight(z: np.ndarray, scale_factor: float = 1.) -> np.ndarray: """Return weight. See gaia.cu3.agistools.algo.observations.DecayDownweight.downweight(double, double). Parameters ---------- z : ndarray The input data array. scale_factor : float The scale factor Returns ------- ndarray Array of decay downweights """ a = 1.77373519609519 b = 1.14161463726663 x = np.abs(scale_factor * np.array(z)) return np.piecewise(x, [x <= 2, (2 < x) & (x < 3), x >= 3], [1, lambda x: 1 - a * (x - 2) ** 2 + b * (x - 2) ** 3, lambda x: np.exp(-x / 3)])
[docs] def huber_downweight(z: np.ndarray, scale_factor: float = 1.) -> np.ndarray: """Return weight. See gaia.cu3.agistools.algo.observations.HuberDownweight Parameters ---------- z : ndarray The input data array. scale_factor : float The scale factor Returns ------- ndarray Array of Huber downweights """ c = 2.0 x = np.abs(scale_factor * np.array(z)) return np.piecewise(x, [x <= c, x > c], [1, lambda x: c * (2 * x - c) / x ** 2])
# Parameters of the AGIS solver # cf. gaia.cu3.agistools.algo.gis.source.SourceUpdateStatistics.Options AGIS_SOLVER_OPTIONS = {} AGIS_SOLVER_OPTIONS['downweighter'] = decay_downweight AGIS_SOLVER_OPTIONS['downweightLimit'] = 0.2 AGIS_SOLVER_OPTIONS['tolEsvExcess'] = 1e-6 AGIS_SOLVER_OPTIONS['maxIterExcess'] = 10 AGIS_SOLVER_OPTIONS['dfWeight'] = 1.0 AGIS_SOLVER_OPTIONS['maxIterRobust'] = 30 AGIS_SOLVER_OPTIONS['tolEsvRobust'] = 1e-6
[docs] def estimate_excess_noise(residuals, sigmas, weights, nu): """Estimate the excess noise from given residuals. See Equation 64 of Lindegren at al. 2012, A&A, 538, A78. Parameters ---------- residuals: narray Array of residuals. sigmas: narray Array of sigmas. weights: narray Array of weights. nu: int Number of degrees of freedom. Returns ------- narray Array containing the excess noise estimation. """ y = 0 sy = np.sum(weights * residuals ** 2 / (sigmas ** 2)) if sy < nu: return 0 else: for i in range(0, 3): dsdy = - np.sum(weights * residuals ** 2 / (sigmas ** 2 + y) ** 2) dy = sy / dsdy * (1 - sy / nu) y = y + dy sy = np.sum(weights * residuals ** 2 / (sigmas ** 2 + y)) return np.sqrt(y)
[docs] def robust_dispersion_estimate(residuals): """Return half the intersextile range according to procedure P3 of LL83 (Eq. 22). Parameters ---------- residuals : ndarray Array of residuals. Returns ------- float The dispersion estimate. """ quantiles = np.quantile(residuals, [1 / 6, 3 / 6, 5 / 6]) return 0.5 * (quantiles[2] - quantiles[0]) + np.abs(quantiles[1])
[docs] def solve_linear_equations(design_matrix_coefficients: np.ndarray, dependent_variable: np.ndarray, dependent_variable_error: np.ndarray, weights: np.ndarray = None, excess_noise: float = 0): """Use matrix inversion to solve the linear equations using least-squares. Note ---- Weights and excess noise can be accounted for as described in procedure 4 of GAIA-C3-TN-LU-LL-083. Parameters ---------- design_matrix_coefficients: ndarray The normal matrix. dependent_variable: ndarray The measurements. dependent_variable_error: ndarray The measurement uncertainties. weights: ndarray, optionl The measurement weights. excess_noise: float, optional The excess noise, a term that is added in quadrature to the measurement uncertainty. Returns ------- ndarray The solution parameters. ndarray The residuals, i.e. observed minus calculated values. ndarray Inverse of the solution parameter covariance matrix. """ D = design_matrix_coefficients h = dependent_variable if weights is None: weights = np.ones_like(dependent_variable) total_variance = dependent_variable_error ** 2 + excess_noise ** 2 weight_parameters = weights / total_variance # obsWeight in BasicSourceUpdateCalculator logging.debug(f'solve_linear_equations: \t excess_noise={excess_noise}') logging.debug(f'solve_linear_equations: \t weights={weights}') logging.debug(f'solve_linear_equations: \t total_variance={total_variance}') logging.debug(f'solve_linear_equations: \t weight_parameters (obsWeight)={weight_parameters}') WD = np.sqrt(weight_parameters).reshape(-1, 1) * D wh = np.sqrt(weight_parameters) * h parameter_covariance_matrix_formal_inverse = WD.T @ WD # normalsFull in BasicSourceUpdateCalculator r = WD.T @ wh logging.debug(f'solve_linear_equations: \t r={r}') logging.debug(f'solve_linear_equations: \t parameter_covariance_matrix_formal_inverse ' f'(normalsFull)={parameter_covariance_matrix_formal_inverse}') parameters = np.linalg.solve(parameter_covariance_matrix_formal_inverse, r) residuals = dependent_variable - design_matrix_coefficients @ parameters return parameters, residuals, parameter_covariance_matrix_formal_inverse
[docs] def get_sextiles(x): """Return sextiles of input array. Note ---- Emulate gaia.cu3.agistools.algo.gis.source.RobustSourceUpdateCalculator.getSextiles(double[]) The sextiles are returned as an array of length 5 (say, double[] sext), with sext[0] = the 1st (lowest) sextile and sext[4] = the 5th (highest) sextile. sext[2] is the median, and (sext[4]-sext[0])/2 is half the intersextile range, which can be used as a robust estimate of the standard deviation. If the length of the input array x is <= 3, then sext[0] and sext[4] equal the minimum and maximum x values, respectively. If the length is 1, then all sextiles equal the single x value. Parameters ---------- x : ndarray An array containg the input values. Returns ------- ndarray Array containing the sextiles. """ m = 6 n = len(x) xs = copy.deepcopy(x) xs = np.sort(xs) sext = np.zeros(m - 1) for i in range(m - 1): p = (i + 1) / m lo = int(np.floor(p * n - 0.5)) if lo < 0: sext[i] = xs[0] elif (lo > n - 2): sext[i] = xs[n - 1] else: h = p * n - (lo + 0.5) sext[i] = (1.0 - h) * xs[lo] + h * xs[lo + 1] return sext
[docs] @dataclass class DesignEquation: """Class to handle and solve linear equation systems like AGIS does.""" _required_parameters = ['design_matrix_coefficients', 'dependent_variable', 'dependent_variable_error'] _default_array_parameters = {'excess_attitude_noise': 0., 'weight': 1., 'weight_prior': 1.} _default_parameters = {'n_outliers': 0, 'excess_source_variance': 0., 'n_solve': 0, 'gaussian_priors': None} def __init__(self, parameters: dict): """Initialise the object. Parameters ---------- parameters: dict Dictionary of inputs for constructing the design equation. The dictionary must have at least three entries as indicated by the `_required_parameters` attribute. """ self.design_matrix_coefficients: np.ndarray self.dependent_variable: np.ndarray self.dependent_variable_error: np.ndarray self.excess_attitude_noise: np.ndarray self.weight: np.ndarray self.weight_prior: np.ndarray self.n_outliers: int self.excess_source_variance: float self.n_solve: int self.observationVariance: np.ndarray for key, item in parameters.items(): setattr(self, key, item) for key in self._required_parameters: assert getattr(self, key) is not None # assert hasattr(self, key) for key, value in self._default_array_parameters.items(): if hasattr(self, key) is False: setattr(self, key, np.ones(len(getattr(self, self._required_parameters[0]))) * value) for key, value in self._default_parameters.items(): if hasattr(self, key) is False: setattr(self, key, value) # compute excess_attitude_noise (epsilonA) when necessary if hasattr(self, 'observationVariance'): if np.all(self.observationVariance != self.observation_variances): logging.debug("DesignEquation: excess_attitude_noise is not zero; " "recomputing and setting it.") self.excess_attitude_noise = np.sqrt(self.observationVariance - self.dependent_variable_error ** 2) np.testing.assert_allclose(self.observationVariance, self.observation_variances) if hasattr(self, 'model') is False: self.model = 'unnamed_model' @property def observation_variances(self): """Return the observation variances. Note ---- This is the sum of observation uncertainties and axcess attitude/observation noise, see Eq. 62ff of `Lindegren at al. 2012 <http://adsabs.harvard.edu/abs/2012A%26A...538A..78L>`__: sigmaTilde_l^2 = sigma_l^2 + epsilon_a^2(t_l), where obsSigma = the sigmas of the observations epsilonA = Any contribution from the excess attitude noise and excess calibration noise (but not the excess source noise). See Java method setObservationVariances(double[] obsSigma, double[] epsilonA) Returns ------- ndarray The observation variances. """ obs_variances = self.dependent_variable_error**2 + self.excess_attitude_noise**2 return obs_variances @property def n_measurements(self): """Return current number of measurements.""" return len(self.dependent_variable) @property def n_parameters(self): """Return current number of parameters.""" return self.design_matrix_coefficients.shape[1] @property def n_degrees_of_freedom(self): """Return current degrees of freedom.""" return self.n_measurements - self.n_parameters @property def n_effective_degrees_of_freedom(self): """Return current effective degrees of freedom.""" return self.n_degrees_of_freedom - self.n_outliers @property def excess_source_noise(self): """Return current excess source noise.""" return np.sqrt(self.excess_source_variance)
[docs] def solve(self, solver='agis', **kwargs): """Solve the equations and return results. Parameters ---------- solver : str the solver to be used, default is 'agis' kwargs : dict keyword arguments passed on to the solver code Returns ------- dict Solution parameters and auxiliary information """ if solver == 'least_squares': return self.solve_least_squares(**kwargs) elif solver == 'agis': return self.solve_like_agis(**kwargs) else: raise NotImplementedError
[docs] def basic_source_update_calculator_solve(self, cholesky: bool = False, total_variance: np.ndarray = None) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Use matrix inversion to solve the linear equations using least-squares. Note ---- Modelled after gaia.cu3.agistools.algo.gis.source.BasicSourceUpdateCalculator.solve() See also solve_linear_equations function. Parameters ---------- cholesky: boolean, optional Cholesky decomposition. Default: False. total_variance: narray, optional Total variance array. Default: None. Returns ------- narray normals_full, which is the same as parameter_covariance_matrix_formal_inverse. narray rhs, right-hand side of the normal equations. narray Array of parameters. narray Array of residuals. """ normals_full, rhs = self.construct_full_normals(total_variance=total_variance) if cholesky is False: parameters = np.linalg.solve(normals_full, rhs) else: logging.debug("basic_source_update_calculator_solve using cho_solve") logging.debug(f"basic_source_update_calculator_solve normals_full = {normals_full[0:5]}...") c, low = cho_factor(normals_full) logging.debug(f"basic_source_update_calculator_solve c = {c}") logging.debug(f"basic_source_update_calculator_solve low = {low}") parameters = cho_solve((c, low), rhs) logging.debug(f"basic_source_update_calculator_solve parameters = {parameters}") residuals = self.compute_residuals(parameters) # update counter self.n_solve += 1 return normals_full, rhs, parameters, residuals
[docs] def compute_residuals(self, parameters): """Compute the residuals given the input parameters. Parameters ---------- parameters: narray Array of parameters corresponding to a solution of the system of equations. Returns ------- narray The computed residuals. """ return self.dependent_variable - self.design_matrix_coefficients @ parameters
[docs] def get_postfit_residuals(self): """Return post-fit residuals. Note ---- See Java method gaia.cu3.agistools.algo.gis.source.BasicSourceUpdateCalculator.getPostfitResiduals() """ return self.basic_source_update_calculator_solve()[-1]
[docs] def get_updates(self): """Return the updates from the latest call to solve(). Note ---- See Java method gaia.cu3.agistools.algo.gis.source.BasicSourceUpdateCalculator.getUpdates() """ return self.basic_source_update_calculator_solve()[-2]
[docs] def get_prefit_residuals(self): """Return pre-fit residuals. Note ---- See Java method gaia.cu3.agistools.algo.gis.source.BasicSourceUpdateCalculator.getPrefitResiduals() """ return self.dependent_variable
[docs] @staticmethod def get_initial_dispersion_estimate(data): """Return initial dispersion estimate of the input dataset. Note ---- See gaia.cu3.agistools.algo.gis.source.RobustSourceUpdateCalculator.getInitialDispersionEstimate(double[]) Parameters ---------- data : ndarray Input dataset. Returns ------- ndarray Initial dispersion estimate of the input. """ quantile_values = np.quantile(data, [1 / 6, 2 / 6, 3 / 6, 4 / 6, 5 / 6]) return 0.5 * (quantile_values[4] - quantile_values[0]) + np.abs(quantile_values[2])
[docs] def compute_n_outliers(self): """Compute the number of outliers based on weights and threshold. Note ---- See gaia.cu3.agistools.algo.gis.source.SourceUpdateStatistics.getNOut() Returns ------- narray n_outliers """ downweights = self.get_downweights(True) # return downweight * downweight_prior n_outliers = np.sum(downweights < AGIS_SOLVER_OPTIONS['downweightLimit']) return n_outliers
[docs] def get_index_keep(self): """Return the indices of the measurements that are not outliers.""" downweights = self.get_downweights(True) index_keep = np.where(downweights >= AGIS_SOLVER_OPTIONS['downweightLimit'])[0] if (len(index_keep) + self.compute_n_outliers() != len(self.weight)): raise ValueError("Number of outliers is inconsistent.") return index_keep
[docs] def calculate_estimate(self): """Calculate the solution estimate. Note ---- See gaia.cu3.agistools.algo.gis.source.RobustSourceUpdateCalculator.calculateEstimate() Returns ------- narray Excess source noise. narray Excess source noise significance. """ # try first with excess source variance = 0 excess_source_noise = 0 self.excess_source_variance = excess_source_noise**2 # gaia.cu3.agistools.algo.gis.source.RobustSourceUpdateCalculator.setDefaultDownweights() self.weight = np.ones(self.n_measurements) iterRobust = 0 ssr, ssr_derivative = self.get_weighted_ssr_and_derivative() sMin0 = ssr dof = self.n_effective_degrees_of_freedom logging.debug(f"initial dof = {dof}") # iter_excess = 0 excess_source_noise_significance = 0. # if sMin0 <= dof we are done; otherwise try to set a reasonable excess source # noise to eliminate gross outliers if (sMin0 > dof): excess_source_variance_old = self.excess_source_variance excess_source_noise_improved = self.get_initial_dispersion_estimate(self.get_prefit_residuals()) logging.debug(f"self.get_initial_dispersion_estimate(self.get_prefit_residuals()) = \ {self.get_initial_dispersion_estimate(self.get_prefit_residuals())}") excess_source_variance_improved = excess_source_noise_improved**2 self.excess_source_variance = excess_source_variance_improved logging.debug(f"self.excess_source_variance = {self.excess_source_variance}") logging.debug(f"calculate_estimate self.get_prefit_residuals() = {self.get_prefit_residuals()[0:4]}...") logging.debug(f"self.observation_variances = {self.observation_variances[0:4]}") downweights = self.calculate_downweights(self.get_prefit_residuals()) logging.debug(f"downweights!=1 = {downweights[np.where(downweights != 1)[0]]}") self.weight = downweights self.n_outliers = self.compute_n_outliers() dof = self.n_effective_degrees_of_freedom logging.debug(f"updated dof = {dof}") excess_source_variance_delta = excess_source_variance_improved - excess_source_variance_old logging.debug(f"Before robust loop: excess_source_variance_delta = \ {excess_source_variance_delta}, excess_source_variance={self.excess_source_variance}") while (np.abs(excess_source_variance_delta) > AGIS_SOLVER_OPTIONS['tolEsvRobust'] * self.excess_source_variance) \ and (iterRobust < AGIS_SOLVER_OPTIONS['maxIterRobust']): logging.debug(f"Robust estimator iteration {iterRobust}: excess_source_variance={self.excess_source_variance}") iterRobust += 1 excess_source_variance_old = self.excess_source_variance excess_source_variance_improved = self.get_improved_estimate_of_excess_source_variance(dof) logging.debug(f"excess_source_variance_improved = {excess_source_variance_improved}") logging.debug(f"Robust estimator iteration {iterRobust}: \ self.get_improved_estimate_of_excess_source_variance(dof) = {self.get_improved_estimate_of_excess_source_variance(dof)}") self.excess_source_variance = excess_source_variance_improved self.weight = self.calculate_downweights(self.get_postfit_residuals()) self.n_outliers = self.compute_n_outliers() dof = self.n_effective_degrees_of_freedom logging.debug(f"iterRobust={iterRobust}: dof = {dof}") excess_source_variance_delta = excess_source_variance_improved - excess_source_variance_old logging.debug(f"Robust estimator iteration {iterRobust}: excess_source_variance = \ {self.excess_source_variance}; excess_source_variance_delta = {excess_source_variance_delta}") excess_source_noise, excess_source_noise_significance = self.calculate_excess_source_noise() return excess_source_noise, excess_source_noise_significance
[docs] def solve_like_agis(self, total_variance: np.ndarray = None) -> dict: """Solve linear equations like AGIS. Parameters ---------- total_variance: float Total variance. Returns ------- dict Dictionary with information on the solution. """ if total_variance is None: excess_source_noise, excess_source_noise_significance = self.calculate_estimate() logging.debug(f"solve_like_agis: excess_source_noise = {excess_source_noise}") else: logging.debug('Computing direct solution using fixed weights and external total variance (no inner iterations).') excess_source_noise = None excess_source_noise_significance = None # get the best-fit parameters of the model parameter_covariance_matrix_formal_inverse, rhs, parameters, residuals = self.basic_source_update_calculator_solve(total_variance=total_variance) results = OrderedDict() results['model'] = self.model results['solver'] = 'agis' results['parameters'] = parameters # keep only ‘good’ (i.e. not strongly downweighted) observations for fit metric calculations # this is consistent with documentation: # https://gea.esac.esa.int/archive/documentation/GEDR3/Gaia_archive/chap_datamodel/sec_dm_main_tables/ssec_dm_gaia_source.html index_keep = self.get_index_keep() results['residuals'] = residuals[index_keep] results['index_keep'] = index_keep # optional: specify timestamps of used observations if hasattr(self, 'timestamps'): results['timestamps_of_used_observations'] = self.timestamps.reset_index(drop=True).iloc[index_keep] results['weights'] = self.weight[index_keep] results['n_data_total'] = self.n_measurements # number of measurements/constraints, i.e. number of equations results['n_measurements'] = self.n_measurements - self.compute_n_outliers() results['parameter_covariance_matrix_formal_inverse'] = parameter_covariance_matrix_formal_inverse results['parameter_covariance_matrix_formal'] = np.linalg.inv(results['parameter_covariance_matrix_formal_inverse']) # formal uncertainty of the solution parameters results['parameters_formal_uncertainty'] = np.sqrt(np.diag(results['parameter_covariance_matrix_formal'])) # number free parameters results['n_parameters'] = len(results['parameters']) results['excess_noise'] = excess_source_noise results['n_outliers'] = self.compute_n_outliers() results['significance'] = excess_source_noise_significance # check setting of total variance, since this goes into ln likelyhood and BIC # total variance of measurements # results['total_variance'] = self.dependent_variable_error[index_keep]**2 + excess_source_noise**2 if total_variance is None: results['total_variance'] = self.observation_variances[index_keep] + excess_source_noise**2 else: results['total_variance'] = total_variance # results['measurement_variance'] = self.dependent_variable_error[index_keep]**2 results['measurement_variance'] = self.observation_variances[index_keep] # parameter covariance matrix normalised to yield chi2=1 chi2 = chi_squared(results['residuals'], results['measurement_variance']) results['parameter_covariance_matrix_normalised'] = results['parameter_covariance_matrix_formal'] * chi2 / self.n_effective_degrees_of_freedom # normalised uncertainty of the solution parameters results['parameters_normalised_uncertainty'] = np.sqrt(np.diag(results['parameter_covariance_matrix_normalised'])) keys_for_stats = SolutionStatistic._required_parameters + ['model', 'solver', 'excess_noise', 'n_outliers'] results['solution_statistic'] = SolutionStatistic({key: results[key] for key in keys_for_stats}) logging.debug('solve_like_agis result: {}/{} measurements are considered outliers'.format(results['n_outliers'], results['n_data_total'])) return results
[docs] def solve_least_squares(self): """Solve equations with a standard least-square solver. Returns ------- dict Dictionary containing information about the solution. """ # solve the equations parameters, residuals, N = solve_linear_equations(self.design_matrix_coefficients, self.dependent_variable, self.dependent_variable_error) results = OrderedDict() results['solver'] = 'least-squares' results['model'] = self.model results['parameters'] = parameters results['residuals'] = residuals results['parameter_covariance_matrix_formal_inverse'] = N results['parameter_covariance_matrix_formal'] = np.linalg.inv(results['parameter_covariance_matrix_formal_inverse']) # formal uncertainty of the solution parameters results['parameters_formal_uncertainty'] = np.sqrt(np.diag(results['parameter_covariance_matrix_formal'])) # number of measurements/constraints, i.e. number of equations results['n_measurements'] = len(self.dependent_variable) # number of free parameters results['n_parameters'] = len(results['parameters']) # total variance of measurements results['total_variance'] = self.dependent_variable_error ** 2 results['measurement_variance'] = results['total_variance'] # parameter covariance matrix normalised to yield chi2=1 chi2 = chi_squared(results['residuals'], results['measurement_variance']) results['parameter_covariance_matrix_normalised'] = results['parameter_covariance_matrix_formal'] * chi2 / self.n_effective_degrees_of_freedom # normalised uncertainty of the solution parameters results['parameters_normalised_uncertainty'] = np.sqrt(np.diag(results['parameter_covariance_matrix_normalised'])) keys_for_stats = SolutionStatistic._required_parameters + ['model', 'solver'] results['solution_statistic'] = SolutionStatistic({key: results[key] for key in keys_for_stats}) return results
[docs] def construct_full_normals(self, total_variance=None): """Replicate Java method constructFullNormals. Note ---- gaia.cu3.agistools.algo.gis.source.BasicSourceUpdateCalculator.constructFullNormals() Parameters ---------- total_variance: narray, optional Array of total variances. Returns ------- narray Parameter covariance matrix formal inverse. narray rhs, right-hand side of the normal equations. """ weights = self.weight * self.weight_prior logging.debug(f"construct_full_normals: \t self.excess_source_variance = {self.excess_source_variance}") if total_variance is None: total_variance = self.observation_variances + self.excess_source_variance weight_parameters = weights / total_variance # obsWeight in BasicSourceUpdateCalculator logging.debug(f'construct_full_normals: \t weight_parameters (obsWeight) = {weight_parameters[0:4]}...') logging.debug(f'construct_full_normals: \t design_matrix_coefficients = {self.design_matrix_coefficients}...') WD = np.sqrt(weight_parameters).reshape(-1, 1) * self.design_matrix_coefficients # == Math.sqrt(obsWeight) * obs.partialDerivatives wh = np.sqrt(weight_parameters) * self.dependent_variable # == Math.sqrt(obsWeight) * obs.prefitResidual logging.debug(f'construct_full_normals: \t Math.sqrt(obsWeight) * obs.partialDerivatives=(normalsLhs)=(WD)={WD[0:4]}...') logging.debug(f'construct_full_normals: \t Math.sqrt(obsWeight) * obs.prefitResidual=(wh)={wh[0:4]}...') rhs = WD.T @ wh # right-hand side of the normal equations parameter_covariance_matrix_formal_inverse = WD.T @ WD # normalsFull in BasicSourceUpdateCalculator # add a prior when specified if self.gaussian_priors is not None: logging.debug(self.gaussian_priors) # add Gaussian prior on the matrix diagonal diagonal_indices = np.diag_indices_from(parameter_covariance_matrix_formal_inverse) for ii, prior in enumerate(self.gaussian_priors): if prior is not None: parameter_covariance_matrix_formal_inverse[diagonal_indices[0][ii], diagonal_indices[1][ii]] += 1 / (float(prior) ** 2) logging.debug(f'construct_full_normals: \t parameter_covariance_matrix_formal_inverse (normalsFull) = \ {parameter_covariance_matrix_formal_inverse}') logging.debug(f'construct_full_normals: \t rhs (rhs)={rhs}') return parameter_covariance_matrix_formal_inverse, rhs
[docs] def get_weighted_ssr_and_derivative(self): # , excess_source_variance=0): """Return weighted sum of squared residuals and its derivative.""" normals_full, rhs = self.construct_full_normals() parameters = np.linalg.solve(normals_full, rhs) residuals = self.dependent_variable - self.design_matrix_coefficients @ parameters logging.debug(f"get_weighted_ssr_and_derivative PostfitResiduals = {residuals[0:4]}...") ssr = np.sum(residuals ** 2 * self.weight * 1 / (self.observation_variances + self.excess_source_variance)) ssr_derivative = -1 * np.sum(residuals ** 2 * self.weight * (1 / (self.observation_variances + self.excess_source_variance)) ** 2) return ssr, ssr_derivative
[docs] def get_improved_estimate_of_excess_source_variance(self, dof): """Return an improved estimate of the excess source noise. Note ---- This uses the currently set data, variances and downweights. It implements a single iteration of algorithm P1 in GAIA-C3-TN-LU-LL-083. See the Java method getImprovedEstimateOfExcessSourceVariance(int dof). Parameters ---------- dof : int degrees of freedom Returns ------- float Applied change in excess source variance. """ s = self.get_weighted_ssr_and_derivative() logging.debug(f"get_improved_estimate_of_excess_source_variance self.get_weighted_ssr_and_derivative() \ = {self.get_weighted_ssr_and_derivative()}") deltaVar = -(s[0] / s[1]) * (s[0] / dof - 1.0) return np.max(np.array([self.excess_source_variance + deltaVar, 0.0]))
[docs] def calculate_excess_source_noise(self): """Compute and return excess source noise. Note ---- Calculates and sets the excess source noise and significance for the currently set (fixed) variances and downweights. This implements algorithm P1 in GAIA-C3-TN-LU-LL-083. See corresponding Java code: gaia.cu3.agistools.algo.gis.source.RobustSourceUpdateCalculator.calculateExcessSourceNoise() Returns ------- float The source excess noise. float The source excess noise significance. """ self.excess_source_variance = 0 excess_source_noise_significance = 0. ssr, ssr_derivative = self.get_weighted_ssr_and_derivative() sMin0 = ssr dof = self.n_effective_degrees_of_freedom logging.debug(f"calculate_excess_source_noise: dof = {dof}") iter_excess = 0 if (sMin0 > dof): excess_source_variance_old = self.excess_source_variance excess_source_variance_improved = self.get_improved_estimate_of_excess_source_variance(dof) self.excess_source_variance = excess_source_variance_improved excess_source_variance_delta = excess_source_variance_improved - excess_source_variance_old while (np.abs(excess_source_variance_delta) > AGIS_SOLVER_OPTIONS['tolEsvExcess'] * self.excess_source_variance) \ and (iter_excess < AGIS_SOLVER_OPTIONS['maxIterExcess']): logging.debug(f"calculate_excess_source_noise: Excess noise iteration {iter_excess}: \ excess_source_variance = {self.excess_source_variance}") iter_excess += 1 excess_source_variance_old = self.excess_source_variance excess_source_variance_improved = self.get_improved_estimate_of_excess_source_variance(dof) self.excess_source_variance = excess_source_variance_improved excess_source_variance_delta = excess_source_variance_improved - excess_source_variance_old logging.debug(f"calculate_excess_source_noise: Excess noise iteration {iter_excess}: \ excess_source_variance = {self.excess_source_variance}") # // Eq.(10) excess_source_noise_significance = (sMin0 - dof) / np.sqrt(2.0 * dof) return self.excess_source_noise, excess_source_noise_significance
[docs] def get_downweights(self, multiply_by_prior: bool): """Return array of the currently defined downweights. Returns ------- ndarray Array of downweights. """ downweights = self.weight if multiply_by_prior: downweights *= self.weight_prior return downweights
[docs] def calculate_downweights(self, residuals): """Return downweights for a given array of residuals. Note ---- See gaia.cu3.agistools.algo.gis.source.RobustSourceUpdateCalculator.calculateDownweights. Calculates downweights for a given array of residuals, using the current variances and excess source noise. This implements algorithm P2, i.e. Eq. (20) in GAIA-C3-TN-LU-LL-083. Parameters ---------- residuals : ndarray Array of residuals. Returns ------- ndarray Array of downweights. """ downweights = self.get_downweights(False) total_variance = self.observation_variances + self.excess_source_variance normalised_residuals = residuals / np.sqrt(total_variance) downweights_new = AGIS_SOLVER_OPTIONS['downweighter'](normalised_residuals, AGIS_SOLVER_OPTIONS['dfWeight']) # Only update the downweights if they are being used index_used = np.where(self.weight_prior == 1)[0] downweights[index_used] = downweights_new[index_used] return downweights
[docs] def agis_weight_function(z): """Evaluate AGIS weight function as defined in Eq. 66 of Lindegren et al. 2012. Parameters ---------- z : ndarray Input data array Returns ------- ndarray An array on weights """ x = np.abs(z) return np.piecewise(x, [x <= 2, (2 < x) & (x < 3), x >= 3], [1, lambda x: 1 - 1.773735 * (x - 2) ** 2 + 1.141615 * (x - 2) ** 3, lambda x: np.exp(-x / 3)])
[docs] def agis_weights(residuals, uncertainties, excess_noise, weight_threshold=0.2): """Return the residual down weight factors and identify outliers. Note ---- See procedure P2 in GAIA-C3-TN-LU-LL-083 (Eq. 21) from the `Archive documentation <https://gea.esac.esa.int/archive/documentation/GEDR3/Gaia_archive/chap_datamodel/sec_dm_main_tables/ssec_dm_gaia_source.html>`__. astrometric_n_good_obs_al : Number of good observations AL (short) Number of AL observations (= CCD transits) that were not strongly downweighted in the astrometric solution of the source. Strongly downweighted observations (with downweighting factor 𝑤<0.2) are instead counted in astrometric_n_bad_obs_al. The sum of astrometric_n_good_obs_al and astrometric_n_bad_obs_al equals astrometric_n_obs_al, the total number of AL observations used in the astrometric solution of the source. Parameters ---------- residuals: narray Array of residuals. uncertainties: narray Array of uncertainties. excess_noise: narray. Array of excess noises. weight_threshold: float Cutoff below which measurements are considered to be outliers. Returns ------- ndarray Array of weights. int Number of outliers. ndarray Array of the good indices. """ if np.any(np.sqrt(uncertainties ** 2 + excess_noise ** 2) == 0): logging.debug(f'excess_noise={excess_noise}') logging.warning('agis_weights was given uncertainties with at least one zero entry.') argument = residuals / np.sqrt(uncertainties ** 2 + excess_noise ** 2) # normalizedResidual in calculateDownweights # compute weights weights = agis_weight_function(argument) # outlier_index = np.where(weights < weight_threshold)[0] # index of ‘good’ (i.e. not strongly downweighted) observations good_index = np.where(weights >= weight_threshold)[0] # compute number of identified outliers n_outliers = len(residuals) - len(good_index) return weights, n_outliers, good_index