hivemind.dht

This is a Distributed Hash Table optimized for rapidly accessing a lot of lightweight metadata. Hivemind DHT is based on Kademlia [1] with added support for improved bulk store/get operations and caching.

The code is organized as follows:

  • class DHT (__init__.py) - high-level class for model training. Runs DHTNode in a background process.
  • class DHTNode (node.py) - an asyncio implementation of dht server, stores AND gets keys.
  • class DHTProtocol (protocol.py) - an RPC protocol to request data from dht nodes.
  • async def traverse_dht (traverse.py) - a search algorithm that crawls DHT peers.
  • [1] Maymounkov P., Mazieres D. (2002) Kademlia: A Peer-to-Peer Information System Based on the XOR Metric.
  • [2] https://github.com/bmuller/kademlia , Brian, if you’re reading this: THANK YOU! you’re awesome :)

Here’s a high level scheme of how these components interact with one another:

../_images/dht.png

DHT and DHTNode

class hivemind.dht.DHT(listen_on: str = '0.0.0.0:*', initial_peers: Sequence[str] = (), *, start: bool, daemon: bool = True, max_workers: Optional[int] = None, parallel_rpc: Optional[int] = None, receiver_threads: int = 1, negative_caching: bool = True, expiration: float = 300, **kwargs)[source]

High-level interface to hivemind.dht that is designed to allow RemoteMixtureOfExperts to select best experts.

  • hivemind servers periodically announce their experts via DHT.declare_experts
  • trainers find most suitable experts via DHT.find_best_experts
Parameters:
  • initial_peers – one or multiple endpoints pointing to active DHT peers. Similar format to listen_on.
  • listen_on – an interface for incoming connections, e.g. “127.0.0.1:”, “0.0.0.0:1234” or “ipv6:[::]:
  • start – if True, automatically starts the background process on creation. Otherwise await manual start
  • daemon – if True, the background process is marked as daemon and automatically terminated after main process
  • max_workers – declare_experts and get_experts will use up to this many parallel workers (but no more than one per key)
  • expiration – experts declared from this node expire after this many seconds (default = 5 minutes)
  • receiver_threads – uses this many threads to await on input pipe. Default = 1 should be enough in most cases
  • negative_caching

    if True, whenever DHT is unable to find an expert or prefix, it will cache the “no key” result inside the DHT for :expiration: seconds. Caching only affects beam search and has three main effects:

    1. Faster beam search under node failures: if there are inconsistencies in DHT keys, such as a prefix pointing to a now-defunct expert, these inconsistencies will be overwritten by the first peer that stumbles upon them. As a result, beam search will not have to wait for non-existent experts until the expiration of their DHT entries;
    2. Delayed expert availability: Without negative cache, new experts are always immediately available for beam search after they are published to the DHT. With negative cache, there are rare cases (e.g. when adding new experts in place of recently defunct ones) when new experts will be initially invisible, but gradually become visible to more peers as those peers refresh their cache. This process takes at most :expiration: seconds;
    3. Faster beam search in very sparse grids: there is one edge case where negative cache will improve beam search performance; If an expert grid is very sparse, there can be empty indices in the first grid dimension (i.e. indices {i} such that _no_ experts that start with “{prefix}.{i}.*”). If so, the default beam search will be very slow due to the way it forms initial beam. Beam search with negative cache enabled will run normally. Though, this is a pathological case (e.g. only 90 experts in an oversized 100x100 grid) that should be avoided.
  • kwargs – any other params will be forwarded to DHTNode upon creation

Each expert has an identifier in the form of {prefix}.{i}.{j}.{…}, e.g. “ffn_expert.98.76.54.32.10” An expert identifier consists of:

  • optional prefix that determines expert role, experiment name, etc.
  • one or more integers that determine that expert’s position in an N-dimensional grid

A hivemind.Server can DHT.declare_experts(expert_uids: List[str]) to make its experts visible to everyone. When declaring experts, DHT will store each expert’s uid and all its prefixes until :expiration: (specified at init) For instance, declaring “ffn_expert.98.76.54.32.10” will store the following keys in a DHT: "ffn_expert.98", "ffn_expert.98.76", "ffn_expert.98.76.54", ..., "ffn_expert.98.76.54.32.10"

In order to enable fast beam search, DHT maintains dictionaries of all active suffixes for every prefix (e.g. “ffn_expert.98”: {76: ffn_expert.98.76…., 123: ffn_expert.98.123…, 225: ffn_expert.98.225….}))

RemoteMixtureOfExperts can use these prefixes to find top-k most suitable experts with a left-to-right beam search. For instance, consider RemoteMixtureOfExperts with prefix “ffn_expert” and grid size [100, 100, 100, 100, 100]. This MoE can query all experts with that prefix and arbitrary indices in 0…99 along each dimension. However, not every expert in such 100^5 grid can be alive at a given moment of time (the grid size is redundant). In order to find k best “alive” experts, MoE first ranks indices along the first dimension with its gating function. It can then check which of those indices correspond to “alive” experts by querying keys such as “ffn_expert.98”.

