left-icon

The Kademlia Protocol Succinctly®
by Marc Clifton

Previous
Chapter

of
A
A
A

CHAPTER 13

A Basic TCP Subnet Protocol

A Basic TCP Subnet Protocol


There’s a few points to make in this implementation:

  • A single port is used along with a subnet identifier to route requests to the correct handler. The subnet identifier makes it easier to test multiple “servers” on the same machine as we don’t need to open (and “allow”) a unique port number per “server.”
  • JSON is used as the serialization format for request and response data.
  • RPC calls are synchronous, in that they wait for a response. There are other implementations that continue based on the random ID and handler, which I chose not to implement.

Request messages

Requests issued by the DHT are serialized to JSON from the following classes.

Code Listing 95: BaseRequest Class

public abstract class BaseRequest

{

  public object Protocol { get; set; }

  public string ProtocolName { get; set; }

  public BigInteger RandomID { get; set; }

  public BigInteger Sender { get; set; }

  public BaseRequest()

  {

     RandomID = ID.RandomID.Value;

  }

}

public class FindNodeRequest : BaseRequest

{

  public BigInteger Key { get; set; }

}

public class FindValueRequest : BaseRequest

{

  public BigInteger Key { get; set; }

}

public class PingRequest : BaseRequest { }

public class StoreRequest : BaseRequest

{

  public BigInteger Key { get; set; }

  public string Value { get; set; }

  public bool IsCached { get; set; }

  public int ExpirationTimeSec { get; set; }

}

public interface ITcpSubnet

{

  int Subnet { get; set; }

}

public class FindNodeSubnetRequest : FindNodeRequest, ITcpSubnet

{

  public int Subnet { get; set; }

}

public class FindValueSubnetRequest : FindValueRequest, ITcpSubnet

{

  public int Subnet { get; set; }

}

public class PingSubnetRequest : PingRequest, ITcpSubnet

{

  public int Subnet { get; set; }

}

public class StoreSubnetRequest : StoreRequest, ITcpSubnet

{

  public int Subnet { get; set; }

}

On the server side, which receives these messages, they are handled by a common request class.

Code Listing 96: CommonRequest—Server Side

/// <summary>

/// For passing to Node handlers with common parameters.

/// </summary>

public class CommonRequest

{

  public object Protocol { get; set; }

  public string ProtocolName { get; set; }

  public BigInteger RandomID { get; set; }

  public BigInteger Sender { get; set; }

  public BigInteger Key { get; set; }

  public string Value { get; set; }

  public bool IsCached { get; set; }

  public int ExpirationTimeSec { get; set; }

}

As the comment states, the common request simplifies the server implementation by having RPC handler methods with the same parameter.

Request handlers

The request handlers extract the pertinent pieces of the CommonRequest and call the appropriate method of the Node class. The important part here is that the contact protocol must be returned as part of the FindNode and FindValue response. Note that the returns are anonymous objects.

Code Listing 97: Request Handlers

public object ServerPing(CommonRequest request)

{

  IProtocol protocol = Protocol.InstantiateProtocol(
    request.Protocol, request.ProtocolName);

  Ping(new Contact(protocol, new ID(request.Sender)));

  return new { RandomID = request.RandomID };

}

public object ServerStore(CommonRequest request)

{

  IProtocol protocol = Protocol.InstantiateProtocol(
    request.Protocol, request.ProtocolName);

  Store(new Contact(protocol, new ID(request.Sender)),
    new ID(request.Key), request.Value, request.IsCached,
    request.ExpirationTimeSec);

  return new { RandomID = request.RandomID };

}

public object ServerFindNode(CommonRequest request)

{

  IProtocol protocol = Protocol.InstantiateProtocol(
    request.Protocol, request.ProtocolName);

  var (contacts, val) =
    FindNode(new Contact(protocol, new ID(request.Sender)), new ID(request.Key));

  return new

  {

    Contacts = contacts.Select(c =>

    new

    {

      Contact = c.ID.Value,

      Protocol = c.Protocol,

      ProtocolName = c.Protocol.GetType().Name

    }).ToList(),

    RandomID = request.RandomID

  };

}

public object ServerFindValue(CommonRequest request)

{

  IProtocol protocol = Protocol.InstantiateProtocol(
    request.Protocol, request.ProtocolName);

  var (contacts, val) =
    FindValue(new Contact(protocol, new ID(request.Sender)), new ID(request.Key));

  return new

