Algorithms

Torch Algorithms

TorchFedAvgAlgo

class substrafl.algorithms.pytorch.TorchFedAvgAlgo(model: torch.nn.modules.module.Module, criterion: torch.nn.modules.loss._Loss, optimizer: torch.optim.optimizer.Optimizer, index_generator: substrafl.index_generator.base.BaseIndexGenerator, dataset: torch.utils.data.dataset.Dataset, scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None, with_batch_norm_parameters: bool = False, seed: Optional[int] = None, use_gpu: bool = True, *args, **kwargs)

Bases: substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

To be inherited. Wraps the necessary operation so a torch model can be trained in the Federated Averaging strategy.

The train method:

  • updates the weights of the model with the aggregated weights,

  • initializes or loads the index generator,

  • calls the _local_train() method to do the local training

  • then gets the weight updates from the models and sends them to the aggregator.

The predict method generates the predictions.

The child class can override the _local_train() and _local_predict() methods, or other methods if necessary.

To add a custom parameter to the __init__ of the class, also add it to the call to super().__init__ as shown in the example with my_custom_extra_parameter. Only primitive types (str, int, …) are supported for extra parameters.

Example

class MyAlgo(TorchFedAvgAlgo):
    def __init__(
        self,
        my_custom_extra_parameter,
    ):
        super().__init__(
            model=perceptron,
            criterion=torch.nn.MSELoss(),
            optimizer=optimizer,
            index_generator=NpIndexGenerator(
                num_updates=100,
                batch_size=32,
            ),
            dataset=MyDataset,
            my_custom_extra_parameter=my_custom_extra_parameter,
        )
    def _local_train(
        self,
        train_dataset: torch.utils.data.Dataset,
    ):

        # Create torch dataloader from the automatically instantiated dataset
        # ``train_dataset = self._dataset(datasamples=datasamples, is_inference=False)`` is executed prior
        #  the execution of this function
        train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=self._index_generator)

        for x_batch, y_batch in train_data_loader:

            # Forward pass
            y_pred = self._model(x_batch)

            # Compute Loss
            loss = self._criterion(y_pred, y_batch)
            self._optimizer.zero_grad()
            loss.backward()
            self._optimizer.step()

            if self._scheduler is not None:
                self._scheduler.step()

The __init__ functions is called at each call of the train() or predict() function For round>=2, some attributes will then be overwritten by their previous states in the load() function, before the train() or predict() function is ran.

Parameters
  • model (torch.nn.modules.module.Module) – A torch model.

  • criterion (torch.nn.modules.loss._Loss) – A torch criterion (loss).

  • optimizer (torch.optim.Optimizer) – A torch optimizer linked to the model.

  • index_generator (BaseIndexGenerator) – a stateful index generator. Must inherit from BaseIndexGenerator. The __next__ method shall return a python object (batch_index) which is used for selecting each batch from the output of the get_data method of the opener during training in this way: x[batch_index], y[batch_index]. If overridden, the generator class must be defined either as part of a package or in a different file than the one from which the execute_experiment function is called. This generator is used as stateful batch_sampler of the data loader created from the given dataset

  • dataset (torch.utils.data.Dataset) – an instantiable dataset class whose __init__ arguments are x, y and is_inference. The torch datasets used for both training and inference will be instantiate from it prior to the _local_train execution and within the predict method. The __getitem__ methods of those generated datasets must return both x (training data) and y (target values) when is_inference is set to False and only x (testing data) when is_inference is set to True. This behavior can be changed by re-writing the _local_train or predict methods.

  • scheduler (torch.optim.lr_scheduler._LRScheduler, Optional) – A torch scheduler that will be called at every batch. If None, no scheduler will be used. Defaults to None.

  • with_batch_norm_parameters (bool) – Whether to include the batch norm layer parameters in the fed avg strategy. Defaults to False.

  • seed (Optional[int]) – Seed set at the algo initialization on each organization. Defaults to None.

  • use_gpu (bool) – Whether to use the GPUs if they are available. Defaults to True.

_local_predict(predict_dataset: torch.utils.data.dataset.Dataset, predictions_path)

Executes the following operations:

  • Create the torch dataloader using the index generator batch size.

  • Sets the model to eval mode

  • Save the predictions using the _save_predictions() function.

Parameters

predict_dataset (torch.utils.data.Dataset) – predict_dataset build from the x returned by the opener.

Important

The onus is on the user to save the compute predictions. Substrafl provides the _save_predictions() to do so. The user can load those predictions from a metric file with the command: y_pred = np.load(inputs['predictions']).

Raises

BatchSizeNotFoundError – No default batch size have been found to perform local prediction. Please overwrite the predict function of your algorithm.

Parameters

predict_dataset (torch.utils.data.dataset.Dataset) –

_local_train(train_dataset: torch.utils.data.dataset.Dataset)

Local train method. Contains the local training loop.

Train the model on num_updates minibatches, using the self._index_generator generator as batch sampler for the torch dataset.

Parameters

train_dataset (torch.utils.data.Dataset) – train_dataset build from the x and y returned by the opener.

Important

You must use next(self._index_generator) as batch sampler, to ensure that the batches you are using are correct between 2 rounds of the federated learning strategy.

Example

# Create torch dataloader
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=self._index_generator)

for x_batch, y_batch in train_data_loader:

    # Forward pass
    y_pred = self._model(x_batch)

    # Compute Loss
    loss = self._criterion(y_pred, y_batch)
    self._optimizer.zero_grad()
    loss.backward()
    self._optimizer.step()

    if self._scheduler is not None:
        self._scheduler.step()
_save_predictions(predictions: torch.Tensor, predictions_path: os.PathLike)

Save the predictions under the numpy format.