After selecting k best indices along first dimension, MoE moves to the second dimension. It can find top-k index pairs (e.g. “expert.98.76”) that use one of k best indices from the previous step. This beam search explores one additional dimension per step and finds k best experts from across the DHT in O(k * num_dimensions * dimension_size) time depending on the chosen grid dimensions.

run() → None[source]

Serve DHT forever. This function will not return until DHT node is shut down

run_in_background(await_ready=True, timeout=None)[source]

Starts DHT in a background process. if await_ready, this method will wait until background dht is ready to process incoming requests or for :timeout: seconds max.

shutdown() → None[source]

Shut down a running dht process

get(key: Any, latest: bool = False, return_future: bool = False, **kwargs) → Union[hivemind.utils.timed_storage.ValueWithExpiration[typing.Any][Any], None, hivemind.utils.mpfuture.MPFuture][source]

Search for a key across DHT and return either first or latest entry (if found). :param key: same key as in node.store(…) :param latest: if True, finds the latest value, otherwise finds any non-expired value (which is much faster) :param return_future: if False (default), return when finished. Otherwise return MPFuture and run in background. :param kwargs: parameters forwarded to DHTNode.get_many_by_id :returns: (value, expiration time); if value was not found, returns None

store(key: Any, value: Any, expiration_time: float, subkey: Optional[Any] = None, return_future: bool = False, **kwargs) → Union[bool, hivemind.utils.mpfuture.MPFuture][source]

Find num_replicas best nodes to store (key, value) and store it there until expiration time. :note: store is a simplified interface to store_many, all kwargs are be forwarded there :param return_future: if False (default), return when finished. Otherwise return MPFuture and run in background. :returns: True if store succeeds, False if it fails (due to no response or newer value)

get_visible_address(num_peers: Optional[int] = None, peers: Sequence[str] = ()) → str[source]

Get this machine’s visible address by requesting other peers or using pre-specified network addresses. If no parameters are specified, this function will check for manual endpoint; if unavailable, ask 1 random peer.

Parameters:
  • num_peers – if specified, ask multiple peers and check that they perceive the same endpoint
  • peers – if specified, ask these exact peers instead of choosing random known peers
Note:

if this node has no known peers in routing table, one must specify :peers: manually

declare_experts(uids: Sequence[str], endpoint: str, wait: bool = True, timeout: Optional[float] = None) → Dict[str, bool][source]

Make experts visible to all DHT peers; update timestamps if declared previously.

Parameters:
  • uids – a list of expert ids to update
  • endpoint – endpoint that serves these experts, usually your server endpoint (e.g. “201.111.222.333:1337”)
  • wait – if True, awaits for declaration to finish, otherwise runs in background
  • timeout – waits for the procedure to finish for up to this long, None means wait indefinitely
Returns:

if wait, returns store status for every key (True = store succeeded, False = store rejected)

get_experts(uids: List[str], expiration_time: Optional[float] = None, return_future: bool = False) → List[Optional[hivemind.client.expert.RemoteExpert]][source]
Parameters:
  • uids – find experts with these ids from across the DHT
  • expiration_time – if specified, return experts that expire no sooner than this (based on get_dht_time)
  • return_future – if False (default), return when finished. Otherwise return MPFuture and run in background.
Returns:

a list of [RemoteExpert if found else None]

get_initial_beam(prefix: str, scores: Sequence[float], beam_size: int, num_workers: Optional[int] = None, return_future: bool = False) → List[Tuple[float, str, Dict[int, hivemind.dht.UidEndpoint]]][source]
Parameters:
  • prefix – search for experts whose uids start with this prefix
  • scores – prefer suffix coordinates that have highest scores
  • beam_size – select this many active suffixes with highest scores
  • num_workers – maintain up to this many concurrent DHT searches
  • return_future – if False (default), return when finished. Otherwise return MPFuture and run in background.
Returns:

a list of up to beam_size tuples of (prefix score, prefix itself, dict{suffix: example expert})

get_active_successors(prefixes: List[str], grid_size: Optional[int] = None, num_workers: Optional[int] = None, return_future: bool = False) → Dict[str, Dict[int, hivemind.dht.UidEndpoint]][source]
Parameters:
  • prefixes – a list of prefix for which to find active successor uids
  • grid_size – if specified, only return successors if ther are in range [0, grid_size)
  • num_workers – how many parallel workers to use for DHTNode.get_many
  • return_future – if False (default), find and return successors. Otherwise return MPFuture and fill later.
Returns:

for every expert, return a dict{active_next_coordinate: (matching_expert_uid, matching_endpoint)}

Note:

if a prefix is not found, get_active_successors will return an empty dictionary for that prefix