  {

    Contacts = contacts?.Select(c =>

    new

    {

      Contact = c.ID.Value,

      Protocol = c.Protocol,

      ProtocolName = c.Protocol.GetType().Name

    })?.ToList(),

    RandomID = request.RandomID,

    Value = val

  };

}

Responses

JSON responses are deserialized into the following classes.

Code Listing 98: Server Responses

public abstract class BaseResponse

{

  public BigInteger RandomID { get; set; }

}

public class ErrorResponse : BaseResponse

{

  public string ErrorMessage { get; set; }

}

public class ContactResponse

{

  public BigInteger Contact { get; set; }

  public object Protocol { get; set; }

  public string ProtocolName { get; set; }

}

public class FindNodeResponse : BaseResponse

{

  public List<ContactResponse> Contacts { get; set; }

}

public class FindValueResponse : BaseResponse

{

  public List<ContactResponse> Contacts { get; set; }

  public string Value { get; set; }

}

public class PingResponse : BaseResponse { }

public class StoreResponse : BaseResponse { }

Server implementation

The server is a straightforward HttpListener implemented as a C# HttpListenerContext object, but note how the subnet ID is used to route the request to the specific node associated with the subnet.

Code Listing 99: ProcessRequest

protected override async void ProcessRequest(HttpListenerContext context)

{

  string data = new StreamReader(context.Request.InputStream,
    context.Request.ContentEncoding).ReadToEnd();

  if (context.Request.HttpMethod == "POST")

  {

    Type requestType;

    string path = context.Request.RawUrl;

    // Remove "//"

    // Prefix our call with "Server" so that the method name is unambiguous.

    string methodName = "Server" + path.Substring(2);

    if (routePackets.TryGetValue(path, out requestType))

    {

      CommonRequest commonRequest =
        JsonConvert.DeserializeObject<CommonRequest>(data);
      int subnet = ((ITcpSubnet)JsonConvert.DeserializeObject(
        data, requestType)).Subnet;

      INode node;

      if (subnets.TryGetValue(subnet, out node))

      {

        await Task.Run(() =>
          CommonRequestHandler(methodName, commonRequest, node, context));

      }

      else

      {

        SendErrorResponse(context, new ErrorResponse()
          { ErrorMessage = "Method not recognized." });

      }

    }

    else

    {

      SendErrorResponse(context, new ErrorResponse()
        { ErrorMessage = "Subnet node not found." });

    }

    context.Response.Close();

  }

}

The TCP subnet protocol handlers

The protocol implements the four RPC calls, issuing HTTP POSTs to the server. Notice how the protocol is instantiated from the JSON return, and if the protocol isn’t supported, the contact is removed from the contacts returned by FindNode and FindValue.

Code Listing 100: FindNode, FindValue, Ping, and Store Handlers

public (List<Contact> contacts, RpcError error) FindNode(Contact sender, ID key)

{

  ErrorResponse error;

  ID id = ID.RandomID;

  bool timeoutError;

  var ret = RestCall.Post<FindNodeResponse, ErrorResponse>(
    url + ":" + port + "//FindNode",

  new FindNodeSubnetRequest()

  {

    Protocol = sender.Protocol,

    ProtocolName = sender.Protocol.GetType().Name,

    Subnet = subnet,

    Sender = sender.ID.Value,

    Key = key.Value,

    RandomID = id.Value

  }, out error, out timeoutError);

  try

  {

    var contacts = ret?.Contacts?.Select(
      val => new Contact(Protocol.InstantiateProtocol(
        val.Protocol, val.ProtocolName), new ID(val.Contact))).ToList();

    // Return only contacts with supported protocols.

    return (contacts?.Where(c => c.Protocol != null).ToList() ?? EmptyContactList(),
    GetRpcError(id, ret, timeoutError, error));

  }

  catch (Exception ex)

  {

    return (null, new RpcError() { ProtocolError = true,
      ProtocolErrorMessage = ex.Message });

  }

}

/// <summary>

/// Attempt to find the value in the peer network.

/// </summary>

/// <returns>A null contact list is acceptable here as it is a valid return
/// if the value is found.

/// The caller is responsible for checking the timeoutError flag to make
/// sure null contacts is not

/// the result of a timeout error.</returns>

public (List<Contact> contacts, string val, RpcError error)
  FindValue(Contact sender, ID key)

