Nodes

AggregationNode

class substrafl.nodes.aggregation_node.AggregationNode(organization_id: str)

Bases: organizations.organization.Node

The node which applies operations to the shared states which are received from TrainDataNode data operations. The result is sent to the TrainDataNode and/or TestDataNode data operations.

Parameters

organization_id (str) –

register_operations(*, client: substra.sdk.client.Client, permissions: substra.sdk.schemas.Permissions, cache: Dict[substrafl.remote.remote_struct.RemoteStruct, OperationKey], dependencies: substrafl.dependency.schemas.Dependency) Dict[substrafl.remote.remote_struct.RemoteStruct, OperationKey]

Define the functions for each operation and submit the aggregated task to substra.

Go through every operation in the computation graph, check what function they use (identified by their RemoteStruct id), submit it to substra and save RemoteStruct : function_key into the cache (where function_key is the returned function key per substra.) If two tasks depend on the same function, the function won’t be added twice to substra as this method check if a function has already been submitted to substra before adding it.

Parameters
  • client (substra.Client) – Substra defined client used to register the operation.

  • permissions (substra.sdk.schemas.Permissions) – Substra permissions attached to the registered operation.

  • cache (Dict[RemoteStruct, OperationKey]) – Already registered function identifications. The key of each element is the RemoteStruct id (generated by substrafl) and the value is the key generated by substra.

  • dependencies (Dependency) – Dependencies of the given operation.

Returns

updated cache

Return type

Dict[RemoteStruct, OperationKey]

update_states(operation: substrafl.remote.operations.RemoteOperation, *, authorized_ids: Set[str], round_idx: Optional[int] = None, clean_models: bool = False) substrafl.nodes.references.shared_state.SharedStateRef

Adding an aggregated task to the list of operations to be executed by the node during the compute plan. This is done in a static way, nothing is submitted to substra. This is why the function key is a RemoteStruct (substrafl local reference of the algorithm) and not a substra function_key as nothing has been submitted yet.

Parameters
  • operation (RemoteOperation) – Automatically generated structure returned by the remote() decorator. This allows to register an operation and execute it later on.

  • round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. Default to None.

  • authorized_ids (Set[str]) – Authorized org to access the output model.

  • clean_models (bool) – Whether outputs of this operation are transient (deleted when they are not used anymore) or not. Defaults to False.

Raises

TypeError – operation must be an RemoteOperation, make sure to decorate your (user defined) aggregate function of the strategy with @remote.

Returns

Identification for the result of this operation.

Return type

SharedStateRef

TrainDataNode

class substrafl.nodes.train_data_node.TrainDataNode(organization_id: str, data_manager_key: str, data_sample_keys: List[str])

Bases: organizations.organization.Node

A predefined structure that allows you to register operations on your train node in a static way before submitting them to substra.

Parameters
  • organization_id (str) – The substra organization ID (shared with other organizations if permissions are needed)

  • data_manager_key (str) – Substra data_manager_key opening data samples used by the strategy

  • data_sample_keys (List[str]) – Substra data_sample_keys used for the training on this node

register_operations(*, client: substra.sdk.client.Client, permissions: substra.sdk.schemas.Permissions, cache: Dict[substrafl.remote.remote_struct.RemoteStruct, OperationKey], dependencies: substrafl.dependency.schemas.Dependency) Dict[substrafl.remote.remote_struct.RemoteStruct, OperationKey]

Define the functions for each operation and submit the train task to substra.

Go through every operation in the computation graph, check what function they use (identified by their RemoteStruct id), submit it to substra and save RemoteStruct : function_key into the cache (where function_key is the returned function key by substra.) If two tasks depend on the same function, the function won’t be added twice to substra as this method check if a function has already been submitted to substra before adding it.

Parameters
  • client (substra.Client) – Substra client for the organization.

  • permissions (substra.sdk.schemas.Permissions) – Permissions for the function.

  • cache (Dict[RemoteStruct, OperationKey]) – Already registered function identifications. The key of each element is the RemoteStruct id (generated by substrafl) and the value is the key generated by substra.

  • dependencies (Dependency) – Function dependencies.

Returns

updated cache

Return type

Dict[RemoteStruct, OperationKey]

summary() dict

Summary of the class to be exposed in the experiment summary file

Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

update_states(operation: substrafl.remote.operations.RemoteDataOperation, *, authorized_ids: Set[str], round_idx: Optional[int] = None, aggregation_id: Optional[str] = None, clean_models: bool = False, local_state: Optional[substrafl.nodes.references.local_state.LocalStateRef] = None) Tuple[substrafl.nodes.references.local_state.LocalStateRef, substrafl.nodes.references.shared_state.SharedStateRef]

