Strategies

Federated Averaging

class substrafl.strategies.FedAvg(algo: Algo, metric_functions: Dict[str, Callable] | List[Callable] | Callable | None = None)

Bases: Strategy

Federated averaging strategy.

Federated averaging is the simplest federating strategy. A round consists in performing a predefined number of forward/backward passes on each client, aggregating updates by computing their means and distributing the consensus update to all clients. In FedAvg, strategy is performed in a centralized way, where a single server or AggregationNodeProtocol communicates with a number of clients TrainDataNodeProtocol and TestDataNodeProtocol.

Formally, if \(w_t\) denotes the parameters of the model at round \(t\), a single round consists in the following steps:

\[\Delta w_t^{k} = \mathcal{O}^k_t(w_t| X_t^k, y_t^k, m) \Delta w_t = \sum_{k=1}^K \frac{n_k}{n} \Delta w_t^k w_{t + 1} = w_t + \Delta w_t\]

where \(\mathcal{O}^k_t\) is the local optimizer algorithm of client \(k\) taking as argument the averaged weights as well as the \(t\)-th batch of data for local worker \(k\) and the number of local updates \(m\) to perform, and where \(n_k\) is the number of samples for worker \(k\), \(n = \sum_{k=1}^K n_k\) is the total number of samples.

Parameters:
  • algo (Algo) – The algorithm your strategy will execute (i.e. train and test on all the specified nodes)

  • metric_functions (Optional[Union[Dict[str, Callable], List[Callable], Callable]]) – list of Functions that implement the different metrics. If a Dict is given, the keys will be used to register the result of the associated function. If a Function or a List is given, function.__name__ will be used to store the result.

avg_shared_states(shared_states: List[FedAvgSharedState] | None) FedAvgAveragedState

Compute the weighted average of all elements returned by the train methods of the user-defined algorithm. The average is weighted by the proportion of the number of samples.

Example

shared_states = [
    {"weights": [3, 3, 3], "gradient": [4, 4, 4], "n_samples": 20},
    {"weights": [6, 6, 6], "gradient": [1, 1, 1], "n_samples": 40},
]
result = {"weights": [5, 5, 5], "gradient": [2, 2, 2]}
Parameters:

shared_states (List[FedAvgSharedState]) – The list of the shared_state returned by the train method of the algorithm for each organization.

Raises:
  • EmptySharedStatesError – The train method of your algorithm must return a shared_state

  • TypeError – Each shared_state must contains the key n_samples

  • TypeError – Each shared_state must contains at least one element to average

  • TypeError – All the elements of shared_states must be similar (same keys)

  • TypeError – All elements to average must be of type np.ndarray

Returns:

A dict containing the weighted average of each input parameters without the passed key “n_samples”.

Return type:

FedAvgAveragedState

property name: StrategyName

The name of the strategy

Returns:

Name of the strategy

Return type:

StrategyName

perform_evaluation(test_data_nodes: List[TestDataNodeProtocol], train_data_nodes: List[TrainDataNodeProtocol], round_idx: int)

Perform evaluation on test_data_nodes.

Parameters:
  • test_data_nodes (List[TestDataNodeProtocol]) – test data nodes to perform the prediction from the algo on.

  • train_data_nodes (List[TrainDataNodeProtocol]) – train data nodes the model has been trained on.

  • round_idx (int) – round index.

perform_round(*, train_data_nodes: List[TrainDataNodeProtocol], aggregation_node: AggregationNodeProtocol, round_idx: int, clean_models: bool, additional_orgs_permissions: set | None = None)
One round of the Federated Averaging strategy consists in:
  • if round_idx==0: initialize the strategy by performing a local update

    (train on n mini-batches) of the models on each train data node

  • aggregate the model shared_states

  • set the model weights to the aggregated weights on each train data nodes

  • perform a local update (train on n mini-batches) of the models on each train data nodes

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – List of the nodes on which to perform local updates.

  • aggregation_node (AggregationNodeProtocol) – Node without data, used to perform operations on the shared states of the models

  • round_idx (int) – Round number, it starts at 0.

  • clean_models (bool) – Clean the intermediary models of this round on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed.

  • additional_orgs_permissions (Optional[set]) – Additional permissions to give to the model outputs after training, in order to test the model on an other organization.

Scaffold