find_best_experts(prefix: str, grid_scores: Sequence[Sequence[float]], beam_size: int, num_workers: Optional[int] = None, return_future: bool = False) → Union[List[hivemind.client.expert.RemoteExpert], hivemind.utils.mpfuture.MPFuture][source]

Find and return :beam_size: active experts with highest scores, use both local cache and DHT

Parameters:
  • prefix – common prefix for all expert uids in grid
  • grid_scores (model scores for each grid dimension, list of arrays of shape grid_size[i]) – scores predicted for each dimension in the grid,
  • beam_size – how many best experts should beam search return After time_budget is reached, beam search won’t search for more experts and instead fall back on local cache Please note that any queries that fall outside the budget will still be performed in background and cached for subsequent iterations as long as DHTNode.cache_locally is True
  • num_workers – use up to this many concurrent workers to search DHT
  • return_future – if set to True, returns MPFuture that can be awaited to get the actual result
Returns:

a list that contains up to k_best RemoteExpert instances

batch_find_best_experts(prefix: str, batch_grid_scores: Sequence[Sequence[Sequence[float]]], beam_size: int, *, workers_per_sample: Optional[int] = None, return_future=False) → Union[List[List[hivemind.client.expert.RemoteExpert]], hivemind.utils.mpfuture.MPFuture][source]

Find and return :beam_size: active experts with highest scores, use both local cache and DHT

Parameters:
  • prefix – common prefix for all expert uids in grid
  • batch_grid_scores (list of arrays of shape (batch_size, grid_size[i])) – scores predicted for each batch example and each dimension in the grid,
  • beam_size – how many best experts should beam search return After time_budget is reached, beam search won’t search for more experts and instead fall back on local cache Please note that any queries that fall outside the budget will still be performed in background and cached for subsequent iterations as long as DHTNode.cache_locally is True
  • workers_per_sample – use up to this many concurrent workers for every sample in batch
  • return_future – if set to True, returns MPFuture that can be awaited to get the actual result
Returns:

a list that contains up to k_best RemoteExpert instances

class hivemind.dht.DHTNode(*, _initialized_with_create=False)[source]

Asyncio-based class that represents one DHT participant. Created via await DHTNode.create(…) Each DHTNode has an identifier, a local storage and access too other nodes via DHTProtocol.

Note:Hivemind DHT is optimized to store a lot of temporary metadata that is regularly updated. For example, expert heartbeat emitted by a hivemind.Server responsible for that expert. Such metadata does not require regular maintenance by peers or persistence on shutdown. Instead, DHTNode is designed to rapidly send bulk data and resolve conflicts.

Every (key, value) pair in this DHT has an expiration time - float computed as get_dht_time() (UnixTime by default) DHT nodes always prefer values with higher expiration time and may delete any value past its expiration.

Similar to Kademlia RPC protocol, hivemind DHT has 3 RPCs:

  • ping - request peer’s identifier and update routing table (same as Kademlia PING RPC)
  • store - send several (key, value, expiration_time) pairs to the same peer (like Kademlia STORE, but in bulk)
  • find - request one or several keys, get values and expiration (if peer finds it locally) and :bucket_size: of
    nearest peers from recipient’s routing table (ordered nearest-to-farthest, not including recipient itself) This RPC is a mixture between Kademlia FIND_NODE and FIND_VALUE with multiple keys per call.

A DHTNode follows the following contract:

  • when asked to get(key), a node must find and return a value with highest expiration time that it found across DHT IF that time has not come yet. if expiration time is smaller than current get_dht_time(), node may return None;
  • when requested to store(key: value, expiration_time), a node must store (key => value) at until expiration time or until DHTNode gets the same key with greater expiration time. If a node is asked to store a key but it already has the same key with newer expiration, store will be rejected. Store returns True if accepted, False if rejected;
  • when requested to store(key: value, expiration_time, subkey=subkey), adds a sub-key to a dictionary value type. Dictionary values can have multiple sub-keys stored by different peers with individual expiration times. A subkey will be accepted to a dictionary either if there is no such sub-key or if new subkey’s expiration is later than previous expiration under that subkey. See DHTProtocol.call_store for details.

DHTNode also features several (optional) caching policies:

  • cache_locally: after GET, store the result in node’s own local cache
  • cache_nearest: after GET, send the result to this many nearest nodes that don’t have that value yet (see Kademlia)
  • cache_on_store: after STORE, either save or remove that key from node’s own cache depending on store status
  • cache_refresh_before_expiry: if a value in cache was used and is about to expire, try to GET it this many seconds before expiration. The motivation here is that some frequent keys should be always kept in cache to avoid latency.
  • reuse_get_requests: if there are several concurrent GET requests, when one request finishes, DHTNode will attempt to reuse the result of this GET request for other requests with the same key. Useful for batch-parallel requests.
