HeartbeatManager.java
package emissary.directory;
import emissary.client.EmissaryClient;
import emissary.client.EmissaryResponse;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.server.mvc.adapters.HeartbeatAdapter;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.entity.UrlEncodedFormEntity;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
/**
* Facility for directory instances to check up on each other by sending a heartbeat message
*/
public class HeartbeatManager {
// Our logger
protected static final Logger logger = LoggerFactory.getLogger(HeartbeatManager.class);
// parameter constants
public static final String FROM_PLACE_NAME = "hbf";
public static final String TO_PLACE_NAME = "hbt";
public static final String BAD_RESPOSNE = "Bad request -> status: 500";
/** The timer drives the tasks and scheduling of heartbeat pings */
@Nullable
protected Timer timer = null;
/** Directory this instance acts on behalf of */
protected String thisDirectory;
/** Default delay seconds before timer starts {@value} */
public static final int DEFAULT_INITIAL_DELAY_SECONDS = 60;
/**
* Configured value before timer starts in seconds, default is {@value #DEFAULT_INITIAL_DELAY_SECONDS}
*/
protected int initialDelaySeconds = DEFAULT_INITIAL_DELAY_SECONDS;
/** Default delay between timer runs {@value} */
public static final int DEFAULT_INTERVAL_SECONDS = 30;
/**
* Configured value between timer runs in seconds, default is {@value #DEFAULT_INTERVAL_SECONDS}
*/
protected int intervalSeconds = DEFAULT_INTERVAL_SECONDS;
/** Number of consecutive failures to trigger notice */
protected int failThreshold = 3;
/** Number of consecutive failures to trigger permanent failure notice */
protected int permanentFailThreshold = 20;
/** Status value for callers to use when setting initially healthy */
public static final boolean IS_ALIVE = true;
/** Status value for callers to use when setting initially not healthy */
public static final boolean NO_CONTACT = false;
/** The remote directories we are checking on and their health */
protected Map<String, Health> directories = new ConcurrentHashMap<>(100, 0.8f, 3);
/**
* Setup to manage heartbeats to remote directories
*
* @param directoryKey Key for directory I act on behalf of
* @param initialDelaySeconds seconds to wait before initial timer task
* @param intervalSeconds how often the timeer task kicks off
*/
public HeartbeatManager(final String directoryKey, final int initialDelaySeconds, final int intervalSeconds) {
this(directoryKey, null, initialDelaySeconds, intervalSeconds);
}
@Deprecated
@SuppressWarnings("InconsistentOverloads")
public HeartbeatManager(final String directoryKey, @Nullable final List<String> dirList, final int initialDelaySeconds,
final int intervalSeconds) {
this(directoryKey, initialDelaySeconds, intervalSeconds, dirList);
}
/**
* Setup to manage heartbeats to remote directories
*
* @param directoryKey Key for directory I act on behalf of
* @param initialDelaySeconds seconds to wait before initial timer task
* @param intervalSeconds how often the timeer task kicks off
* @param dirList list of directory keys for remote directories
*/
public HeartbeatManager(final String directoryKey, final int initialDelaySeconds, final int intervalSeconds,
@Nullable final List<String> dirList) {
this.initialDelaySeconds = initialDelaySeconds;
this.intervalSeconds = intervalSeconds;
logger.debug("Starting with initialDelay={}, interval={}", initialDelaySeconds, intervalSeconds);
// new daemon timer
this.timer = new Timer("HeartbeatManager", true);
// Save directory key
this.thisDirectory = directoryKey;
// Save each directory in dirList along with good
// starting health value
if (dirList != null) {
for (final String key : dirList) {
addRemoteDirectory(key);
}
}
// "smooth" execution every 30 seconds starting in 2 minutes
this.timer.schedule(new HeartbeatTask(), (this.initialDelaySeconds * 1000L), (this.intervalSeconds * 1000L));
}
/**
* Set the failure threshold
*/
public void setFailThreshold(final int t) {
this.failThreshold = t;
logger.debug("Set new fail threshold to {}", t);
}
/**
* Set the second failure threshold
*/
public void setPermanentFailThreshold(final int t) {
this.permanentFailThreshold = t;
logger.debug("Set new permanent fail threshold to {}", t);
}
/**
* Shutdown processing
*/
public void shutDown() {
this.timer.cancel();
}
/**
* Add another directory to monitor
*
* @param key four-tuple for the remote directory
*/
public void addRemoteDirectory(final String key) {
addRemoteDirectory(key, true);
}
/**
* Add another directory to monitor with initial status
*
* @param key four-tuple for the remote directory
* @param isAlive initial status
*/
public void addRemoteDirectory(final String key, final boolean isAlive) {
// Skip if on same JVM
if (!KeyManipulator.isLocalTo(this.thisDirectory, key)) {
final String dkey = KeyManipulator.getDefaultDirectoryKey(key);
this.directories.put(dkey, new Health(isAlive, "Initial status"));
logger.debug("Added remote {} with initial status {} now monitoring {} remote directories", dkey, isAlive, this.directories.size());
} else {
logger.debug("Skipping local directory {}, is not remote", key);
}
}
/**
* Remove a directory from the monitor list
*
* @param key four-tuple for the remote directory
*/
public void removeRemoteDirectory(final String key) {
this.directories.remove(KeyManipulator.getDefaultDirectoryKey(key));
}
/**
* Access to see if remote is healthy
*
* @param key four-tuple key for remote directory
*/
public boolean isHealthy(final String key) {
final String dirKey = KeyManipulator.getDefaultDirectoryKey(key);
final Health val = this.directories.get(dirKey);
if (val != null) {
return val.isHealthy();
}
return false;
}
/**
* Access to see if remote is alive
*
* @param key four-tuple key for remote directory
*/
public boolean isAlive(final String key) {
final String dirKey = KeyManipulator.getDefaultDirectoryKey(key);
final Health val = this.directories.get(dirKey);
if (val != null) {
return val.isAlive();
}
return false;
}
/**
* Externally set the status. Does not call trigger
*
* @param key directory to set status for
* @param status new status
* @param reason the exception message on bad status
*/
void setHealthStatus(final String key, final boolean status, final String reason) {
final String dirKey = KeyManipulator.getDefaultDirectoryKey(key);
final Health v = this.directories.get(dirKey);
if (v == null) {
logger.error("Not monitoring directory {}", dirKey);
return;
}
logger.debug("Externally setting {} status for {} - {}", status, key, reason);
v.setStatus(status, reason);
}
/**
* Set health status and call the trigger on transition
*
* @param key key for directory
* @param status new status
* @param reason the exception message on bad status
*/
protected void healthReport(final String key, final boolean status, final String reason) {
final String dirKey = KeyManipulator.getDefaultDirectoryKey(key);
final Health v = this.directories.get(dirKey);
if (v == null) {
logger.error("Not monitoring directory {}", dirKey);
return;
}
final boolean wasAlive = v.isAlive();
final boolean wasHealthy = v.isHealthy();
v.addReport(status, reason);
final boolean isAlive = v.isAlive();
final boolean isHealthy = v.isHealthy();
if (logger.isDebugEnabled()) {
logger.debug("Reporting on {} status={}, wasAlive/Healthy={}/{}, isAlive/Healthy={}/{}", key, status, wasAlive, wasHealthy, isAlive,
isHealthy);
}
if (wasAlive && !isAlive) {
// We had a permanent failure.
takeFailureAction(key, true);
} else if (wasHealthy && !isHealthy) {
// We had a negative transition.
takeFailureAction(key, false);
} else if (!wasHealthy && isHealthy) {
// We had a positive transition.
takeSuccessAction(key);
}
}
/**
* Notify our directory that there was a falure
*
* @param key string key of the directory that failed
*/
public void takeFailureAction(final String key, final boolean permanent) {
final String myKey = KeyManipulator.getServiceLocation(this.thisDirectory);
try {
final IRemoteDirectory d = (IRemoteDirectory) Namespace.lookup(myKey);
final int count = d.irdFailDirectory(key, permanent);
logger.info("Notified {} of failed directory {}{}, {} keys removed", myKey, key, (permanent ? " permanently!" : ""), count);
} catch (NamespaceException ne) {
logger.error("Tried to fail a remote directory {} but cannot look up my own directory using {}", key, myKey, ne);
}
}
/**
* Notify our directory that a directory has been contacted. This could initiate a zone transfer or other action
*
* @param key string key of the directory that was contacted
*/
void takeSuccessAction(final String key) {
final String myKey = KeyManipulator.getServiceLocation(this.thisDirectory);
try {
final DirectoryPlace d = (DirectoryPlace) Namespace.lookup(myKey);
if (d.isStaticPeer(key)) {
logger.info("Notifying {} of re-established contact with {}", myKey, key);
d.contactedRemoteDirectory(key);
} else {
logger.info("Ignoring contact with non-configured peer {}", key);
}
} catch (NamespaceException ne) {
logger.error("Tried to reestablish a remote directory " + key + " but cannot look up my own directory using " + myKey, ne);
}
}
/**
* The Task thread, monitors all remote directories
*/
class HeartbeatTask extends TimerTask {
@Override
public void run() {
try {
logger.debug("Running timer task on {} directories", HeartbeatManager.this.directories.size());
for (final String dir : HeartbeatManager.this.directories.keySet()) {
heartbeat(dir);
}
logger.debug("Ending the HeartbeatTask run method");
} catch (RuntimeException e) {
logger.error("Unexpected problem in heartbeat timer", e);
}
}
}
/**
* Send a heartbeat message to the directory represented by key and take follow-on actions as appropriate Called from
* the timer task normally, but can be called externally by the impatient
*
* @see emissary.directory.DirectoryPlace#heartbeatRemoteDirectory(String)
* @param key key representing the directory to heartbeat
* @return true if the directory referenced by key is up
*/
public final boolean heartbeat(final String key) {
boolean isup = false;
try {
logger.debug("Sending heartbeat msg to {}", key);
EmissaryResponse response = getHeartbeat(this.thisDirectory, key);
if (response.getStatus() == 200) {
healthReport(key, true, response.getContentString());
isup = true;
} else {
healthReport(key, false, response.getContentString());
isup = false;
}
} catch (RuntimeException e) {
logger.error("Cannot perform heartbeat", e);
healthReport(key, false, e.getMessage());
isup = false;
}
return isup;
}
public static EmissaryResponse getHeartbeat(String fromPlace, String toPlace) {
return getHeartbeat(fromPlace, toPlace, new EmissaryClient());
}
public static EmissaryResponse getHeartbeat(String fromPlace, String toPlace, EmissaryClient client) {
final String directoryUrl = KeyManipulator.getServiceHostUrl(toPlace);
final HttpPost method = client.createHttpPost(directoryUrl, EmissaryClient.context, "/Heartbeat.action");
final String loc = KeyManipulator.getServiceLocation(toPlace);
final List<NameValuePair> nvps = new ArrayList<>();
nvps.add(new BasicNameValuePair(HeartbeatAdapter.FROM_PLACE_NAME, fromPlace));
nvps.add(new BasicNameValuePair(HeartbeatAdapter.TO_PLACE_NAME, loc));
method.setEntity(new UrlEncodedFormEntity(nvps, StandardCharsets.UTF_8));
return client.send(method);
}
/**
* Holder class for the health information for a single remote directory
*/
class Health {
// Count consecutive failures
private int failCounter = 0;
private String lastMessage;
/**
* Create a new Health object with the specified status and msg
*
* @param isAlive true if the initial status is no failures
* @param msg the initial msg value
*/
public Health(final boolean isAlive, final String msg) {
setStatus(isAlive, msg);
}
/**
* Accumulate another heartbeat result
*
* @param v the most recent status
* @param msg the most recent message
*/
public void addReport(final boolean v, final String msg) {
this.lastMessage = msg;
if (v) {
this.failCounter = 0;
} else {
this.failCounter++;
}
}
/**
* Set the status now
*
* @param isAlive false means permanent failure indicated
* @param message message to asocciate with this statsu
*/
void setStatus(final boolean isAlive, final String message) {
if (!isAlive) {
this.failCounter = HeartbeatManager.this.permanentFailThreshold;
this.lastMessage = message;
} else {
this.failCounter = 0;
this.lastMessage = message;
}
}
/**
* Report our health status
*
* @return true if failed less than threshold times
*/
public boolean isHealthy() {
return this.failCounter < HeartbeatManager.this.failThreshold;
}
/**
* Report our aliveness status
*
* @return true if failed less than permanent threshold times
*/
public boolean isAlive() {
return this.failCounter < HeartbeatManager.this.permanentFailThreshold;
}
/**
* Access to the last saved message
*/
public String getLastMessage() {
return this.lastMessage;
}
}
}