class substrafl.strategies.Scaffold(algo: Algo, aggregation_lr: float = 1, metric_functions: Dict[str, Callable] | List[Callable] | Callable | None = None)

Bases: Strategy

Scaffold strategy. Paper: https://arxiv.org/pdf/1910.06378.pdf Scaffold is Federated Averaging with control variates. By adding auxiliary variables in the clients and server the authors of the related paper prove better bounds on the convergence assuming certain hypothesis, in particular with non-iid data.

A round consists in performing a predefined number of forward/backward passes on each client, aggregating updates by computing their means and distributing the consensus update to all clients. In Scaffold, strategy is performed in a centralized way, where a single server or AggregationNodeProtocol communicates with a number of clients TrainDataNodeProtocol and TestDataNodeProtocol.

Parameters:
  • algo (Algo) – The algorithm your strategy will execute (i.e. train and test on all the specified nodes)

  • aggregation_lr (float, Optional) – Global aggregation rate applied on the averaged weight updates (eta_g in the paper). Defaults to 1. Must be >=0.

  • metric_functions (Optional[Union[Dict[str, Callable], List[Callable], Callable]]) – list of Functions that implement the different metrics. If a Dict is given, the keys will be used to register the result of the associated function. If a Function or a List is given, function.__name__ will be used to store the result.

avg_shared_states(shared_states: List[ScaffoldSharedState] | None) ScaffoldAveragedStates

Performs the aggregation of the shared states returned by the train methods of the user-defined algorithm, according to the server operations of the Scaffold Algo.

  1. Computes the weighted average of the weight updates and applies the aggregation learning rate

  2. Updates the server control variate with the weighted average of the control variate updates

The average is weighted by the proportion of the number of samples.

Parameters:

shared_states (List[ScaffoldSharedState]) – Shared state returned by the train method of the algorithm for each client (e.g. algorithms.pytorch.scaffold.train)

Returns:

averaged weight updates and updated server control variate

Return type:

ScaffoldAveragedStates

property name: StrategyName

The name of the strategy

Returns:

Name of the strategy

Return type:

StrategyName

perform_evaluation(test_data_nodes: List[TestDataNodeProtocol], train_data_nodes: List[TrainDataNodeProtocol], round_idx: int)

Perform evaluation on test_data_nodes.

Parameters:
  • test_data_nodes (List[TestDataNodeProtocol]) – test data nodes to perform the prediction from the algo on.

  • train_data_nodes (List[TrainDataNodeProtocol]) – train data nodes the model has been trained on.

  • round_idx (int) – round index.

perform_round(*, train_data_nodes: List[TrainDataNodeProtocol], aggregation_node: AggregationNodeProtocol, round_idx: int, clean_models: bool, additional_orgs_permissions: set | None = None)
One round of the Scaffold strategy consists in:
  • if round_idx==0: initialize the strategy by performing a local update

    (train on n mini-batches) of the models on each train data nodes

  • aggregate the model shared_states

  • set the model weights to the aggregated weights on each train data nodes

  • perform a local update (train on n mini-batches) of the models on each train data nodes

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – List of the organizations on which to perform

  • aggregation_node (AggregationNodeProtocol) – Node without data, used to perform operations on the shared states of the models

  • round_idx (int) – Round number, it starts at 0.

  • clean_models (bool) – Clean the intermediary models of this round on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed.

  • additional_orgs_permissions (Optional[set]) – Additional permissions to give to the model outputs after training, in order to test the model on an other organization.

  • aggregation_node

Newton Raphson

class substrafl.strategies.NewtonRaphson(algo: Algo, damping_factor: float, metric_functions: Dict[str, Callable] | List[Callable] | Callable | None = None)

Bases: Strategy

Newton-Raphson strategy.

Newton-Raphson strategy is based on Newton-Raphson distributed method. It leads to a faster convergence than a standard FedAvg strategy, however it can only be used on convex problems.

See https://en.wikipedia.org/wiki/Newton%27s_method_in_optimization for more details.

In a local step, the first order derivative (gradients) and the second order derivative (Hessian Matrix) of loss with respect to weights is calculated for each node.

Hessians and gradients are averaged on the aggregation node.

Updates of the weights are then calculated using the formula:

\[update = -\eta * H^{-1}_\theta.\Delta_\theta\]