Parameters
  • predictions (torch.Tensor) – predictions to save.

  • predictions_path (os.PathLike) – destination file to save predictions.

load(path: pathlib.Path) substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

Load the stateful arguments of this class. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – The path where the class has been saved.

Returns

The class with the loaded elements.

Return type

TorchAlgo

property model: torch.nn.modules.module.Module

Model exposed when the user downloads the model

Returns

model

Return type

torch.nn.Module

predict(datasamples: Any, shared_state: Optional[Any], predictions_path: os.PathLike = None) Any

Executes the following operations:

  • Create the test torch dataset.

  • Execute and return the results of the self._local_predict method

Parameters
  • datasamples (Any) – Input data

  • shared_state (Any) – Latest train task shared state (output of the train method)

  • predictions_path (os.PathLike) – Destination file to save predictions

Return type

Any

save(path: pathlib.Path)

Saves all the stateful elements of the class to the specified path. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – A path where to save the class.

property strategies: List[substrafl.schemas.StrategyName]

List of compatible strategies

Returns

typing.List[StrategyName]

Return type

List

summary()

Summary of the class to be exposed in the experiment summary file

Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

train(datasamples: Any, shared_state: Optional[substrafl.schemas.FedAvgAveragedState] = None) substrafl.schemas.FedAvgSharedState

Train method of the fed avg strategy implemented with torch. This method will execute the following operations:

  • instantiates the provided (or default) batch indexer

  • if a shared state is passed, set the parameters of the model to the provided shared state

  • train the model for n_updates

  • compute the weight update

Parameters
  • datasamples (Any) – Input data returned by the get_data method from the opener.

  • shared_state (FedAvgAveragedState, Optional) – Dict containing torch parameters that will be set to the model. Defaults to None.

Returns

weight update (delta between fine-tuned weights and previous weights)

Return type

FedAvgSharedState

TorchScaffoldAlgo

class substrafl.algorithms.pytorch.torch_scaffold_algo.CUpdateRule(value)

Bases: enum.IntEnum

The rule used to update the client control variate

Values:

  • STABLE (1): The stable rule, I in the Scaffold paper (not implemented)

  • FAST (2): The fast rule, II in the Scaffold paper

class substrafl.algorithms.pytorch.torch_scaffold_algo.TorchScaffoldAlgo(model: torch.nn.modules.module.Module, criterion: torch.nn.modules.loss._Loss, optimizer: torch.optim.optimizer.Optimizer, index_generator: substrafl.index_generator.base.BaseIndexGenerator, dataset: torch.utils.data.dataset.Dataset, scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None, with_batch_norm_parameters: bool = False, c_update_rule: substrafl.algorithms.pytorch.torch_scaffold_algo.CUpdateRule = CUpdateRule.FAST, seed: Optional[int] = None, use_gpu: bool = True, *args, **kwargs)

Bases: substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

To be inherited. Wraps the necessary operation so a torch model can be trained in the Scaffold strategy.

The train method:

  • updates the weights of the model with the aggregated weights,

  • initializes or loads the index generator,

  • calls the _local_train() method to do the local training

  • then gets the weight updates from the models and sends them to the aggregator.

The predict method generates the predictions.

The child class can override the _local_train() and _local_predict() methods, or other methods if necessary.

