MoveSpool.java
package emissary.pool;
import emissary.core.IMobileAgent;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.directory.DirectoryEntry;
import emissary.directory.IDirectoryPlace;
import emissary.directory.KeyManipulator;
import emissary.place.IServiceProviderPlace;
import emissary.util.PayloadUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/**
* Provide a storage area for incoming "moveTo(here)" payloads so that the http transfer can become more asnychronous.
* This class provides a FIFO for payloads that are arriving and a thread that will put them into agents from the pool
* as agents become available
*/
public class MoveSpool implements Runnable {
// Our logger
private static final Logger logger = LoggerFactory.getLogger(MoveSpool.class);
// The payload FIFO
protected final Deque<SpoolItem> spool = new ArrayDeque<>();
// Reference to the agent pool
protected AgentPool pool;
// The thread that stuffs payloads into pool agents
Thread watcher;
// thread quit control
boolean timeToQuit = false;
// How we want to be registerd in the namespace
public static final String NAMESPACE_NAME = "ArrivalSpool";
// Stats on how many moves and for what types arrived here
public final Map<String, Integer> moveCountMap = new HashMap<>();
// Stats collection
private int highWaterMark = 0;
@SuppressWarnings("NonFinalStaticField")
private static long lookupCount = 0;
private long enqueCount = 0;
private long dequeCount = 0;
// Cached ref to my local directory
@Nullable
IDirectoryPlace localDirectory = null;
// Methods for using the spool to dispatch
public enum Method {
ARRIVE, GO
}
/**
* Make one and bind it in the namespace
*/
public MoveSpool() {
configure();
// register this pool in the namespace
Namespace.bind(NAMESPACE_NAME, this);
}
/**
* Configure stuff
*/
@SuppressWarnings("ThreadPriorityCheck")
private void configure() {
// Get the agent pool
resetPool();
// start the watcher thread
watcher = new Thread(this, "MoveSpool");
watcher.setPriority(Thread.MAX_PRIORITY - 2);
watcher.setDaemon(true);
watcher.start();
}
public void resetPool() {
// grab the default pool
try {
pool = AgentPool.lookup();
logger.debug("Found the AgentPool on MoveSpool#resetPool");
} catch (NamespaceException nex) {
logger.error("Unable to find agent pool, " + "please create the agent pool before creating the MoveSpool");
}
}
/**
* Shut down the spooling thread and clear out any remaining payloads.
*/
public void quit() {
logger.warn("Purging the spool...");
synchronized (spool) {
if (!spool.isEmpty()) {
spool.clear();
}
spool.notifyAll();
}
timeToQuit = true;
Namespace.unbind(NAMESPACE_NAME);
logger.info("Done stopping the move spool");
}
/**
* Get a reference to the local directory on this machine
*
* @return a reference to the local directory from the namespace
*/
private IDirectoryPlace getLocalDirectory() {
if (localDirectory == null) {
for (String key : Namespace.keySet()) {
try {
Object value = Namespace.lookup(key);
if (value instanceof IDirectoryPlace) {
localDirectory = (IDirectoryPlace) value;
break;
}
} catch (NamespaceException ex) {
logger.info("Problem in namespace", ex);
}
}
}
return localDirectory;
}
/**
* Run the thread to watch the spool
*/
@Override
@SuppressWarnings("ThreadPriorityCheck")
public void run() {
int consecutiveSendCounter = 0;
// Run until we are told to quit
while (!timeToQuit) {
// Check the spool for work to be done
int sz = 0;
synchronized (spool) {
sz = spool.size();
}
if (sz == 0) {
// No payloads to look at. Sleep a while
consecutiveSendCounter = 0;
try {
logger.debug("Nothing in spool, time to wait...");
Thread.yield();
synchronized (spool) {
if (spool.isEmpty()) {
spool.wait(60000);
}
}
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
continue;
}
// Get an agent and a sool item
IMobileAgent agent = null;
SpoolItem item = null;
String itemName = null;
try {
// This may block for the max time the
// pool is configured to use if no
// agents available
agent = pool.borrowAgent();
if (agent == null) {
logger.debug("Got a null agent from pool!");
continue;
}
// Get the oldest payload from the spool
item = removeFirstPayload();
if (item == null) {
logger.debug("Got a null item from move spool!");
pool.returnAgent(agent);
continue;
}
// We have both an agent and a spool item
// so hook em up and send it on the way
itemName = PayloadUtil.getName(item.getPayload());
logger.debug("Handing over " + itemName + " to an agent, method=" + item.getMethod());
if (item.getMethod() == Method.GO) {
IServiceProviderPlace place = item.getPlace();
if (place == null) {
place = getLocalDirectory();
}
Object payload = item.getPayload();
agent.go(payload, place);
consecutiveSendCounter++;
} else if (item.getMethod() == Method.ARRIVE) {
agent.arrive(item.getPayload(), item.getPlace(), item.getErrorCount(), item.getItineraryItems());
consecutiveSendCounter++;
} else {
logger.error("Illegal spooler method specified " + item.getMethod() + ", payload=" + item.getPayload()
+ " will be irretreivably lost");
}
if (consecutiveSendCounter % 10 == 0) {
logger.debug("Sent 10 consecutive entries, " + "time to yield the MoveSpool");
Thread.yield();
}
} catch (Throwable t) {
if (agent != null) {
logger.error("Unable to start agent, payload " + itemName + " is irretrievably lost", t);
try {
pool.returnAgent(agent);
} catch (RuntimeException ex) {
logger.error("Unable to return agent to the pool", ex);
}
} else {
logger.debug("Cannot get agent from pool, trying again", t);
}
} finally {
// hold no references to this stuff
agent = null;
item = null;
}
}
}
/**
* Remove the oldest payload item on the spool
*
* @return SpoolItem from the spool
*/
protected SpoolItem removeFirstPayload() {
SpoolItem s = null;
synchronized (spool) {
// Do some stats
if (spool.size() > highWaterMark) {
highWaterMark = spool.size();
}
s = spool.removeFirst();
dequeCount++;
}
return s;
}
/**
* Add an item to the spool for sending. Can be the result of a sprout or a new item being ingested into the system. The
* arrivalPlace is null so we call MobileAgent.go rather than MobileAgent.arrive
*
* @param payload the dataObject or Collection to save
* @return number of items on the queue
*/
public int send(Object payload) {
return enqueue(Method.GO, payload, null, 0, (List<DirectoryEntry>) null);
}
/**
* Add an item to the spool for sending. Can be the result of a sprout or a new item being ingested into the system. We
* call MobileAgent.go rather than MobileAgent.arrive
*
* @param payload the dataObject or Collection to save
* @param place the sending or sprouting place reference
* @return number of items on the queue
*/
public int send(Object payload, IServiceProviderPlace place) {
return enqueue(Method.GO, payload, place, 0, (List<DirectoryEntry>) null);
}
/**
* Add an arriving payload and associated state transfer info to the spool Calls MobileAgent.arrive in this case
*
* @param payload the data object or Collection to save
* @param place IServiceProviderPlace ref for the agent to visit
* @param errorCount state from the transferred MobileAgent
* @param itineraryItems state from the transferred MobileAgent
* @return number of items in the queue
*/
public int arrive(Object payload, IServiceProviderPlace place, int errorCount, List<DirectoryEntry> itineraryItems) {
return enqueue(Method.ARRIVE, payload, place, errorCount, itineraryItems);
}
/**
* Add an item to the spool. When an agent becomes available it is assigned to this payload in turn. We call
* MobileAgent.go or MobileAgent.arrive depending on the Method requested
*
* @param method ARRIVE or GO
* @param payload the data object or Collection to save
* @param place IServiceProviderPlace ref for the agent to visit, possibly null
* @param errorCount state from the transferred MobileAgent or null for GO
* @param itineraryItems state from the transferred MobileAgent or empty for GO
* @return number of items on the queue
*/
protected int enqueue(Method method, Object payload, @Nullable IServiceProviderPlace place, int errorCount,
@Nullable List<DirectoryEntry> itineraryItems) {
String itemName = PayloadUtil.getName(payload);
logger.debug("Enqueue item " + itemName + " for place " + place + ", method=" + method);
SpoolItem s = new SpoolItem(method, payload, place, errorCount, itineraryItems);
int size = 0;
synchronized (spool) {
spool.addLast(s);
enqueCount++;
size = spool.size();
spool.notifyAll();
}
// Collect the stats
synchronized (moveCountMap) {
String serviceName = s.getServiceName();
if (moveCountMap.containsKey(serviceName)) {
Integer count = moveCountMap.get(serviceName);
moveCountMap.put(serviceName, Integer.valueOf(count.intValue() + 1));
} else {
moveCountMap.put(serviceName, Integer.valueOf(1));
}
}
logger.debug("Done enqueue of " + itemName + ", size=" + size);
return size;
}
/**
* Look up the instance in the namespace
*/
public static MoveSpool lookup() throws NamespaceException {
lookupCount++;
return (MoveSpool) Namespace.lookup(NAMESPACE_NAME);
}
/**
* Provide a copy of the map for stats gathering applications. This map shows how many items of each type have arrived
* on this node
*/
public Map<String, Integer> getMoveCountMap() {
synchronized (moveCountMap) {
return new HashMap<>(moveCountMap);
}
}
/**
* Provide statistics in string form
*
* @return list of types and counts that have spooled here
*/
public String getStatPairs() {
StringBuilder sb = new StringBuilder();
sb.append("{");
synchronized (moveCountMap) {
for (String key : moveCountMap.keySet()) {
if (sb.length() > 1) {
sb.append(",");
}
sb.append(key).append("=").append(moveCountMap.get(key));
}
}
sb.append("}");
return sb.toString();
}
/**
* To String for the namespace display
*/
@Override
public String toString() {
// Spool size is deliberately not synchronized
return "MoveSpool current/high " + spool.size() + "/" + highWaterMark + ", en/dequeue " + enqueCount + "/" + dequeCount + ", serviceNames="
+ getStatPairs();
}
/**
* Non-public encapsulation of what we need to hold on the spool
*/
protected static class SpoolItem {
final Method method;
final Object payload;
final IServiceProviderPlace place;
final int errorCount;
final List<DirectoryEntry> itineraryItems;
public SpoolItem(Method method, Object payload, IServiceProviderPlace place, int errorCount, List<DirectoryEntry> itineraryItems) {
this.method = method;
this.payload = payload;
this.place = place;
this.errorCount = errorCount;
this.itineraryItems = itineraryItems;
}
/**
* Get the payload
*/
public Object getPayload() {
return payload;
}
/**
* Get the place
*/
public IServiceProviderPlace getPlace() {
return place;
}
/**
* Get the error count
*/
public int getErrorCount() {
return errorCount;
}
/**
* Get serviceName from place key
*
* @return string service name from key
*/
public String getServiceName() {
if (place != null) {
return KeyManipulator.getServiceName(place.getKey());
}
return "sprout";
}
/**
* Get the list of itinerary items
*
* @return List of DirectoryEntry
*/
public List<DirectoryEntry> getItineraryItems() {
return itineraryItems;
}
/**
* Get the spool method
*/
public Method getMethod() {
return method;
}
}
/**
* Get the lookupCount
*
* @return the lookupCount
*/
public static long getLookupCount() {
return lookupCount;
}
/**
* Get the dequeCount
*
* @return the dequeCount
*/
public long getDequeCount() {
return dequeCount;
}
/**
* Get the enqueCount
*
* @return the enqueCount
*/
public long getEnqueCount() {
return enqueCount;
}
/**
* Get the highWaterMark
*
* @return the highWaterMark
*/
public int getHighWaterMark() {
return highWaterMark;
}
public int getCurrentSpoolSize() {
return spool.size();
}
}