Hivemind Server

class hivemind.server.Server(dht: Optional[hivemind.dht.DHT], expert_backends: Dict[str, hivemind.server.expert_backend.ExpertBackend], listen_on: str = '0.0.0.0:*', num_connection_handlers: int = 1, update_period: int = 30, start=False, checkpoint_dir=None, **kwargs)[source]

Server allows you to host “experts” - pytorch sub-networks used by Decentralized Mixture of Experts. After creation, a server should be started: see Server.run or Server.run_in_background.

A working server does 3 things:
  • processes incoming forward/backward requests via Runtime (created by the server)
  • publishes updates to expert status every :update_period: seconds
  • follows orders from HivemindController - if it exists
Parameters:
  • expert_backends – dict{expert uid (str) : ExpertBackend} for all expert hosted by this server.
  • listen_on – server’s dht address that determines how it can be accessed. Address and (optional) port
  • num_connection_handlers – maximum number of simultaneous requests. Please note that the default value of 1 if too small for normal functioning, we recommend 4 handlers per expert backend.
  • update_period – how often will server attempt to publish its state (i.e. experts) to the DHT; if dht is None, this parameter is ignored.
  • start – if True, the server will immediately start as a background thread and returns control after server is ready (see .ready below)
run()[source]

Starts Server in the current thread. Initializes dht if necessary, starts connection handlers, runs Runtime (self.runtime) to process incoming requests.

run_in_background(await_ready=True, timeout=None)[source]

Starts Server in a background thread. if await_ready, this method will wait until background server is ready to process incoming requests or for :timeout: seconds max.

ready[source]

An event (multiprocessing.Event) that is set when the server is ready to process requests.

Example

>>> server.start()
>>> server.ready.wait(timeout=10)
>>> print("Server ready" if server.ready.is_set() else "Server didn't start in 10 seconds")
shutdown()[source]

Gracefully terminate a hivemind server, process-safe. Please note that terminating server otherwise (e.g. by killing processes) may result in zombie processes. If you did already cause a zombie outbreak, your only option is to kill them with -9 (SIGKILL).

class hivemind.server.Runtime(expert_backends: Dict[str, hivemind.server.expert_backend.ExpertBackend], prefetch_batches=64, sender_threads: int = 1, device: torch.device = None)[source]

A group of processes that processes incoming requests for multiple experts on a shared device. Runtime is usually created and managed by Server, humans need not apply.

For debugging, you can start runtime manually with .start() or .run()

>>> expert_backends = {'expert_name': ExpertBackend(**kwargs)}
>>> runtime = Runtime(expert_backends)
>>> runtime.start()  # start runtime in background thread. To start in current thread, use runtime.run()
>>> runtime.ready.wait()  # await for runtime to load all experts on device and create request pools
>>> future = runtime.expert_backends['expert_name'].forward_pool.submit_task(*expert_inputs)
>>> print("Returned:", future.result())
>>> runtime.shutdown()
Parameters:
  • expert_backends – a dict [expert uid -> ExpertBackend]
  • prefetch_batches – form up to this many batches in advance
  • start – start runtime immediately (at the end of __init__)
  • sender_threads – dispatches outputs from finished batches using this many asynchronous threads
  • device – if specified, moves all experts and data to this device via .to(device=device). If you want to manually specify devices for each expert (in their forward pass), leave device=None (default)
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

shutdown()[source]

Gracefully terminate a running runtime.

iterate_minibatches_from_pools(timeout=None)[source]

Chooses pool according to priority, then copies exposed batch and frees the buffer

class hivemind.server.ExpertBackend(name: str, expert: torch.nn.modules.module.Module, opt: torch.optim.optimizer.Optimizer, *, args_schema: Tuple[hivemind.utils.tensor_descr.BatchTensorDescriptor, ...] = None, kwargs_schema: Dict[str, hivemind.utils.tensor_descr.BatchTensorDescriptor] = None, outputs_schema: Union[hivemind.utils.tensor_descr.BatchTensorDescriptor, Tuple[hivemind.utils.tensor_descr.BatchTensorDescriptor, ...]] = None, **kwargs)[source]