{

  ErrorResponse error;

  ID id = ID.RandomID;

  bool timeoutError;

  var ret = RestCall.Post<FindValueResponse, ErrorResponse>(
    url + ":" + port + "//FindNode",

  new FindValueSubnetRequest()

  {

    Protocol = sender.Protocol,

    ProtocolName = sender.Protocol.GetType().Name,

    Subnet = subnet,

    Sender = sender.ID.Value,

    Key = key.Value,

    RandomID = id.Value

  }, out error, out timeoutError);

  try

  {

    var contacts = ret?.Contacts?.Select(
      val => new Contact(Protocol.InstantiateProtocol(val.Protocol,
        val.ProtocolName), new ID(val.Contact))).ToList();

    // Return only contacts with supported protocols.

    return (contacts?.Where(c => c.Protocol != null).ToList(),
      ret.Value, GetRpcError(id, ret, timeoutError, error));

  }

  catch (Exception ex)

  {

    return (null, null, new RpcError() { ProtocolError = true,
      ProtocolErrorMessage = ex.Message });

  }

}


public RpcError Ping(Contact sender)

{

  ErrorResponse error;

  ID id = ID.RandomID;

  bool timeoutError;

  var ret = RestCall.Post<FindValueResponse, ErrorResponse>(
    url + ":" + port + "//Ping",

  new PingSubnetRequest()

  {

    Protocol = sender.Protocol,

    ProtocolName = sender.Protocol.GetType().Name,

    Subnet = subnet,

    Sender = sender.ID.Value,

    RandomID = id.Value

  },

  out error, out timeoutError);

  return GetRpcError(id, ret, timeoutError, error);

}

public RpcError Store(Contact sender, ID key, string val, bool isCached = false,
  int expirationTimeSec = 0)

{

  ErrorResponse error;

  ID id = ID.RandomID;

  bool timeoutError;

  var ret = RestCall.Post<FindValueResponse, ErrorResponse>(
    url + ":" + port + "//Store",

  new StoreSubnetRequest()

  {

    Protocol = sender.Protocol,

    ProtocolName = sender.Protocol.GetType().Name,

    Subnet = subnet,

    Sender = sender.ID.Value,

    Key = key.Value,

    Value = val,

    IsCached = isCached,

    ExpirationTimeSec = expirationTimeSec,

    RandomID = id.Value

  },

  out error, out timeoutError);

  return GetRpcError(id, ret, timeoutError, error);

}

The RpcError class manages the kinds of errors that we can encounter, and is instantiated in the GetRpcError method.

Code Listing 101: Handling RPC Errors

public class RpcError

{

  public bool HasError
  {
    get { return TimeoutError || IDMismatchError || PeerError || ProtocolError; }
  }

  public bool TimeoutError { get; set; }

  public bool IDMismatchError { get; set; }

  public bool PeerError { get; set; }

  public bool ProtocolError { get; set; }

  public string PeerErrorMessage { get; set; }

  public string ProtocolErrorMessage { get; set; }

}

protected RpcError GetRpcError(
  ID id,
  BaseResponse resp,
  bool timeoutError,
  ErrorResponse peerError)

{

  return new RpcError()
  {
    IDMismatchError = id != resp.RandomID,
    TimeoutError = timeoutError,
    PeerError = peerError != null,
    PeerErrorMessage = peerError?.ErrorMessage
  };

}

Note that this class reflects several different errors that can occur:

  • Timeout: The peer failed to respond in the RestCall.REQUEST_TIMEOUT period, which by default is 500 ms.
  • ID mismatch: The peer responded, but not with an ID that matched the sender’s random ID.
  • Peer: The peer encountered an exception, in which case the exception message is returned to the caller.
  • Deserialization: The Post method catches JSON deserialization errors, which also indicates an error with the peer response.

TCP subnet unit testing

As with the unit tests for the protocol itself, studying these unit tests is useful for how one sets up a server and client. The following unit tests validate the round-trip calls, exercising the protocol calls and the server. Each test initializes the server and then tears it down.

Code Listing 102: TCP Subnet Setup and Teardown

[TestClass]

public class TcpSubnetTests

{

  protected string localIP = "http://127.0.0.1";

  protected int port = 2720;

  protected TcpSubnetServer server;

  [TestInitialize]

  public void Initialize()

  {

    server = new TcpSubnetServer(localIP, port);

  }

  [TestCleanup]

  public void TestCleanup()

  {

    server.Stop();

  }

...

The unit tests exercise each of the four RPC calls as well as a timeout error.

PingRouteTest

This test verifies the Ping RPC call.

Code Listing 103: PingRouteTest

[TestMethod]

public void PingRouteTest()

{

  TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1);

  TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2);

  ID ourID = ID.RandomID;

  Contact c1 = new Contact(p1, ourID);

  Node n1 = new Node(c1, new VirtualStorage());

  Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage());

  server.RegisterProtocol(p1.Subnet, n1);

  server.RegisterProtocol(p2.Subnet, n2);

  server.Start();

  p2.Ping(c1);

}

Oddly there’s no assertion here, as nothing of note happens. The point of this is that no exceptions are thrown.

StoreRouteTest

This test verifies the Store RPC call.

Code Listing 104: StoreRoutTest

[TestMethod]

public void StoreRouteTest()

{

  TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1);

  TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2);

  ID ourID = ID.RandomID;

  Contact c1 = new Contact(p1, ourID);

  Node n1 = new Node(c1, new VirtualStorage());

  Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage());

  server.RegisterProtocol(p1.Subnet, n1);

  server.RegisterProtocol(p2.Subnet, n2);

  server.Start();

  Contact sender = new Contact(p1, ID.RandomID);

  ID testID = ID.RandomID;

  string testValue = "Test";

  p2.Store(sender, testID, testValue);

  Assert.IsTrue(n2.Storage.Contains(testID),
    "Expected remote peer to have value.");

  Assert.IsTrue(n2.Storage.Get(testID) == testValue,
    "Expected remote peer to contain stored value.");

}

FindNodesRouteTest

This test verifies the FindNodes RPC call.

Code Listing 105: FindNodesRouteTest

[TestMethod]

public void FindNodesRouteTest()

{

  TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1);

  TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2);

  ID ourID = ID.RandomID;

  Contact c1 = new Contact(p1, ourID);

  Node n1 = new Node(c1, new VirtualStorage());

  Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage());

  // Node 2 knows about another contact, that isn't us (because we're excluded.)

  ID otherPeer = ID.RandomID;

  n2.BucketList.Buckets[0].Contacts.Add(new Contact(
    new TcpSubnetProtocol(localIP, port, 3), otherPeer));

  server.RegisterProtocol(p1.Subnet, n1);

  server.RegisterProtocol(p2.Subnet, n2);

  server.Start();

  ID id = ID.RandomID;

  List<Contact> ret = p2.FindNode(c1, id).contacts;

  Assert.IsTrue(ret.Count == 1, "Expected 1 contact.");

  Assert.IsTrue(ret[0].ID == otherPeer,
    "Expected contact to the other peer (not us).");

}

FindValueRouteTest

This test verifies the FindValue RPC call.

Code Listing 106: FindValueRouteTest

[TestMethod]

public void FindValueRouteTest()

{

  TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1);

  TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2);

  ID ourID = ID.RandomID;

  Contact c1 = new Contact(p1, ourID);

  Node n1 = new Node(c1, new VirtualStorage());

  Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage());

  server.RegisterProtocol(p1.Subnet, n1);

  server.RegisterProtocol(p2.Subnet, n2);

  server.Start();

  ID testID = ID.RandomID;

  string testValue = "Test";

  p2.Store(c1, testID, testValue);

  Assert.IsTrue(n2.Storage.Contains(testID),
    "Expected remote peer to have value.");

  Assert.IsTrue(n2.Storage.Get(testID) == testValue,
    "Expected remote peer to contain stored value.");

  var ret = p2.FindValue(c1, testID);

  Assert.IsTrue(ret.contacts == null, "Expected to find value.");

  Assert.IsTrue(ret.val == testValue,
    "Value does not match expected value from peer.");

}

UnresponsiveNodeTest

This test verifies that an unresponsive node results in a timeout error.

Code Listing 107: UnresponsiveNodeTest

[TestMethod]

public void UnresponsiveNodeTest()

{

  TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1);

  TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2);

  p2.Responds = false;

  ID ourID = ID.RandomID;

  Contact c1 = new Contact(p1, ourID);

  Node n1 = new Node(c1, new VirtualStorage());

  Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage());

  server.RegisterProtocol(p1.Subnet, n1);

  server.RegisterProtocol(p2.Subnet, n2);

  server.Start();

  ID testID = ID.RandomID;

  string testValue = "Test";

  RpcError error = p2.Store(c1, testID, testValue);

  Assert.IsTrue(error.TimeoutError, "Expected timeout.");

}

Scroll To Top
Disclaimer
DISCLAIMER: Web reader is currently in beta. Please report any issues through our support system. PDF and Kindle format files are also available for download.

Previous

Next



You are one step away from downloading ebooks from the Succinctly® series premier collection!
A confirmation has been sent to your email address. Please check and confirm your email subscription to complete the download.