CHAPTER 10
In this section, you will learn something that is not at all clearly stated in the Kademlia specification—there are actually three kinds of data store:
It is important to know that the Kademlia specification does not discuss this at all. The information in this section has been gleaned from closely looking at the discussion on the eMule project forum, particularly miniminime’s discussion of the roles that a node can take on.[14] Implementing this approach requires that the receiver peer knows whether to store the key-value in the republish store or in the cache store.
The original publisher store is never written to as a result of an RPC store. This store is updated only by the peer itself when it is the originator of the key-value and wants to publish the key-value to other peers. Key-values from this store are republished once every 24 hours.
This store contains key-values that have been republished by other peers as part of process of distributing the data among peers. This store never contains the peer’s (as an originator) own storage, but only key-values received from other peers. Key-values in this store are republished to other peers only if a closer peer is found. This check occurs every hour and is optimized to avoid calling the lookup algorithm:
The cached store is used when republishing a FindValue request onto the next closest node. The intention here is to avoid hotspots during lookup of popular keys by temporarily republishing them onto other nearby peers. These temporary key-values have an expiration time. Key-values in the cached store are never republished, and are removed after the expiration time.
The three storage mechanisms are managed in the Dht class.
Code Listing 73: Storage Mechanisms
protected IStorage originatorStorage; protected IStorage republishStorage; protected IStorage cacheStorage; |
The Dht class implements a few constructor options, the first primarily for aiding unit testing.
Code Listing 74: Dht Constructors
/// <summary> /// Use this constructor to initialize the stores to the same instance. /// </summary> public Dht( { originatorStorage = storageFactory(); republishStorage = storageFactory(); cacheStorage = storageFactory(); FinishInitialization(id, protocol, router); SetupTimers(); } /// <summary> /// Supports different concrete storage types. For example, you may want /// </summary> public Dht( { this.originatorStorage = originatorStorage; this.republishStorage = republishStorage; this.cacheStorage = cacheStorage; FinishInitialization(id, protocol, router); SetupTimers(); } |
The ability to specify different storage mechanisms can be very useful; however, this means that a Node must store the key-value in the appropriate storage.
Code Listing 75: The Node Store Method
/// <summary> /// Store a key-value pair in the republish or cache storage. /// </summary> public void Store( { Validate.IsFalse<SendingQueryToSelfException>(sender.ID == ourContact.ID, bucketList.AddContact(sender); if (isCached) { cacheStorage.Set(key, val, expirationTimeSec); } else { SendKeyValuesIfNewContact(sender); storage.Set(key, val, Constants.EXPIRATION_TIME_SECONDS); } } |
From the spec: “To ensure the persistence of key-value pairs, nodes must periodically republish keys. Otherwise, two phenomena may cause lookups for valid keys to fail. First, some of the k nodes that initially get a key-value pair when it is published may leave the network. Second, new nodes may join the network with IDs closer to some published key than the nodes on which the key-value pair was originally published. In both cases, the nodes with a key-value pair must republish it so as once again to ensure it is available on the k nodes closest to the key.
To compensate for nodes leaving the network, Kademlia republishes each key-value pair once an hour. A naive implementation of this strategy would require many messages—each of up to k nodes storing a key-value pair would perform a node lookup followed by k - 1 STORE RPCs every hour.”
From Wikipedia,[15] which can be helpful for understanding the spec with different phrasing:
“Periodically, a node that stores a value will explore the network to find the k nodes that are close to the key value and replicate the value onto them. This compensates for disappeared nodes.”
And:
“The node that is providing the file [key-value] will periodically refresh the information onto the network (perform FIND_NODE and STORE messages). When all of the nodes having the file [key-value] go offline, nobody will be refreshing its values (sources and keywords) and the information will eventually disappear from the network.”
The Wikipedia write-up clarifies what is meant by “on the k nodes closest to the key.” In other words, for each key, a FindNode is called to find closer nodes, and the value is republished. Without the optimizations, this can be a time-consuming process if there’s a lot of key-values in a node’s store, which is addressed in an optimization later.
From the spec: “Fortunately, the republishing process can be heavily optimized. First, when a node receives a STORE RPC for a given key-value pair, it assumes the RPC was also issued to the other k - 1 closest nodes, and thus the recipient will not republish the key-value pair in the next hour. This ensures that as long as republication intervals are not exactly synchronized, only one node will republish a given key-value pair every hour.”
This first optimization is simple—when receiving a store, update the timestamp on the key-value. Any key-value that has been touched within the last hour is not republished, as we can assume:
From the spec: “A second optimization avoids performing node lookups before republishing keys. As described in Section 2.4, to handle unbalanced trees, nodes split k-buckets as required to ensure they have complete knowledge of a surrounding subtree with at least k nodes. If, before republishing key-value pairs, a node u refreshes all k-buckets in this subtree of k nodes, it will automatically be able to figure out the k closest nodes to a given key. These bucket refreshes can be amortized over the republication of many keys.”
This second optimization is straightforward—if we’ve done a bucket refresh within the last hour, we can avoid calling FindNode (the node lookup algorithm.) How do we determine the bucket to test if it’s been refreshed? The bucket for which the key is in range should contain some closer contacts we’ve seen for that key. While the answer might be obvious, it’s worthwhile to discuss the reasoning here.
Buckets in the bucket list are maintained in range order rather than in a tree, which naturally orders them by their prefix.
Table 1: Bucket Range(s)
State | Bucket Range(s) | Prefix(es) |
|---|---|---|
Initial Bucket | 0 .. 2160 | 1 |
Two Buckets | 0 .. 2159 | 2159 .. 2160 | 01, 1 |
Four Buckets | 0 .. 2158 | 2158 .. 2159 | 2159 - 2159 + 2158 | 2159 - 2159+2158 .. 2160 | 001, 01, 10, 1 |
When we identify a bucket with a given key, the contacts in that bucket are closest, as per the XOR computation on the prefix. For example, looking at the four buckets with prefixes 001, 01, 10, and 1, we see that the contacts in the key’s bucket range are closest (the closest bucket contacts are in green, and farther bucket contacts are in red).
Table 2: Key Prefixes
Key Prefix | Key Prefix ^ Bucket Prefixes | Explanation |
|---|---|---|
1 | 101, 11, 01, 0 | Bucket with prefix 1 always has contacts that are closer |
01 | 011, 00, 11, 11 | Bucket with prefix 01 always has contacts that are closer |
001 | 000, 011, 101, 101 | Bucket with prefix 001 always has contacts that are closer |
0001 | 0011, 0101, 1001, 1001 | Bucket with prefix 001 always has contacts that are closer |
For this reason, we use the bucket for which the key is in range. Also, new key-values that are published onto to closer nodes persist for 24 hours.
Key-values in the republish store are republished at a particular interval, typically every hour.
Code Listing 76: KeyValueRepublishElapsed
/// <summary> /// Replicate key values if the key-value hasn’t been touched within /// </summary> protected void KeyValueRepublishElapsed(object sender, ElapsedEventArgs e) { DateTime now = DateTime.Now; republishStorage.Keys.Where(k => { ID key = new ID(k); StoreOnCloserContacts(key, republishStorage.Get(key)); republishStorage.Touch(k); }); } |
Note how a lookup is only performed if the bucket containing the key hasn’t itself been refreshed recently (within the past hour).
Code Listing 77: StoreOnCloserContacts
/// <summary> /// Perform a lookup if the bucket containing the key has not been refreshed, /// otherwise just get the contacts the k closest contacts we know about. /// </summary> protected void StoreOnCloserContacts(ID key, string val) { DateTime now = DateTime.Now; KBucket kbucket = node.BucketList.GetKBucket(key); List<Contact> contacts; if ((now - kbucket.TimeStamp).TotalMilliseconds < { // Bucket has been refreshed recently, so don't do a lookup as we contacts = node.BucketList.GetCloseContacts(key, node.OurContact.ID); } else { contacts = router.Lookup(key, router.RpcFindNodes).contacts; } contacts.ForEach(c => { RpcError error = c.Protocol.Store(node.OurContact, key, val); HandleError(error, c); }); } |
Expired key-values are removed from the republish and cache storage, which happens in the Dht class.
Code Listing 78: ExpireKeysElapsed
/// <summary> /// Any expired keys in the republish or node's cache are removed. /// </summary> protected virtual void ExpireKeysElapsed(object sender, ElapsedEventArgs e) { RemoveExpiredData(cacheStorage); RemoveExpiredData(republishStorage); } protected void RemoveExpiredData(IStorage store) { DateTime now = DateTime.Now; // ToList so our key list is resolved now as we remove keys. store.Keys.Where(k => (now - store.GetTimeStamp(k)).TotalSeconds >= { store.Remove(k); }); } |
From the spec: “For Kademlia’s current application (file sharing), we also require the original publisher of a (key,value) pair to republish it every 24 hours. Otherwise, (key,value) pairs expire 24 hours after publication, so as to limit stale index information in the system. For other applications, such as digital certificates or cryptographic hash to value mappings, longer expiration times may be appropriate.”
Republishing originator data is handled in a timer event that resends the key-values in the originator’s storage.
Code Listing 79: OriginatorRepublishElapsed
protected void OriginatorRepublishElapsed(object sender, ElapsedEventArgs e) { DateTime now = DateTime.Now; originatorStorage.Keys.Where( { ID key = new ID(k); // Just use close contacts, don't do a lookup. var contacts = node.BucketList.GetCloseContacts(key, node.OurContact.ID); contacts.ForEach(c => { RpcError error = c.Protocol.Store(ourContact, key, originatorStorage.Get(key)); HandleError(error, c); }); originatorStorage.Touch(k); }); } |
Republished key-values persist for 24 hours.
From the spec: “When a new node joins the system, it must store any key-value pair to which it is one of the k closest. Existing nodes, by similarly exploiting complete knowledge of their surrounding subtrees, will know which key-value pairs the new node should store. Any node learning of a new node therefore issues STORE RPCs to transfer relevant key-value pairs to the new node. To avoid redundant STORE RPCs, however, a node only transfers a key-value pair if it’s own ID is closer to the key than are the IDs of other nodes.”
Interpretation:
A new node (contact) will be instructed to store key-values that exist on the bootstrapping node (the one it’s boostrapping with) for key-values that meet the following condition: The key XOR’d with the bootstrapping node’s ID < (closer than) the key XOR’d the IDs of other nodes.
What does “other nodes” mean? Are these all other contacts the bootstrapping node knows about, or just the k closest contacts in the joining node’s bucket, or some other interpretation? We have to understand what “exploiting complete knowledge of their surrounding subtrees” means. First, this indicates that it isn’t just the joining node’s bucket. It would make sense to interpret this as “store the values onto the joining node for any key-value where the joining node will be closer to that key when there are no other nodes that are closer.” If the joining node becomes the closest node to a key-value, then it is requested to store that key-value.
It’s interesting to note that this algorithm executes regardless of whether the bootstrapping node actually added the the joining node to a k-bucket. Remember also that “joining” actually means contacting another node with any one of the four RPC calls. When a new node registers, republished key-values persist for 24 hours.
The implementation is as follows.
Code Listing 80: SendKeyValuesIfNewContact
/// <summary> /// For a new contact, we store values to that contact whose keys ^ ourContact /// </summary> protected void SendKeyValuesIfNewContact(Contact sender) { List<Contact> contacts = new List<Contact>(); if (IsNewContact(sender)) { lock (bucketList) { // Clone so we can release the lock. contacts = new List<Contact>(bucketList.Buckets.SelectMany(b => b.Contacts)); } if (contacts.Count() > 0) { // and our distance to the key < any other contact's distance to the key... storage.Keys.AsParallel().ForEach(k => { // our min distance to the contact. var distance = contacts.Min(c => k ^ c.ID); // If our contact is closer, store the contact on its node. if ((k ^ ourContact.ID) < distance) { var error = sender.Protocol.Store(ourContact, new ID(k), storage.Get(k)); dht?.HandleError(error, sender); } }); } } } |
Annoyingly, for every stored value, there just isn’t any way to avoid performing the XOR computation on every contact. This could get expensive, and it is currently optimized using Linq’s parallel feature.
Determining whether a contact is new is slightly more complicated than one would think. We need to check not only whether the contact exists in any of our buckets, but also whether it’s a pending contact—one that wasn’t placed in a bucket because the bucket was full, but nonetheless has already received any closer keys.
Code Listing 81: IsNewContact
/// <summary> /// Returns true if the contact isn't in the bucket list or the /// </summary> protected bool IsNewContact(Contact sender) { bool ret; lock (bucketList) { // If we have a new contact... ret = bucketList.ContactExists(sender); } if (dht != null) // for unit testing, dht may be null { lock (dht.PendingContacts) { ret |= dht.PendingContacts.ContainsBy(sender, c => c.ID); } } return !ret; } |
From the spec: “To avoid ‘over-caching,’ we make the expiration time of a (key,value) pair in any node’s database exponentially inversely proportional to the number of nodes between the current node and the node whose ID is closest to the key ID.”
The specification provides no guidance for what the calculation for “exponentially inversely proportional” should actually be. It’s also undefined as to what the time constants are—what is a baseline time for which a key-value should persist? It is assumed that this should be a maximum of 24 hours. We also need to track an expiration time that is separate from the key-value republish timestamp. Furthermore, up to this point, I haven’t implemented the concept of accelerated lookup optimization, which is where the value of b comes from. In this implementation, where we have bucket ranges, rather than a bucket-per-bit in the key space, the accelerated lookup optimization is irrelevant, so we’ll use b==5 which is the spec’s recommended value for that optimization.
Also, who does the computation “between the current node and the node whose ID is closest to the key ID?” Is the current node:
As discussed earlier, the entire concept of having separate stores (originator, republished, cached) is never discussed in the Kademlia specification. Without understanding these three different stores, trying to understand how caching works is probably impossible.
Caching occurs in only one place—when a value being looked up (and successfully found) is stored on a “close” node:
Code Listing 82: Handling Over-Caching
public (bool found, List<Contact> contacts, string val) FindValue(ID key) { var lookup = router.Lookup(key, router.RpcFindValue); if (lookup.found) { ret = (true, null, lookup.val); // Find the first close contact (other than the one the value was found by) var storeTo = lookup.contacts.Where(c => c != lookup.foundBy). if (storeTo != null) { int separatingNodes = GetSeparatingNodesCount(ourContact, storeTo); int expTimeSec = (int)(Constants.EXPIRATION_TIME_SECONDS / RpcError error = storeTo.Protocol.Store(node.OurContact, key, lookup.val, HandleError(error, storeTo); } } |
Note the true flag, indicating that this RPC Store call is for caching purposes.
It is reasonable for a cached key-value to expire, but we may never want to expire originator or republished key-values. One good example is a distributed blockchain (or distributed ledger) where data should never disappear, even if the original publisher disappears from the peer network. There are a variety of ways to do this, such as overriding, as in Code Listing 83.
Code Listing 83: Never Expiring Republished Key-Value
protected override void ExpireKeysElapsed(object sender, ElapsedEventArgs e) { RemoveExpiredData(cacheStorage); // RemoveExpiredData(republishStorage); |
In a subclass of the Dht overriding this method, only the cached store expires.
There’s a lot of setup here for creating two existing contacts and two key-values whose IDs have been specifically set. See the comments for the XOR distance “math.”
Code Listing 84: TestNewContactGetsStoredContactsTest
[TestMethod] public void TestNewContactGetsStoredContactsTest() { // Set up a node at the midpoint. // The existing node has the ID 10000.... Node existing = new Node(new Contact(null, ID.Mid), new VirtualStorage()); string val1 = "Value 1"; string valMid = "Value Mid"; // The existing node stores two items, one with an ID "hash" of 1, // Simple storage, rather than executing the code for Store. existing.SimpleStore(ID.One, val1); existing.SimpleStore(ID.Mid, valMid); Assert.IsTrue(existing.Storage.Keys.Count == 2, // Create a contact in the existing node’s bucket list that is closer // This contact has the prefix 010000.... Contact otherContact = new Contact(null, ID.Zero.SetBit(158)); Node other = new Node(otherContact, new VirtualStorage()); existing.BucketList.Buckets[0].Contacts.Add(otherContact); // The unseen contact has a prefix 0110000.... VirtualProtocol unseenvp = new VirtualProtocol(); Contact unseenContact = new Contact(unseenvp, ID.Zero.SetBit(157)); Node unseen = new Node(unseenContact, new VirtualStorage()); unseenvp.Node = unseen; // final fixup. Assert.IsTrue(unseen.Storage.Keys.Count == 0, // An unseen node pings, and we should get back valMin only, existing.Ping(unseenContact); // Contacts V1 V2 // 10000000 00...0001 10...0000 // 01000000 // Math: // c1 ^ V1 c1 ^ V2 c2 ^ V1 c2 ^ V2 // 100...001 000...000 010...001 110...000 // c1 ^ V1 > c1 ^ V2, so v1 doesn’t get sent to the unseen node. // c1 ^ V2 < c2 ^ V2, so it does get sent. Assert.IsTrue(unseen.Storage.Keys.Count == 1, Assert.IsTrue(unseen.Storage.Contains(ID.Mid), Assert.IsTrue(unseen.Storage.Get(ID.Mid) == valMid, } |
Ping is simply a “respond back with the random ID” that was sent. Internally, the buckets are potentially updated, and if the contact is new, store RPC calls are made to it for any values that it should store, as discussed above when a new node registers.
In his paper, Bruno Spori writes:[16]
“The situation is different when the first message a node received is a request message. In this case, the receiver cannot be sure whether the sender’s contact information [is] correct. It could be that the request was faked. To determine this, the piggy-backed ping is used. The effect of the piggy-backed ping is that the original sender of the request must send a ping reply upon receiving the reply message. Thus, the receiver of the request message is able to determine the correctness of the sender as well.”
We will instead rely on the error-handling mechanism for evicting contacts that do not respond or respond incorrectly or with errors. Error handling will be discussed later.