QueServer.java
package emissary.pickup;
import emissary.core.Pausable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
/**
* Monitor thread for a PickupQueue and return items for processing. Operates in pull mode from a PickupSpace or push
* mode just monitoring the queue
*/
public abstract class QueServer extends Pausable {
// Logger
private static final Logger logger = LoggerFactory.getLogger(QueServer.class);
// Poll interval in millis
public static final long DEFAULT_POLLING_INTERVAL = 1000L;
protected long pollingInterval = DEFAULT_POLLING_INTERVAL;
// Loop control
protected boolean timeToShutdown = false;
// The queue this thread will monitor
protected final PickupQueue queue;
// For Pull mode from a PickupSpace
protected IPickUpSpace space;
/**
* Create
*
* @param space the pickupspace controller
* @param queue the queue this thread monitors
*/
public QueServer(IPickUpSpace space, PickupQueue queue) {
this(space, queue, DEFAULT_POLLING_INTERVAL);
}
/**
* Create with polling interval
*
* @param space the pickupspace controller
* @param queue the queue this thread monitors
* @param pollingInterval value in millis
*/
public QueServer(IPickUpSpace space, PickupQueue queue, long pollingInterval) {
this(space, queue, pollingInterval, "PickupQueServer");
}
/**
* Create with polling interval and thread name
*
* @param space the pickupspace controller
* @param queue the queue this thread monitors
* @param pollingInterval value in millis
* @param name value to supply to Thread name
*/
@SuppressWarnings("ThreadPriorityCheck")
public QueServer(IPickUpSpace space, PickupQueue queue, long pollingInterval, String name) {
super(name);
this.space = space;
this.queue = queue;
this.pollingInterval = pollingInterval;
this.setPriority(Thread.NORM_PRIORITY + 1);
}
/**
* Processing loop to monitor the queue
*/
@Override
public void run() {
logger.debug("Starting the QueServer run method");
while (!timeToShutdown) {
// Process something on the queue
try {
checkQue();
} catch (RuntimeException e) {
logger.warn("Exception in checkQue():" + e, e);
}
if (checkPaused()) {
// check to see if we want to stop taking work
continue;
} else if (space.getSpaceCount() > 0 && queue.canHold(1)) {
// If pull mode and we have room for one more.
logger.debug("Que can hold more, trying take()");
boolean status = space.take();
if (status) {
try {
Thread.sleep(pollingInterval);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
continue;
}
} else {
// We must be in push mode or the queue is full,
// just monitor the queue and try again
logger.debug("Que full or push mode, waiting, space = " + space + " spacenames = " + space.getSpaceNames() + ", queCanHold(1)? = "
+ queue.canHold(1));
try {
synchronized (queue) {
queue.wait(pollingInterval);
}
} catch (InterruptedException e) {
logger.debug("Woke me up so lets check the queue!");
Thread.currentThread().interrupt();
}
}
}
logger.debug("Off the end of the QueServer.run method");
}
/**
* Check the queue for waiting objects and process them
*/
@SuppressWarnings("ThreadPriorityCheck")
public void checkQue() {
WorkBundle paths = queue.deque();
while (paths != null) {
logger.debug("checkQue got a work bundle " + paths);
// We have work so parse it out and wait for the next agent.
// This will send the work on the agent's thread.
// Once the agents are sent we notify the
// workspace of completion of this bundle
try {
boolean status = processQueueItem(paths);
logger.debug("Initiating bundle completed msg for {}, status={}", paths.getBundleId(), status);
space.bundleCompleted(paths.getBundleId(), status);
} catch (RuntimeException e) {
StringBuilder fnb = new StringBuilder();
// Report filenames on error
for (Iterator<String> i = paths.getFileNameIterator(); i.hasNext();) {
String fn = i.next();
fnb.append(fn).append(",");
}
logger.warn("Processing exception on {}", fnb.toString(), e);
logger.debug("Initiating bundle failed msg for {}", paths.getBundleId());
space.bundleCompleted(paths.getBundleId(), false);
}
// Yield but don't go back to sleep if
// there is still work to do
Thread.yield();
paths = queue.deque();
}
logger.debug("QueServer.checkQue ran out of data");
}
/**
* Action to take when an item is removed from queue
*
* @param path the bundle from the queue
* @return true if it worked
*/
public abstract boolean processQueueItem(WorkBundle path);
/**
* Schedule this thread to stop soon
*/
public void shutdown() {
this.timeToShutdown = true;
}
/**
* Pass through to get size of injected queue
*
* @return size of the queue
*/
public int getQueSize() {
return queue.getQueSize();
}
/**
* Pass through to enqueue a work bundle on the injected queue
*
* @param bundle the work bundle to enqueue
* @return true if it was enqueued, false if we are too busy to handle it
*/
public boolean enque(WorkBundle bundle) {
return queue.enque(bundle);
}
}