How could we make use of ClusterListener in Mongo? How could we make use of ClusterListener in Mongo? mongodb mongodb

How could we make use of ClusterListener in Mongo?


TL;DR

The ClusterListener interface can be used to monitor some aspects of a replicaset but if you want to dig deeper and/or if you want to interrogate the replicaset status outside of the events for which the ClusterListener provides callbacks then you might prefer to invoke the replSetGetStatus command and inspect its output.

Detail

The ClusterListener provides call backs which allow you to watch/respond to changes to your replicaset. For example, the following CLusterListener ...

public class LoggingClusterListener implements ClusterListener {    private static final Logger logger = LoggerFactory.getLogger(LoggingClusterListener.class);    @Override    public void clusterOpening(final ClusterOpeningEvent clusterOpeningEvent) {        logger.info("clusterOpening: {}", clusterOpeningEvent.getClusterId().getValue());    }    @Override    public void clusterClosed(final ClusterClosedEvent clusterClosedEvent) {        logger.info("clusterClosed: {}", clusterClosedEvent.getClusterId().getValue());    }    @Override    public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) {        logger.info("clusterDescriptionChanged: {}", event.getClusterId().getValue());        for (ServerDescription sd : event.getNewDescription().getServerDescriptions()) {            logger.info("{} / {} / {} / {}", sd.getType(), sd.getCanonicalAddress(), sd.getState().name());        }    }}

... when associated with a MongoClient like this ...

final MongoClientOptions options = MongoClientOptions.builder()  .addClusterListener(new LoggingClusterListener())  .build();return new MongoClient(serverAddresses, options);

... will emit the following logging:

// cluster starting up ...2017-08-17 12:49:55,977 [main]  clusterOpening: 599582e36d47c231ec963b0b2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostB:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostC:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged   599582e36d47c231ec963b0b2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostB:27017 / CONNECTED / {}    2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostC:27017 / CONNECTED / {}    2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_SECONDARY / hostA:27017 / CONNECTED / {}    // ... the primary fails over to hostA:270172017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged:  599582e36d47c231ec963b0b2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostB:27017 / CONNECTED / {}    2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_SECONDARY / hostC:27017 / CONNECTED / {}    2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_PRIMARY / hostA:27017 / CONNECTED / {}  2017-08-17 12:50:07,126 [main]  clusterClosed: 599582e36d47c231ec963b0b

Perhaps this is sufficient for you but if not, if for example you want to actively monitor replicaset status - rather than only responding when one of the following happens ...

  • Cluster start
  • Cluster stop
  • Cluster description changes

... then you might prefer to periodically sample the replicaset status and report/log/alert on the results. You can do this by executing the replSetGetStatus command and interrogating the results. This command returns a BsonDocument (the format of which is described here) which can be interrogated and logged.

Logging the status document is the simplest response but that approach could be enhanced to form the basis of a monitoring solution by raising alerts on the basis of the document's contents e.g.

  • replicationLag > configured threadhold
  • lastHeartbeat > now() - configured threshold
  • identity of the primary has changed
  • health != 1
  • etc

The following code reads the replicaset status document, interrogates it (including calculating the replication lag) and logs the output.

MongoReplicaSetStatusLogger mongoReplicaSetStatusLogger = new MongoReplicaSetStatusLogger();// periodically ...MongoClient mongoClient = getMongoClient();MongoDatabase admin = mongoClient.getDatabase("admin");BsonDocument commandResult = admin.runCommand(new BsonDocument("replSetGetStatus", new BsonInt32(1)), BsonDocument.class);mongoReplicaSetStatusLogger.report(commandResult);

Here's the MongoReplicaSetStatusLogger implementation:

import org.bson.BsonDocument;import org.bson.BsonInvalidOperationException;import org.bson.BsonNumber;import org.bson.BsonValue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Optional;public class MongoReplicaSetStatusLogger {    private static final Logger logger = LoggerFactory.getLogger(MongoReplicaSetStatusLogger.class);    private static final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSSZ");    private static final String DEFAULT_VALUE = "UNKNOWN";    private static final String MEMBERS = "members";    public void report(BsonDocument replicasetStatusDocument) {        if (hasMembers(replicasetStatusDocument)) {            replicasetStatusDocument.getArray(MEMBERS).stream()                    .filter(BsonValue::isDocument)                    .map(memberDocument -> (BsonDocument) memberDocument)                    .forEach(memberDocument -> logMemberDocument(memberDocument));        } else {            logger.warn("The replicaset status document does not contain a '{}' attributes, perhaps there has been " +                    "a MongoDB upgrade and the format has changed!", MEMBERS);        }    }    private boolean hasMembers(BsonDocument replicasetStatusDocument) {        return replicasetStatusDocument.containsKey(MEMBERS) && replicasetStatusDocument.get(MEMBERS).isArray();    }    private void logMemberDocument(BsonDocument memberDocument) {        StringBuilder stringBuilder = new StringBuilder()                .append(logAttribute("node", getStringValue(memberDocument, "name")))                .append(logAttribute("health", getNumericValue(memberDocument, "health")))                .append(logAttribute("state", getStringValue(memberDocument, "stateStr")))                .append(logAttribute("uptime(s)", getNumericValue(memberDocument, "uptime")))                .append(logAttribute("lastOptime", getDateTimeValue(memberDocument, "optimeDate")))                .append(logAttribute("lastHeartbeat", getDateTimeValue(memberDocument, "lastHeartbeat")))                .append(logAttribute("lastHeartbeatRecv", getDateTimeValue(memberDocument, "lastHeartbeatRecv")))                .append(logAttribute("ping(ms)", getNumericValue(memberDocument, "pingMs")))                .append(logAttribute("replicationLag(s)", getReplicationLag(memberDocument)));        logger.error(stringBuilder.toString());    }    private String logAttribute(String key, Optional<String> value) {        return new StringBuilder(key).append("=").append(value.orElse(DEFAULT_VALUE)).append("|").toString();    }    private Optional<String> getStringValue(BsonDocument memberDocument, String key) {        if (memberDocument.containsKey(key)) {            try {                return Optional.of(memberDocument.getString(key).getValue().toUpperCase());            } catch (BsonInvalidOperationException e) {                logger.warn("Exception reading: {} from replicaset status document, message: {}.", key, e.getMessage());            }        }        return Optional.empty();    }    private Optional<String> getNumericValue(BsonDocument memberDocument, String key) {        if (memberDocument.containsKey(key)) {            BsonNumber bsonNumber = memberDocument.getNumber(key);            if (bsonNumber.isInt32()) {                return Optional.of(Integer.toString(bsonNumber.intValue()));            } else if (bsonNumber.isInt64()) {                return Optional.of(Long.toString(bsonNumber.longValue()));            } else if (bsonNumber.isDouble()) {                return Optional.of(Double.toString(bsonNumber.doubleValue()));            }        }        return Optional.empty();    }    private Optional<String> getDateTimeValue(BsonDocument memberDocument, String key) {        if (memberDocument.containsKey(key)) {            try {                return Optional.of(dateFormatter.format(new Date(memberDocument.getDateTime(key).getValue())));            } catch (BsonInvalidOperationException e) {                logger.warn("Exception reading: {} from replicaset status document due to: {}!", key, e.getMessage());            }        }        return Optional.empty();    }    private Optional<String> getReplicationLag(BsonDocument memberDocument) {        if (memberDocument.containsKey("optimeDate") && memberDocument.containsKey("lastHeartbeat")) {            try {                long optimeDate = memberDocument.getDateTime("optimeDate").getValue();                long lastHeartbeat = memberDocument.getDateTime("lastHeartbeat").getValue();                long replicationLag = lastHeartbeat - optimeDate;                return Optional.of(Long.toString(replicationLag));            } catch (BsonInvalidOperationException e) {                logger.warn("Exception reading 'optimeDate' or 'lastHeartbeat' from replicaset status document due to: {}!", e.getMessage());            } catch (IllegalArgumentException e) {                logger.warn("Exception calculating the replication lag due to: {}!", e.getMessage());            }        }        return Optional.empty();    }}

Here's an example of the output:

2017-08-17 15:44:35,192|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostA:27017|health=1.0|state=PRIMARY|uptime(s)=21|lastOptime=2017-08-17T15:43:32,000+0100|lastHeartbeat=UNKNOWN|lastHeartbeatRecv=UNKNOWN|ping(ms)=UNKNOWN|replicationLag(s)=UNKNOWN|2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostB:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,443+0100|lastHeartbeatRecv=2017-08-17T15:43:36,412+0100|ping(ms)=0|replicationLag(s)=15443|2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostC:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,444+0100|lastHeartbeatRecv=2017-08-17T15:43:36,470+0100|ping(ms)=0|replicationLag(s)=15444|