To add a custom parameter to the __init__``of the class, also add it to the call to ``super().__init__` as shown in the example with my_custom_extra_parameter. Only primitive types (str, int, …) are supported for extra parameters.

Example

class MyAlgo(TorchScaffoldAlgo):
    def __init__(
        self,
        my_custom_extra_parameter,
    ):
        super().__init__(
            model=perceptron,
            criterion=torch.nn.MSELoss(),
            optimizer=optimizer,
            num_updates=100,
            index_generator=NpIndexGenerator(
                num_updates=10,
                batch_size=32,
            ),
            dataset=MyDataset,
            my_custom_extra_parameter=my_custom_extra_parameter,
        )
    def _local_train(
        self,
        train_dataset: torch.utils.data.Dataset,
    ):
        # Create torch dataloader
        # ``train_dataset = self._dataset(datasamples=datasamples, is_inference=False)`` is executed
        # prior the execution of this function
        train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=self._index_generator)

        for x_batch, y_batch in train_data_loader:

            # Forward pass
            y_pred = self._model(x_batch)
            # Compute Loss
            loss = self._criterion(y_pred, y_batch)
            self._optimizer.zero_grad()
            # backward pass: compute the gradients
            loss.backward()
            # forward pass: update the weights.
            self._optimizer.step()

            # Scaffold specific: to keep between _optimizer.step() and _scheduler.step()
            # _scheduler and Scaffold strategies are not scientifically validated, it is not
            # recommended to use one. If one is used, _scheduler.step() must be called after
            # _scaffold_parameters_update()
            self._scaffold_parameters_update()
            if self._scheduler is not None:
                self._scheduler.step()

The __init__ function is called at each call of the train or predict function For round>2, some attributes will then be overwritten by their previous states in the load() function, before the train() or predict() function is ran.

Parameters
  • model (torch.nn.modules.module.Module) – A torch model.

  • criterion (torch.nn.modules.loss._Loss) – A torch criterion (loss).

  • optimizer (torch.optim.Optimizer) – A torch optimizer linked to the model.

  • index_generator (BaseIndexGenerator) – a stateful index generator. Must inherit from BaseIndexGenerator. The __next__ method shall return a python object (batch_index) which is used for selecting each batch from the output of the get_data method of the opener during training in this way: x[batch_index], y[batch_index]. If overridden, the generator class must be defined either as part of a package or in a different file than the one from which the execute_experiment function is called. This generator is used as stateful batch_sampler of the data loader created from the given dataset

  • dataset (torch.utils.data.Dataset) – an instantiable dataset class whose __init__ arguments are x, y and is_inference. The torch datasets used for both training and inference will be instantiate from it prior to the _local_train execution and within the predict method. The __getitem__ methods of those generated datasets must return both x (training data) and y (target values) when is_inference is set to False and only x (testing data) when is_inference is set to True. This behavior can be changed by re-writing the _local_train or predict methods.

  • scheduler (torch.optim.lr_scheduler._LRScheduler, Optional) – A torch scheduler that will be called at every batch. If None, no scheduler will be used. Defaults to None.

  • with_batch_norm_parameters (bool) – Whether to include the batch norm layer parameters in the fed avg strategy. Defaults to False.

  • c_update_rule (CUpdateRule) – The rule used to update the client control variate. Defaults to CUpdateRule.FAST.

  • seed (Optional[int]) – Seed set at the algo initialization on each organization. Defaults to None.

  • use_gpu (bool) – Whether to use the GPUs if they are available. Defaults to True.

Raises

NumUpdatesValueError – If num_updates is inferior or equal to zero.

_local_predict(predict_dataset: torch.utils.data.dataset.Dataset, predictions_path)

Executes the following operations:

  • Create the torch dataloader using the index generator batch size.

  • Sets the model to eval mode

  • Save the predictions using the _save_predictions() function.

Parameters

predict_dataset (torch.utils.data.Dataset) – predict_dataset build from the x returned by the opener.

Important

The onus is on the user to save the compute predictions. Substrafl provides the _save_predictions() to do so. The user can load those predictions from a metric file with the command: y_pred = np.load(inputs['predictions']).

Raises

BatchSizeNotFoundError – No default batch size have been found to perform local prediction. Please overwrite the predict function of your algorithm.

Parameters

predict_dataset (torch.utils.data.dataset.Dataset) –

_local_train(train_dataset: torch.utils.data.dataset.Dataset)

Local train method. Contains the local training loop.

Train the model on num_updates minibatches, using the self._index_generator generator as batch sampler for the torch dataset.

Parameters

train_dataset (torch.utils.data.Dataset) – train_dataset build from the x and y returned by the opener.

Important

You must use next(self._index_generator) at each minibatch, to ensure that you are using the batches are correct between 2 rounds of the federated learning strategy.

Example

# Create torch dataloader
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=self._index_generator)

for x_batch, y_batch in train_data_loader:

    # Forward pass
    y_pred = self._model(x_batch)

    # Compute Loss
    loss = self._criterion(y_pred, y_batch)
    self._optimizer.zero_grad()
    loss.backward()
    self._optimizer.step()

    # Scaffold specific function to call between self._optimizer.step() and self._scheduler.step()
    self._scaffold_parameters_update()

    if self._scheduler is not None:
        self._scheduler.step()
_save_predictions(predictions: torch.Tensor, predictions_path: os.PathLike)

Save the predictions under the numpy format.

Parameters
  • predictions (torch.Tensor) – predictions to save.

  • predictions_path (os.PathLike) – destination file to save predictions.

_scaffold_parameters_update()

Must be called for each update after the optimizer.step() operation.

load(path: pathlib.Path) substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

Load the stateful arguments of this class. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – The path where the class has been saved.

Returns

The class with the loaded elements.

Return type

TorchAlgo

property model: torch.nn.modules.module.Module

Model exposed when the user downloads the model

Returns

model

Return type

torch.nn.Module

predict(datasamples: Any, shared_state: Optional[Any], predictions_path: os.PathLike = None) Any

Executes the following operations:

  • Create the test torch dataset.

  • Execute and return the results of the self._local_predict method

Parameters
  • datasamples (Any) – Input data

  • shared_state (Any) – Latest train task shared state (output of the train method)

  • predictions_path (os.PathLike) – Destination file to save predictions

Return type

Any

save(path: pathlib.Path)

Saves all the stateful elements of the class to the specified path. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – A path where to save the class.

property strategies: List[substrafl.schemas.StrategyName]

List of compatible strategies

Returns

typing.List[StrategyName]

Return type

List

summary()

Summary of the class to be exposed in the experiment summary file

Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

train(datasamples: Any, shared_state: Optional[substrafl.schemas.ScaffoldAveragedStates] = None) substrafl.schemas.ScaffoldSharedState

Train method of the Scaffold strategy implemented with torch. This method will execute the following operations:

  • instantiates the provided (or default) batch indexer

  • if a shared state is passed, set the parameters of the model to the provided shared state

  • train the model for n_updates

  • compute the weight update and control variate update

Parameters
  • datasamples (Any) – Input data returned by the get_data method from the opener.

  • shared_state (Optional[ScaffoldAveragedStates]) – Shared state sent by the aggregate_organization (returned by the func strategies.scaffold.avg_shared_states) Defaults to None.

Returns

the shared states of the Algo

Return type

ScaffoldSharedState

TorchNewtonRaphsonAlgo

class substrafl.algorithms.pytorch.torch_newton_raphson_algo.TorchNewtonRaphsonAlgo(model: torch.nn.modules.module.Module, criterion: torch.nn.modules.loss._Loss, batch_size: Optional[int], dataset: torch.utils.data.dataset.Dataset, l2_coeff: float = 0, with_batch_norm_parameters: bool = False, seed: Optional[int] = None, use_gpu: bool = True, *args, **kwargs)

Bases: substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

To be inherited. Wraps the necessary operation so a torch model can be trained in the Newton-Raphson strategy.

The train method:

  • updates the weights of the model with the calculated weight updates

  • creates and initializes the index generator with the given batch size

  • calls the _local_train() method to compute the local gradients and Hessian and sends them to the aggregator.

  • a L2 regularization can be applied to the loss by settings l2_coeff different to zero (default value). L2 regularization adds numerical stability when inverting the hessian.

The child class can overwrite _local_train() and _local_predict(), or other methods if necessary.

To add a custom parameter to the __init__ of the class, also add it to the call to super().__init__.

The __init__ function is called at each call of the train or predict function.

For round>=2, some attributes will then be overwritten by their previous states in the load() function, before the train or predict function is ran.

TorchNewtonRaphsonAlgo computes its NewtonRaphsonSharedState (gradients and Hessian matrix) on all the samples of the dataset. Data might be split into mini-batches to prevent loading too much data at once.

Parameters
  • model (torch.nn.modules.module.Module) – A torch model.

  • criterion (torch.nn.modules.loss._Loss) – A torch criterion (loss).

  • batch_size (int) – The size of the batch. If set to None it will be set to the number of samples in the dataset. Note that dividing the data to batches is done only to avoid the memory issues. The weights are updated only at the end of the epoch.

  • dataset (torch.utils.data.Dataset) – an instantiable dataset class whose __init__ arguments are x, y and is_inference. The torch datasets used for both training and inference will be instantiate from it prior to the _local_train execution and within the predict method. The __getitem__ methods of those generated datasets must return both x (training data) and y (target values) when is_inference is set to False and only x (testing data) when is_inference is set to True. This behavior can be changed by re-writing the _local_train or predict methods.

  • l2_coeff (float) – L2 regularization coefficient. The larger l2_coeff is, the better the stability of the hessian matrix will be, however the convergence might be slower. Defaults to 0.

  • with_batch_norm_parameters (bool) – Whether to include the batch norm layer parameters in the Newton-Raphson strategy. Defaults to False.

  • seed (Optional[int]) – Seed set at the algo initialization on each organization. Defaults to None.

  • use_gpu (bool) – Whether to use the GPUs if they are available. Defaults to True.

_local_predict(predict_dataset: torch.utils.data.dataset.Dataset, predictions_path)

Executes the following operations:

  • Create the torch dataloader using the batch size given at the __init__ of the class

  • Sets the model to eval mode

  • Returns the predictions

Parameters

predict_dataset (torch.utils.data.Dataset) – predict_dataset build from the x returned by the opener.

_local_train(train_dataset: torch.utils.data.dataset.Dataset)

Local train method. Contains the local training loop.

Train the model on num_updates minibatches, using the self._index_generator generator as batch sampler for the torch dataset.

Parameters

train_dataset (torch.utils.data.Dataset) – train_dataset build from the x and y returned by the opener.

Important

You must use next(self._index_generator) at each minibatch, to ensure that you are using the batches are correct between 2 rounds of the Newton Raphson strategy.

Important

Call the function self._update_gradients_and_hessian(loss, current_batch_size) after computing the loss and the current_batch_size.

Example

# As the parameters of the model don't change during the loop, the l2 regularization is constant and
# can be calculated only once for all the batches.
l2_reg = self._l2_reg()

# Create torch dataloader
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=self._index_generator)

for x_batch, y_batch in train_data_loader:

    # Forward pass
    y_pred = self._model(x_batch)

    # Compute Loss
    loss = self._criterion(y_pred, y_batch)

    # L2 regularization
    loss += l2_reg

    current_batch_size = len(x_batch)

    # NEWTON RAPHSON specific function, to call after computing the loss and the current_batch_size.

    self._update_gradients_and_hessian(loss, current_batch_size)
_save_predictions(predictions: torch.Tensor, predictions_path: os.PathLike)

Save the predictions under the numpy format.

Parameters
  • predictions (torch.Tensor) – predictions to save.

  • predictions_path (os.PathLike) – destination file to save predictions.

_update_gradients_and_hessian(loss: torch.Tensor, current_batch_size: int)

Updates the gradients and hessian matrices.

Parameters
  • loss (torch.Tensor) – the loss to compute the gradients and hessian from.

  • current_batch_size (int) – The length of the batch used to compute the given loss.

load(path: pathlib.Path) substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

Load the stateful arguments of this class. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – The path where the class has been saved.

Returns

The class with the loaded elements.

Return type

TorchAlgo

property model: torch.nn.modules.module.Module

Model exposed when the user downloads the model

Returns

model

Return type

torch.nn.Module

predict(datasamples: Any, shared_state: Optional[Any], predictions_path: os.PathLike = None) Any

Executes the following operations:

  • Create the test torch dataset.

  • Execute and return the results of the self._local_predict method

Parameters
  • datasamples (Any) – Input data

  • shared_state (Any) – Latest train task shared state (output of the train method)

  • predictions_path (os.PathLike) – Destination file to save predictions

Return type

Any

save(path: pathlib.Path)

Saves all the stateful elements of the class to the specified path. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – A path where to save the class.

property strategies: List[substrafl.schemas.StrategyName]

List of compatible strategies

Returns

typing.List[StrategyName]

Return type

List

summary()

Summary of the class to be exposed in the experiment summary file

Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

train(datasamples: Any, shared_state: Optional[substrafl.schemas.NewtonRaphsonAveragedStates] = None) substrafl.schemas.NewtonRaphsonSharedState

Train method of the Newton Raphson strategy implemented with torch. This method will execute the following operations:

  • creates and initializes the index generator

  • if a shared state is passed, set the weights of the model to the provided shared state weights

  • initializes hessians and gradient

  • calls the

    _local_train() method to compute the local gradients and Hessian and sends them to the aggregator.

  • a L2 regularization can be applied to the loss by settings l2_coeff different to zero (default value)

Parameters
  • datasamples (Any) – Input data returned by the get_data method from the opener.

  • shared_state (NewtonRaphsonAveragedStates, Optional) – Dict containing torch parameters that will be set to the model. Defaults to None.

Returns

local gradients, local Hessian and the number of samples they were computed from.

Return type

NewtonRaphsonSharedState

Raises

NegativeHessianMatrixError – Hessian matrix must be positive semi-definite to correspond to a convex problem.

TorchSingleOrganizationAlgo

class substrafl.algorithms.pytorch.TorchSingleOrganizationAlgo(model: torch.nn.modules.module.Module, criterion: torch.nn.modules.loss._Loss, optimizer: torch.optim.optimizer.Optimizer, index_generator: substrafl.index_generator.base.BaseIndexGenerator, dataset: torch.utils.data.dataset.Dataset, scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None, seed: Optional[int] = None, use_gpu: bool = True, *args, **kwargs)

Bases: substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

To be inherited. Wraps the necessary operation so a torch model can be trained in the Single organization strategy.

The train method:

  • initializes or loads the index generator,

  • calls the _local_train() method to do the local training

The predict method generates the predictions.

The child class can override the _local_train() and _local_predict() methods, or other methods if necessary.

To add a custom parameter to the __init__``of the class, also add it to the call to ``super().__init__` as shown in the example with my_custom_extra_parameter. Only primitive types (str, int, …) are supported for extra parameters.

