src.core package

Submodules

src.core.data_types module

class src.core.data_types.ParameterServerStatus(value)[source]

Bases: Enum

Enum for the status of the parameter server.

ACCEPTED = 0
REJECTED = 1
SHUTDOWN = 2

src.core.parameter_server module

class src.core.parameter_server.ParameterServer(model, param)[source]

Bases: object

Parameter Server for Stale Synchronous Parallel training. The server manages the global model parameters and coordinates the gradient updates from multiple workers. Each worker computes gradients locally and with a push operation sends the result to the server, which aggregates the gradients and updates the model parameters. Each worker can receive the latest model parameters with a pull operation.

Arguments: :param model: PyTorch model instance :type model: nn.Module :param param: Configuration parameters :type param: ConfigParameters

get_hist() list[int][source]

Return the raw counts of staleness occurrences for this run.

get_staleness_stats()[source]

Returns a dict: {“per_worker”: { wid: { “mean”:…, “median”:…, “std”:…, “pct_over_bound”:…}, …},”combined”: {“mean”:…,”median”:…,”std”:…,”pct_over_bound”:…}}

get_time_push()[source]

Return the time spent in push and pull operations.

get_version()[source]

Return the current version of the model parameters.

pull()[source]
push(wid, w_version: int, grads: list[Tensor]) ParameterServerStatus[source]

src.core.parameter_server_asap module

class src.core.parameter_server_asap.ParameterServerASAP_SGD(model: Module, param: ConfigParameters)[source]

Bases: ParameterServer

“Instance-based Adaptiveness to Staleness in Asynchronous SGD” (https://proceedings.mlr.press/v162/backstrom22a/backstrom22a.pdf).

push(wid, w_version: int, grads: list[Tensor]) ParameterServerStatus[source]

src.core.parameter_server_dasgd module

class src.core.parameter_server_dasgd.ParameterServerDASGD(model: Module, param: ConfigParameters)[source]

Bases: ParameterServer

“Asynchronous SGD with stale gradient dynamic adjustment for deep learning training” (https://www.sciencedirect.com/science/article/pii/S0020025524011344?via%3Dihub)

push(wid, w_version: int, grads: list[Tensor]) ParameterServerStatus[source]

src.core.parameter_server_saasgd module

class src.core.parameter_server_saasgd.ParameterServerSAASGD(model: Module, param: ConfigParameters)[source]

Bases: ParameterServer

Staleness-aware Async-SGD Implementation from the paper: “Staleness-aware Asynchronous SGD for Distributed Deep Learning” (https://arxiv.org/pdf/1511.05950).

push(wid, w_version: int, grads: list[Tensor]) ParameterServerStatus[source]

src.core.train_runner module

class src.core.train_runner.PSManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)[source]

Bases: BaseManager

ParameterServer(*args, **kwds)
get_hist(*args, **kwds)
get_staleness_stats(*args, **kwds)
get_time_push(*args, **kwds)
src.core.train_runner.run_training(dataset_builder: ~typing.Callable[[int, int, int], ~typing.Tuple[~torch.utils.data.dataloader.DataLoader, int]], model: ~typing.Callable[[int], ~torch.nn.modules.module.Module], param: ~src.config.ConfigParameters = ConfigParameters(num_workers=5, staleness=50, lr=0.01, local_steps=1, batch_size=10, device='cpu', log_level=20, tol=1e-08, Amplitude=1), parameter_server: ~typing.Callable = <class 'src.core.parameter_server.ParameterServer'>, asgd_worker: ~typing.Callable = <function worker>) list[Tensor][source]

Helper function to run the Stale Synchronous Parallel training with the provided dataset builder, model and configuration parameters.

Parameters:
  • dataset_builder – Function used to build the dataset.

  • model – Model class to be trained.

  • param (ConfigParameters) – SSP Configuration parameters.

Returns:

The final model parameters after training.

Return type:

list[torch.Tensor]

src.core.utils module

src.core.utils.build_model(theta: list[Tensor], model, input_dim: int) Module[source]

Build a model instance from the provided parameters.

Parameters:
  • theta (list[torch.Tensor]) – List of model parameters.

  • model_cls (Callable[[int], nn.Module]) – Model class to be instantiated.

  • input_dim (int) – Input dimension of the model.

Returns:

Model instance with the provided parameters.

Return type:

nn.Module

src.core.utils.evaluate_model(name: str, model: Module, X_eval: ndarray, y_eval: ndarray) float[source]

Evaluate the model on the provided evaluation dataset.

Parameters:
  • model (nn.Module) – Model instance to be evaluated.

  • X_eval (np.ndarray) – Evaluation dataset features.

  • y_eval (np.ndarray) – Evaluation dataset labels.

Returns:

Accuracy of the model on the evaluation dataset.

Return type:

float

Raises:

ValueError – If the model is not in evaluation mode.

src.core.utils.exp_delay(num_workers: int, scale: float = 0.0001) None[source]
src.core.utils.l1_norm(w: ndarray) float[source]
src.core.utils.l2_norm(w: ndarray) float[source]
src.core.utils.parse_args()[source]

Parse command line arguments. User will have to choose the amount of overparametrization between 110%, 150% and 200%. :param overparam: Percentage of features vs samples. :return: Parsed arguments.

src.core.utils.set_seed(seed: int)[source]
src.core.utils.sgd_training(X_train, y_train, num_epochs=10000, criterion=MSELoss(), batch_size=10, lr=0.01, tol=1e-08)[source]
src.core.utils.sparsity_ratio(w: ndarray) float[source]

L1/L2 ratio: higher → more diffuse weights, lower → more concentrated.

src.core.utils.weight_kurtosis(w)[source]

src.core.worker module

src.core.worker.worker(w_id: int, server: ParameterServer, model: Callable[[int], Module], input_dim: int, dataset_builder: Callable[[int, int, int], Tuple[DataLoader, int]], param: ConfigParameters, start_evt) None[source]

Worker function for Stale Synchronous Parallel training.

Parameters:
  • w_id (int) – Worker ID.

  • server (ParameterServer) – Parameter server

  • model (Callable[[int], nn.Module]) – Model class to be trained.

  • input_dim (int) – Input dimension of the model.

  • dataset_builder (Callable[[int,int,int], Tuple[torch.utils.data.DataLoader,int]]+) – Function used to build the dataset.

  • param (ConfigParameters) – SSP Configuration parameters.

Returns:

None

Module contents