Nodes
AggregationNode
- class substrafl.nodes.aggregation_node.AggregationNode(organization_id: str)
Bases:
AggregationNodeProtocol
The node which applies operations to the shared states which are received from
TrainDataNode
data operations. The result is sent to theTrainDataNode
and/orTestDataNode
data operations.- Parameters:
organization_id (str) –
- register_operations(*, client: Client, permissions: Permissions, cache: Dict[RemoteStruct, OperationKey], dependencies: Dependency) Dict[RemoteStruct, OperationKey]
Define the functions for each operation and submit the aggregated task to substra.
Go through every operation in the computation graph, check what function they use (identified by their RemoteStruct id), submit it to substra and save RemoteStruct : function_key into the cache (where function_key is the returned function key per substra.) If two tasks depend on the same function, the function won’t be added twice to substra as this method check if a function has already been submitted to substra before adding it.
- Parameters:
client (substra.Client) – Substra defined client used to register the operation.
permissions (substra.sdk.schemas.Permissions) – Substra permissions attached to the registered operation.
cache (Dict[RemoteStruct, OperationKey]) – Already registered function identifications. The key of each element is the RemoteStruct id (generated by substrafl) and the value is the key generated by substra.
dependencies (Dependency) – Dependencies of the given operation.
- Returns:
updated cache
- Return type:
- summary() dict
Summary of the class to be exposed in the experiment summary file
- Returns:
a json-serializable dict with the attributes the user wants to store
- Return type:
- update_states(operation: RemoteOperation, *, authorized_ids: Set[str], round_idx: int | None = None, clean_models: bool = False) SharedStateRef
Adding an aggregated task to the list of operations to be executed by the node during the compute plan. This is done in a static way, nothing is submitted to substra. This is why the function key is a RemoteStruct (substrafl local reference of the algorithm) and not a substra function_key as nothing has been submitted yet.
- Parameters:
operation (RemoteOperation) – Automatically generated structure returned by the
remote()
decorator. This allows to register an operation and execute it later on.round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. Default to None.
authorized_ids (Set[str]) – Authorized org to access the output model.
clean_models (bool) – Whether outputs of this operation are transient (deleted when they are not used anymore) or not. Defaults to False.
- Raises:
TypeError – operation must be an RemoteOperation, make sure to decorate your (user defined) aggregate function of the strategy with @remote.
- Returns:
Identification for the result of this operation.
- Return type:
TrainDataNode
- class substrafl.nodes.train_data_node.TrainDataNode(organization_id: str, data_manager_key: str, data_sample_keys: List[str])
Bases:
TrainDataNodeProtocol
A predefined structure that allows you to register operations on your train node in a static way before submitting them to substra.
- Parameters:
- register_operations(*, client: Client, permissions: Permissions, cache: Dict[RemoteStruct, OperationKey], dependencies: Dependency) Dict[RemoteStruct, OperationKey]
Define the functions for each operation and submit the train task to substra.
Go through every operation in the computation graph, check what function they use (identified by their RemoteStruct id), submit it to substra and save RemoteStruct : function_key into the cache (where function_key is the returned function key by substra.) If two tasks depend on the same function, the function won’t be added twice to substra as this method check if a function has already been submitted to substra before adding it.
- Parameters:
client (substra.Client) – Substra client for the organization.
permissions (substra.sdk.schemas.Permissions) – Permissions for the function.
cache (Dict[RemoteStruct, OperationKey]) – Already registered function identifications. The key of each element is the RemoteStruct id (generated by substrafl) and the value is the key generated by substra.
dependencies (Dependency) – Function dependencies.
- Returns:
updated cache
- Return type:
- summary() dict
Summary of the class to be exposed in the experiment summary file
- Returns:
a json-serializable dict with the attributes the user wants to store
- Return type:
- update_states(operation: RemoteDataOperation, *, authorized_ids: Set[str], round_idx: int | None = None, aggregation_id: str | None = None, clean_models: bool = False, local_state: LocalStateRef | None = None) Tuple[LocalStateRef, SharedStateRef]
Adding a train task to the list of operations to be executed by the node during the compute plan. This is done in a static way, nothing is submitted to substra. This is why the function key is a RemoteStruct (substrafl local reference of the algorithm) and not a substra function_key as nothing has been submitted yet.
- Parameters:
operation (RemoteDataOperation) – Automatically generated structure returned by the
remote_data()
decorator. This allows to register an operation and execute it later on.authorized_ids (Set[str]) – Authorized org to access the output model.
round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. In case of a centralized strategy, it is preceded by an initialization round tagged: 0. Default to None
aggregation_id (str) – Aggregation node id to authorize access to the shared model. Defaults to None.
clean_models (bool) – Whether outputs of this operation are transient (deleted when they are not used anymore) or not. Defaults to False.
local_state (Optional[LocalStateRef]) – The parent task LocalStateRef. Defaults to None.
- Raises:
TypeError – operation must be a RemoteDataOperation, make sure to decorate the train and predict methods of your method with @remote
- Returns:
Identifications for the results of this operation.
- Return type:
TestDataNode
- class substrafl.nodes.test_data_node.TestDataNode(organization_id: str, data_manager_key: str, data_sample_keys: List[str])
Bases:
TestDataNodeProtocol
A node on which you will test your algorithm.
- Parameters:
- summary() dict
Summary of the class to be exposed in the experiment summary file
- Returns:
a json-serializable dict with the attributes the user wants to store
- Return type:
- update_states(traintask_id: str, operation: RemoteDataOperation, round_idx: int | None = None)
Creating a test task based on the node characteristic.
- Parameters:
traintask_id (str) – The substra parent id
operation (RemoteDataOperation) – Automatically generated structure returned by the
remote_data()
decorator. This allows to register an operation and execute it later on.round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. Default to None.
Protocols
References
- class substrafl.nodes.schemas.OperationKey
NewType creates simple unique types with almost zero runtime overhead. NewType(name, tp) is considered a subtype of tp by static type checkers. At runtime, NewType(name, tp) returns a dummy callable that simply returns its argument. Usage:
UserId = NewType('UserId', int) def name_by_id(user_id: UserId) -> str: ... UserId('user') # Fails type check name_by_id(42) # Fails type check name_by_id(UserId(42)) # OK num = UserId(5) + 1 # type: int
alias of
str
- class substrafl.nodes.schemas._PrettyJsonBaseModel
Bases:
BaseModel
Base model configuration for pretty representation
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class substrafl.nodes.schemas.SimuStatesMemory(*, worker: List[str] = [], round_idx: List[int] = [], state: List[Any] = [])
Bases:
_PrettyJsonBaseModel
Intermediate states of a simulated experiment
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'round_idx': FieldInfo(annotation=List[int], required=False, default=[]), 'state': FieldInfo(annotation=List[Any], required=False, default=[]), 'worker': FieldInfo(annotation=List[str], required=False, default=[])}
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class substrafl.nodes.schemas.SimuPerformancesMemory(*, worker: List[str] = [], round_idx: List[int] = [], identifier: List[str] = [], performance: List[float] = [])
Bases:
_PrettyJsonBaseModel
Performances of a simulated experiment
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'identifier': FieldInfo(annotation=List[str], required=False, default=[]), 'performance': FieldInfo(annotation=List[float], required=False, default=[]), 'round_idx': FieldInfo(annotation=List[int], required=False, default=[]), 'worker': FieldInfo(annotation=List[str], required=False, default=[])}
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.