classmethod create(node_id: Optional[hivemind.dht.routing.DHTID] = None, initial_peers: List[str] = (), bucket_size: int = 20, num_replicas: int = 5, depth_modulo: int = 5, parallel_rpc: Optional[int] = None, wait_timeout: float = 3, refresh_timeout: Optional[float] = None, bootstrap_timeout: Optional[float] = None, cache_locally: bool = True, cache_nearest: int = 1, cache_size=None, cache_refresh_before_expiry: float = 5, cache_on_store: bool = True, reuse_get_requests: bool = True, num_workers: int = 1, chunk_size: int = 16, blacklist_time: float = 5.0, backoff_rate: float = 2.0, listen: bool = True, listen_on: str = '0.0.0.0:*', endpoint: Optional[str] = None, validate: bool = True, strict: bool = True, **kwargs) → hivemind.dht.node.DHTNode[source]
Parameters:
  • node_id – current node’s identifier, determines which keys it will store locally, defaults to random id
  • initial_peers – connects to these peers to populate routing table, defaults to no peers
  • bucket_size – max number of nodes in one k-bucket (k). Trying to add {k+1}st node will cause a bucket to either split in two buckets along the midpoint or reject the new node (but still save it as a replacement) Recommended value: k is chosen s.t. any given k nodes are very unlikely to all fail after staleness_timeout
  • num_replicas – number of nearest nodes that will be asked to store a given key, default = bucket_size (≈k)
  • depth_modulo – split full k-bucket if it contains root OR up to the nearest multiple of this value (≈b)
  • parallel_rpc – maximum number of concurrent outgoing RPC requests emitted by DHTProtocol Reduce this value if your RPC requests register no response despite the peer sending the response.
  • wait_timeout – a kademlia rpc request is deemed lost if we did not receive a reply in this many seconds
  • refresh_timeout – refresh buckets if no node from that bucket was updated in this many seconds if staleness_timeout is None, DHTNode will not refresh stale buckets (which is usually okay)
  • bootstrap_timeout – after one of peers responds, await other peers for at most this many seconds
  • cache_locally – if True, caches all values (stored or found) in a node-local cache
  • cache_on_store – if True, update cache entries for a key after storing a new item for that key
  • cache_nearest – whenever DHTNode finds a value, it will also store (cache) this value on this many nodes nearest nodes visited by search algorithm. Prefers nodes that are nearest to :key: but have no value yet
  • cache_size – if specified, local cache will store up to this many records (as in LRU cache)
  • cache_refresh_before_expiry – if nonzero, refreshes locally cached values if they are accessed this many seconds before expiration time.
  • reuse_get_requests – if True, DHTNode allows only one traverse_dht procedure for every key all concurrent get requests for the same key will reuse the procedure that is currently in progress
  • num_workers – concurrent workers in traverse_dht (see traverse_dht num_workers param)
  • chunk_size – maximum number of concurrent calls in get_many and cache refresh queue
  • blacklist_time – excludes non-responsive peers from search for this many seconds (set 0 to disable)
  • backoff_rate – blacklist time will be multiplied by :backoff_rate: for each successive non-response
  • validate – if True, use initial peers to validate that this node is accessible and synchronized
  • strict – if True, any error encountered in validation will interrupt the creation of DHTNode
  • listen – if True (default), this node will accept incoming request and otherwise be a DHT “citzen” if False, this node will refuse any incoming request, effectively being only a “client”
  • listen_on – network interface, e.g. “0.0.0.0:1337” or “localhost:” ( means pick any port) or “[::]:7654”
  • endpoint – if specified, this is peer’s preferred public endpoint. Otherwise let peers infer endpoint
  • channel_options – options for grpc.aio.insecure_channel, e.g. [(‘grpc.enable_retries’, 0)] see https://grpc.github.io/grpc/core/group__grpc__arg__keys.html for a list of all options
  • kwargs – extra parameters used in grpc.aio.server
find_nearest_nodes(queries: Collection[hivemind.dht.routing.DHTID], k_nearest: Optional[int] = None, beam_size: Optional[int] = None, num_workers: Optional[int] = None, node_to_endpoint: Optional[Dict[hivemind.dht.routing.DHTID, str]] = None, exclude_self: bool = False, **kwargs) → Dict[hivemind.dht.routing.DHTID, Dict[hivemind.dht.routing.DHTID, str]][source]
Parameters:
  • queries – find k nearest nodes for each of these DHTIDs
  • k_nearest – return this many nearest nodes for every query (if there are enough nodes)
  • beam_size – replacement for self.beam_size, see traverse_dht beam_size param
  • num_workers – replacement for self.num_workers, see traverse_dht num_workers param
  • node_to_endpoint – if specified, uses this dict[node_id => endpoint] as initial peers
  • exclude_self – if True, nearest nodes will not contain self.node_id (default = use local peers)
  • kwargs – additional params passed to traverse_dht
Returns:

for every query, return nearest peers ordered dict[peer DHTID -> network Endpoint], nearest-first

