CHAPTER 13
There’s a few points to make in this implementation:
Requests issued by the DHT are serialized to JSON from the following classes.
Code Listing 95: BaseRequest Class
public abstract class BaseRequest { public object Protocol { get; set; } public string ProtocolName { get; set; } public BigInteger RandomID { get; set; } public BigInteger Sender { get; set; } public BaseRequest() { RandomID = ID.RandomID.Value; } } public class FindNodeRequest : BaseRequest { public BigInteger Key { get; set; } } public class FindValueRequest : BaseRequest { public BigInteger Key { get; set; } } public class PingRequest : BaseRequest { } public class StoreRequest : BaseRequest { public BigInteger Key { get; set; } public string Value { get; set; } public bool IsCached { get; set; } public int ExpirationTimeSec { get; set; } } public interface ITcpSubnet { int Subnet { get; set; } } public class FindNodeSubnetRequest : FindNodeRequest, ITcpSubnet { public int Subnet { get; set; } } public class FindValueSubnetRequest : FindValueRequest, ITcpSubnet { public int Subnet { get; set; } } public class PingSubnetRequest : PingRequest, ITcpSubnet { public int Subnet { get; set; } } public class StoreSubnetRequest : StoreRequest, ITcpSubnet { public int Subnet { get; set; } } |
On the server side, which receives these messages, they are handled by a common request class.
Code Listing 96: CommonRequest—Server Side
/// <summary> /// For passing to Node handlers with common parameters. /// </summary> public class CommonRequest { public object Protocol { get; set; } public string ProtocolName { get; set; } public BigInteger RandomID { get; set; } public BigInteger Sender { get; set; } public BigInteger Key { get; set; } public string Value { get; set; } public bool IsCached { get; set; } public int ExpirationTimeSec { get; set; } } |
As the comment states, the common request simplifies the server implementation by having RPC handler methods with the same parameter.
The request handlers extract the pertinent pieces of the CommonRequest and call the appropriate method of the Node class. The important part here is that the contact protocol must be returned as part of the FindNode and FindValue response. Note that the returns are anonymous objects.
Code Listing 97: Request Handlers
public object ServerPing(CommonRequest request) { IProtocol protocol = Protocol.InstantiateProtocol( Ping(new Contact(protocol, new ID(request.Sender))); return new { RandomID = request.RandomID }; } public object ServerStore(CommonRequest request) { IProtocol protocol = Protocol.InstantiateProtocol( Store(new Contact(protocol, new ID(request.Sender)), return new { RandomID = request.RandomID }; } public object ServerFindNode(CommonRequest request) { IProtocol protocol = Protocol.InstantiateProtocol( var (contacts, val) = return new { Contacts = contacts.Select(c => new { Contact = c.ID.Value, Protocol = c.Protocol, ProtocolName = c.Protocol.GetType().Name }).ToList(), RandomID = request.RandomID }; } public object ServerFindValue(CommonRequest request) { IProtocol protocol = Protocol.InstantiateProtocol( var (contacts, val) = return new { Contacts = contacts?.Select(c => new { Contact = c.ID.Value, Protocol = c.Protocol, ProtocolName = c.Protocol.GetType().Name })?.ToList(), RandomID = request.RandomID, Value = val }; } |
JSON responses are deserialized into the following classes.
Code Listing 98: Server Responses
public abstract class BaseResponse { public BigInteger RandomID { get; set; } } public class ErrorResponse : BaseResponse { public string ErrorMessage { get; set; } } public class ContactResponse { public BigInteger Contact { get; set; } public object Protocol { get; set; } public string ProtocolName { get; set; } } public class FindNodeResponse : BaseResponse { public List<ContactResponse> Contacts { get; set; } } public class FindValueResponse : BaseResponse { public List<ContactResponse> Contacts { get; set; } public string Value { get; set; } } public class PingResponse : BaseResponse { } public class StoreResponse : BaseResponse { } |
The server is a straightforward HttpListener implemented as a C# HttpListenerContext object, but note how the subnet ID is used to route the request to the specific node associated with the subnet.
Code Listing 99: ProcessRequest
protected override async void ProcessRequest(HttpListenerContext context) { string data = new StreamReader(context.Request.InputStream, if (context.Request.HttpMethod == "POST") { Type requestType; string path = context.Request.RawUrl; // Remove "//" // Prefix our call with "Server" so that the method name is unambiguous. string methodName = "Server" + path.Substring(2); if (routePackets.TryGetValue(path, out requestType)) { CommonRequest commonRequest = INode node; if (subnets.TryGetValue(subnet, out node)) { await Task.Run(() => } else { SendErrorResponse(context, new ErrorResponse() } } else { SendErrorResponse(context, new ErrorResponse() } context.Response.Close(); } } |
The protocol implements the four RPC calls, issuing HTTP POSTs to the server. Notice how the protocol is instantiated from the JSON return, and if the protocol isn’t supported, the contact is removed from the contacts returned by FindNode and FindValue.
Code Listing 100: FindNode, FindValue, Ping, and Store Handlers
public (List<Contact> contacts, RpcError error) FindNode(Contact sender, ID key) { ErrorResponse error; ID id = ID.RandomID; bool timeoutError; var ret = RestCall.Post<FindNodeResponse, ErrorResponse>( new FindNodeSubnetRequest() { Protocol = sender.Protocol, ProtocolName = sender.Protocol.GetType().Name, Subnet = subnet, Sender = sender.ID.Value, Key = key.Value, RandomID = id.Value }, out error, out timeoutError); try { var contacts = ret?.Contacts?.Select( // Return only contacts with supported protocols. return (contacts?.Where(c => c.Protocol != null).ToList() ?? EmptyContactList(), } catch (Exception ex) { return (null, new RpcError() { ProtocolError = true, } } /// <summary> /// Attempt to find the value in the peer network. /// </summary> /// <returns>A null contact list is acceptable here as it is a valid return /// The caller is responsible for checking the timeoutError flag to make /// the result of a timeout error.</returns> public (List<Contact> contacts, string val, RpcError error) { ErrorResponse error; ID id = ID.RandomID; bool timeoutError; var ret = RestCall.Post<FindValueResponse, ErrorResponse>( new FindValueSubnetRequest() { Protocol = sender.Protocol, ProtocolName = sender.Protocol.GetType().Name, Subnet = subnet, Sender = sender.ID.Value, Key = key.Value, RandomID = id.Value }, out error, out timeoutError); try { var contacts = ret?.Contacts?.Select( // Return only contacts with supported protocols. return (contacts?.Where(c => c.Protocol != null).ToList(), } catch (Exception ex) { return (null, null, new RpcError() { ProtocolError = true, } } public RpcError Ping(Contact sender) { ErrorResponse error; ID id = ID.RandomID; bool timeoutError; var ret = RestCall.Post<FindValueResponse, ErrorResponse>( new PingSubnetRequest() { Protocol = sender.Protocol, ProtocolName = sender.Protocol.GetType().Name, Subnet = subnet, Sender = sender.ID.Value, RandomID = id.Value }, out error, out timeoutError); return GetRpcError(id, ret, timeoutError, error); } public RpcError Store(Contact sender, ID key, string val, bool isCached = false, { ErrorResponse error; ID id = ID.RandomID; bool timeoutError; var ret = RestCall.Post<FindValueResponse, ErrorResponse>( new StoreSubnetRequest() { Protocol = sender.Protocol, ProtocolName = sender.Protocol.GetType().Name, Subnet = subnet, Sender = sender.ID.Value, Key = key.Value, Value = val, IsCached = isCached, ExpirationTimeSec = expirationTimeSec, RandomID = id.Value }, out error, out timeoutError); return GetRpcError(id, ret, timeoutError, error); } |
The RpcError class manages the kinds of errors that we can encounter, and is instantiated in the GetRpcError method.
Code Listing 101: Handling RPC Errors
public class RpcError { public bool HasError public bool TimeoutError { get; set; } public bool IDMismatchError { get; set; } public bool PeerError { get; set; } public bool ProtocolError { get; set; } public string PeerErrorMessage { get; set; } public string ProtocolErrorMessage { get; set; } } { return new RpcError() } |
Note that this class reflects several different errors that can occur:
As with the unit tests for the protocol itself, studying these unit tests is useful for how one sets up a server and client. The following unit tests validate the round-trip calls, exercising the protocol calls and the server. Each test initializes the server and then tears it down.
Code Listing 102: TCP Subnet Setup and Teardown
[TestClass] public class TcpSubnetTests { protected string localIP = "http://127.0.0.1"; protected int port = 2720; protected TcpSubnetServer server; [TestInitialize] public void Initialize() { server = new TcpSubnetServer(localIP, port); } [TestCleanup] public void TestCleanup() { server.Stop(); } ... |
The unit tests exercise each of the four RPC calls as well as a timeout error.
This test verifies the Ping RPC call.
Code Listing 103: PingRouteTest
[TestMethod] public void PingRouteTest() { TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1); TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2); ID ourID = ID.RandomID; Contact c1 = new Contact(p1, ourID); Node n1 = new Node(c1, new VirtualStorage()); Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage()); server.RegisterProtocol(p1.Subnet, n1); server.RegisterProtocol(p2.Subnet, n2); server.Start(); p2.Ping(c1); } |
Oddly there’s no assertion here, as nothing of note happens. The point of this is that no exceptions are thrown.
This test verifies the Store RPC call.
Code Listing 104: StoreRoutTest
[TestMethod] public void StoreRouteTest() { TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1); TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2); ID ourID = ID.RandomID; Contact c1 = new Contact(p1, ourID); Node n1 = new Node(c1, new VirtualStorage()); Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage()); server.RegisterProtocol(p1.Subnet, n1); server.RegisterProtocol(p2.Subnet, n2); server.Start(); Contact sender = new Contact(p1, ID.RandomID); ID testID = ID.RandomID; string testValue = "Test"; p2.Store(sender, testID, testValue); Assert.IsTrue(n2.Storage.Contains(testID), Assert.IsTrue(n2.Storage.Get(testID) == testValue, } |
This test verifies the FindNodes RPC call.
Code Listing 105: FindNodesRouteTest
[TestMethod] public void FindNodesRouteTest() { TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1); TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2); ID ourID = ID.RandomID; Contact c1 = new Contact(p1, ourID); Node n1 = new Node(c1, new VirtualStorage()); Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage()); // Node 2 knows about another contact, that isn't us (because we're excluded.) ID otherPeer = ID.RandomID; n2.BucketList.Buckets[0].Contacts.Add(new Contact( server.RegisterProtocol(p1.Subnet, n1); server.RegisterProtocol(p2.Subnet, n2); server.Start(); ID id = ID.RandomID; List<Contact> ret = p2.FindNode(c1, id).contacts; Assert.IsTrue(ret.Count == 1, "Expected 1 contact."); Assert.IsTrue(ret[0].ID == otherPeer, } |
This test verifies the FindValue RPC call.
Code Listing 106: FindValueRouteTest
[TestMethod] public void FindValueRouteTest() { TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1); TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2); ID ourID = ID.RandomID; Contact c1 = new Contact(p1, ourID); Node n1 = new Node(c1, new VirtualStorage()); Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage()); server.RegisterProtocol(p1.Subnet, n1); server.RegisterProtocol(p2.Subnet, n2); server.Start(); ID testID = ID.RandomID; string testValue = "Test"; p2.Store(c1, testID, testValue); Assert.IsTrue(n2.Storage.Contains(testID), Assert.IsTrue(n2.Storage.Get(testID) == testValue, var ret = p2.FindValue(c1, testID); Assert.IsTrue(ret.contacts == null, "Expected to find value."); Assert.IsTrue(ret.val == testValue, } |
This test verifies that an unresponsive node results in a timeout error.
Code Listing 107: UnresponsiveNodeTest
[TestMethod] public void UnresponsiveNodeTest() { TcpSubnetProtocol p1 = new TcpSubnetProtocol(localIP, port, 1); TcpSubnetProtocol p2 = new TcpSubnetProtocol(localIP, port, 2); p2.Responds = false; ID ourID = ID.RandomID; Contact c1 = new Contact(p1, ourID); Node n1 = new Node(c1, new VirtualStorage()); Node n2 = new Node(new Contact(p2, ID.RandomID), new VirtualStorage()); server.RegisterProtocol(p1.Subnet, n1); server.RegisterProtocol(p2.Subnet, n2); server.Start(); ID testID = ID.RandomID; string testValue = "Test"; RpcError error = p2.Store(c1, testID, testValue); Assert.IsTrue(error.TimeoutError, "Expected timeout."); } |