Where \(H\) is the Hessian of the loss with respect to \(\theta\) and \(\Delta_\theta\) is the gradients of the loss with respect to \(\theta\) and \(0 < \eta <= 1\) is the damping factor.

Parameters:
  • algo (Algo) – The algorithm your strategy will execute (i.e. train and test on all the specified nodes)

  • damping_factor (float) – Must be between 0 and 1. Multiplicative coefficient of the parameters update. Smaller value for \(\eta\) will increase the stability but decrease the speed of convergence of the gradient descent. Recommended value: damping_factor=0.8.

  • metric_functions (Optional[Union[Dict[str, Callable], List[Callable], Callable]]) – list of Functions that implement the different metrics. If a Dict is given, the keys will be used to register the result of the associated function. If a Function or a List is given, function.__name__ will be used to store the result.

compute_averaged_states(shared_states: List[NewtonRaphsonSharedState] | None) NewtonRaphsonAveragedStates

Given the gradients and the Hessian (the second order derivative of loss with respect to weights), updates of the weights are calculated using the formula:

\[update = -\eta * H^{-1}_\theta.\Delta_\theta\]

Where \(H\) is the Hessian of the loss with respect to \(\theta\) and \(\Delta_\theta\) is the gradients of the loss with respect to \(\theta\) and \(0 < \eta <= 1\) is the damping factor.

The average is weighted by the number of samples.

Example

shared_states = [
    {"gradients": [1, 1, 1], "hessian": [[1, 0, 0], [0, 1, 0], [0, 0, 1]], "n_samples": 2},
    {"gradients": [2, 2, 2], "hessian": [[2, 0, 0], [0, 2, 0], [0, 0, 2]], "n_samples": 1}
]
damping_factor = 1

average = {"gradients": [4/3, 4/3, 4/3], "hessian": [[4/3, 0, 0],[0, 4/3, 0], [0, 0, 4/3]]}
Parameters:

shared_states (List[NewtonRaphsonSharedState]) – The list of the shared_state returned by the train method of the algorithm for each node.

Raises:
  • EmptySharedStatesError – The train method of your algorithm must return a shared_state

  • TypeError – Each shared_state must contains the key n_samples, gradients and hessian

  • TypeError – Each shared_state must contains at least one element to average

  • TypeError – All the elements of shared_states must be similar (same keys)

  • TypeError – All elements to average must be of type np.array

Returns:

A dict containing the parameters updates of each input parameters.

Return type:

NewtonRaphsonAveragedStates

property name: StrategyName

The name of the strategy

Returns:

Name of the strategy

Return type:

StrategyName

perform_evaluation(test_data_nodes: List[TestDataNodeProtocol], train_data_nodes: List[TrainDataNodeProtocol], round_idx: int)

Perform evaluation on test_data_nodes.

Parameters:
  • test_data_nodes (List[TestDataNodeProtocol]) – test data nodes to perform the prediction from the algo on.

  • train_data_nodes (List[TrainDataNodeProtocol]) – train data nodes the model has been trained on.

  • round_idx (int) – round index.

perform_round(*, train_data_nodes: List[TrainDataNodeProtocol], aggregation_node: AggregationNodeProtocol, round_idx: int, clean_models: bool, additional_orgs_permissions: set | None = None)

One round of the Newton-Raphson strategy consists in:

  • if round_ids==0: initialize the strategy by performing a local update of the models on each train data nodes

  • aggregate the model shared_states

  • set the model weights to the aggregated weights on each train data nodes

  • perform a local update of the models on each train data nodes

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – List of the nodes on which to perform local updates

  • aggregation_node (AggregationNodeProtocol) – node without data, used to perform operations on the shared states of the models

  • round_idx (int) – Round number, it starts at 0.

  • clean_models (bool) – Clean the intermediary models of this round on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed.

  • additional_orgs_permissions (Optional[set]) – Additional permissions to give to the model outputs after training, in order to test the model on an other organization.

Fed PCA

class substrafl.strategies.FedPCA(algo: Algo, metric_functions: Dict[str, Callable] | List[Callable] | Callable | None = None)

Bases: Strategy

Federated Principal Component Analysis strategy.

The goal of this strategy is to perform a principal component analysis (PCA) by computing the eigen vectors with highest eigen values of the covariance matrices regarding the provided data.

We assume we have clients indexed by \(j\), with \(n_j\) samples each.

