Send record and wait for its acknowledgement to receive Send record and wait for its acknowledgement to receive multithreading multithreading

Send record and wait for its acknowledgement to receive


The code has a number of potential issues:

  • An answer may be received before the call to retryHolder#put.
  • Possibly there is a race condition when messages are retried too.
  • If two messages are sent to the same address the second overwrites the first?
  • Send always wastes time with a sleep, use a wait+notify instead.

I would store a class with more state instead. It could contain a flag (retryIfNoAnswer yes/no) that the retry handler could check. It could provide waitForAnswer/markAnswerReceived methods using wait/notify so that send doesn't have to sleep for a fixed time. The waitForAnswer method can return true if an answer was obtained and false on timeout. Put the object in the retry handler before sending and use a timestamp so that only messages older than a certain age are retried. That fixes the first race condition.

EDIT: updated example code below, compiles with your code, not tested:

public class SendToQueue {private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);// Not sure why you are using a cache rather than a standard ConcurrentHashMap?private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)    .concurrencyLevel(100)    .removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();private static class PendingMessage {    private final long _address;    private final byte[] _encodedRecords;    private final Socket _socket;    private final boolean _retryEnabled;    private final Object _monitor = new Object();    private long _sendTimeMillis;    private volatile boolean _acknowledged;    public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {        _address = address;        _sendTimeMillis = System.currentTimeMillis();        _encodedRecords = encodedRecords;        _socket = socket;        _retryEnabled = retryEnabled;    }    public synchronized boolean hasExpired() {        return System.currentTimeMillis() - _sendTimeMillis > 500L;    }    public synchronized void markResent() {        _sendTimeMillis = System.currentTimeMillis();    }    public boolean shouldRetry() {        return _retryEnabled && !_acknowledged;    }    public boolean waitForAck() {        try {            synchronized(_monitor) {                _monitor.wait(500L);            }            return _acknowledged;        }        catch (InterruptedException e) {            return false;        }    }    public void ackReceived() {        _acknowledged = true;        synchronized(_monitor) {            _monitor.notifyAll();        }    }    public long getAddress() {        return _address;    }    public byte[] getEncodedRecords() {        return _encodedRecords;    }    public Socket getSocket() {        return _socket;    }}private static class Holder {    private static final SendToQueue INSTANCE = new SendToQueue();}public static SendToQueue getInstance() {    return Holder.INSTANCE;}private void handleRetries() {    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());    for (PendingMessage m : messages) {        if (m.hasExpired()) {            if (m.shouldRetry()) {                m.markResent();                doSendAsync(m, m.getSocket());            }            else {                // Or leave the message and let send remove it                cache.invalidate(m.getAddress());            }        }    }}private SendToQueue() {    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.    executorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            handleRetries();        }    }, 0, 1, TimeUnit.SECONDS);}public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {    PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);    cache.put(address, m);    return doSendAsync(m, socket);}private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {    ZMsg msg = new ZMsg();    msg.add(pendingMessage.getEncodedRecords());    try {        // send data on a socket LINE A        return msg.send(socket);    }    finally {        msg.destroy();    }}public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);    cache.put(address, m);    try {        if (doSendAsync(m, socket)) {            return m.waitForAck();        }        return false;    }    finally {        // Alternatively (checks that address points to m):        // cache.asMap().remove(address, m);        cache.invalidate(address);    }}public void handleAckReceived(final long address) {    PendingMessage m = cache.getIfPresent(address);    if (m != null) {        m.ackReceived();        cache.invalidate(address);    }}}

And called from ResponsePoller:

SendToQueue.getInstance().handleAckReceived(addressFrom);


Design-wise: I feel like you are trying to write a thread-safe and somewhat efficient NIO message sender/receiver but (both) code I see here aren't OK and won't be without significant changes. The best thing to do is either:

  • make full use of the 0MQ framework. I see things and expectations here that are actually available out-of-the-box in ZMQ and java.util.concurrent API.
  • or have a look at Netty (https://netty.io/index.html) preferably if it applies to your project. "Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients." This will save you time if your project gets complex, otherwise it might be overkill to start with (but then expect issues ...).

However if you think you are almost at it with your code or @john's code then I will just give advices to complete:

  • don't use wait() and notify(). Don't sleep() either.
  • use a single thread for your "flow tracker" (i.e. ~the pending message Cache).

You don't actually need 3 threads to process pending messages except if this processing itself is slow (or does heavy stuff) which is not the case here as you basically make an async call (as far as it is really async.. is it?).

The same for the reverse path: use an executor service (multiple threads) for your received packets processing only if the actual processing is slow/blocking or heavy.

I'm not an expert in 0MQ at all but as far as socket.send(...) is thread-safe and non-blocking (which I'm not sure personally - tell me) the above advices shall be correct and make things simpler.

That said, to strictly answer your question:

Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?

I'd say no, hence what do you think of the following? Based on your code and independently of my own feelings this seems acceptable:

public class SendToQueue {    // ...    private final Map<Long, Boolean> transactions = new ConcurrentHashMap<>();    // ...    private void startTransaction(long address) {        this.transactions.put(address, Boolean.FALSE);    }    public void updateTransaction(long address) {        Boolean state = this.transactions.get(address);        if (state != null) {                this.transactions.put(address, Boolean.TRUE);        }    }    private void clearTransaction(long address) {        this.transactions.remove(address);    }    public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {        boolean success = false;        // If address is enough randomized or atomically counted (then ok for parallel send())        startTransaction(address);        try {            boolean sent = sendAsync(address, encodedRecords, socket);            // if the record was sent successfully, then only sleep for timeout period            if (sent) {                // wait for acknowledgement                success = waitDoneUntil(new DoneCondition() {                    @Override                    public boolean isDone() {                        return SendToQueue.this.transactions.get(address); // no NPE                    }                }, 500, TimeUnit.MILLISECONDS);                if (success) {                    // Message acknowledged!                }            }        } finally {            clearTransaction(address);        }        return success;    }    public static interface DoneCondition {        public boolean isDone();    }    /**     * WaitDoneUntil(Future f, int duration, TimeUnit unit). Note: includes a     * sleep(50).     *     * @param f Will block for this future done until maxWaitMillis     * @param waitTime Duration expressed in (time) unit.     * @param unit Time unit.     * @return DoneCondition finally met or not     */    public static boolean waitDoneUntil(DoneCondition f, int waitTime, TimeUnit unit) {        long curMillis = 0;        long maxWaitMillis = unit.toMillis(waitTime);        while (!f.isDone() && curMillis < maxWaitMillis) {            try {                Thread.sleep(50);   // define your step here accordingly or set as parameter            } catch (InterruptedException ex1) {                //logger.debug("waitDoneUntil() interrupted.");                break;            }            curMillis += 50L;        }        return f.isDone();    }    //...}public class ResponsePoller {    //...    public void onReceive(long address) {   // sample prototype             // ...        SendToQueue.getInstance().updateTransaction(address);        // The interested sender will know that its transaction is complete.        // While subsequent (late) calls will have no effect.    }}