Request Waiting List


The cluster node maintains a waiting list which maps a key and
a callback function. The key is chosen depending on the specific
criteria to invoke the callback.
For example if it needs to be invoked whenever
a message from other cluster node is received, it can be the
Correlation Identifer of the message.
In the case of Replicated Log it is the
High-Water Mark. The callback handles
the response and decides if the client request can be fulfilled.

Consider the example of a key-value store where, data is replicated
on multiple servers. Here, Quorum can be used
to decide when a replication can be considered successful
to initiate a response to the client.
The cluster node then tracks the requests sent to other cluster nodes,
and a callback is registered with each request.
Each request is marked with a Correlation Identifer,
which is used to map response to the request.
The waiting list is then notified to invoke the callback
when the response from other cluster nodes are received.

For the sake of this example, let’s call our three cluster nodes
athens, byzantium and cyrene.
The client connects with athens to store “title” as “Microservices”.
Athens replicates it on byzantium and cyrene; so it sends
a request to itself to store the key-value and sends
requests to both byzantium and cyrene concurrently.
To track responses, athens creates a WriteQuorumResponseCallback
and adds it to the waiting list for each of the requests sent.

For every response received, the WriteQuorumResponseCallback is
invoked to handle the response. It checks whether the required number
of responses have been received.
Once the response is received from byzantium, the quorum is reached
and the pending client request is completed.
Cyrene can respond later, but the response can be sent to the client
without waiting for it.

The code looks like the sample below:
Note that every cluster node maintains its own instance of a waiting list.
The waiting list tracks the key and associated callback and
stores the timestamp at which the callback was registered.
The timestamp is used to check whether the callbacks need to be expired
if responses haven’t been received within the expected time.

public class RequestWaitingList<Key, Response> {
    private Map<Key, CallbackDetails> pendingRequests = new ConcurrentHashMap<>();
    public void add(Key key, RequestCallback<Response> callback) 
        pendingRequests.put(key, new CallbackDetails(callback, clock.nanoTime()));
class CallbackDetails 
    RequestCallback requestCallback;
    long createTime;

    public CallbackDetails(RequestCallback requestCallback, long createTime) 
        this.requestCallback = requestCallback;
        this.createTime = createTime;

    public RequestCallback getRequestCallback() 
        return requestCallback;

    public long elapsedTime(long now) 
        return now - createTime;

public interface RequestCallback<T> 
    void onResponse(T r);
    void onError(Throwable e);

It is asked to handle the response or error
once the response has been received from the other cluster node.

class RequestWaitingList…

  public void handleResponse(Key key, Response response) 
      if (!pendingRequests.containsKey(key)) 
      CallbackDetails callbackDetails = pendingRequests.remove(key);


class RequestWaitingList…

  public void handleError(int requestId, Throwable e) 
      CallbackDetails callbackDetails = pendingRequests.remove(requestId);

The waiting list can then be used to handle quorum responses
with the implementation looking something like this:

static class WriteQuorumCallback implements RequestCallback<RequestOrResponse> 
    private final int quorum;
    private volatile int expectedNumberOfResponses;
    private volatile int receivedResponses;
    private volatile int receivedErrors;
    private volatile boolean done;

    private final RequestOrResponse request;
    private final ClientConnection clientConnection;

    public WriteQuorumCallback(int totalExpectedResponses, RequestOrResponse clientRequest, ClientConnection clientConnection) 
        this.expectedNumberOfResponses = totalExpectedResponses;
        this.quorum = expectedNumberOfResponses / 2 + 1;
        this.request = clientRequest;
        this.clientConnection = clientConnection;

    public void onResponse(RequestOrResponse response) 
        if (receivedResponses == quorum && !done) 
            done = true;

    public void onError(Throwable t) 
        if (receivedErrors == quorum && !done) 
            done = true;

    private void respondToClient(String response) 
        clientConnection.write(new RequestOrResponse(RequestId.SetValueResponse.getId(), response.getBytes(), request.getCorrelationId()));

Whenever a cluster node sends requests to other nodes,
it adds a callback to the waiting list mapping with the Correlation Identifer
of the request sent.

class ClusterNode…

  private void handleSetValueClientRequestRequiringQuorum(List<InetAddressAndPort> replicas, RequestOrResponse request, ClientConnection clientConnection) 
      int totalExpectedResponses = replicas.size();
      RequestCallback requestCallback = new WriteQuorumCallback(totalExpectedResponses, request, clientConnection);
      for (InetAddressAndPort replica : replicas) 
          int correlationId = nextRequestId();
          requestWaitingList.add(correlationId, requestCallback);
              SocketClient client = new SocketClient(replica);
              client.sendOneway(new RequestOrResponse(RequestId.SetValueRequest.getId(), request.getMessageBodyJson(), correlationId, listenAddress));
           catch (IOException e) 
              requestWaitingList.handleError(correlationId, e);

Once the response is received, the waiting list is asked to handle it:

class ClusterNode…

  private void handleSetValueResponse(RequestOrResponse response) 
      requestWaitingList.handleResponse(response.getCorrelationId(), response);

The waiting list will then invoke the associated WriteQuorumCallback.
The WriteQuorumCallback instance verifies if
the quorum responses have been received and invokes the callback
to respond to the client.

Expiring Long Pending Requests

Sometimes, responses from the other cluster nodes are
delayed. In these instances the waiting list generally has
a mechanism to expire requests after a timeout:

class RequestWaitingList…

  private SystemClock clock;
  private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
  private long expirationIntervalMillis = 2000;
  public RequestWaitingList(SystemClock clock) 
      this.clock = clock;
      executor.scheduleWithFixedDelay(this::expire, expirationIntervalMillis, expirationIntervalMillis, MILLISECONDS);

  private void expire() 
      long now = clock.nanoTime();
      List<Key> expiredRequestKeys = getExpiredRequestKeys(now); -> 
          CallbackDetails request = pendingRequests.remove(expiredRequestKey);
          request.requestCallback.onError(new TimeoutException("Request expired"));

  private List<Key> getExpiredRequestKeys(long now) 
      return pendingRequests.entrySet().stream().filter(entry -> entry.getValue().elapsedTime(now) > expirationIntervalMillis).map(e -> e.getKey()).collect(Collectors.toList());