Example

class MyAlgo(TorchSingleOrganizationAlgo):
    def __init__(
        self,
        my_custom_extra_parameter,
    ):
        super().__init__(
            model=perceptron,
            criterion=torch.nn.MSELoss(),
            optimizer=optimizer,
            index_generator=NpIndexGenerator(
                num_updates=100,
                batch_size=32,
            ),
            dataset=MyDataset,
            my_custom_extra_parameter=my_custom_extra_parameter,
        )
    def _local_train(
        self,
        train_dataset: torch.utils.data.Dataset,
    ):
        # Create torch dataloader
        # ``train_dataset = self._dataset(datasamples=datasamples, is_inference=False)`` is executed
        # prior the execution of this function
        train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=self._index_generator)

        for x_batch, y_batch in train_data_loader:

            # Forward pass
            y_pred = self._model(x_batch)
            # Compute Loss
            loss = self._criterion(y_pred, y_batch)
            self._optimizer.zero_grad()
            # backward pass: compute the gradients
            loss.backward()
            # forward pass: update the weights.
            self._optimizer.step()

            if self._scheduler is not None:
                self._scheduler.step()

The __init__ functions is called at each call of the train() or predict() function For round>=2, some attributes will then be overwritten by their previous states in the load() function, before the train() or predict() function is ran.