get(key: Any, latest=False, **kwargs) → Optional[hivemind.utils.timed_storage.ValueWithExpiration[typing.Any][Any]][source]

Search for a key across DHT and return either first or latest entry (if found). :param key: same key as in node.store(…) :param latest: if True, finds the latest value, otherwise finds any non-expired value (which is much faster) :param kwargs: parameters forwarded to get_many_by_id :returns: (value, expiration time); if value was not found, returns None

get_many(keys: Collection[Any], sufficient_expiration_time: Optional[float] = None, **kwargs) → Dict[Any, Union[hivemind.utils.timed_storage.ValueWithExpiration[typing.Any][Any], None, Awaitable[Optional[hivemind.utils.timed_storage.ValueWithExpiration[typing.Any][Any]]]]][source]

Traverse DHT to find a list of keys. For each key, return latest (value, expiration) or None if not found.

Parameters:
  • keys – traverse the DHT and find the value for each of these keys (or (None, None) if not key found)
  • sufficient_expiration_time – if the search finds a value that expires after this time, default = time of call, find any value that did not expire by the time of call If min_expiration_time=float(‘inf’), this method will find a value with _latest_ expiration
  • kwargs – for full list of parameters, see DHTNode.get_many_by_id
Returns:

for each key: value and its expiration time. If nothing is found, returns (None, None) for that key

Note:

in order to check if get returned a value, please check if (expiration_time is None)

get_many_by_id(key_ids: Collection[hivemind.dht.routing.DHTID], sufficient_expiration_time: Optional[float] = None, num_workers: Optional[int] = None, beam_size: Optional[int] = None, return_futures: bool = False, _is_refresh=False) → Dict[hivemind.dht.routing.DHTID, Union[hivemind.utils.timed_storage.ValueWithExpiration[typing.Any][Any], None, Awaitable[Optional[hivemind.utils.timed_storage.ValueWithExpiration[typing.Any][Any]]]]][source]

Traverse DHT to find a list of DHTIDs. For each key, return latest (value, expiration) or None if not found.

Parameters:
  • key_ids – traverse the DHT and find the value for each of these keys (or (None, None) if not key found)
  • sufficient_expiration_time – if the search finds a value that expires after this time, default = time of call, find any value that did not expire by the time of call If min_expiration_time=float(‘inf’), this method will find a value with _latest_ expiration
  • beam_size – maintains up to this many nearest nodes when crawling dht, default beam_size = bucket_size
  • num_workers – override for default num_workers, see traverse_dht num_workers param
  • return_futures – if True, immediately return asyncio.Future for every before interacting with the nework. The algorithm will populate these futures with (value, expiration) when it finds the corresponding key Note: canceling a future will stop search for the corresponding key
  • _is_refresh – internal flag, set to True by an internal cache refresher (if enabled)
Returns:

for each key: value and its expiration time. If nothing is found, returns (None, None) for that key

Note:

in order to check if get returned a value, please check (expiration_time is None)

shutdown(timeout=None)[source]

Process existing requests, close all connections and stop the server

store(key: Any, value: Any, expiration_time: float, subkey: Optional[Any] = None, **kwargs) → bool[source]

Find num_replicas best nodes to store (key, value) and store it there at least until expiration time. :note: store is a simplified interface to store_many, all kwargs are be forwarded there :returns: True if store succeeds, False if it fails (due to no response or newer value)

store_many(keys: List[Any], values: List[Any], expiration_time: Union[float, List[float]], subkeys: Union[Any, List[Optional[Any]], None] = None, exclude_self: bool = False, await_all_replicas=True, **kwargs) → Dict[Any, bool][source]

Traverse DHT to find up :num_replicas: to best nodes to store multiple (key, value, expiration_time) pairs.

Parameters:
  • keys – arbitrary serializable keys associated with each value
  • values – serializable “payload” for each key
  • expiration_time – either one expiration time for all keys or individual expiration times (see class doc)
  • subkeys – an optional list of same shape as keys. If specified, this
  • kwargs – any additional parameters passed to traverse_dht function (e.g. num workers)
  • exclude_self – if True, never store value locally even if you are one of the nearest nodes
  • await_all_replicas – if False, this function returns after first store_ok and proceeds in background if True, the function will wait for num_replicas successful stores or running out of beam_size nodes
Note:

if exclude_self is True and self.cache_locally == True, value will still be __cached__ locally

Returns:

for each key: True if store succeeds, False if it fails (due to no response or newer value)

DHT communication protocol

RPC protocol that provides nodes a way to communicate with each other. Based on gRPC.AIO.

