How to implement retry policies while sending data to another application? How to implement retry policies while sending data to another application? multithreading multithreading

How to implement retry policies while sending data to another application?


I am not able to work out all the details regarding how to use the relevant API-s, but as for algorithm, you could try:

  • the retry-policy needs to have some sort of state attached to each message (atleast the number of times the current message has been retried, possible what the current delay is). You need to decide whether the RetryPolicy should keep that itself or if you want to store it inside the message.
  • instead of allowRetry, you could have a method calculating when the next retry should occur (in absolute time or as a number of milliseconds in the future), which will be a function of the state mentioned above
  • the retry queue should contain information on when each message should be retried.
  • instead of using scheduleAtFixedRate, find the message in the retry queue which has the lowest when_is_next_retry (possibly by sorting on absolute retry-timestamp and picking the first), and let the executorService reschedule itself using schedule and the time_to_next_retry
  • for each retry, pull it from the retry queue, send the message, use the RetryPolicy for calculating when the next retry should be (if it is to be retried) and insert back into the retry queue with a new value for when_is_next_retry (if the RetryPolicy returns -1, it could mean that the message shall not be retried any more)


not a perfect way, but can be achieved by below way as well.

public interface RetryPolicy {public boolean allowRetry();public void decreaseRetryCount();

}

Create two implementation. For RetryNTimes

public class RetryNTimes implements RetryPolicy {private int maxRetryCount;public RetryNTimes(int maxRetryCount) {    this.maxRetryCount = maxRetryCount;}public boolean allowRetry() {    return maxRetryCount > 0;}public void decreaseRetryCount(){    maxRetryCount = maxRetryCount-1;}}

For ExponentialBackoffRetry

public class ExponentialBackoffRetry implements RetryPolicy {private int maxRetryCount;private final Date retryUpto;public ExponentialBackoffRetry(int maxRetryCount, Date retryUpto) {    this.maxRetryCount = maxRetryCount;    this.retryUpto = retryUpto;}public boolean allowRetry() {    Date date = new Date();    if(maxRetryCount <= 0 || date.compareTo(retryUpto)>=0)    {        return false;    }    return true;}public void decreaseRetryCount() {    maxRetryCount = maxRetryCount-1;}}

You need to make some changes in SendToZeroMQ class

public class SendToZeroMQ {private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);private final Cache<Long,RetryMessage> retryQueue =        CacheBuilder                .newBuilder()                .maximumSize(10000000)                .concurrencyLevel(200)                .removalListener(                        RemovalListeners.asynchronous(new CustomListener(), executorService)).build();private static class Holder {    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();}public static SendToZeroMQ getInstance() {    return Holder.INSTANCE;}private SendToZeroMQ() {    executorService.submit(new ResponsePoller());    // retry every 30 seconds for now    executorService.scheduleAtFixedRate(new Runnable() {        public void run() {            for (Map.Entry<Long, RetryMessage> entry : retryQueue.asMap().entrySet()) {                RetryMessage retryMessage = entry.getValue();                if(retryMessage.getRetryPolicy().allowRetry())                {                    retryMessage.getRetryPolicy().decreaseRetryCount();                    entry.setValue(retryMessage);                    sendTo(entry.getKey(), retryMessage.getMessage(),retryMessage);                }else                {                    retryQueue.asMap().remove(entry.getKey());                }            }        }    }, 0, 30, TimeUnit.SECONDS);}public boolean sendTo(final long address, final byte[] encodedRecords, RetryMessage retryMessage) {    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();    if (!liveSockets.isPresent()) {        return false;    }    if(null==retryMessage)    {        RetryPolicy retryPolicy = new RetryNTimes(10);        retryMessage = new RetryMessage(retryPolicy,encodedRecords);        retryQueue.asMap().put(address,retryMessage);    }    return sendTo(address, encodedRecords, liveSockets.get().getSocket());}public boolean sendTo(final long address, final byte[] encodedByteArray, final ZMQ.Socket socket) {    ZMsg msg = new ZMsg();    msg.add(encodedByteArray);    boolean sent = msg.send(socket);    msg.destroy();    return sent;}public void removeFromRetryQueue(final long address) {    retryQueue.invalidate(address);}}


Here is a working little simulation of your environment that shows how this can be done. Note the Guava cache is the wrong data structure here, since you aren't interested in eviction (I think). So I'm using a concurrent hashmap:

package experimental;import static java.util.concurrent.TimeUnit.MILLISECONDS;import java.util.Arrays;import java.util.Iterator;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ScheduledExecutorService;class Experimental {  /** Return the desired backoff delay in millis for the given retry number, which is 1-based. */  interface RetryStrategy {    long getDelayMs(int retry);  }  enum ConstantBackoff implements RetryStrategy {    INSTANCE;    @Override    public long getDelayMs(int retry) {      return 1000L;    }  }  enum ExponentialBackoff implements RetryStrategy {    INSTANCE;    @Override    public long getDelayMs(int retry) {      return 100 + (1L << retry);    }  }  static class Sender {    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);    private final ConcurrentMap<Long, Retrier> pending = new ConcurrentHashMap<>();    /** Send the given data with given address on the given socket. */    void sendTo(long addr, byte[] data, int socket) {      System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket);    }    private class Retrier implements Runnable {      private final RetryStrategy retryStrategy;      private final long addr;      private final byte[] data;      private final int socket;      private int retry;      private Future<?> future;       Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {        this.retryStrategy = retryStrategy;        this.addr = addr;        this.data = data;        this.socket = socket;        this.retry = 0;      }      synchronized void start() {        if (future == null) {          future = executorService.submit(this);          pending.put(addr, this);        }      }      synchronized void cancel() {        if (future != null) {          future.cancel(true);          future = null;        }      }      private synchronized void reschedule() {        if (future != null) {          future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS);        }      }      @Override      synchronized public void run() {        sendTo(addr, data, socket);        reschedule();      }    }    long getVerifiedAddr() {      System.err.println("Pending messages: " + pending.size());      Iterator<Long> i = pending.keySet().iterator();      long addr = i.hasNext() ? i.next() : 0;      return addr;    }    class CancellationPoller implements Runnable {      @Override      public void run() {        while (!Thread.currentThread().isInterrupted()) {          try {            Thread.sleep(1000);          } catch (InterruptedException ex) {             Thread.currentThread().interrupt();          }          long addr = getVerifiedAddr();          if (addr == 0) {            continue;          }          System.err.println("Verified message (to be cancelled) " + addr);          Retrier retrier = pending.remove(addr);          if (retrier != null) {            retrier.cancel();          }        }      }    }    Sender initialize() {      executorService.submit(new CancellationPoller());      return this;    }    void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {      new Retrier(retryStrategy, addr, data, socket).start();    }  }  public static void main(String[] args) {    Sender sender = new Sender().initialize();    for (long i = 1; i <= 10; i++) {      sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42);    }    for (long i = -1; i >= -10; i--) {      sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37);    }  }}