PK
^=UZ Z / strategies_examples/plot_substrafl_fedavg.ipynb{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Substrafl FedAvg on MNIST dataset\n\n\nThis example illustrate the basic usage of Substrafl, and propose a model training by Federated Learning\nusing de Federated Average strategy.\n\nIt is based on `the MNIST Dataset of handwritten digits `__.\n\nIn this example, we work on **the grayscale images** of size **28x28 pixels**. The problem considered is a\nclassification problem aiming to recognize the number written on each image.\n\nThe objective of this example is to launch a *federated learning* experiment on two organizations, using the **FedAvg strategy** on a\n**convolutional neural network** (CNN)\ntorch model.\n\nThis example does not use the deployed platform of Substra and will run in local mode.\n\n**Requirements:**\n\n - To run this example locally, please make sure to download and unzip in the same directory as this example the\n assets needed to run it:\n\n .. only:: builder_html or readthedocs\n\n :download:`assets required to run this example <../../../../../tmp/substrafl_fedavg_assets.zip>`\n\n Please ensure to have all the libraries installed, a *requirements.txt* file is included in the zip file, where\n you can run the command: `pip install -r requirements.txt` to install them.\n\n - **Substra** and **Substrafl** should already be installed, if not follow the instructions described here:\n `substrafl_doc/substrafl_overview:Installation`\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Client and data preparation\n***************************\n\nImports\n=======\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import codecs\nimport os\nimport pathlib\nimport sys\nimport zipfile\n\nimport numpy as np\nfrom torchvision.datasets import MNIST"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Creating the Substra Client\n===========================\n\nWe work with two different organizations, defined by their IDs. Both organizations provide a dataset. One of them will also provide the algorithm and # will register the machine learning tasks.\n\nOnce these variables defined, we can create our Substra `documentation/references/sdk:Client`.\n\nThis example runs in local mode, simulating a **federated learning** experiment.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substra import Client\n\n# Choose the subprocess mode to locally simulate the FL process\nDEBUG_SPAWNER = \"subprocess\"\nos.environ[\"DEBUG_SPAWNER\"] = DEBUG_SPAWNER\n\n# Create the substra clients\nN_CLIENTS = 2\nclients = [Client(debug=True) for _ in range(N_CLIENTS)]\nclients = {client.organization_info().organization_id: client for client in clients}\n\n# Store their IDs\nORGS_ID = list(clients.keys())\n\n# The org id on which your computation tasks are registered\nALGO_ORG_ID = ORGS_ID[1]\n\n\n# Create the temporary directory for generated data\n(pathlib.Path.cwd() / \"tmp\").mkdir(exist_ok=True)\n\ndata_path = pathlib.Path.cwd() / \"tmp\" / \"data\"\nassets_directory = pathlib.Path.cwd() / \"assets\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Download and extract MNIST dataset\n==================================\n\nThis section downloads (if needed) the **MNIST dataset** using the `torchvision library\n`__.\nIt extracts the images from the raw files and locally create two folders: one for each organization.\n\nEach organization will have access to half the train data, and to half the test data (which correspond to **30,000**\nimages for training and **5,000** for testing each).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def get_int(b: bytes) -> int:\n return int(codecs.encode(b, \"hex\"), 16)\n\n\ndef MNISTraw2numpy(path: str, strict: bool = True) -> np.array:\n # read\n with open(path, \"rb\") as f:\n data = f.read()\n # parse\n magic = get_int(data[0:4])\n nd = magic % 256\n assert 1 <= nd <= 3\n numpy_type = np.uint8\n s = [get_int(data[4 * (i + 1) : 4 * (i + 2)]) for i in range(nd)]\n\n num_bytes_per_value = np.iinfo(numpy_type).bits // 8\n # The MNIST format uses the big endian byte order. If the system uses little endian byte order by default,\n # we need to reverse the bytes before we can read them with np.frombuffer().\n needs_byte_reversal = sys.byteorder == \"little\" and num_bytes_per_value > 1\n parsed = np.frombuffer(bytearray(data), dtype=numpy_type, offset=(4 * (nd + 1)))\n if needs_byte_reversal:\n parsed = parsed.flip(0)\n\n assert parsed.shape[0] == np.prod(s) or not strict\n return parsed.reshape(*s)\n\n\nraw_path = pathlib.Path(data_path) / \"MNIST\" / \"raw\"\n\n# Download the dataset\nMNIST(data_path, download=True)\n\n# Extract numpy array from raw data\ntrain_images = MNISTraw2numpy(str(raw_path / \"train-images-idx3-ubyte\"))\ntrain_labels = MNISTraw2numpy(str(raw_path / \"train-labels-idx1-ubyte\"))\ntest_images = MNISTraw2numpy(str(raw_path / \"t10k-images-idx3-ubyte\"))\ntest_labels = MNISTraw2numpy(str(raw_path / \"t10k-labels-idx1-ubyte\"))\n\n# Split array into the number of organization\ntrain_images_folds = np.split(train_images, N_CLIENTS)\ntrain_labels_folds = np.split(train_labels, N_CLIENTS)\ntest_images_folds = np.split(test_images, N_CLIENTS)\ntest_labels_folds = np.split(test_labels, N_CLIENTS)\n\n# Save splits in different folders to simulate the different organization\nfor i in range(N_CLIENTS):\n\n # Save train dataset on each org\n os.makedirs(str(data_path / f\"org_{i+1}/train\"), exist_ok=True)\n filename = data_path / f\"org_{i+1}/train/train_images.npy\"\n np.save(str(filename), train_images_folds[i])\n filename = data_path / f\"org_{i+1}/train/train_labels.npy\"\n np.save(str(filename), train_labels_folds[i])\n\n # Save test dataset on each org\n os.makedirs(str(data_path / f\"org_{i+1}/test\"), exist_ok=True)\n filename = data_path / f\"org_{i+1}/test/test_images.npy\"\n np.save(str(filename), test_images_folds[i])\n filename = data_path / f\"org_{i+1}/test/test_labels.npy\"\n np.save(str(filename), test_labels_folds[i])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Registering assets\n******************\n\nSubstra and Substrafl imports\n==============================\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from substra.sdk.schemas import (\n DatasetSpec,\n AlgoInputSpec,\n AlgoOutputSpec,\n AssetKind,\n Permissions,\n DataSampleSpec,\n AlgoCategory,\n AlgoSpec,\n)\nfrom substrafl.nodes import TestDataNode, TrainDataNode"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Permissions\n===========\n\nAs data can not be seen once it is registered on the platform, we set `documentation/concepts:Permissions` for\neach `documentation/concepts:Assets` define their access rights to the different data.\n\nThe metadata are visible by all the users of a :term:`Channel`.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"permissions = Permissions(public=False, authorized_ids=ORGS_ID)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Registering dataset\n===================\n\nA `documentation/concepts:Dataset` is composed of an **opener**, which is a Python script with the instruction\nof *how to load the data* from the files in memory, and a **description markdown** file.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dataset = DatasetSpec(\n name=\"MNIST\",\n type=\"npy\",\n data_opener=assets_directory / \"dataset\" / \"opener.py\",\n description=assets_directory / \"dataset\" / \"description.md\",\n permissions=permissions,\n logs_permission=permissions,\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Adding Metrics\n==============\n\nA metric corresponds to an algorithm used to compute the score of predictions on a\n**datasample**.\nConcretely, a metric corresponds to an archive *(tar or zip file)*, automatically build\nfrom:\n\n- a **Python scripts** that implement the metric computation\n- a `Dockerfile `__ to specify the required dependencies of the\n **Python scripts**\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"inputs_metrics = [\n AlgoInputSpec(identifier=\"datasamples\", kind=AssetKind.data_sample, optional=False, multiple=True),\n AlgoInputSpec(identifier=\"opener\", kind=AssetKind.data_manager, optional=False, multiple=False),\n AlgoInputSpec(identifier=\"predictions\", kind=AssetKind.model, optional=False, multiple=False),\n]\n\noutputs_metrics = [AlgoOutputSpec(identifier=\"performance\", kind=AssetKind.performance, multiple=False)]\n\nobjective = AlgoSpec(\n category=AlgoCategory.metric,\n inputs=inputs_metrics,\n outputs=outputs_metrics,\n name=\"Accuracy\",\n description=assets_directory / \"metric\" / \"description.md\",\n file=assets_directory / \"metric\" / \"metrics.zip\",\n permissions=permissions,\n)\n\nMETRICS_DOCKERFILE_FILES = [\n assets_directory / \"metric\" / \"metrics.py\",\n assets_directory / \"metric\" / \"Dockerfile\",\n]\n\narchive_path = objective.file\nwith zipfile.ZipFile(archive_path, \"w\") as z:\n for filepath in METRICS_DOCKERFILE_FILES:\n z.write(filepath, arcname=filepath.name)\n\nmetric_key = clients[ALGO_ORG_ID].add_algo(objective)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Train and test data nodes\n=========================\n\nThe `documentation/concepts:Dataset` object itself does not contain the data. The proper asset to access them\nis the **datasample asset**.\n\nA **datasample** contains a local path to the data, and the key identifying the `documentation/concepts:Dataset`\nit is based on, in order to have access to the proper `opener.py` file.\n\nNow that all our `documentation/concepts:Assets` are well defined, we can create\n`substrafl_doc/api/nodes:TrainDataNode` and `substrafl_doc/api/nodes:TestDataNode` to gathered the\n`documentation/concepts:Dataset` and the **datasamples** on the specified nodes.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"train_data_nodes = list()\ntest_data_nodes = list()\n\nfor ind, org_id in enumerate(ORGS_ID):\n client = clients[org_id]\n\n # Add the dataset to the client to provide access to the opener in each organization.\n dataset_key = client.add_dataset(dataset)\n assert dataset_key, \"Missing data manager key\"\n\n # Add the training data on each organization.\n data_sample = DataSampleSpec(\n data_manager_keys=[dataset_key],\n test_only=False,\n path=data_path / f\"org_{ind+1}\" / \"train\",\n )\n train_datasample_key = client.add_data_sample(\n data_sample,\n local=True,\n )\n\n # Create the Train Data Node (or training task) and save it in a list\n train_data_node = TrainDataNode(\n organization_id=org_id,\n data_manager_key=dataset_key,\n data_sample_keys=[train_datasample_key],\n )\n train_data_nodes.append(train_data_node)\n\n # Add the testing data on each organization.\n data_sample = DataSampleSpec(\n data_manager_keys=[dataset_key],\n test_only=True,\n path=data_path / f\"org_{ind+1}\" / \"test\",\n )\n test_datasample_key = client.add_data_sample(\n data_sample,\n local=True,\n )\n\n # Create the Test Data Node (or testing task) and save it in a list\n test_data_node = TestDataNode(\n organization_id=org_id,\n data_manager_key=dataset_key,\n test_data_sample_keys=[test_datasample_key],\n metric_keys=[metric_key],\n )\n test_data_nodes.append(test_data_node)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Machine Learning specification\n******************************\n\nTorch imports\n=============\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import torch\nfrom torch import nn\nimport torch.nn.functional as F"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"CNN definition\n==============\n\nWe choose to use a classic torch CNN as the model to train. The model structure is defined by the user independently\nof Substrafl.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"seed = 42\ntorch.manual_seed(seed)\n\n\nclass CNN(nn.Module):\n def __init__(self):\n super(CNN, self).__init__()\n self.conv1 = nn.Conv2d(1, 32, kernel_size=5)\n self.conv2 = nn.Conv2d(32, 32, kernel_size=5)\n self.conv3 = nn.Conv2d(32, 64, kernel_size=5)\n self.fc1 = nn.Linear(3 * 3 * 64, 256)\n self.fc2 = nn.Linear(256, 10)\n\n def forward(self, x, eval=False):\n x = F.relu(self.conv1(x))\n x = F.relu(F.max_pool2d(self.conv2(x), 2))\n x = F.dropout(x, p=0.5, training=not eval)\n x = F.relu(F.max_pool2d(self.conv3(x), 2))\n x = F.dropout(x, p=0.5, training=not eval)\n x = x.view(-1, 3 * 3 * 64)\n x = F.relu(self.fc1(x))\n x = F.dropout(x, p=0.5, training=not eval)\n x = self.fc2(x)\n return F.log_softmax(x, dim=1)\n\n\nmodel = CNN()\noptimizer = torch.optim.Adam(model.parameters(), lr=0.001)\ncriterion = torch.nn.CrossEntropyLoss()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Substrafl imports\n==================\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from typing import Any\n\nfrom substrafl.algorithms.pytorch import TorchFedAvgAlgo\nfrom substrafl.dependency import Dependency\nfrom substrafl.strategies import FedAvg\nfrom substrafl.nodes import AggregationNode\nfrom substrafl.evaluation_strategy import EvaluationStrategy\nfrom substrafl.index_generator import NpIndexGenerator\nfrom substrafl.experiment import execute_experiment"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Substrafl algo definition\n==========================\n\nTo instantiate a Substrafl `substrafl_doc/api/algorithms:Torch Algorithms`, you need to define a torch Dataset\nwith a specific `__init__` signature, that must contain (self, x, y, is_inference). This torch Dataset is useful to\npreprocess your data on the `__getitem__` function.\nThe `__getitem__` function is expected to return x and y if is_inference is False, else x.\nThis behavior can be changed by re-writing the `_local_train` or `predict` methods.\n\nThis dataset is passed **as a class** to the `substrafl_doc/api/algorithms:Torch Algorithms`.\nIndeed, this torch Dataset will be instantiated within the algorithm, using the opener functions as x and y\nparameters.\n\nThe index generator will be used a the batch sampler of the dataset, in order to save the state of the seen samples\nduring the training, as Federated Algorithms have a notion of `num_updates`, which forced the batch sampler of the\ndataset to be stateful.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Number of model update between each FL strategy aggregation.\nNUM_UPDATES = 15\n\n# Number of samples per update.\nBATCH_SIZE = 32\n\nindex_generator = NpIndexGenerator(\n batch_size=BATCH_SIZE,\n num_updates=NUM_UPDATES,\n)\n\n\nclass TorchDataset(torch.utils.data.Dataset):\n def __init__(self, x, y, is_inference: bool):\n self.x = x\n self.y = y\n self.is_inference = is_inference\n\n def __getitem__(self, idx):\n if not self.is_inference:\n return self.x[idx] / 255, self.y[idx]\n else:\n return self.x[idx] / 255\n\n def __len__(self):\n return len(self.x)\n\n\nclass MyAlgo(TorchFedAvgAlgo):\n def __init__(self):\n super().__init__(\n model=model,\n criterion=criterion,\n optimizer=optimizer,\n index_generator=index_generator,\n dataset=TorchDataset,\n seed=seed,\n )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Algo dependencies\n=================\n\nThe **dependencies** needed for the `substrafl_doc/api/algorithms:Torch Algorithms` are specified by a\n`substrafl_doc/api/dependency:Dependency` object, in order to install the right library in the Python\nenvironment of each organization.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"algo_deps = Dependency(pypi_dependencies=[\"numpy==1.23.1\", \"torch==1.11.0\"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Federated Learning strategies\n=============================\n\nFor this example, we choose to use the **Federated averaging Strategy** (`substrafl_doc/api/strategies:Strategies`),\nbased on `the FedAvg paper by McMahan et al., 2017 `__.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"strategy = FedAvg()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Running the experiment\n**********************\n\nWe now have all the necessary objects to launch our experiment. Below a summary of all the objects we created so far:\n\n- A `documentation/references/sdk:Client` to orchestrate all the assets of our project, using their keys to\n identify them\n- An `substrafl_doc/api/algorithms:Torch Algorithms`, to define the training parameters *(optimizer, train function,\n predict function, etc...)*\n- A `substrafl_doc/api/strategies:Strategies`, to specify the federated learning aggregation operation\n- `substrafl_doc/api/nodes:TrainDataNode`, to indicate where we can process training task, on which data and using\n which *opener*\n- An `substrafl_doc/api/evaluation_strategy:Evaluation Strategy`, to define where and at which frequency we\n evaluate the model\n- An `substrafl_doc/api/nodes:AggregationNode`, to specify the node on which the aggregation operation will be\n computed\n- The **number of round**, a round being defined by a local training step followed by an aggregation operation\n- An **experiment folder** to save a summary of the operation made\n- The `substrafl_doc/api/dependency:Dependency` to define the libraries the experiment needs to run.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"aggregation_node = AggregationNode(ALGO_ORG_ID)\n\nmy_eval_strategy = EvaluationStrategy(test_data_nodes=test_data_nodes, rounds=1)\n\n# Number of time to apply the compute plan.\nNUM_ROUNDS = 3\n\ncompute_plan = execute_experiment(\n client=clients[ALGO_ORG_ID],\n algo=MyAlgo(),\n strategy=strategy,\n train_data_nodes=train_data_nodes,\n evaluation_strategy=my_eval_strategy,\n aggregation_node=aggregation_node,\n num_rounds=NUM_ROUNDS,\n experiment_folder=str(pathlib.Path.cwd() / \"tmp\" / \"experiment_summaries\"),\n dependencies=algo_deps,\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Listing results\n===============\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import pandas as pd\n\nperformances_df = pd.DataFrame(client.get_performances(compute_plan.key).dict())\nprint(\"\\nPerformance Table: \\n\")\nprint(performances_df[[\"worker\", \"round_idx\", \"performance\"]])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot results\n============\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n\nplt.title(\"Test dataset results\")\nplt.xlabel(\"Rounds\")\nplt.ylabel(\"Accuracy\")\n\nfor id in ORGS_ID:\n df = performances_df.query(f\"worker == '{id}'\")\n plt.plot(df[\"round_idx\"], df[\"performance\"], label=id)\n\nplt.legend(loc=\"lower right\")\nplt.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK
^=UZ Z / strategies_examples/plot_substrafl_fedavg.ipynbPK ] Z