{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Using Torch FedAvg on MNIST dataset\n\n\nThis example illustrates the basic usage of SubstraFL and proposes Federated Learning using the Federated Averaging strategy\non the `MNIST Dataset of handwritten digits `__ using PyTorch.\nIn this example, we work on 28x28 pixel sized grayscale images. This is a classification problem\naiming to recognize the number written on each image.\n\nSubstraFL can be used with any machine learning framework (PyTorch, Tensorflow, Scikit-Learn, etc). \n\nHowever a specific interface has been developed for PyTorch which makes writing PyTorch code simpler than with other frameworks. This example here uses the specific PyTorch interface.\n\nThis example does not use a deployed platform of Substra and runs in local mode.\n\nTo run this example, you have two options:\n\n- **Recommended option**: use a hosted Jupyter notebook. With this option you don't have to install anything, just run the notebook.\n To access the hosted notebook, scroll down at the bottom of this page and click on the **Launch Binder** button.\n- **Run the example locally**. To do that you need to download and unzip the assets needed to run it in the same\n directory as used this example.\n\n .. only:: builder_html or readthedocs\n\n :download:`assets required to run this example <../../../../../tmp/torch_fedavg_assets.zip>`\n\n * Please ensure to have all the libraries installed. A *requirements.txt* file is included in the zip file, where you can run the command ``pip install -r requirements.txt`` to install them.\n * **Substra** and **SubstraFL** should already be installed. If not follow the instructions described here: `substrafl_doc/substrafl_overview:Installation`.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Setup\n*****\n\nThis examples runs with three organizations. Two organizations provide datasets, while a third\none provides the algorithm.\n\nIn the following code cell, we define the different organizations needed for our FL experiment.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substra import Client\n\nN_CLIENTS = 3\n\n# Every computation will run in `subprocess` mode, where everything runs locally in Python\n# subprocesses.\n# Ohers backend_types are:\n# \"docker\" mode where computations run locally in docker containers\n# \"remote\" mode where computations run remotely (you need to have a deployed platform for that)\nclient_0 = Client(backend_type=\"subprocess\")\nclient_1 = Client(backend_type=\"subprocess\")\nclient_2 = Client(backend_type=\"subprocess\")\n# To run in remote mode you have to also use the function `Client.login(username, password)`\n\nclients = {\n client_0.organization_info().organization_id: client_0,\n client_1.organization_info().organization_id: client_1,\n client_2.organization_info().organization_id: client_2,\n}\n\n\n# Store organization IDs\nORGS_ID = list(clients.keys())\nALGO_ORG_ID = ORGS_ID[0] # Algo provider is defined as the first organization.\nDATA_PROVIDER_ORGS_ID = ORGS_ID[1:] # Data providers orgs are the two last organizations."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Data and metrics\n****************\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Data preparation\n================\n\nThis section downloads (if needed) the **MNIST dataset** using the `torchvision library\n`__.\nIt extracts the images from the raw files and locally creates a folder for each\norganization.\n\nEach organization will have access to half the training data and half the test data (which\ncorresponds to **30,000**\nimages for training and **5,000** for testing each).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import pathlib\nfrom torch_fedavg_assets.dataset.mnist_dataset import setup_mnist\n\n\n# Create the temporary directory for generated data\n(pathlib.Path.cwd() / \"tmp\").mkdir(exist_ok=True)\ndata_path = pathlib.Path.cwd() / \"tmp\" / \"data_mnist\"\n\nsetup_mnist(data_path, len(DATA_PROVIDER_ORGS_ID))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dataset registration\n====================\n\nA `documentation/concepts:Dataset` is composed of an **opener**, which is a Python script that can load\nthe data from the files in memory and a description markdown file.\nThe `documentation/concepts:Dataset` object itself does not contain the data. The proper asset that contains the\ndata is the **datasample asset**.\n\nA **datasample** contains a local path to the data. A datasample can be linked to a dataset in order to add data to a\ndataset.\n\nData privacy is a key concept for Federated Learning experiments. That is why we set\n`documentation/concepts:Permissions` for `documentation/concepts:Assets` to determine how each organization can access a specific asset.\n\nNote that metadata such as the assets' creation date and the asset owner are visible to all the organizations of a\nnetwork.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substra.sdk.schemas import DatasetSpec\nfrom substra.sdk.schemas import Permissions\nfrom substra.sdk.schemas import DataSampleSpec\n\nassets_directory = pathlib.Path.cwd() / \"torch_fedavg_assets\"\ndataset_keys = {}\ntrain_datasample_keys = {}\ntest_datasample_keys = {}\n\nfor i, org_id in enumerate(DATA_PROVIDER_ORGS_ID):\n\n client = clients[org_id]\n\n permissions_dataset = Permissions(public=False, authorized_ids=[ALGO_ORG_ID])\n\n # DatasetSpec is the specification of a dataset. It makes sure every field\n # is well defined, and that our dataset is ready to be registered.\n # The real dataset object is created in the add_dataset method.\n\n dataset = DatasetSpec(\n name=\"MNIST\",\n type=\"npy\",\n data_opener=assets_directory / \"dataset\" / \"mnist_opener.py\",\n description=assets_directory / \"dataset\" / \"description.md\",\n permissions=permissions_dataset,\n logs_permission=permissions_dataset,\n )\n dataset_keys[org_id] = client.add_dataset(dataset)\n assert dataset_keys[org_id], \"Missing dataset key\"\n\n # Add the training data on each organization.\n data_sample = DataSampleSpec(\n data_manager_keys=[dataset_keys[org_id]],\n test_only=False,\n path=data_path / f\"org_{i+1}\" / \"train\",\n )\n train_datasample_keys[org_id] = client.add_data_sample(data_sample)\n\n # Add the testing data on each organization.\n data_sample = DataSampleSpec(\n data_manager_keys=[dataset_keys[org_id]],\n test_only=True,\n path=data_path / f\"org_{i+1}\" / \"test\",\n )\n test_datasample_keys[org_id] = client.add_data_sample(data_sample)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Metric registration\n===================\n\nA metric is a function used to evaluate the performance of your model on one or several\n**datasamples**.\n\nTo add a metric, you need to define a function that computes and returns a performance\nfrom the datasamples (as returned by the opener) and the predictions_path (to be loaded within the function).\n\nWhen using a Torch SubstraFL algorithm, the predictions are saved in the `predict` function in numpy format\nso that you can simply load them using `np.load`.\n\nAfter defining the metrics, dependencies, and permissions, we use the `add_metric` function to register the metric.\nThis metric will be used on the test datasamples to evaluate the model performances.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from sklearn.metrics import accuracy_score\nimport numpy as np\n\nfrom substrafl.dependency import Dependency\nfrom substrafl.remote.register import add_metric\n\npermissions_metric = Permissions(\n public=False, authorized_ids=[ALGO_ORG_ID] + DATA_PROVIDER_ORGS_ID\n)\n\n# The Dependency object is instantiated in order to install the right libraries in\n# the Python environment of each organization.\nmetric_deps = Dependency(pypi_dependencies=[\"numpy==1.23.1\", \"scikit-learn==1.1.1\"])\n\n\ndef accuracy(datasamples, predictions_path):\n y_true = datasamples[\"labels\"]\n y_pred = np.load(predictions_path)\n\n return accuracy_score(y_true, np.argmax(y_pred, axis=1))\n\n\nmetric_key = add_metric(\n client=clients[ALGO_ORG_ID],\n metric_function=accuracy,\n permissions=permissions_metric,\n dependencies=metric_deps,\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Specify the machine learning components\n***************************************\nThis section uses the PyTorch based SubstraFL API to simplify the definition of machine learning components.\nHowever, SubstraFL is compatible with any machine learning framework.\n\n\nIn this section, you will:\n\n- Register a model and its dependencies\n- Specify the federated learning strategy\n- Specify the training and aggregation nodes\n- Specify the test nodes\n- Actually run the computations\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Model definition\n================\n\nWe choose to use a classic torch CNN as the model to train. The model structure is defined by the user independently\nof SubstraFL.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import torch\nfrom torch import nn\nimport torch.nn.functional as F\n\nseed = 42\ntorch.manual_seed(seed)\n\n\nclass CNN(nn.Module):\n def __init__(self):\n super(CNN, self).__init__()\n self.conv1 = nn.Conv2d(1, 32, kernel_size=5)\n self.conv2 = nn.Conv2d(32, 32, kernel_size=5)\n self.conv3 = nn.Conv2d(32, 64, kernel_size=5)\n self.fc1 = nn.Linear(3 * 3 * 64, 256)\n self.fc2 = nn.Linear(256, 10)\n\n def forward(self, x, eval=False):\n x = F.relu(self.conv1(x))\n x = F.relu(F.max_pool2d(self.conv2(x), 2))\n x = F.dropout(x, p=0.5, training=not eval)\n x = F.relu(F.max_pool2d(self.conv3(x), 2))\n x = F.dropout(x, p=0.5, training=not eval)\n x = x.view(-1, 3 * 3 * 64)\n x = F.relu(self.fc1(x))\n x = F.dropout(x, p=0.5, training=not eval)\n x = self.fc2(x)\n return F.log_softmax(x, dim=1)\n\n\nmodel = CNN()\noptimizer = torch.optim.Adam(model.parameters(), lr=0.001)\ncriterion = torch.nn.CrossEntropyLoss()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Specifying on how much data to train\n====================================\n\nTo specify on how much data to train at each round, we use the `index_generator` object.\nWe specify the batch size and the number of batches to consider for each round (called `num_updates`).\nSee `substrafl_doc/substrafl_overview:Index Generator` for more details.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substrafl.index_generator import NpIndexGenerator\n\n# Number of model updates between each FL strategy aggregation.\nNUM_UPDATES = 100\n\n# Number of samples per update.\nBATCH_SIZE = 32\n\nindex_generator = NpIndexGenerator(\n batch_size=BATCH_SIZE,\n num_updates=NUM_UPDATES,\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Torch Dataset definition\n==========================\n\nThis torch Dataset is used to preprocess the data using the `__getitem__` function.\n\nThis torch Dataset needs to have a specific `__init__` signature, that must contain (self, datasamples, is_inference).\n\nThe `__getitem__` function is expected to return (inputs, outputs) if `is_inference` is `False`, else only the inputs.\nThis behavior can be changed by re-writing the `_local_train` or `predict` methods.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"class TorchDataset(torch.utils.data.Dataset):\n def __init__(self, datasamples, is_inference: bool):\n self.x = datasamples[\"images\"]\n self.y = datasamples[\"labels\"]\n self.is_inference = is_inference\n\n def __getitem__(self, idx):\n\n if self.is_inference:\n x = torch.FloatTensor(self.x[idx][None, ...]) / 255\n return x\n\n else:\n x = torch.FloatTensor(self.x[idx][None, ...]) / 255\n\n y = torch.tensor(self.y[idx]).type(torch.int64)\n y = F.one_hot(y, 10)\n y = y.type(torch.float32)\n\n return x, y\n\n def __len__(self):\n return len(self.x)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"SubstraFL algo definition\n==========================\n\nA SubstraFL Algo gathers all the defined elements that run locally in each organization.\nThis is the only SubstraFL object that is framework specific (here PyTorch specific).\n\nThe `TorchDataset` is passed **as a class** to the `Torch algorithm `_.\nIndeed, this `TorchDataset` will be instantiated directly on the data provider organization.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substrafl.algorithms.pytorch import TorchFedAvgAlgo\n\n\nclass MyAlgo(TorchFedAvgAlgo):\n def __init__(self):\n super().__init__(\n model=model,\n criterion=criterion,\n optimizer=optimizer,\n index_generator=index_generator,\n dataset=TorchDataset,\n seed=seed,\n )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Federated Learning strategies\n=============================\n\nA FL strategy specifies how to train a model on distributed data.\nThe most well known strategy is the Federated Averaging strategy: train locally a model on every organization,\nthen aggregate the weight updates from every organization, and then apply locally at each organization the averaged\nupdates.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substrafl.strategies import FedAvg\n\nstrategy = FedAvg()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Where to train where to aggregate\n=================================\n\nWe specify on which data we want to train our model, using the `substrafl_doc/api/nodes:TrainDataNode` object.\nHere we train on the two datasets that we have registered earlier.\n\nThe `substrafl_doc/api/nodes:AggregationNode` specifies the organization on which the aggregation operation\nwill be computed.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substrafl.nodes import TrainDataNode\nfrom substrafl.nodes import AggregationNode\n\n\naggregation_node = AggregationNode(ALGO_ORG_ID)\n\ntrain_data_nodes = list()\n\nfor org_id in DATA_PROVIDER_ORGS_ID:\n\n # Create the Train Data Node (or training task) and save it in a list\n train_data_node = TrainDataNode(\n organization_id=org_id,\n data_manager_key=dataset_keys[org_id],\n data_sample_keys=[train_datasample_keys[org_id]],\n )\n train_data_nodes.append(train_data_node)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Where and when to test\n======================\n\nWith the same logic as the train nodes, we create `substrafl_doc/api/nodes:TestDataNode` to specify on which\ndata we want to test our model.\n\nThe `substrafl_doc/api/evaluation_strategy:Evaluation Strategy` defines where and at which frequency we\nevaluate the model, using the given metric(s) that you registered in a previous section.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substrafl.nodes import TestDataNode\nfrom substrafl.evaluation_strategy import EvaluationStrategy\n\n\ntest_data_nodes = list()\n\nfor org_id in DATA_PROVIDER_ORGS_ID:\n\n # Create the Test Data Node (or testing task) and save it in a list\n test_data_node = TestDataNode(\n organization_id=org_id,\n data_manager_key=dataset_keys[org_id],\n test_data_sample_keys=[test_datasample_keys[org_id]],\n metric_keys=[metric_key],\n )\n test_data_nodes.append(test_data_node)\n\n# Test at the end of every round\nmy_eval_strategy = EvaluationStrategy(test_data_nodes=test_data_nodes, rounds=1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Running the experiment\n**********************\n\nWe now have all the necessary objects to launch our experiment. Please see a summary below of all the objects we created so far:\n\n- A `documentation/references/sdk:Client` to add or retrieve the assets of our experiment, using their keys to\n identify them.\n- An `Torch algorithm `_ to define the training parameters *(optimizer, train\n function, predict function, etc...)*.\n- A `Federated Strategy `_, to specify how to train the model on\n distributed data.\n- `Train data nodes `_ to indicate on which data to train.\n- An `substrafl_doc/api/evaluation_strategy:Evaluation Strategy`, to define where and at which frequency we\n evaluate the model.\n- An `substrafl_doc/api/nodes:AggregationNode`, to specify the organization on which the aggregation operation\n will be computed.\n- The **number of rounds**, a round being defined by a local training step followed by an aggregation operation.\n- An **experiment folder** to save a summary of the operation made.\n- The `substrafl_doc/api/dependency:Dependency` to define the libraries on which the experiment needs to run.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substrafl.experiment import execute_experiment\n\n# A round is defined by a local training step followed by an aggregation operation\nNUM_ROUNDS = 3\n\n# The Dependency object is instantiated in order to install the right libraries in\n# the Python environment of each organization.\nalgo_deps = Dependency(pypi_dependencies=[\"numpy==1.23.1\", \"torch==1.11.0\"])\n\ncompute_plan = execute_experiment(\n client=clients[ALGO_ORG_ID],\n algo=MyAlgo(),\n strategy=strategy,\n train_data_nodes=train_data_nodes,\n evaluation_strategy=my_eval_strategy,\n aggregation_node=aggregation_node,\n num_rounds=NUM_ROUNDS,\n experiment_folder=str(pathlib.Path.cwd() / \"tmp\" / \"experiment_summaries\"),\n dependencies=algo_deps,\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Explore the results\n*******************\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"List results\n============\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import pandas as pd\n\nperformances_df = pd.DataFrame(client.get_performances(compute_plan.key).dict())\nprint(\"\\nPerformance Table: \\n\")\nprint(performances_df[[\"worker\", \"round_idx\", \"performance\"]])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot results\n============\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n\nplt.title(\"Test dataset results\")\nplt.xlabel(\"Rounds\")\nplt.ylabel(\"Accuracy\")\n\nfor id in DATA_PROVIDER_ORGS_ID:\n df = performances_df.query(f\"worker == '{id}'\")\n plt.plot(df[\"round_idx\"], df[\"performance\"], label=id)\n\nplt.legend(loc=\"lower right\")\nplt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Download a model\n================\n\nAfter the experiment, you might be interested in downloading your trained model.\nTo do so, you will need the source code in order to reload your code architecture in memory.\nYou have the option to choose the client and the round you are interested in downloading.\n\nIf `round_idx` is set to `None`, the last round will be selected by default.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substrafl.model_loading import download_algo_files\nfrom substrafl.model_loading import load_algo\n\nclient_to_dowload_from = DATA_PROVIDER_ORGS_ID[0]\nround_idx = None\n\nalgo_files_folder = str(pathlib.Path.cwd() / \"tmp\" / \"algo_files\")\n\ndownload_algo_files(\n client=clients[client_to_dowload_from],\n compute_plan_key=compute_plan.key,\n round_idx=round_idx,\n dest_folder=algo_files_folder,\n)\n\nmodel = load_algo(input_folder=algo_files_folder).model\n\nprint(model)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
}
},
"nbformat": 4,
"nbformat_minor": 0
}