Parameters
  • Args

  • model (torch.nn.modules.module.Module) – A torch model.

  • criterion (torch.nn.modules.loss._Loss) – A torch criterion (loss).

  • optimizer (torch.optim.Optimizer) – A torch optimizer linked to the model.

  • index_generator (BaseIndexGenerator) – a stateful index generator. Must inherit from BaseIndexGenerator. The __next__ method shall return a python object (batch_index) which is used for selecting each batch from the output of the the get_data method of the opener during training in this way: x[batch_index], y[batch_index]. If overridden, the generator class must be defined either as part of a package or in a different file than the one from which the execute_experiment function is called. This generator is used as stateful batch_sampler of the data loader created from the given dataset

  • dataset (torch.utils.data.Dataset) – an instantiable dataset class whose __init__ arguments are x, y and is_inference. The torch datasets used for both training and inference will be instantiate from it prior to the _local_train execution and within the predict method. The __getitem__ methods of those generated datasets must return both x (training data) and y (target values) when is_inference is set to False and only x (testing data) when is_inference is set to True. This behavior can be changed by re-writing the _local_train or predict methods.

  • scheduler (torch.optim.lr_scheduler._LRScheduler, Optional) – A torch scheduler that will be called at every batch. If None, no scheduler will be used. Defaults to None.

  • seed (Optional[int]) – Seed set at the algo initialization on each organization. Defaults to None.

  • use_gpu (bool) – Whether to use the GPUs if they are available. Defaults to True.

_local_predict(predict_dataset: torch.utils.data.dataset.Dataset, predictions_path)

Executes the following operations:

  • Create the torch dataloader using the index generator batch size.

  • Sets the model to eval mode

  • Save the predictions using the _save_predictions() function.

Parameters

predict_dataset (torch.utils.data.Dataset) – predict_dataset build from the x returned by the opener.

Important

The onus is on the user to save the compute predictions. Substrafl provides the _save_predictions() to do so. The user can load those predictions from a metric file with the command: y_pred = np.load(inputs['predictions']).

Raises

BatchSizeNotFoundError – No default batch size have been found to perform local prediction. Please overwrite the predict function of your algorithm.

Parameters

predict_dataset (torch.utils.data.dataset.Dataset) –

_local_train(train_dataset: torch.utils.data.dataset.Dataset)

Local train method. Contains the local training loop.

Train the model on num_updates minibatches, using the self._index_generator generator as batch sampler for the torch dataset.

Parameters

train_dataset (torch.utils.data.Dataset) – train_dataset build from the x and y returned by the opener.

Important

You must use next(self._index_generator) as batch sampler, to ensure that the batches you are using are correct between 2 rounds of the federated learning strategy.

Example

# Create torch dataloader
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=self._index_generator)

for x_batch, y_batch in train_data_loader:

    # Forward pass
    y_pred = self._model(x_batch)

    # Compute Loss
    loss = self._criterion(y_pred, y_batch)
    self._optimizer.zero_grad()
    loss.backward()
    self._optimizer.step()

    if self._scheduler is not None:
        self._scheduler.step()
