MonitorCommand.java
package emissary.command;
import emissary.client.EmissaryClient;
import emissary.client.response.BaseResponseEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Option;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
public abstract class MonitorCommand<T extends BaseResponseEntity> extends HttpCommand {
static final Logger LOG = LoggerFactory.getLogger(MonitorCommand.class);
public static final String COMMAND_NAME = "MonitorCommand";
private final Object lock = new Object();
@Option(names = {"--mon"},
description = "runs the agents command in monitor mode, executing every 30 seconds by default\nDefault: ${DEFAULT-VALUE}")
private boolean monitor = false;
@Option(names = {"-i", "--interval"}, description = "how many seconds to wait between each endpoint call\nDefault: ${DEFAULT-VALUE}")
private int sleepInterval = 30;
@Option(names = {"--cluster"}, description = "sets endpoint to clustered mode\nDefault: ${DEFAULT-VALUE}")
private boolean clustered = false;
public boolean getMonitor() {
return monitor;
}
public int getSleepInterval() {
return sleepInterval;
}
public boolean getClustered() {
return clustered;
}
public abstract T sendRequest(EmissaryClient client, String endpoint);
public abstract String getTargetEndpoint();
@Override
public void run(CommandLine c) {
setup();
try {
do {
LOG.info(Instant.now().toString());
collectEndpointData();
if (getMonitor()) {
TimeUnit.SECONDS.sleep(getSleepInterval());
}
} while (getMonitor());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void collectEndpointData() {
EmissaryClient client = new EmissaryClient();
T entity = sendRequest(client, buildEndpoint(getHost(), getPort()));
try {
if (getClustered()) {
sendClusterRequests(client, entity);
}
} catch (IOException e) {
LOG.error("Problem generating peer list. Something is very wrong.");
}
displayEntityResults(entity);
}
private void sendClusterRequests(final EmissaryClient client, final T entity) throws IOException {
PeersCommand.getPeers(getHostAndPort(), true).parallelStream().forEach(hostAndPort -> {
try {
String endpoint = buildEndpoint(hostAndPort);
T response = sendRequest(client, endpoint);
synchronized (lock) {
entity.append(response);
}
} catch (RuntimeException e) {
LOG.error("Problem hitting agents endpoint: {}\n{}", hostAndPort, e.getMessage());
synchronized (lock) {
entity.addError(e.getMessage());
}
}
});
}
// Here as a hook in case commands have summarize/custom display options
@SuppressWarnings("SystemOut")
protected void displayEntityResults(T entity) {
entity.dumpToConsole();
for (String error : entity.getErrors()) {
System.err.print(error);
}
}
private String buildEndpoint(final String host, final int port) {
return buildEndpoint(host + ":" + port);
}
private String buildEndpoint(final String hostAndPort) {
return getScheme() + "://" + hostAndPort + "/" + getTargetEndpoint();
}
}