Federated Analytics on the diabetes dataset
This example demonstrates how to use the flexibility of the SubstraFL library and the base class ComputePlanBuilder to do Federated Analytics. It reproduces the diabetes example. of the Substra SDK example section using SubstraFL. If you are new to SubstraFL, we recommend to start by the MNIST Example. to learn how to use the library in the simplest configuration first.
We use the Diabetes dataset available from the Scikit-Learn dataset module. This dataset contains medical information such as Age, Sex or Blood pressure. The goal of this example is to compute some analytics such as Age mean, Blood pressure standard deviation or Sex percentage.
We simulate having two different data organizations, and a third organization which wants to compute aggregated analytics without having access to the raw data. The example here runs everything locally; however there is only one parameter to change to run it on a real network.
Caution: This example is provided as an illustrative example only. In real life, you should be careful not to accidentally leak private information when doing Federated Analytics. For example if a column contains very similar values, sharing its mean and its standard deviation is functionally equivalent to sharing the content of the column. It is strongly recommended to consider what are the potential security risks in your use case, and to act accordingly. It is possible to use other privacy-preserving techniques, such as Differential Privacy, in addition to Substra. Because the focus of this example is Substra capabilities and for the sake of simplicity, such safeguards are not implemented here.
To run this example, you need to download and unzip the assets needed to run it in the same directory as used this example.
Please ensure to have all the libraries installed. A requirements.txt file is included in the zip file, where you can run the command pip install -r requirements.txt
to install them.
Instantiating the Substra clients
We work with three different organizations. Two organizations provide data, and a third one performs Federated Analytics to compute aggregated statistics without having access to the raw datasets.
This example runs in local mode, simulating a federated learning experiment.
In the following code cell, we define the different organizations needed for our FL experiment.
[1]:
from substra import Client
# Choose the subprocess mode to locally simulate the FL process
N_CLIENTS = 3
client_0 = Client(client_name="org-1")
client_1 = Client(client_name="org-2")
client_2 = Client(client_name="org-3")
# Create a dictionary to easily access each client from its human-friendly id
clients = {
client_0.organization_info().organization_id: client_0,
client_1.organization_info().organization_id: client_1,
client_2.organization_info().organization_id: client_2,
}
# Store organization IDs
ORGS_ID = list(clients)
# The provider of the functions for computing analytics is defined as the first organization.
ANALYTICS_PROVIDER_ORG_ID = ORGS_ID[0]
# Data providers orgs are the two last organizations.
DATA_PROVIDER_ORGS_ID = ORGS_ID[1:]
/home/docs/checkouts/readthedocs.org/user_builds/substra-documentation/conda/stable/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from .autonotebook import tqdm as notebook_tqdm
Prepare the data
The function setup_diabetes
downloads if needed the diabetes dataset, and split it in two to simulate a federated setup. Each data organization has access to a chunk of the dataset.
[2]:
import pathlib
from diabetes_substrafl_assets.dataset.diabetes_substrafl_dataset import setup_diabetes
data_path = pathlib.Path.cwd() / "tmp" / "data_diabetes"
data_path.mkdir(parents=True, exist_ok=True)
setup_diabetes(data_path=data_path)
Registering data samples and dataset
Every asset will be created in respect to predefined specifications previously imported from substra.sdk.schemas
. To register assets, Schemas are first instantiated and the specs are then registered, which generate the real assets.
Permissions are defined when registering assets. In a nutshell:
Data cannot be seen once it’s registered on the platform.
Metadata are visible by all the users of a network.
Permissions allow you to execute a function on a certain dataset.
Next, we need to define the asset directory. You should have already downloaded the assets folder as stated above.
A dataset represents the data in Substra. It contains some metadata and an opener, a script used to load the data from files into memory. You can find more details about datasets in the API Reference DatasetSpec.
[3]:
from substra.sdk.schemas import DataSampleSpec
from substra.sdk.schemas import DatasetSpec
from substra.sdk.schemas import Permissions
assets_directory = pathlib.Path.cwd() / "diabetes_substrafl_assets"
assert assets_directory.is_dir(), """Did not find the asset directory,
a directory called 'assets' is expected in the same location as this file"""
permissions_dataset = Permissions(public=False, authorized_ids=[ANALYTICS_PROVIDER_ORG_ID])
dataset = DatasetSpec(
name=f"Diabetes dataset",
data_opener=assets_directory / "dataset" / "diabetes_substrafl_opener.py",
description=data_path / "description.md",
permissions=permissions_dataset,
logs_permission=permissions_dataset,
)
# We register the dataset for each organization
dataset_keys = {client_id: clients[client_id].add_dataset(dataset) for client_id in DATA_PROVIDER_ORGS_ID}
for client_id, key in dataset_keys.items():
print(f"Dataset key for {client_id}: {key}")
Dataset key for MyOrg2MSP: 7b093128-c414-407d-9b72-c19784021e32
Dataset key for MyOrg3MSP: 0f2cc772-9a1e-420d-8e42-96722402410d
The dataset object itself is an empty shell. Data samples are needed in order to add actual data. A data sample contains subfolders containing a single data file like a CSV and the key identifying the dataset it is linked to.
[4]:
datasample_keys = {
org_id: clients[org_id].add_data_sample(
DataSampleSpec(
data_manager_keys=[dataset_keys[org_id]],
path=data_path / f"org_{i + 1}",
),
local=True,
)
for i, org_id in enumerate(DATA_PROVIDER_ORGS_ID)
}
The flexibility of the ComputePlanBuilder class
This example aims at explaining how to use the Compute Plan Builder class, and how to use the full power of the flexibility it provides.
Before starting, we need to have in mind that a federated computation can be represented as a graph of tasks. Some of these tasks need data to be executed (training tasks) and others are here to aggregate local results (aggregation tasks).
Substra does not store an explicit definition of this graph; instead, it gives the user full flexibility to define the compute plan (or computation graph) they need, by linking a task to its parents.
To create this graph of computations, SubstraFL provides the Node
abstraction. A Node
assigns to an organization (aka a Client) tasks of a given type. The type of the Node
depends on the type of tasks we want to run on this organization (training or aggregation tasks).
An organization (aka Client) without data can host an Aggregation node. We will use the Aggregation node object to compute the aggregated analytics.
An organization (aka a Client) containing the data samples can host a Train data node. Each node will only have access data from the organization hosting it. These data samples must be instantiated with the right permissions to be processed by the given Client.
[5]:
from substrafl.nodes import TrainDataNode
from substrafl.nodes import AggregationNode
aggregation_node = AggregationNode(ANALYTICS_PROVIDER_ORG_ID)
train_data_nodes = [
TrainDataNode(
organization_id=org_id,
data_manager_key=dataset_keys[org_id],
data_sample_keys=[datasample_keys[org_id]],
)
for org_id in DATA_PROVIDER_ORGS_ID
]
The Compute Plan Builder is an abstract class that asks the user to implement only three methods:
build_compute_plan(...)
load_local_state(...)
save_local_state(...)
The build_compute_plan
method is essential to create the graph of the compute plan that will be executed on Substra. Using the different Nodes
we created, we will update their states by applying user defined methods.
These methods are passed as argument to the Node
using its update_state
method.
[6]:
import numpy as np
import pandas as pd
import json
from collections import defaultdict
import pandas as pd
from typing import List, Dict
from substrafl import ComputePlanBuilder
from substrafl.remote import remote_data, remote
class Analytics(ComputePlanBuilder):
def __init__(self):
super().__init__()
self.first_order_aggregated_state = {}
self.second_order_aggregated_state = {}
@remote_data
def local_first_order_computation(self, data_from_opener: pd.DataFrame, shared_state=None):
"""Compute from the data samples, expected to be a pandas dataframe,
the means and counts of each column of the data frame.
These data_from_opener are the output of the ``get_data`` function defined
in the ``diabetes_substrafl_opener.py`` file are available in the asset
folder downloaded at the beginning of the example.
The signature of a function decorated by @remote_data must contain
the data_from_opener and the shared_state arguments.
Args:
data_from_opener (pd.DataFrame): Pandas dataframe provided by the opener.
shared_state (None, optional): Unused here as this function only
use local information already present in the data.
Defaults to None.
Returns:
dict: dictionary containing the local information on means, counts
and number of sample. This dict will be used as a state to be
shared to an AggregationNode in order to compute the aggregation
of the different analytics.
"""
df = data_from_opener
states = {
"n_samples": len(df),
"means": df.select_dtypes(include=np.number).sum().to_dict(),
"counts": {
name: series.value_counts().to_dict() for name, series in df.select_dtypes(include="category").items()
},
}
return states
@remote_data
def local_second_order_computation(self, data_from_opener: pd.DataFrame, shared_state: Dict):
"""This function will use the output of the ``aggregation`` function to compute
locally the standard deviation of the different columns.
Args:
data_from_opener (pd.DataFrame): Pandas dataframe provided by the opener.
shared_state (Dict): Output of a first order analytics computation,
that must contain the means.
Returns:
Dict: dictionary containing the local information on standard deviation
and number of sample. This dict will be used as a state to be shared
to an AggregationNode in order to compute the aggregation of the
different analytics.
"""
df = data_from_opener
means = pd.Series(shared_state["means"])
states = {
"n_samples": len(df),
"std": np.power(df.select_dtypes(include=np.number) - means, 2).sum(),
}
return states
@remote
def aggregation(self, shared_states: List[Dict]):
"""Aggregation function that receive a list on locally computed analytics in order to
aggregate them.
The aggregation will be a weighted average using "n_samples" as weight coefficient.
Args:
shared_states (List[Dict]): list of dictionaries containing a field "n_samples",
and the analytics to aggregate in separated fields.
Returns:
Dict: dictionary containing the aggregated analytics.
"""
total_len = 0
for state in shared_states:
total_len += state["n_samples"]
aggregated_values = defaultdict(lambda: defaultdict(float))
for state in shared_states:
for analytics_name, col_dict in state.items():
if analytics_name == "n_samples":
# already aggregated in total_len
continue
for col_name, v in col_dict.items():
if isinstance(v, dict):
# this column is categorical and v is a dict over
# the different modalities
if not aggregated_values[analytics_name][col_name]:
aggregated_values[analytics_name][col_name] = defaultdict(float)
for modality, vv in v.items():
aggregated_values[analytics_name][col_name][modality] += vv / total_len
else:
# this is a numerical column and v is numerical
aggregated_values[analytics_name][col_name] += v / total_len
# transform default_dict to regular dict
aggregated_values = json.loads(json.dumps(aggregated_values))
return aggregated_values
def build_compute_plan(
self,
train_data_nodes: List[TrainDataNode],
aggregation_node: AggregationNode,
num_rounds=None,
evaluation_strategy=None,
clean_models=False,
):
"""Method to build and link the different computations to execute with each other.
We will use the ``update_state``method of the nodes given as input to choose which
method to apply.
For our example, we will only use TrainDataNodes and AggregationNodes.
Args:
train_data_nodes (List[TrainDataNode]): Nodes linked to the data samples on which
to compute analytics.
aggregation_node (AggregationNode): Node on which to compute the aggregation
of the analytics extracted from the train_data_nodes.
num_rounds Optional[int]: Num rounds to be used to iterate on recurrent part of
the compute plan. Defaults to None.
evaluation_strategy Optional[substrafl.EvaluationStrategy]: Object storing the
TestDataNode. Unused in this example. Defaults to None.
clean_models (bool): Clean the intermediary models of this round on the
Substra platform. Default to False.
"""
first_order_shared_states = []
local_states = {}
for node in train_data_nodes:
# Call local_first_order_computation on each train data node
next_local_state, next_shared_state = node.update_states(
self.local_first_order_computation(
node.data_sample_keys,
shared_state=None,
_algo_name=f"Computing first order means with {self.__class__.__name__}",
),
local_state=None,
round_idx=0,
authorized_ids=set([node.organization_id]),
aggregation_id=aggregation_node.organization_id,
clean_models=False,
)
# All local analytics are stored in the first_order_shared_states,
# given as input the the aggregation method.
first_order_shared_states.append(next_shared_state)
local_states[node.organization_id] = next_local_state
# Call the aggregation method on the first_order_shared_states
self.first_order_aggregated_state = aggregation_node.update_states(
self.aggregation(
shared_states=first_order_shared_states,
_algo_name="Aggregating first order",
),
round_idx=0,
authorized_ids=set([train_data_node.organization_id for train_data_node in train_data_nodes]),
clean_models=False,
)
second_order_shared_states = []
for node in train_data_nodes:
# Call local_second_order_computation on each train data node
_, next_shared_state = node.update_states(
self.local_second_order_computation(
node.data_sample_keys,
shared_state=self.first_order_aggregated_state,
_algo_name=f"Computing second order analytics with {self.__class__.__name__}",
),
local_state=local_states[node.organization_id],
round_idx=1,
authorized_ids=set([node.organization_id]),
aggregation_id=aggregation_node.organization_id,
clean_models=False,
)
# All local analytics are stored in the second_order_shared_states,
# given as input the the aggregation method.
second_order_shared_states.append(next_shared_state)
# Call the aggregation method on the second_order_shared_states
self.second_order_aggregated_state = aggregation_node.update_states(
self.aggregation(
shared_states=second_order_shared_states,
_algo_name="Aggregating second order",
),
round_idx=1,
authorized_ids=set([train_data_node.organization_id for train_data_node in train_data_nodes]),
clean_models=False,
)
def save_local_state(self, path: pathlib.Path):
"""This function will save the important local state to retrieve after each new
call to a train or test task.
Args:
path (pathlib.Path): Path where to save the local_state. Provided internally by
Substra.
"""
state_to_save = {
"first_order": self.first_order_aggregated_state,
"second_order": self.second_order_aggregated_state,
}
with open(path, "w") as f:
json.dump(state_to_save, f)
def load_local_state(self, path: pathlib.Path):
"""Mirror function to load the local_state from a file saved using
``save_local_state``.
Args:
path (pathlib.Path): Path where to load the local_state. Provided internally by
Substra.
Returns:
ComputePlanBuilder: return self with the updated local state.
"""
with open(path, "r") as f:
state_to_load = json.load(f)
self.first_order_aggregated_state = state_to_load["first_order"]
self.second_order_aggregated_state = state_to_load["second_order"]
return self
Now that we saw the implementation of the custom Analytics
class, we can add details to some of the previously introduced concepts.
The update_state
method outputs the new state of the node, that can be passed as an argument to a following one. This succession of next_state
passed to a new node.update_state
is how Substra build the graph of the compute plan.
The load_local_state
and save_local_state
are two methods used at each new iteration on a Node, in order to retrieve the previous local state that have not been shared with the other Nodes
.
For instance, after updating a Train data node using its update_state
method, we will have access to its next local state, that we will pass as argument to the next update_state
we will apply on this Train data node.
To summarize, a Compute Plan Builder is composed of several decorated user defined functions, that can need some data (decorated with @remote_data
) or not (decorated with @remote
).
See Decorator for more information on these decorators.
These user defined functions will be used to create the graph of the compute plan through the build_compute_plan
method and the update_state
method of the different Nodes
.
The local state obtained after updating a Train data node needs the methods save_local_state
and load_local_state
to retrieve the state where the Node was at the end of the last update.
Running the experiment
As a last step before launching our experiment, we need to specify the third parties dependencies required to run it. The Dependency object is instantiated in order to install the right libraries in the Python environment of each organization.
We now have all the necessary objects to launch our experiment. Please see a summary below of all the objects we created so far:
A Client to add or retrieve the assets of our experiment, using their keys to identify them.
A Federated Strategy, to specify what compute plan we want to execute.
Train data nodes to indicate on which data to train.
An Evaluation Strategy, to define where and at which frequency we evaluate the model. Here this does not apply to our experiment. We set it to None.
An Aggregation Node, to specify the organization on which the aggregation operation will be computed.
An experiment folder to save a summary of the operation made.
The Dependency to define the libraries on which the experiment needs to run.
[7]:
from substrafl.dependency import Dependency
from substrafl.experiment import execute_experiment
dependencies = Dependency(pypi_dependencies=["numpy==2.1.1", "pandas==2.2.2"])
compute_plan = execute_experiment(
client=clients[ANALYTICS_PROVIDER_ORG_ID],
strategy=Analytics(),
train_data_nodes=train_data_nodes,
evaluation_strategy=None,
aggregation_node=aggregation_node,
experiment_folder=str(pathlib.Path.cwd() / "tmp" / "experiment_summaries"),
dependencies=dependencies,
clean_models=False,
name="Federated Analytics with SubstraFL documentation example",
)
2024-10-15 06:26:36,618 - INFO - Building the compute plan.
2024-10-15 06:26:36,620 - INFO - Registering the functions to Substra.
2024-10-15 06:26:36,647 - INFO - Registering the compute plan to Substra.
2024-10-15 06:26:36,648 - INFO - Experiment summary saved to /home/docs/checkouts/readthedocs.org/user_builds/substra-documentation/checkouts/stable/docs/source/examples/substrafl/go_further/tmp/experiment_summaries/2024_10_15_06_26_36_67235568-57cc-4797-8376-283ee5c76079.json
Compute plan progress: 100%|██████████| 6/6 [00:04<00:00, 1.24it/s]
2024-10-15 06:26:41,475 - INFO - The compute plan has been registered to Substra, its key is 67235568-57cc-4797-8376-283ee5c76079.
Results
The output of a task can be downloaded using some utils function provided by SubstraFL, such as download_algo_state
, download_train_shared_state
or download_aggregate_shared_state
.
These functions download from a given Client
and a given compute_plan_key
the output of a given round_idx
or rank_idx
.
[8]:
from substrafl.model_loading import download_aggregate_shared_state
# The aggregated analytics are computed in the ANALYTICS_PROVIDER_ORG_ID client.
client_to_download_from = clients[ANALYTICS_PROVIDER_ORG_ID]
# The results will be available once the compute plan is completed
client_to_download_from.wait_compute_plan(compute_plan.key)
first_rank_analytics = download_aggregate_shared_state(
client=client_to_download_from,
compute_plan_key=compute_plan.key,
round_idx=0,
)
second_rank_analytics = download_aggregate_shared_state(
client=client_to_download_from,
compute_plan_key=compute_plan.key,
round_idx=1,
)
print(
f"""Age mean: {first_rank_analytics['means']['age']:.2f} years
Sex percentage:
Male: {100*first_rank_analytics['counts']['sex']['M']:.2f}%
Female: {100*first_rank_analytics['counts']['sex']['F']:.2f}%
Blood pressure std: {second_rank_analytics["std"]["bp"]:.2f} mm Hg
"""
)
Age mean: 48.52 years
Sex percentage:
Male: 53.17%
Female: 46.83%
Blood pressure std: 190.87 mm Hg