_save_predictions(predictions: torch.Tensor, predictions_path: os.PathLike)

Save the predictions under the numpy format.

Parameters
  • predictions (torch.Tensor) – predictions to save.

  • predictions_path (os.PathLike) – destination file to save predictions.

load(path: pathlib.Path) substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

Load the stateful arguments of this class. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – The path where the class has been saved.

Returns

The class with the loaded elements.

Return type

TorchAlgo

property model: torch.nn.modules.module.Module

Model exposed when the user downloads the model

Returns

model

Return type

torch.nn.Module

predict(datasamples: Any, shared_state: Optional[Any], predictions_path: os.PathLike = None) Any

Executes the following operations:

  • Create the test torch dataset.

  • Execute and return the results of the self._local_predict method

Parameters
  • datasamples (Any) – Input data

  • shared_state (Any) – Latest train task shared state (output of the train method)

  • predictions_path (os.PathLike) – Destination file to save predictions

Return type

Any

save(path: pathlib.Path)

Saves all the stateful elements of the class to the specified path. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – A path where to save the class.

property strategies: List[substrafl.schemas.StrategyName]

List of compatible strategies

Returns

typing.List[StrategyName]

Return type

List

summary()

Summary of the class to be exposed in the experiment summary file. Implement this function in the child class to add strategy-specific variables. The variables must be json-serializable.

Example

def summary(self):
    summary = super().summary()
    summary.update(
        "strategy_specific_variable": self._strategy_specific_variable,
    )
    return summary
Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

train(datasamples: Any, shared_state=None) Dict[str, numpy.ndarray]

Train method of the SingleOrganization strategy implemented with torch.

Parameters
  • x (Any) – Input data.

  • y (Any) – Input target.

  • shared_state (Dict[str, numpy.ndarray], Optional) – Kept for consistency, setting this parameter won’t have any effect. Defaults to None.

  • datasamples (Any) –

Returns

weight update which is an empty array. SingleOrganization strategy is not using shared state and this return is only for consistency.

Return type

Dict[str, numpy.ndarray]

Torch Base Class

class substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo(model: torch.nn.modules.module.Module, criterion: torch.nn.modules.loss._Loss, index_generator: Optional[substrafl.index_generator.base.BaseIndexGenerator], dataset: torch.utils.data.dataset.Dataset, optimizer: Optional[torch.optim.optimizer.Optimizer] = None, scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None, seed: Optional[int] = None, use_gpu: bool = True, *args, **kwargs)

Bases: algorithms.algo.Algo

Base TorchAlgo class, all the torch algo classes inherit from it.

To implement a new strategy:

  • add the strategy specific parameters in the __init__

  • implement the train() function: it must use the _local_train() function, which can be overridden by the user and must contain as little strategy-specific code as possible

  • Reimplement the _update_from_checkpoint() and _get_state_to_save() functions to add strategy-specific variables to the local state

The __init__ functions is called at each call of the train() or predict() function For round>2, some attributes will then be overwritten by their previous states in the load() function, before the train() or predict() function is ran.

Parameters
_get_state_to_save() dict

Create the algo checkpoint: a dictionary saved with torch.save. In this algo, it contains the state to save for every strategy. Reimplement in the child class to add strategy-specific variables.

Example

def _get_state_to_save(self) -> dict:
    local_state = super()._get_state_to_save()
    local_state.update({
        "strategy_specific_variable": self._strategy_specific_variable,
    })
    return local_state
Returns

checkpoint to save

Return type

dict

_get_torch_device(use_gpu: bool) torch.device

Get the torch device, CPU or GPU, depending on availability and user input.

Parameters

use_gpu (bool) – whether to use GPUs if available or not.

Returns

Torch device

Return type

torch.device

_local_predict(predict_dataset: torch.utils.data.dataset.Dataset, predictions_path)

Executes the following operations:

  • Create the torch dataloader using the index generator batch size.

  • Sets the model to eval mode

  • Save the predictions using the _save_predictions() function.

Parameters

predict_dataset (torch.utils.data.Dataset) – predict_dataset build from the x returned by the opener.

Important

The onus is on the user to save the compute predictions. Substrafl provides the _save_predictions() to do so. The user can load those predictions from a metric file with the command: y_pred = np.load(inputs['predictions']).

Raises

BatchSizeNotFoundError – No default batch size have been found to perform local prediction. Please overwrite the predict function of your algorithm.

Parameters

predict_dataset (torch.utils.data.dataset.Dataset) –

_local_train(train_dataset: torch.utils.data.dataset.Dataset)

Local train method. Contains the local training loop.

Train the model on num_updates minibatches, using the self._index_generator generator as batch sampler for the torch dataset.

Parameters

train_dataset (torch.utils.data.Dataset) – train_dataset build from the x and y returned by the opener.

Important

You must use next(self._index_generator) as batch sampler, to ensure that the batches you are using are correct between 2 rounds of the federated learning strategy.

Example

# Create torch dataloader
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=self._index_generator)

for x_batch, y_batch in train_data_loader:

    # Forward pass
    y_pred = self._model(x_batch)

    # Compute Loss
    loss = self._criterion(y_pred, y_batch)
    self._optimizer.zero_grad()
    loss.backward()
    self._optimizer.step()

    if self._scheduler is not None:
        self._scheduler.step()
_save_predictions(predictions: torch.Tensor, predictions_path: os.PathLike)

Save the predictions under the numpy format.

Parameters
  • predictions (torch.Tensor) – predictions to save.

  • predictions_path (os.PathLike) – destination file to save predictions.

_update_from_checkpoint(path: pathlib.Path) dict

