Nodes

AggregationNode

class substrafl.nodes.aggregation_node.AggregationNode(organization_id: str)

Bases: AggregationNodeProtocol

The node which applies operations to the shared states which are received from TrainDataNode data operations. The result is sent to the TrainDataNode and/or TestDataNode data operations.

Parameters:

organization_id (str) –

register_operations(*, client: Client, permissions: Permissions, cache: Dict[RemoteStruct, OperationKey], dependencies: Dependency) Dict[RemoteStruct, OperationKey]

Define the functions for each operation and submit the aggregated task to substra.

Go through every operation in the computation graph, check what function they use (identified by their RemoteStruct id), submit it to substra and save RemoteStruct : function_key into the cache (where function_key is the returned function key per substra.) If two tasks depend on the same function, the function won’t be added twice to substra as this method check if a function has already been submitted to substra before adding it.

Parameters:
  • client (substra.Client) – Substra defined client used to register the operation.

  • permissions (substra.sdk.schemas.Permissions) – Substra permissions attached to the registered operation.

  • cache (Dict[RemoteStruct, OperationKey]) – Already registered function identifications. The key of each element is the RemoteStruct id (generated by substrafl) and the value is the key generated by substra.

  • dependencies (Dependency) – Dependencies of the given operation.

Returns:

updated cache

Return type:

Dict[RemoteStruct, OperationKey]

summary() dict

Summary of the class to be exposed in the experiment summary file

Returns:

a json-serializable dict with the attributes the user wants to store

Return type:

dict

update_states(operation: RemoteOperation, *, authorized_ids: Set[str], round_idx: int | None = None, clean_models: bool = False) SharedStateRef

Adding an aggregated task to the list of operations to be executed by the node during the compute plan. This is done in a static way, nothing is submitted to substra. This is why the function key is a RemoteStruct (substrafl local reference of the algorithm) and not a substra function_key as nothing has been submitted yet.

Parameters:
  • operation (RemoteOperation) – Automatically generated structure returned by the remote() decorator. This allows to register an operation and execute it later on.

  • round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. Default to None.

  • authorized_ids (Set[str]) – Authorized org to access the output model.

  • clean_models (bool) – Whether outputs of this operation are transient (deleted when they are not used anymore) or not. Defaults to False.

Raises:

TypeError – operation must be an RemoteOperation, make sure to decorate your (user defined) aggregate function of the strategy with @remote.

Returns:

Identification for the result of this operation.

Return type:

SharedStateRef

TrainDataNode

class substrafl.nodes.train_data_node.TrainDataNode(organization_id: str, data_manager_key: str, data_sample_keys: List[str])

Bases: TrainDataNodeProtocol

A predefined structure that allows you to register operations on your train node in a static way before submitting them to substra.

Parameters:
  • organization_id (str) – The substra organization ID (shared with other organizations if permissions are needed)

  • data_manager_key (str) – Substra data_manager_key opening data samples used by the strategy

  • data_sample_keys (List[str]) – Substra data_sample_keys used for the training on this node

register_operations(*, client: Client, permissions: Permissions, cache: Dict[RemoteStruct, OperationKey], dependencies: Dependency) Dict[RemoteStruct, OperationKey]

Define the functions for each operation and submit the train task to substra.

Go through every operation in the computation graph, check what function they use (identified by their RemoteStruct id), submit it to substra and save RemoteStruct : function_key into the cache (where function_key is the returned function key by substra.) If two tasks depend on the same function, the function won’t be added twice to substra as this method check if a function has already been submitted to substra before adding it.

Parameters:
  • client (substra.Client) – Substra client for the organization.

  • permissions (substra.sdk.schemas.Permissions) – Permissions for the function.

  • cache (Dict[RemoteStruct, OperationKey]) – Already registered function identifications. The key of each element is the RemoteStruct id (generated by substrafl) and the value is the key generated by substra.

  • dependencies (Dependency) – Function dependencies.

Returns:

updated cache

Return type:

Dict[RemoteStruct, OperationKey]

summary() dict

Summary of the class to be exposed in the experiment summary file

Returns:

a json-serializable dict with the attributes the user wants to store

Return type:

dict

update_states(operation: RemoteDataOperation, *, authorized_ids: Set[str], round_idx: int | None = None, aggregation_id: str | None = None, clean_models: bool = False, local_state: LocalStateRef | None = None) Tuple[LocalStateRef, SharedStateRef]