class hivemind.dht.protocol.DHTProtocol(*, _initialized_with_create=False)[source]
call_find(peer: str, keys: Collection[hivemind.dht.routing.DHTID]) → Optional[Dict[hivemind.dht.routing.DHTID, Tuple[Optional[hivemind.utils.timed_storage.ValueWithExpiration[typing.Union[bytes, hivemind.dht.storage.DictionaryDHTValue]][Union[bytes, hivemind.dht.storage.DictionaryDHTValue]]], Dict[hivemind.dht.routing.DHTID, str]]]][source]
Request keys from a peer. For each key, look for its (value, expiration time) locally and
k additional peers that are most likely to have this key (ranked by XOR distance)
Returns:A dict key => Tuple[optional value, optional expiration time, nearest neighbors] value: value stored by the recipient with that key, or None if peer doesn’t have this value expiration time: expiration time of the returned value, None if no value was found neighbors: a dictionary[node_id : endpoint] containing nearest neighbors from peer’s routing table If peer didn’t respond, returns None
call_ping(peer: str, validate: bool = False, strict: bool = True) → Optional[hivemind.dht.routing.DHTID][source]

Get peer’s node id and add him to the routing table. If peer doesn’t respond, return None :param peer: string network address, e.g. 123.123.123.123:1337 or [2a21:6с8:b192:2105]:8888 :param validate: if True, validates that node’s endpoint is available :param strict: if strict=True, validation will raise exception on fail, otherwise it will only warn :note: if DHTProtocol was created with listen=True, also request peer to add you to his routing table

Returns:node’s DHTID, if peer responded and decided to send his node_id
call_store(peer: str, keys: Sequence[hivemind.dht.routing.DHTID], values: Sequence[Union[bytes, hivemind.dht.storage.DictionaryDHTValue]], expiration_time: Union[float, Sequence[float]], subkeys: Union[Any, Sequence[Optional[Any]], None] = None, in_cache: Union[bool, Sequence[bool], None] = None) → Optional[List[bool]][source]

Ask a recipient to store several (key, value : expiration_time) items or update their older value

Parameters:
  • peer – request this peer to store the data
  • keys – a list of N keys digested by DHTID.generate(source=some_dict_key)
  • values – a list of N serialized values (bytes) for each respective key
  • expiration_time – a list of N expiration timestamps for each respective key-value pair(see get_dht_time())
  • subkeys – a list of N optional sub-keys. If None, stores value normally. If not subkey is not None: 1) if local storage doesn’t have :key:, create a new dictionary {subkey: (value, expiration_time)} 2) if local storage already has a dictionary under :key:, try add (subkey, value, exp_time) to that dictionary 2) if local storage associates :key: with a normal value with smaller expiration, clear :key: and perform (1) 3) finally, if local storage currently associates :key: with a normal value with larger expiration, do nothing
  • in_cache – a list of booleans, True = store i-th key in cache, value = store i-th key locally
Note:

the difference between storing normally and in cache is that normal storage is guaranteed to be stored until expiration time (best-effort), whereas cached storage can be evicted early due to limited cache size

Returns:

list of [True / False] True = stored, False = failed (found newer value or no response) if peer did not respond (e.g. due to timeout or congestion), returns None

classmethod create(node_id: hivemind.dht.routing.DHTID, bucket_size: int, depth_modulo: int, num_replicas: int, wait_timeout: float, parallel_rpc: Optional[int] = None, cache_size: Optional[int] = None, listen=True, listen_on='0.0.0.0:*', endpoint: Optional[str] = None, channel_options: Sequence[Tuple[str, Any]] = (), **kwargs) → hivemind.dht.protocol.DHTProtocol[source]

A protocol that allows DHT nodes to request keys/neighbors from other DHT nodes. As a side-effect, DHTProtocol also maintains a routing table as described in https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf

See DHTNode (node.py) for a more detailed description.

Note:the rpc_* methods defined in this class will be automatically exposed to other DHT nodes, for instance, def rpc_ping can be called as protocol.call_ping(endpoint, dht_id) from a remote machine Only the call_* methods are meant to be called publicly, e.g. from DHTNode Read more: https://github.com/bmuller/rpcudp/tree/master/rpcudp
get_outgoing_request_endpoint(peer: str) → Optional[str][source]

ask this peer how it perceives this node’s outgoing request address

rpc_find(request: dht_pb2.FindRequest, context: grpc.ServicerContext) → dht_pb2.FindResponse[source]

Someone wants to find keys in the DHT. For all keys that we have locally, return value and expiration Also return :bucket_size: nearest neighbors from our routing table for each key (whether or not we found value)

rpc_ping(request: dht_pb2.PingRequest, context: grpc.ServicerContext)[source]

Some node wants us to add it to our routing table.

rpc_store(request: dht_pb2.StoreRequest, context: grpc.ServicerContext) → dht_pb2.StoreResponse[source]

Some node wants us to store this (key, value) pair

shutdown(timeout=None)[source]

Process existing requests, close all connections and stop the server

update_routing_table(node_id: Optional[hivemind.dht.routing.DHTID], peer_endpoint: str, responded=True)[source]

This method is called on every incoming AND outgoing request to update the routing table