Load the checkpoint and update the internal state from it. Pop the values from the checkpoint so that we can ensure that it is empty at the end, ie all the values have been used.

Parameters

path (pathlib.Path) – path where the checkpoint is saved

Returns

checkpoint

Return type

dict

Example

def _update_from_checkpoint(self, path: Path) -> dict:
    checkpoint = super()._update_from_checkpoint(path=path)
    self._strategy_specific_variable = checkpoint.pop("strategy_specific_variable")
    return checkpoint
load(path: pathlib.Path) substrafl.algorithms.pytorch.torch_base_algo.TorchAlgo

Load the stateful arguments of this class. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – The path where the class has been saved.

Returns

The class with the loaded elements.

Return type

TorchAlgo

property model: torch.nn.modules.module.Module

Model exposed when the user downloads the model

Returns

model

Return type

torch.nn.Module

predict(datasamples: Any, shared_state: Optional[Any], predictions_path: os.PathLike = None) Any

Executes the following operations:

  • Create the test torch dataset.

  • Execute and return the results of the self._local_predict method

Parameters
  • datasamples (Any) – Input data

  • shared_state (Any) – Latest train task shared state (output of the train method)

  • predictions_path (os.PathLike) – Destination file to save predictions

Return type

Any

save(path: pathlib.Path)

Saves all the stateful elements of the class to the specified path. Child classes do not need to override that function.

Parameters

path (pathlib.Path) – A path where to save the class.

abstract property strategies: List[substrafl.schemas.StrategyName]

List of compatible strategies

Returns

typing.List[StrategyName]

Return type

List

summary()

Summary of the class to be exposed in the experiment summary file. Implement this function in the child class to add strategy-specific variables. The variables must be json-serializable.

Example

def summary(self):
    summary = super().summary()
    summary.update(
        "strategy_specific_variable": self._strategy_specific_variable,
    )
    return summary
Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

abstract train(datasamples: Any, shared_state: Optional[Any] = None) Any

Is executed for each TrainDataOrganizations. This functions takes the output of the get_data method from the opener, plus the shared state from the aggregator if there is one, and returns a shared state (state to send to the aggregator). Any variable that needs to be saved and updated from one round to another should be an attribute of self (e.g. self._my_local_state_variable), and be saved and loaded in the save() and load() functions.

Parameters
  • datasamples (Any) – The output of the get_data method of the opener.

  • shared_state (Any) – None for the first round of the computation graph then the returned object from the previous organization of the computation graph.

Raises

NotImplementedError

Returns

The object passed to the next organization of the computation graph.

Return type

Any

Torch functions

substrafl.algorithms.pytorch.weight_manager.add_parameters(parameters: List[torch.nn.parameter.Parameter], parameters_to_add: List[torch.nn.parameter.Parameter]) List[torch.nn.parameter.Parameter]

add the given list of torch parameters i.e. : parameters - parameters_to_add. Those elements can be extracted from a model thanks to the get_parameters() function.

Parameters
Returns

The addition of the given parameters.

Return type

List[torch.nn.parameter.Parameter]

substrafl.algorithms.pytorch.weight_manager.batch_norm_param(model: torch.nn.modules.module.Module) Generator[torch.nn.parameter.Parameter, None, None]

Generator of the internal parameters of the batch norm layers of the model. This yields references hence all modification done to the yielded object will be applied to the input model.

The internal parameters of a batch norm layer include the running mean and the running variance.

Parameters

model (torch.nn.Module) – A torch model.

Yields

float – The running mean and variance of all batch norm layers parameters from the given model.

Return type

Generator[torch.nn.parameter.Parameter, None, None]

substrafl.algorithms.pytorch.weight_manager.get_parameters(model: torch.nn.modules.module.Module, with_batch_norm_parameters: bool) List[torch.nn.parameter.Parameter]

Model parameters from the provided torch model. This function returns a copy not a reference. If with_batch_norm_parameters is set to True, the running mean and the running variance of the batch norm layers will be added after the “classic” parameters of the model.

Parameters
  • model (torch.nn.Module) – A torch model.

  • with_batch_norm_parameters (bool) – If set to True, the running mean and the running variance of each batch norm layer will be added after the “classic” parameters.

Returns

The list of torch parameters of the provided model.

Return type

List[torch.nn.parameter.Parameter]

substrafl.algorithms.pytorch.weight_manager.increment_parameters(model: torch.nn.modules.module.Module, updates: List[torch.nn.parameter.Parameter], with_batch_norm_parameters: bool, updates_multiplier: float = 1.0)

Add the given update to the model parameters. If with_batch_norm_parameters is set to True, the operation will include the running mean and the running variance of the batch norm layers (in this case, they must be included in the given update). This function modifies the given model internally and therefore returns nothing.

Parameters
  • model (torch.nn.Module) – The torch model to modify.

  • updates (List[torch.nn.parameter.Parameter]) – A list of torch parameters to add to the model, as ordered by the standard iterators. The trainable parameters should come first followed by the batch norm parameters if with_batch_norm_parameters is set to True. If the type is np.ndarray, it is converted in torch.Tensor.

  • with_batch_norm_parameters (bool) – If set to True, the running mean and the running variance of each batch norm layer will be included, after the trainable layers, in the model parameters to modify.

  • updates_multiplier (float, Optional) – The coefficient which multiplies the updates before being added to the model. Defaults to 1.0.

substrafl.algorithms.pytorch.weight_manager.is_batchnorm_layer(layer: torch.nn.modules.module.Module) bool

Checks if the provided layer is a Batch Norm layer (either 1D, 2D or 3D).

Parameters

layer (torch.nn.Module) – Pytorch module.

Returns

Whether the given module is a batch norm one.

Return type

bool