We note \(N = \sum_j n_j\) the total number of samples. We denote \(D\) the dimension of the data, and \(K\) the number of eigen vectors computed.

This strategy implementation is based on the federated iteration algorithm described by:

Anne Hartebrodt, Richard Röttger, Federated horizontally partitioned principal component analysis for biomedical applications, Bioinformatics Advances, Volume 2, Issue 1, 2022, vbac026, https://doi.org/10.1093/bioadv/vbac026 (algorithm 3 of the paper)

During the process, the local covariance matrices of each center are not shared. However, the column-wise mean of each dataset is shared across the centers.

The federated iteration is divided in three steps.

Step 1:
  • For \(d= 1, ..., D\), each center computes the mean of the \(d\) component of their dataset and share it to the central aggregator. The central aggregator averages the local mean and send to all the clients the global column-wise means of data.

Step 2:
  • Each center normalize their dataset by subtracting the global mean column-wise and compute the covariance matrix of their local data after mean-subtraction. We denote by \(C_j\) the local covariance matrices.

We initialize \(eig_0\): a matrix of size \(D \times K\) corresponding to the \(K\) eigen vectors we want to compute

Step 3, for a given number of rounds (rounds are labeled by \(r\)) we perform the following:
Step 3.1:
  • Each center \(j\) computes \(eig^r_j = C_j.eig^{r-1}_j\)

Step 3.2:
  • The aggregator computes \(eig^r = \frac{1}{N}\sum_j n_j eig^r_j\)

Step 3:3:
  • The aggregator performs a QR decomposition: \(eig^r \mapsto QR(eig^r)\) and shares \(eig^r\) to all the clients

\(eig^r\) will converge to the \(K\) eigen-vectors of the global covariance matrix with the highest eigen-values.

Parameters:
  • algo (Algo) – The algorithm your strategy will execute (i.e. train and test on all the specified nodes)

  • metric_functions (Optional[Union[Dict[str, Callable], List[Callable], Callable]]) – list of Functions that implement the different metrics. If a Dict is given, the keys will be used to register the result of the associated function. If a Function or a List is given, function.__name__ will be used to store the result.

avg_shared_states(shared_states: List[FedPCASharedState] | None) FedPCAAveragedState

Compute the weighted average of all elements returned by the train methods of the user-defined algorithm. The average is weighted by the proportion of the number of samples.

Example

shared_states = [
    {"parameters_update": [3, 6, 1], "n_samples": 20},
    {"parameters_update": [6, 3, 1], "n_samples": 40},
]
result = {"parameters_update": [5, 4, 1]}
Parameters:

shared_states (List[FedPCASharedState]) – The list of the shared_state returned by the train method of the algorithm for each organization.

Raises:
  • EmptySharedStatesError – The train method of your algorithm must return a shared_state

  • TypeError – Each shared_state must contains the key n_samples

  • TypeError – Each shared_state must contains at least one element to average

  • TypeError – All the elements of shared_states must be similar (same keys)

  • TypeError – All elements to average must be of type np.ndarray

Returns:

A dict containing the weighted average of each input parameters without the passed key “n_samples”.

Return type:

FedPCAAveragedState

avg_shared_states_with_qr(shared_states: List[FedPCASharedState] | None) FedPCAAveragedState

Compute the weighted average of all elements returned by the train methods of the user-defined algorithm and factorize the obtained matrix with a QR decomposition, where Q is orthonormal and R is upper-triangular.

The returned FedPCAAveragedState the Q matrix only.

Parameters:

shared_states (List[FedPCASharedState]) – The list of the shared_state returned by the train method of the algorithm for each organization.

Raises:

EmptySharedStatesError – The train method of your algorithm must return a shared_state

Returns:

returned the Q matrix as a FedPCAAveragedState.

Return type:

FedPCAAveragedState

property name: StrategyName

The name of the strategy

Returns:

Name of the strategy

Return type:

StrategyName

perform_evaluation(test_data_nodes: List[TestDataNodeProtocol], train_data_nodes: List[TrainDataNodeProtocol], round_idx: int) None

Perform evaluation on test_data_nodes. Perform prediction before round 3 is not take into account as all objects to compute prediction are not initialize before the second round.

Parameters:
  • test_data_nodes (List[TestDataNodeProtocol]) – test data nodes to perform the prediction from the algo on.

  • train_data_nodes (List[TrainDataNodeProtocol]) – train data nodes the model has been trained on.

  • round_idx (int) – round index.