Parameters:
  • peer_endpoint – sender endpoint for incoming requests, recipient endpoint for outgoing requests
  • node_id – sender node id for incoming requests, recipient node id for outgoing requests
  • responded – for outgoing requests, this indicated whether recipient responded or not. For incoming requests, this should always be True
class hivemind.dht.routing.RoutingTable(node_id: hivemind.dht.routing.DHTID, bucket_size: int, depth_modulo: int)[source]

A data structure that contains DHT peers bucketed according to their distance to node_id. Follows Kademlia routing table as described in https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf

Parameters:
  • node_id – node id used to measure distance
  • bucket_size – parameter $k$ from Kademlia paper Section 2.2
  • depth_modulo – parameter $b$ from Kademlia paper Section 2.2.
Note:

you can find a more detailed description of parameters in DHTNode, see node.py

get_bucket_index(node_id: hivemind.dht.routing.DHTID) → int[source]

Get the index of the bucket that the given node would fall into.

add_or_update_node(node_id: hivemind.dht.routing.DHTID, endpoint: str) → Optional[Tuple[hivemind.dht.routing.DHTID, str]][source]

Update routing table after an incoming request from :endpoint: or outgoing request to :endpoint:

Returns:If we cannot add node_id to the routing table, return the least-recently-updated node (Section 2.2)
Note:DHTProtocol calls this method for every incoming and outgoing request if there was a response. If this method returned a node to be ping-ed, the protocol will ping it to check and either move it to the start of the table or remove that node and replace it with
split_bucket(index: int) → None[source]

Split bucket range in two equal parts and reassign nodes to the appropriate half

get(*, node_id: Optional[hivemind.dht.routing.DHTID] = None, endpoint: Optional[str] = None, default=None)[source]

Find endpoint for a given DHTID or vice versa

get_nearest_neighbors(query_id: hivemind.dht.routing.DHTID, k: int, exclude: Optional[hivemind.dht.routing.DHTID] = None) → List[Tuple[hivemind.dht.routing.DHTID, str]][source]

Find k nearest neighbors from routing table according to XOR distance, does NOT include self.node_id

Parameters:
  • query_id – find neighbors of this node
  • k – find this many neighbors. If there aren’t enough nodes in the table, returns all nodes
  • exclude – if True, results will not contain query_node_id even if it is in table
Returns:

a list of tuples (node_id, endpoint) for up to k neighbors sorted from nearest to farthest

class hivemind.dht.routing.KBucket(lower: int, upper: int, size: int, depth: int = 0)[source]

A bucket containing up to :size: of DHTIDs in [lower, upper) semi-interval. Maps DHT node ids to their endpoints

has_in_range(node_id: hivemind.dht.routing.DHTID)[source]

Check if node_id is between this bucket’s lower and upper bounds

add_or_update_node(node_id: hivemind.dht.routing.DHTID, endpoint: str) → bool[source]

Add node to KBucket or update existing node, return True if successful, False if the bucket is full. If the bucket is full, keep track of node in a replacement list, per section 4.1 of the paper.

Parameters:
  • node_id – dht node identifier that should be added or moved to the front of bucket
  • endpoint – network address associated with that node id
Note:

this function has a side-effect of resetting KBucket.last_updated time

request_ping_node() → Optional[Tuple[hivemind.dht.routing.DHTID, str]][source]
Returns:least-recently updated node that isn’t already being pinged right now – if such node exists
split() → Tuple[hivemind.dht.routing.KBucket, hivemind.dht.routing.KBucket][source]

Split bucket over midpoint, rounded down, assign nodes to according to their id

class hivemind.dht.routing.DHTID[source]
classmethod generate(source: Optional[Any] = None, nbits: int = 255)[source]

Generates random uid based on SHA1

Parameters:source – if provided, converts this value to bytes and uses it as input for hashing function; by default, generates a random dhtid from :nbits: random bits
xor_distance(other: Union[hivemind.dht.routing.DHTID, Sequence[hivemind.dht.routing.DHTID]]) → Union[int, List[int]][source]
Parameters:other – one or multiple DHTIDs. If given multiple DHTIDs as other, this function will compute distance from self to each of DHTIDs in other.
Returns:a number or a list of numbers whose binary representations equal bitwise xor between DHTIDs.
to_bytes(length=20, byteorder='big', *, signed=False) → bytes[source]

A standard way to serialize DHTID into bytes

classmethod from_bytes(raw: bytes, byteorder='big', *, signed=False) → hivemind.dht.routing.DHTID[source]

reverse of to_bytes

Traverse (crawl) DHT

Utility functions for crawling DHT nodes, used to get and store keys in a DHT

hivemind.dht.traverse.simple_traverse_dht(query_id: hivemind.dht.routing.DHTID, initial_nodes: Collection[hivemind.dht.routing.DHTID], beam_size: int, get_neighbors: Callable[[hivemind.dht.routing.DHTID], Awaitable[Tuple[Collection[hivemind.dht.routing.DHTID], bool]]], visited_nodes: Collection[hivemind.dht.routing.DHTID] = ()) → Tuple[Tuple[hivemind.dht.routing.DHTID], Set[hivemind.dht.routing.DHTID]][source]