substrafl.algorithms.pytorch.weight_manager.model_parameters(model: torch.nn.modules.module.Module, with_batch_norm_parameters: bool) torch.nn.parameter.Parameter

A generator of the given model parameters. The returned generator yields references hence all modification done to the yielded object will be applied to the input model. If with_batch_norm_parameters is set to True, the running mean and the running variance of each batch norm layer will be added after the “classic” parameters.

Parameters
  • model (torch.nn.Module) – A torch model.

  • with_batch_norm_parameters (bool) – If set to True, the running mean and the running variance of each batch norm layer will be added after the “classic” parameters.

Returns

A python generator of torch parameters.

Return type

Generator[torch.nn.parameter.Parameter, Any, Any]

substrafl.algorithms.pytorch.weight_manager.set_parameters(model: torch.nn.modules.module.Module, parameters: List[torch.nn.parameter.Parameter], with_batch_norm_parameters: bool)

Sets the parameters of a pytorch model to the provided parameters. If with_batch_norm_parameters is set to True, the operation will include the running mean and the running variance of the batch norm layers (in this case, they must be included in the given parameters). This function modifies the given model internally and therefore returns nothing.

Parameters
  • model (torch.nn.Module) – The torch model to modify.

  • parameters (List[torch.nn.parameter.Parameter]) – Model parameters, as ordered by the standard iterators.

  • with_batch_norm_parameters (bool) – is set to True.

  • with_batch_norm_parameters – Whether to the batch norm layers’ internal parameters are provided and need to be included in the operation.

substrafl.algorithms.pytorch.weight_manager.subtract_parameters(parameters: List[torch.nn.parameter.Parameter], parameters_to_subtract: List[torch.nn.parameter.Parameter]) List[torch.nn.parameter.Parameter]

subtract the given list of torch parameters i.e. : parameters - parameters_to_subtract. Those elements can be extracted from a model thanks to the get_parameters() function.

Parameters
Returns

The subtraction of the given parameters.

Return type

List[torch.nn.parameter.Parameter]

substrafl.algorithms.pytorch.weight_manager.weighted_sum_parameters(parameters_list: List[List[torch.Tensor]], coefficient_list: List[float]) List[torch.Tensor]

Do a weighted sum of the given lists of torch parameters. Those elements can be extracted from a model thanks to the get_parameters() function.

Parameters
  • parameters_list (List[List[torch.Tensor]]) – A list of List of torch parameters.

  • coefficient_list (List[float]) – A list of coefficients which will be applied to each list of parameters.

Returns

The weighted sum of the given list of torch parameters.

Return type

List[torch.nn.parameter.Parameter]

substrafl.algorithms.pytorch.weight_manager.zeros_like_parameters(model: torch.nn.modules.module.Module, with_batch_norm_parameters: bool, device: torch.device) List[torch.Tensor]

Copy the model parameters from the provided torch model and sets values to zero. If with_batch_norm_parameters is set to True, the running mean and the running variance of the batch norm layers will be added after the “classic” parameters of the model.

Parameters
  • model (torch.nn.Module) – A torch model.

  • with_batch_norm_parameters (bool) – If set to True, the running mean and the running variance of each batch norm layer will be added after the “classic” parameters.

  • device (torch.device) – torch device on which to save the parameters

Returns

The list of torch parameters of the provided model with values set to zero.

Return type

List[torch.nn.parameter.Parameter]

Base Class

class substrafl.algorithms.algo.Algo(*args, **kwargs)

Bases: abc.ABC

The base class to be inherited for substrafl algorithms.

abstract load(path: pathlib.Path) Any

Executed at the beginning of each step of the computation graph so for each organization, at each step of the computation graph the previous local state can be retrieved.

Parameters

path (pathlib.Path) – The path where the previous local state has been saved.

Raises

NotImplementedError

Returns

The loaded element.

Return type

Any

abstract property model: Any

Model exposed when the user downloads the model

Returns

model

Return type

Any

abstract predict(datasamples: Any, shared_state: Any, predictions_path: Optional[pathlib.Path] = None) Any

Is executed for each TestDataOrganizations. The predictions will be saved on the predictions_path. The predictions are then loaded and used to calculate the metric.

Parameters
  • datasamples (Any) – The output of the get_data method of the opener.

  • shared_state (Any) – None for the first round of the computation graph then the returned object from the previous organization of the computation graph.

  • predictions_path (pathlib.Path) – Destination file to save predictions.

Raises

NotImplementedError

Return type

Any

abstract save(path: pathlib.Path)

Executed at the end of each step of the computation graph so for each organization, the local state can be saved.

Parameters

path (pathlib.Path) – The path where the previous local state has been saved.

Raises

NotImplementedError

abstract property strategies: List[substrafl.schemas.StrategyName]

List of compatible strategies

Returns

typing.List[StrategyName]

Return type

List

summary() dict

Summary of the class to be exposed in the experiment summary file. For child classes, override this function and add super.summary()

Example

def summary(self):

    summary = super().summary()
    summary.update(
        {
            "attribute": self.attribute,
            ...
        }
    )
    return summary
Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

abstract train(datasamples, shared_state: Any) Any

Is executed for each TrainDataOrganizations. This functions takes the output of the get_data method from the opener, plus the shared state from the aggregator if there is one, and returns a shared state (state to send to the aggregator). Any variable that needs to be saved and updated from one round to another should be an attribute of self (e.g. self._my_local_state_variable), and be saved and loaded in the save() and load() functions.

Parameters
  • datasamples (Any) – The output of the get_data method of the opener.

  • shared_state (Any) – None for the first round of the computation graph then the returned object from the previous organization of the computation graph.

Raises

NotImplementedError

Returns

The object passed to the next organization of the computation graph.

Return type

Any