Return type:

None

perform_round(train_data_nodes: List[TrainDataNodeProtocol], aggregation_node: AggregationNodeProtocol, round_idx: int, clean_models: bool, additional_orgs_permissions: set | None = None) None

The Federated Principal Component Analysis strategy uses the first two rounds as pre-processing rounds.

Three type of rounds:
  • Compute the average mean on all centers at round 1.

  • Compute the local covariance matrix of each center at round 2.

  • Use the local covariance matrices to compute the orthogonal matrix for every next rounds.

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – List of the nodes on which to perform local updates.

  • aggregation_node (AggregationNodeProtocol) – Node without data, used to perform operations on the shared states of the models

  • round_idx (int) – Round number, it starts at 0.

  • clean_models (bool) – Clean the intermediary models of this round on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed.

  • additional_orgs_permissions (Optional[set]) – Additional permissions to give to the model outputs after training, in order to test the model on an other organization.

Return type:

None

Single Organization

class substrafl.strategies.SingleOrganization(algo: Algo, metric_functions: Dict[str, Callable] | List[Callable] | Callable | None = None)

Bases: Strategy

Single organization strategy.

Single organization is not a real federated strategy and it is rather used for testing as it is faster than other ‘real’ strategies. The training and prediction are performed on a single Node. However, the number of passes to that Node (num_rounds) is still defined to test the actual federated setting. In SingleOrganization strategy a single client TrainDataNodeProtocol and TestDataNodeProtocol performs all the model execution.

Parameters:
  • algo (Algo) – The algorithm your strategy will execute (i.e. train and test on all the specified nodes)

  • metric_functions (Optional[Union[Dict[str, Callable], List[Callable], Callable]]) – list of Functions that implement the different metrics. If a Dict is given, the keys will be used to register the result of the associated function. If a Function or a List is given, function.__name__ will be used to store the result.

initialization_round(*, train_data_nodes: List[TrainDataNodeProtocol], clean_models: bool, round_idx: int | None = 0, additional_orgs_permissions: set | None = None)

Call the initialize function of the algo on each train node.

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – list of the train organizations

  • clean_models (bool) – Clean the intermediary models of this round on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed.

  • round_idx (Optional[int]) – index of the round. Defaults to 0.

  • additional_orgs_permissions (Optional[set]) – Additional permissions to give to the model outputs after training, in order to test the model on an other organization. Default to None

property name: StrategyName

The name of the strategy

Returns:

Name of the strategy

Return type:

StrategyName

perform_evaluation(test_data_nodes: List[TestDataNodeProtocol], train_data_nodes: List[TrainDataNodeProtocol], round_idx: int)

Perform evaluation on test_data_nodes.

Parameters:
  • test_data_nodes (List[TestDataNodeProtocol]) – test data nodes to perform the prediction from the algo on.

  • train_data_nodes (List[TrainDataNodeProtocol]) – train data nodes the model has been trained on.

  • round_idx (int) – round index.

perform_round(*, train_data_nodes: List[TrainDataNodeProtocol], round_idx: int, clean_models: bool, aggregation_node: AggregationNodeProtocol | None = None, additional_orgs_permissions: set | None = None)

One round of the SingleOrganization strategy: perform a local update (train on n mini-batches) of the models on a given data node

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – List of the nodes on which to perform local updates, there should be exactly one item in the list.

  • aggregation_node (AggregationNodeProtocol) – Should be None otherwise it will be ignored

  • round_idx (int) – Round number, it starts at 0.

  • clean_models (bool) – Clean the intermediary models of this round on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed.

  • additional_orgs_permissions (Optional[set]) – Additional permissions to give to the model outputs after training, in order to test the model on an other organization.

Strategies Base Class

class substrafl.strategies.strategy.Strategy(algo: Algo, metric_functions: Dict[str, Callable] | List[Callable] | Callable | None = None, *args, **kwargs)

Bases: ComputePlanBuilder

Base strategy to be inherited from SubstraFL strategies.

All child class arguments need to be passed to it through its args and kwargs in order to use them when instantiating it as a RemoteStruct in each process.

Example

class MyStrat(Strategy):
    def __init__(self, algo, my_custom_arg):
        super().__init__(algo=algo, my_custom_arg=my_custom_arg)