ExpertBackend is a wrapper around torch module that allows it to run tasks asynchronously with Runtime By default, ExpertBackend handles three types of requests:

  • forward - receive inputs and compute outputs. Concurrent requests will be batched for better GPU utilization.
  • backward - receive gradients w.r.t. outputs, compute gradients w.r.t. inputs and update expert. Also batched.
  • get_info - return expert metadata. Not batched.
Parameters:
  • expert

    nn.Module to be wrapped into a backend. Arbitrary pytorch module with a few limitations:

    • Experts must always receive the same set of args and kwargs and produce output tensors of same type
    • All args, kwargs and outputs must be tensors where 0-th dimension represents to batch size
    • We recommend using experts that are ~invariant to the order in which they process batches
    • Using randomness (e.g. Dropout) leads to different samples at forward and backward. If you want consistency,
      you should explicitly register these random variables as model inputs or outputs. See hivemind.utils.custom_layers.DeterministicDropout for an example
  • opt – torch optimizer to be applied on every backward call
  • args_schema – description of positional arguments to expert.forward, list of BatchTensorProto
  • kwargs_schema – description of keyword arguments to expert.forward, dict of BatchTensorProto
  • outputs_schema – description of outputs from expert.forward, nested structure of BatchTensorProto
  • kwargs – extra parameters to be forwarded into TaskPool.__init__
forward(*inputs) → Tuple[torch.Tensor, ...][source]

Apply forward pass to an aggregated batch of requests. Used by Runtime, do not call this manually; To submit a request for asynchronous processing, please use ExpertBackend.forward_pool.submit_task.

Subclassing:

This method receives a sequence of torch tensors following nested_flatten(self.forward_schema);

It should return gradients w.r.t. inputs that follow nested_flatten(self.outputs_schema);

backward(*inputs) → Tuple[torch.Tensor, ...][source]

Apply backward pass to an aggregated batch of requests. Used by Runtime, do not call this manually To submit a request for asynchronous processing, please use ExpertBackend.backward_pool.submit_task.

Subclassing:

This method receives a sequence of torch tensors following nested_flatten(self.backward_schema);

It should return gradients w.r.t. inputs that follow nested_flatten(self.forward_schema);

Runtime doesn’t guarantee that backward will be performed in the same order and for the same data as forward, so we recommend stateless backward pass that re-runs expert forward pass inside backward.

Please make sure to call ExpertBackend.apply_gradients here, otherwise the expert will not train

apply_gradients() → None[source]

Train the expert for one step. This method is called by ExpertBackend.backward after computing gradients.

get_info() → Dict[str, Any][source]

Get expert parameters and stats. Used by RemoteExpert to check shapes and for DMoE orchestration.

get_pools() → Sequence[hivemind.server.task_pool.TaskPool][source]

return all pools that should be processed by Runtime

class hivemind.server.TaskPool(process_func: callable, max_batch_size: int, min_batch_size=1, timeout=None, pool_size=None, prefetch_batches=1, uid=None, daemon=True, start=False)[source]

Request aggregator that accepts processing requests, groups them into batches, waits for Runtime to process these batches and dispatches results back to request sources. Operates as a background process.

Parameters:
  • process_func – function to be applied to every formed batch; called by Runtime Note that process_func should accept only positional args (Tensors) and return a flat tuple of Tensors
  • max_batch_size – process at most this many inputs in a batch (task contains have one or several inputs)
  • min_batch_size – process at least this many inputs in a batch, otherwise wait for more
  • timeout – wait for a subsequent task for at most this many seconds
  • pool_size – store at most this many unprocessed tasks in a queue
  • prefetch_batches – prepare up to this many batches in background for faster off-loading to runtime
  • uid – pool identifier used for shared array allocation
  • start – if True, start automatically at the end of __init__
submit_task(*args) → concurrent.futures._base.Future[source]

Add task to this pool’s queue, return Future for its output

iterate_minibatches(*args, **kwargs)[source]

Form minibatches by grouping one or more tasks together up to self.max_batch_size

load_batch_to_runtime(timeout=None, device=None) → Tuple[Any, List[torch.Tensor]][source]

receive next batch of numpy arrays

send_outputs_from_runtime(batch_index: int, batch_outputs: List[torch.Tensor])[source]

send results for a processed batch, previously loaded through load_batch_to_runtime

get_task_size(task: hivemind.server.task_pool.Task) → int[source]

compute task processing complexity (used for batching); defaults to batch size