hivemind.averaging¶This module lets you average tensors in a decentralized manner.
DecentralizedAverager(averaged_tensors: Sequence[torch.Tensor], dht: DHT, *, start: bool, prefix: str, target_group_size: int, min_group_size: int = 2, initial_group_bits: Optional[str] = None, averaging_expiration: float = 15, request_timeout: float = 3, averaging_alpha: float = 1.0, part_size_bytes: int = 1048576, allreduce_timeout: Optional[float] = None, compression_type: runtime_pb2.CompressionType = 0, bandwidth: Optional[float] = None, min_vector_size: int = 0, auxiliary: bool = False, allow_state_sharing: Optional[bool] = None, client_mode: bool = False, listen_on: Endpoint = '0.0.0.0:*', daemon: bool = True, announced_host: Optional[str] = None, channel_options: Sequence[Tuple[str, Any]] = (), shutdown_timeout: float = 5, **kwargs)¶
Parameter averaging service. A trainer can run this service in background to periodically average his parameters with other trainers. The averaging pattern is chosen so that (1) you only need to average with a small group of peers at a time, but (2) all trainers will converge to global average in a logarithmic number of steps.
- averaged_tensors – a sequence of pytorch tensors that will be averaged in each all-reduce
- dht – a DHT node that will be used to find groups
- start – if True, starts the background process immediately
- prefix – a shared prefix for all group keys
- target_group_size – attempts to form groups with up to this many peers (recommended: a power of 2, e.g. 16)
- initial_group_bits – a string of bits (‘0’ and ‘1’) that define the initial group key (bucket index)
- averaging_expiration – attempt to find a group for this many seconds, otherwise try again note - this expiration time only applies to looking for group, passing tensors in allreduce may take more time
- compression_type – optionally compress tensors with this compression algorithm before sending them to peers
- allreduce_timeout – spend at most this many seconds for allreduce (after group is formed)
- averaging_alpha – optional “learning rate” for averaging. If specified, local parameters will be shifted towards the (estimated) average by this coefficient. By default, local parameters are set equal to average.
- request_timeout – when looking for group, wait for a response from leader for at most this many seconds.
- part_size_bytes – tensors for AllReduce are processed in parts of up to this size (after compression)
- bandwidth – if specified, this value represents the network bandwidth available to averager. By default, the averager is assumed to have the average bandwidth of his group. If bandwidth == 0, averager will rely on its groupmates to do all the averaging.
- client_mode – if False (default), this averager will accept incoming requests from other peers if True, the averager will only join existing groups where at least one peer has client_mode=False
- listen_on – network interface, e.g. “0.0.0.0:1337” or “localhost:” ( means pick any port) or “[::]:7654”
- announced_host – visible IP address the averager will announce for external connections from other peers. If None, the address will be chosen from p2p.get_visible_maddrs() (global IPv4 addresses are preferred)
- channel_options – options for grpc.aio.insecure_channel, e.g. [(‘grpc.enable_retries’, 0)] see https://grpc.github.io/grpc/core/group__grpc__arg__keys.html for a list of all options
- kwargs – extra parameters forwarded to grpc.aio.server
- auxiliary – if this flag is specified, averager.step will only assist others without sending local tensors for averaging
- allow_state_sharing – if set to True, other peers can download this peer’s state. Can be overwritten with averager.allow_state_sharing = True / False
- shutdown_timeout – when calling .shutdown, wait for up to this many seconds before terminating
request_timeout must be smaller than averaging_expiration to avoid potential deadlocks.
>>> averager = DecentralizedAverager(...) >>> with averager.get_tensors() as tensors: >>> # run some code, modify tensors if necessary >>> tensors += 1 >>> # do not use tensors after the lock is released >>> metadata = averager.step(gather=dict(my_batch_size=32)) >>> # run averaging once (in-place), gather metadata from groupmates >>> with averager.get_tensors() as tensors_after_averaging: >>> pass # use the averaged tensors
if set to True, other peers can download this peer’s state
Run averager function in a background thread; this is needed to avoid a heisenbug with broken OMP on fork Turns out, using a non-main thread creates a separate OMP pool that works even if the original pool is corrupted Read more: https://github.com/pytorch/pytorch/issues/17199
Starts averager in a background process. if await_ready, this method will wait until background dht is ready to process incoming requests or for :timeout: seconds max.
shutdown() → None¶
Shut down the averager process
step(gather: Optional[Any] = None, weight: Optional[float] = None, timeout: Optional[float] = None, allow_retries: bool = True, wait: bool = True) → Union[Dict[str, Any], None, hivemind.utils.mpfuture.MPFuture]¶
Set up the averager to look for a group and run one round of averaging, return True on success, False on failure
- gather – optionally send this informaton to all peers in the next group and gather it from every groupmate (this operation is known as all-gather). The gathered data will be available as the output of this function.
- weight – averaging weight for this peer, int or float, must be strictly positive
- allow_retries – if averager fails to run one round of allreduce, this option will allow it to try again within the specified timeout
- timeout – if averager was unable to find a group in this many seconds, consider allreduce failedK
- wait – if True (default), return when finished. Otherwise return MPFuture and run in background.
on success, update averaged_tensors and return group info; on failure, return None
get_current_state() → Tuple[Any, Sequence[torch.Tensor]]¶
Get current state and send it to a peer. executed in the host process. Meant to be overriden. :returns: a tuple of (small metadata, sequence of torch tensors) :note: metadata must be seriablizable with self.serializer (default = MSGPackSerializer)
load_state_from_peers(wait=True) → Optional[Tuple[Any, Sequence[torch.Tensor]]]¶
Try to download the latest optimizer state one of the existing peer. :returns: on success, return a 2-tuple with (metadata, tensors), where
- metadata is a small object containing metadata (e.g. hyperparameters, scalars, etc)
- tensors is a sequence of pytorch tensors meant to contain peer’s model weights and optimizer statistics
The exact contents of both metadata and tensors are determined by get_current_state method
rpc_download_state(request: averaging_pb2.DownloadRequest, context: grpc.ServicerContext) → AsyncIterator[averaging_pb2.DownloadData]¶
Get the up-to-date trainer state from a peer. The state consists of two parts: (serialized_metadata, tensors)
- serialized_metadata is a small serialized bytestring meant to store scalars and hyperparameters
- tensors is a sequence of pytorch tensors that represent model parameters or optimizer statistics
get_group_bits(wait: bool = True)¶
Parameters: wait – if True, return bits immediately. Otherwise return awaitable MPFuture Returns: averager’s current group key bits (without prefix)
set_group_bits(group_bits: str, wait: bool = True)¶
- group_bits – group bits (string of ‘0’ or ‘1’) to be used in averager’s group key
- wait – if True, wait until the update is confirmed by the averager. Otherwise return immediately