Parameters:
  • algo (Algo) – The algorithm your strategy will execute (i.e. train and test on all the specified nodes)

  • metric_functions (Optional[Union[Dict[str, Callable], List[Callable], Callable]]) – list of Functions that implement the different metrics. If a Dict is given, the keys will be used to register the result of the associated function. If a Function or a List is given, function.__name__ will be used to store the result.

Raises:

exceptions.IncompatibleAlgoStrategyError – Raise an error if the strategy name is not in algo.strategies.

build_compute_plan(train_data_nodes: List[TrainDataNodeProtocol], aggregation_node: List[AggregationNodeProtocol] | None, evaluation_strategy: EvaluationStrategy | None, num_rounds: int, clean_models: bool | None = True) None

Build the compute plan of the strategy. The built graph will be stored by side effect in the given train_data_nodes, aggregation_nodes and evaluation_strategy. This function create a graph be first calling the initialization_round method of the strategy at round 0, and then call the perform_round method for each new round. If the current round is part of the evaluation strategy, the perform_evaluation method is called to complete the graph.

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – list of the train organizations

  • aggregation_node (Optional[AggregationNodeProtocol]) – aggregation node, necessary for centralized strategy, unused otherwise

  • evaluation_strategy (Optional[EvaluationStrategy]) – evaluation strategy to follow for testing models.

  • num_rounds (int) – Number of times to repeat the compute plan sub-graph (define in perform round).

  • clean_models (bool) – Clean the intermediary models on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed. Defaults to True.

Returns:

None

Return type:

None

evaluate(data_from_opener: Any, shared_state: Any | None = None) Dict[str, float]

Is executed for each TestDataOrganizations.

Parameters:
  • data_from_opener (Any) – The output of the get_data method of the opener.

  • shared_state (Any) – None for the first round of the computation graph then the returned object from the previous organization of the computation graph.

Returns:

keys of the dict are the metric name, and values are the computed

performances.

Return type:

Dict[str, float]

initialization_round(*, train_data_nodes: List[TrainDataNodeProtocol], clean_models: bool, round_idx: int | None = 0, additional_orgs_permissions: set | None = None)

Call the initialize function of the algo on each train node.

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – list of the train organizations

  • clean_models (bool) – Clean the intermediary models of this round on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed.

  • round_idx (Optional[int]) – index of the round. Defaults to 0.

  • additional_orgs_permissions (Optional[set]) – Additional permissions to give to the model outputs after training, in order to test the model on an other organization. Default to None

load_local_state(path: Path) Any

Executed at the beginning of each step of the computation graph to load on each organization the previously saved local state.

Parameters:

path (pathlib.Path) – The path where the previous local state has been saved.

Returns:

The loaded element.

Return type:

Any

abstract property name: StrategyName

The name of the strategy

Returns:

Name of the strategy

Return type:

StrategyName

abstract perform_evaluation(test_data_nodes: List[TestDataNodeProtocol], train_data_nodes: List[TrainDataNodeProtocol], round_idx: int)

Perform the evaluation of the algo on each test nodes. Gets the model from a train organization and evaluate it on the test nodes.

Parameters:
abstract perform_round(*, train_data_nodes: List[TrainDataNodeProtocol], aggregation_node: AggregationNodeProtocol | None, round_idx: int, clean_models: bool, additional_orgs_permissions: set | None = None)

Perform one round of the strategy

Parameters:
  • train_data_nodes (List[TrainDataNodeProtocol]) – list of the train organizations

  • aggregation_node (Optional[AggregationNodeProtocol]) – aggregation node, necessary for centralized strategy, unused otherwise

  • round_idx (int) – index of the round

  • clean_models (bool) – Clean the intermediary models of this round on the Substra platform. Set it to False if you want to download or re-use intermediary models. This causes the disk space to fill quickly so should be set to True unless needed.

  • additional_orgs_permissions (Optional[set]) – Additional permissions to give to the model outputs after training, in order to test the model on an other organization.

save_local_state(path: Path) None

Executed at the end of each step of the computation graph to save the local state locally on each organization.

Parameters:

path (pathlib.Path) – The path where the previous local state has been saved.

Returns:

None

Return type:

None

Schemas

Schemas used in the strategies.

class substrafl.strategies.schemas.FedAvgAveragedState(*, avg_parameters_update: List[ndarray])

Bases: _Model