Traverse the DHT graph using get_neighbors function, find :beam_size: nearest nodes according to DHTID.xor_distance.

Note:

This is a simplified (but working) algorithm provided for documentation purposes. Actual DHTNode uses traverse_dht - a generalization of this this algorithm that allows multiple queries and concurrent workers.

Parameters:
  • query_id – search query, find k_nearest neighbors of this DHTID
  • initial_nodes – nodes used to pre-populate beam search heap, e.g. [my_own_DHTID, …maybe_some_peers]
  • beam_size – beam search will not give up until it exhausts this many nearest nodes (to query_id) from the heap Recommended value: A beam size of k_nearest * (2-5) will yield near-perfect results.
  • get_neighbors – A function that returns neighbors of a given node and controls beam search stopping criteria. async def get_neighbors(node: DHTID) -> neighbors_of_that_node: List[DHTID], should_continue: bool If should_continue is False, beam search will halt and return k_nearest of whatever it found by then.
  • visited_nodes – beam search will neither call get_neighbors on these nodes, nor return them as nearest
Returns:

a list of k nearest nodes (nearest to farthest), and a set of all visited nodes (including visited_nodes)

hivemind.dht.traverse.traverse_dht(queries: Collection[hivemind.dht.routing.DHTID], initial_nodes: List[hivemind.dht.routing.DHTID], beam_size: int, num_workers: int, queries_per_call: int, get_neighbors: Callable[[hivemind.dht.routing.DHTID, Collection[hivemind.dht.routing.DHTID]], Awaitable[Dict[hivemind.dht.routing.DHTID, Tuple[Tuple[hivemind.dht.routing.DHTID], bool]]]], found_callback: Optional[Callable[[hivemind.dht.routing.DHTID, List[hivemind.dht.routing.DHTID], Set[hivemind.dht.routing.DHTID]], Awaitable[Any]]] = None, await_all_tasks: bool = True, visited_nodes: Optional[Dict[hivemind.dht.routing.DHTID, Set[hivemind.dht.routing.DHTID]]] = ()) → Tuple[Dict[hivemind.dht.routing.DHTID, List[hivemind.dht.routing.DHTID]], Dict[hivemind.dht.routing.DHTID, Set[hivemind.dht.routing.DHTID]]][source]

Search the DHT for nearest neighbors to :queries: (based on DHTID.xor_distance). Use get_neighbors to request peers. The algorithm can reuse intermediate results from each query to speed up search for other (similar) queries.

Parameters:
  • queries – a list of search queries, find beam_size neighbors for these DHTIDs
  • initial_nodes – nodes used to pre-populate beam search heap, e.g. [my_own_DHTID, …maybe_some_peers]
  • beam_size – beam search will not give up until it visits this many nearest nodes (to query_id) from the heap
  • num_workers – run up to this many concurrent get_neighbors requests, each querying one peer for neighbors. When selecting a peer to request neighbors from, workers try to balance concurrent exploration across queries. A worker will expand the nearest candidate to a query with least concurrent requests from other workers. If several queries have the same number of concurrent requests, prefer the one with nearest XOR distance.
  • queries_per_call – workers can pack up to this many queries in one get_neighbors call. These queries contain the primary query (see num_workers above) and up to queries_per_call - 1 nearest unfinished queries.
  • get_neighbors – A function that requests a given peer to find nearest neighbors for multiple queries async def get_neighbors(peer, queries) -> {query1: ([nearest1, nearest2, …], False), query2: ([…], True)} For each query in queries, return nearest neighbors (known to a given peer) and a boolean “should_stop” flag If should_stop is True, traverse_dht will no longer search for this query or request it from other peers. The search terminates iff each query is either stopped via should_stop or finds beam_size nearest nodes.
  • found_callback – if specified, call this callback for each finished query the moment it finishes or is stopped More specifically, run asyncio.create_task(found_callback(query, nearest_to_query, visited_for_query)) Using this callback allows one to process results faster before traverse_dht is finishes for all queries. It is guaranteed that found_callback will be called exactly once on each query in queries.
  • await_all_tasks – if True, wait for all tasks to finish before returning, otherwise returns after finding nearest neighbors and finishes the remaining tasks (callbacks and queries to known-but-unvisited nodes)
  • visited_nodes – for each query, do not call get_neighbors on these nodes, nor return them among nearest.
Note:

the source code of this function can get tricky to read. Take a look at simple_traverse_dht function for reference. That function implements a special case of traverse_dht with a single query and one worker.

Returns:

a dict of nearest nodes, and another dict of visited nodes nearest nodes: { query -> a list of up to beam_size nearest nodes, ordered nearest-first } visited nodes: { query -> a set of all nodes that received requests for a given query }