Adding a train task to the list of operations to be executed by the node during the compute plan. This is done in a static way, nothing is submitted to substra. This is why the function key is a RemoteStruct (substrafl local reference of the algorithm) and not a substra function_key as nothing has been submitted yet.

Parameters:
  • operation (RemoteDataOperation) – Automatically generated structure returned by the remote_data() decorator. This allows to register an operation and execute it later on.

  • authorized_ids (Set[str]) – Authorized org to access the output model.

  • round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. In case of a centralized strategy, it is preceded by an initialization round tagged: 0. Default to None

  • aggregation_id (str) – Aggregation node id to authorize access to the shared model. Defaults to None.

  • clean_models (bool) – Whether outputs of this operation are transient (deleted when they are not used anymore) or not. Defaults to False.

  • local_state (Optional[LocalStateRef]) – The parent task LocalStateRef. Defaults to None.

Raises:

TypeError – operation must be a RemoteDataOperation, make sure to decorate the train and predict methods of your method with @remote

Returns:

Identifications for the results of this operation.

Return type:

Tuple[LocalStateRef, SharedStateRef]

TestDataNode

class substrafl.nodes.test_data_node.TestDataNode(organization_id: str, data_manager_key: str, data_sample_keys: List[str])

Bases: TestDataNodeProtocol

A node on which you will test your algorithm.

Parameters:
  • organization_id (str) – The substra organization ID (shared with other organizations if permissions are needed)

  • data_manager_key (str) – Substra data_manager_key opening data samples used by the strategy

  • data_sample_keys (List[str]) – Substra data_sample_keys used for the training on this node

summary() dict

Summary of the class to be exposed in the experiment summary file

Returns:

a json-serializable dict with the attributes the user wants to store

Return type:

dict

update_states(traintask_id: str, operation: RemoteDataOperation, round_idx: int | None = None)

Creating a test task based on the node characteristic.

Parameters:
  • traintask_id (str) – The substra parent id

  • operation (RemoteDataOperation) – Automatically generated structure returned by the remote_data() decorator. This allows to register an operation and execute it later on.

  • round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. Default to None.

Protocols

class substrafl.nodes.protocol.TrainDataNodeProtocol(*args, **kwargs)

Bases: Protocol

class substrafl.nodes.protocol.TestDataNodeProtocol(*args, **kwargs)

Bases: Protocol

class substrafl.nodes.protocol.AggregationNodeProtocol(*args, **kwargs)

Bases: Protocol

References

class substrafl.nodes.schemas.OperationKey

NewType creates simple unique types with almost zero runtime overhead. NewType(name, tp) is considered a subtype of tp by static type checkers. At runtime, NewType(name, tp) returns a dummy callable that simply returns its argument. Usage:

UserId = NewType('UserId', int)

def name_by_id(user_id: UserId) -> str:
    ...

UserId('user')          # Fails type check

name_by_id(42)          # Fails type check
name_by_id(UserId(42))  # OK

num = UserId(5) + 1     # type: int

alias of str

class substrafl.nodes.schemas._PrettyJsonBaseModel

Bases: BaseModel

Base model configuration for pretty representation

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class substrafl.nodes.schemas.InputIdentifiers(value)

Bases: str, Enum

An enumeration.

class substrafl.nodes.schemas.OutputIdentifiers(value)

Bases: str, Enum

An enumeration.

class substrafl.nodes.schemas.SimuStatesMemory(*, worker: List[str] = [], round_idx: List[int] = [], state: List[Any] = [])

Bases: _PrettyJsonBaseModel

Intermediate states of a simulated experiment

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'round_idx': FieldInfo(annotation=List[int], required=False, default=[]), 'state': FieldInfo(annotation=List[Any], required=False, default=[]), 'worker': FieldInfo(annotation=List[str], required=False, default=[])}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class substrafl.nodes.schemas.SimuPerformancesMemory(*, worker: List[str] = [], round_idx: List[int] = [], identifier: List[str] = [], performance: List[float] = [])

Bases: _PrettyJsonBaseModel

Performances of a simulated experiment

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'identifier': FieldInfo(annotation=List[str], required=False, default=[]), 'performance': FieldInfo(annotation=List[float], required=False, default=[]), 'round_idx': FieldInfo(annotation=List[int], required=False, default=[]), 'worker': FieldInfo(annotation=List[str], required=False, default=[])}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class substrafl.nodes.references.local_state.LocalStateRef(key: str, init: bool = False)

Bases: object

Parameters:
class substrafl.nodes.references.shared_state.SharedStateRef(key: str)

Bases: object

Parameters:

key (str) –