Shared state sent by the aggregate_organization in the federated averaging strategy.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

avg_parameters_update (List[ndarray]) –

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'avg_parameters_update': FieldInfo(annotation=List[ndarray], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class substrafl.strategies.schemas.FedAvgSharedState(*, n_samples: int, parameters_update: List[ndarray])

Bases: _Model

Shared state returned by the train method of the algorithm for each client, received by the aggregate function in the federated averaging strategy.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'n_samples': FieldInfo(annotation=int, required=True), 'parameters_update': FieldInfo(annotation=List[ndarray], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class substrafl.strategies.schemas.FedPCAAveragedState(*, avg_parameters_update: List[ndarray])

Bases: _Model

Shared state sent by the aggregate_organization in the federated PCA strategy.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

avg_parameters_update (List[ndarray]) –

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'avg_parameters_update': FieldInfo(annotation=List[ndarray], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class substrafl.strategies.schemas.FedPCASharedState(*, n_samples: int, parameters_update: List[ndarray])

Bases: _Model

Shared state returned by the train method of the algorithm for each client, received by the aggregate function in the federated PCA strategy.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'n_samples': FieldInfo(annotation=int, required=True), 'parameters_update': FieldInfo(annotation=List[ndarray], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class substrafl.strategies.schemas.NewtonRaphsonAveragedStates(*, parameters_update: List[ndarray])

Bases: _Model

Shared state sent by the aggregate_organization in the Newton Raphson strategy.

Parameters:

parameters_update (numpy.ndarray) – the new parameters_update sent to the clients

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'parameters_update': FieldInfo(annotation=List[ndarray], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class substrafl.strategies.schemas.NewtonRaphsonSharedState(*, n_samples: int, gradients: List[ndarray], hessian: ndarray)

Bases: _Model

Shared state returned by the train method of the algorithm for each client, received by the aggregate function in the Newton Raphson strategy.

Parameters:
  • n_samples (int) – number of samples of the client dataset.

  • gradients (numpy.ndarray) – gradients of the model parameters \(\theta\).

  • hessian (numpy.ndarray) – second derivative of the loss function regarding the model parameters \(\theta\).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'gradients': FieldInfo(annotation=List[ndarray], required=True), 'hessian': FieldInfo(annotation=ndarray, required=True), 'n_samples': FieldInfo(annotation=int, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class substrafl.strategies.schemas.ScaffoldAveragedStates(*, server_control_variate: List[ndarray], avg_parameters_update: List[ndarray])

Bases: _Model

Shared state sent by the aggregate_organization (returned by the func strategies.scaffold.avg_shared_states)

Parameters:
  • server_control_variate (List[numpy.ndarray]) – the new server_control_variate sent to the clients

  • avg_parameters_update (List[numpy.ndarray]) – the weighted average of the parameters_update from each client

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'avg_parameters_update': FieldInfo(annotation=List[ndarray], required=True), 'server_control_variate': FieldInfo(annotation=List[ndarray], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class substrafl.strategies.schemas.ScaffoldSharedState(*, parameters_update: List[ndarray], control_variate_update: List[ndarray], n_samples: int, server_control_variate: List[ndarray])

Bases: _Model

Shared state returned by the train method of the algorithm for each client (e.g. algorithms.pytorch.scaffold.train)

Parameters:
  • parameters_update (List[numpy.ndarray]) – the weight update of the client (delta between fine-tuned weights and previous weights)

  • control_variate_update (List[numpy.ndarray]) – the control_variate update of the client

  • n_samples (int) – the number of samples of the client

  • server_control_variate (List[numpy.ndarray]) – the server control variate (c in the Scaffold paper’s Algo). It is sent by every client as the aggregation node doesn’t have a persistent state, and should be the same for each client as it should not be modified in the client Algo

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'control_variate_update': FieldInfo(annotation=List[ndarray], required=True), 'n_samples': FieldInfo(annotation=int, required=True), 'parameters_update': FieldInfo(annotation=List[ndarray], required=True), 'server_control_variate': FieldInfo(annotation=List[ndarray], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class substrafl.strategies.schemas.StrategyName(value)

Bases: str, Enum

An enumeration.

_generate_next_value_(start, count, last_values)

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_value: the last value assigned or None

_member_type_

alias of str

class substrafl.strategies.schemas._Model

Bases: BaseModel

Base model configuration

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.