Adding a train task to the list of operations to be executed by the node during the compute plan. This is done in a static way, nothing is submitted to substra. This is why the function key is a RemoteStruct (substrafl local reference of the algorithm) and not a substra function_key as nothing has been submitted yet.

Parameters
  • operation (RemoteDataOperation) – Automatically generated structure returned by the remote_data() decorator. This allows to register an operation and execute it later on.

  • authorized_ids (Set[str]) – Authorized org to access the output model.

  • round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. In case of a centralized strategy, it is preceded by an initialization round tagged: 0. Default to None

  • aggregation_id (str) – Aggregation node id to authorize access to the shared model. Defaults to None.

  • clean_models (bool) – Whether outputs of this operation are transient (deleted when they are not used anymore) or not. Defaults to False.

  • local_state (Optional[LocalStateRef]) – The parent task LocalStateRef. Defaults to None.

Raises

TypeError – operation must be a RemoteDataOperation, make sure to decorate the train and predict methods of your method with @remote

Returns

Identifications for the results of this operation.

Return type

Tuple[LocalStateRef, SharedStateRef]

TestDataNode

class substrafl.nodes.test_data_node.TestDataNode(organization_id: str, data_manager_key: str, test_data_sample_keys: List[str], metric_functions: Union[Dict[str, Callable], List[Callable], Callable])

Bases: organizations.organization.Node

A node on which you will test your algorithm.

Parameters
  • organization_id (str) – The substra organization ID (shared with other organizations if permissions are needed)

  • data_manager_key (str) – Substra data_manager_key opening data samples used by the strategy

  • test_data_sample_keys (List[str]) – Substra data_sample_keys used for the training on this node

  • metric_functions (Union[Dict[str, Callable], List[Callable], Callable]) – Dictionary of Functions, Function or list of Functions that implement the different metrics. If a Dict is given, the keys will be used to register the result of the associated function. If a Function or a List is given, function.__name__ will be used to store the result.

register_predict_operations(*, client: substra.sdk.client.Client, permissions: substra.sdk.schemas.Permissions, cache: Dict[substrafl.remote.remote_struct.RemoteStruct, OperationKey], dependencies: substrafl.dependency.schemas.Dependency) Dict[substrafl.remote.remote_struct.RemoteStruct, OperationKey]

Find the functions from the parent traintask of each predicttask and submit it with a dockerfile specifying the predict method as --function-name to execute.

Go through every operation in the predict function cache, submit it to substra and save RemoteStruct : function_key into the cache (where function_key is the returned function key by substra.) If two predicttasks depend on the same function, the function won’t be added twice to substra as this method check if an function has already been submitted as a predicttask to substra before adding it.

Parameters
  • client (substra.Client) – Substra client for the organization.

  • permissions (substra.sdk.schemas.Permissions) – Permissions for the function.

  • cache (Dict[RemoteStruct, OperationKey]) – Already registered function identifications. The key of each element is the RemoteStruct id (generated by substrafl) and the value is the key generated by substra.

  • dependencies (Dependency) – Function dependencies.

Returns

updated cache

Return type

Dict[RemoteStruct, OperationKey]

summary() dict

Summary of the class to be exposed in the experiment summary file

Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

update_states(traintask_id: str, operation: substrafl.remote.operations.RemoteDataOperation, round_idx: Optional[int] = None)

Creating a test task based on the node characteristic.

Parameters
  • traintask_id (str) – The substra parent id

  • operation (RemoteDataOperation) – Automatically generated structure returned by the remote_data() decorator. This allows to register an operation and execute it later on.

  • round_idx (int) – Used in case of learning compute plans. Round number, it starts at 1. Default to None.

Node

class substrafl.nodes.node.Node(organization_id: str)

Bases: object

Parameters

organization_id (str) –

summary() dict

Summary of the class to be exposed in the experiment summary file For inherited classes, override this function and add super.summary()

Example

def summary(self):

    summary = super().summary()
    summary.update(
        {
            "attribute": self.attribute,
            ...
        }
    )
    return summary
Returns

a json-serializable dict with the attributes the user wants to store

Return type

dict

References

class substrafl.nodes.node.OperationKey(x)

Bases:

class substrafl.nodes.references.local_state.LocalStateRef(key: str, init: bool = False)

Bases: object

Parameters
Return type

None

class substrafl.nodes.references.shared_state.SharedStateRef(key: str)

Bases: object

Parameters

key (str) –

Return type

None