package emissary.pool;
import emissary.core.IMobileAgent;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.util.PayloadUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
* Provide a storage area for incoming "moveTo(here)" payloads so that the http transfer can become more asnychronous.
* This class provides a FIFO for payloads that are arriving and a thread that will put them into agents from the pool
* as agents become available
public class MoveSpool implements Runnable {
// Our logger
private static final Logger logger = LoggerFactory.getLogger(MoveSpool.class);
// The payload FIFO
protected final Deque<SpoolItem> spool = new ArrayDeque<>();
// Reference to the agent pool
protected AgentPool pool;
// The thread that stuffs payloads into pool agents
Thread watcher;
// thread quit control
boolean timeToQuit = false;
// How we want to be registerd in the namespace
public static final String NAMESPACE_NAME = "ArrivalSpool";
// Stats on how many moves and for what types arrived here
public final Map<String, Integer> moveCountMap = new HashMap<>();
// Stats collection
private int highWaterMark = 0;
private static long lookupCount = 0;
private long enqueCount = 0;
private long dequeCount = 0;
// Cached ref to my local directory
IDirectoryPlace localDirectory = null;
// Methods for using the spool to dispatch
public enum Method {
* Make one and bind it in the namespace
public MoveSpool() {
// register this pool in the namespace
Namespace.bind(NAMESPACE_NAME, this);
* Configure stuff
private void configure() {
// Get the agent pool
// start the watcher thread
watcher = new Thread(this, "MoveSpool");
watcher.setPriority(Thread.MAX_PRIORITY - 2);
public void resetPool() {
// grab the default pool
try {
pool = AgentPool.lookup();
logger.debug("Found the AgentPool on MoveSpool#resetPool");
} catch (NamespaceException nex) {
logger.error("Unable to find agent pool, " + "please create the agent pool before creating the MoveSpool");
* Shut down the spooling thread and clear out any remaining payloads.
public void quit() {
logger.warn("Purging the spool...");
synchronized (spool) {
if (!spool.isEmpty()) {
timeToQuit = true;
Namespace.unbind(NAMESPACE_NAME);"Done stopping the move spool");
* Get a reference to the local directory on this machine
* @return a reference to the local directory from the namespace
private IDirectoryPlace getLocalDirectory() {
if (localDirectory == null) {
for (String key : Namespace.keySet()) {
try {
Object value = Namespace.lookup(key);
if (value instanceof IDirectoryPlace) {
localDirectory = (IDirectoryPlace) value;
} catch (NamespaceException ex) {"Problem in namespace", ex);
return localDirectory;
* Run the thread to watch the spool
public void run() {
int consecutiveSendCounter = 0;
// Run until we are told to quit
while (!timeToQuit) {
// Check the spool for work to be done
int sz = 0;
synchronized (spool) {
sz = spool.size();
if (sz == 0) {
// No payloads to look at. Sleep a while
consecutiveSendCounter = 0;
try {
logger.debug("Nothing in spool, time to wait...");
synchronized (spool) {
if (spool.isEmpty()) {
} catch (InterruptedException ignore) {
// Get an agent and a sool item
IMobileAgent agent = null;
SpoolItem item = null;
String itemName = null;
try {
// This may block for the max time the
// pool is configured to use if no
// agents available
agent = pool.borrowAgent();
if (agent == null) {
logger.debug("Got a null agent from pool!");
// Get the oldest payload from the spool
item = removeFirstPayload();
if (item == null) {
logger.debug("Got a null item from move spool!");
// We have both an agent and a spool item
// so hook em up and send it on the way
itemName = PayloadUtil.getName(item.getPayload());
logger.debug("Handing over " + itemName + " to an agent, method=" + item.getMethod());
if (item.getMethod() == Method.GO) {
IServiceProviderPlace place = item.getPlace();
if (place == null) {
place = getLocalDirectory();
Object payload = item.getPayload();
agent.go(payload, place);
} else if (item.getMethod() == Method.ARRIVE) {
agent.arrive(item.getPayload(), item.getPlace(), item.getErrorCount(), item.getItineraryItems());
} else {
logger.error("Illegal spooler method specified " + item.getMethod() + ", payload=" + item.getPayload()
+ " will be irretreivably lost");
if (consecutiveSendCounter % 10 == 0) {
logger.debug("Sent 10 consecutive entries, " + "time to yield the MoveSpool");
} catch (Throwable t) {
if (agent != null) {
logger.error("Unable to start agent, payload " + itemName + " is irretrievably lost", t);
try {
} catch (RuntimeException ex) {
logger.error("Unable to return agent to the pool", ex);
} else {
logger.debug("Cannot get agent from pool, trying again", t);
} finally {
// hold no references to this stuff
agent = null;
item = null;
* Remove the oldest payload item on the spool
* @return SpoolItem from the spool
protected SpoolItem removeFirstPayload() {
SpoolItem s = null;
synchronized (spool) {
// Do some stats
if (spool.size() > highWaterMark) {
highWaterMark = spool.size();
s = spool.removeFirst();
return s;
* Add an item to the spool for sending. Can be the result of a sprout or a new item being ingested into the system. The
* arrivalPlace is null so we call MobileAgent.go rather than MobileAgent.arrive
* @param payload the dataObject or Collection to save
* @return number of items on the queue
public int send(Object payload) {
return enqueue(Method.GO, payload, null, 0, (List<DirectoryEntry>) null);
* Add an item to the spool for sending. Can be the result of a sprout or a new item being ingested into the system. We
* call MobileAgent.go rather than MobileAgent.arrive
* @param payload the dataObject or Collection to save
* @param place the sending or sprouting place reference
* @return number of items on the queue
public int send(Object payload, IServiceProviderPlace place) {
return enqueue(Method.GO, payload, place, 0, (List<DirectoryEntry>) null);
* Add an arriving payload and associated state transfer info to the spool Calls MobileAgent.arrive in this case
* @param payload the data object or Collection to save
* @param place IServiceProviderPlace ref for the agent to visit
* @param errorCount state from the transferred MobileAgent
* @param itineraryItems state from the transferred MobileAgent
* @return number of items in the queue
public int arrive(Object payload, IServiceProviderPlace place, int errorCount, List<DirectoryEntry> itineraryItems) {
return enqueue(Method.ARRIVE, payload, place, errorCount, itineraryItems);
* Add an item to the spool. When an agent becomes available it is assigned to this payload in turn. We call
* MobileAgent.go or MobileAgent.arrive depending on the Method requested
* @param method ARRIVE or GO
* @param payload the data object or Collection to save
* @param place IServiceProviderPlace ref for the agent to visit, possibly null
* @param errorCount state from the transferred MobileAgent or null for GO
* @param itineraryItems state from the transferred MobileAgent or empty for GO
* @return number of items on the queue
protected int enqueue(Method method, Object payload, @Nullable IServiceProviderPlace place, int errorCount,
@Nullable List<DirectoryEntry> itineraryItems) {
String itemName = PayloadUtil.getName(payload);
logger.debug("Enqueue item " + itemName + " for place " + place + ", method=" + method);
SpoolItem s = new SpoolItem(method, payload, place, errorCount, itineraryItems);
int size = 0;
synchronized (spool) {
size = spool.size();
// Collect the stats
synchronized (moveCountMap) {
String serviceName = s.getServiceName();
if (moveCountMap.containsKey(serviceName)) {
Integer count = moveCountMap.get(serviceName);
moveCountMap.put(serviceName, Integer.valueOf(count.intValue() + 1));
} else {
moveCountMap.put(serviceName, Integer.valueOf(1));
logger.debug("Done enqueue of " + itemName + ", size=" + size);
return size;
* Look up the instance in the namespace
public static MoveSpool lookup() throws NamespaceException {
return (MoveSpool) Namespace.lookup(NAMESPACE_NAME);
* Provide a copy of the map for stats gathering applications. This map shows how many items of each type have arrived
* on this node
public Map<String, Integer> getMoveCountMap() {
synchronized (moveCountMap) {
return new HashMap<>(moveCountMap);
* Provide statistics in string form
* @return list of types and counts that have spooled here
public String getStatPairs() {
StringBuilder sb = new StringBuilder();
synchronized (moveCountMap) {
for (String key : moveCountMap.keySet()) {
if (sb.length() > 1) {
return sb.toString();
* To String for the namespace display
public String toString() {
// Spool size is deliberately not synchronized
return "MoveSpool current/high " + spool.size() + "/" + highWaterMark + ", en/dequeue " + enqueCount + "/" + dequeCount + ", serviceNames="
+ getStatPairs();
* Non-public encapsulation of what we need to hold on the spool
protected static class SpoolItem {
final Method method;
final Object payload;
final IServiceProviderPlace place;
final int errorCount;
final List<DirectoryEntry> itineraryItems;
public SpoolItem(Method method, Object payload, IServiceProviderPlace place, int errorCount, List<DirectoryEntry> itineraryItems) {
this.method = method;
this.payload = payload; = place;
this.errorCount = errorCount;
this.itineraryItems = itineraryItems;
* Get the payload
public Object getPayload() {
return payload;
* Get the place
public IServiceProviderPlace getPlace() {
return place;
* Get the error count
public int getErrorCount() {
return errorCount;
* Get serviceName from place key
* @return string service name from key
public String getServiceName() {
if (place != null) {
return KeyManipulator.getServiceName(place.getKey());
return "sprout";
* Get the list of itinerary items
* @return List of DirectoryEntry
public List<DirectoryEntry> getItineraryItems() {
return itineraryItems;
* Get the spool method
public Method getMethod() {
return method;
* Get the lookupCount
* @return the lookupCount
public static long getLookupCount() {
return lookupCount;
* Get the dequeCount
* @return the dequeCount
public long getDequeCount() {
return dequeCount;
* Get the enqueCount
* @return the enqueCount
public long getEnqueCount() {
return enqueCount;
* Get the highWaterMark
* @return the highWaterMark
public int getHighWaterMark() {
return highWaterMark;
public int getCurrentSpoolSize() {
return spool.size();