ResourceWatcher.java
package emissary.core;
import emissary.place.IServiceProviderPlace;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* Track mobile agents and make them obey resource limitations
*/
public class ResourceWatcher implements Runnable {
protected static final Logger LOG = LoggerFactory.getLogger(ResourceWatcher.class);
public static final String DEFAULT_NAMESPACE_NAME = "ResourceWatcher";
// This is a default that can be modified for every place
protected long timeLimitMillis = TimeUnit.SECONDS.toMillis(30);
// Time limit can be overidden per place by config
// This structure is a cache of place timeouts that have
// been gathered, stored by class place name (not class name,
// as some classes function in different ways (e.g. UnixCommandPlace)
protected Map<String, Long> placeTimeLimits = new ConcurrentHashMap<>();
// The thread we plan to run on
@Nullable
protected transient Thread monitor = null;
// Loop control
protected boolean timeToQuit = false;
protected MetricRegistry metrics;
protected MetricsFormatter metricsFormatter = MetricsFormatter.builder().withDurationUnit(TimeUnit.MILLISECONDS).withRateUnit(TimeUnit.SECONDS)
.build();
// Things we are tracking
protected Queue<TimedResource> tracking = new LinkedBlockingQueue<>();
public ResourceWatcher() {
this(new MetricsManager());
}
/**
* Create a resource watcher set it running and bind into the NamespaceException
*/
@SuppressWarnings("ThreadPriorityCheck")
public ResourceWatcher(final MetricsManager metricsManager) {
this.metrics = metricsManager.getMetricRegistry();
final Thread thread = new Thread(this, "ResourceWatcher");
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(true);
Namespace.bind(DEFAULT_NAMESPACE_NAME, this);
thread.start();
}
private long getPlaceDuration(final IServiceProviderPlace place) {
String placeName = place.getPlaceName();
Long allowedDuration = placeTimeLimits.get(placeName);
// Read and cache the duration for this place
if (allowedDuration == null) {
final long d = place.getResourceLimitMillis();
allowedDuration = d >= -1 ? d : timeLimitMillis;
placeTimeLimits.put(placeName, allowedDuration);
}
return allowedDuration;
}
/**
* Register an agent to start tracking it
*
* @param agent the agent to track
* @param place place executing
* @return TimedResource for the place and agent
*/
public TimedResource starting(final IMobileAgent agent, final IServiceProviderPlace place) {
TimedResource tr = new TimedResource(agent, place, getPlaceDuration(place), metrics.timer(place.getPlaceName()));
tracking.offer(tr);
return tr;
}
/**
* Lookup the default ResourceWatcher in the Namespace
*
* @return The registered ResourceWatcher
*/
public static ResourceWatcher lookup() throws NamespaceException {
return (ResourceWatcher) Namespace.lookup(DEFAULT_NAMESPACE_NAME);
}
/**
* Safely stop the monitoring Thread
*/
public void quit() {
LOG.info("Stopping resource watcher...");
this.timeToQuit = true;
}
/**
* Set the default time limit in millis
*
* @param limit the new value
*/
public void setTimeLimitMillis(final long limit) {
this.timeLimitMillis = limit;
}
/**
* Get the default time limit in millis
*
* @return time limit
*/
public long getTimeLimitMillis() {
return this.timeLimitMillis;
}
/**
* Runnable interface where we get to monitor stuff
*/
@Override
public void run() {
LOG.debug("ResourceWatcher is starting");
while (!this.timeToQuit) {
// Delay this loop
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
Iterator<TimedResource> it = tracking.iterator();
while (it.hasNext()) {
final long now = System.currentTimeMillis();
final TimedResource val = it.next();
if (val.checkState(now)) {
it.remove();
}
}
}
Namespace.unbind(DEFAULT_NAMESPACE_NAME);
LOG.info("Resource watcher stopped.");
}
public void logStats(final Logger loggerArg) {
for (final Map.Entry<String, Timer> e : this.metrics.getTimers().entrySet()) {
// We only want to log stats for places that have had events
if (e.getValue().getCount() > 0) {
loggerArg.info(this.metricsFormatter.formatTimer(e.getKey(), e.getValue()));
}
}
}
public void resetStats() {
// We use reflection to reset the histograms that track finished events, but leaves the namespace for active timers
for (Timer timer : this.metrics.getTimers().values()) {
try {
Field histogramField = Timer.class.getDeclaredField("histogram");
histogramField.setAccessible(true);
histogramField.set(timer, new Histogram(new ExponentiallyDecayingReservoir()));
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error("Issue resetting placeStats in ResourceWatcher", e);
}
}
}
public SortedMap<String, Timer> getStats() {
return this.metrics.getTimers();
}
public Timer getStat(final String statKey) {
return this.metrics.timer(statKey);
}
@Override
public String toString() {
return "Watching " + this.tracking.size() + " agents with default time limit " + this.timeLimitMillis + "ms";
}
}