HDMobileAgent.java
package emissary.core;
import emissary.directory.DirectoryEntry;
import emissary.directory.KeyManipulator;
import emissary.log.MDCConstants;
import emissary.place.EmptyFormPlace;
import emissary.place.IServiceProviderPlace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
/**
* This mobile agent carries around an ArrayList of payload that can be added onto instead of sprouting. The agent is
* responsible for getting all payloads on the list processed to completion before going idle
*/
public class HDMobileAgent extends MobileAgent {
// Our logger, shadow the superclass for our own name in the log
protected static final Logger logger = LoggerFactory.getLogger(HDMobileAgent.class);
// Serializability
static final long serialVersionUID = 786319119844306571L;
// What we carry around with us
protected List<IBaseDataObject> payloadList = Collections.synchronizedList(new ArrayList<>());
/**
* Still have the uncaught exception handler but not really in a true ThreadGroup
*/
public HDMobileAgent() {
super();
}
/**
* Constructor for the factory, a reusable HD Agent
*/
public HDMobileAgent(final ThreadGroup threadGroup, final String threadName) {
super(threadGroup, threadName);
logger.debug("Constructed HD agent {}", threadName);
}
/**
* Override getPayload to just return the first on list or null
*/
@Override
public synchronized IBaseDataObject getPayload() {
return getPayload(0);
}
/**
* New method to get a payload from the list
*
* @param num the specified payload
*/
@Nullable
public synchronized IBaseDataObject getPayload(final int num) {
if (this.payloadList == null || this.payloadList.size() <= num) {
return null;
}
return this.payloadList.get(num);
}
/**
* Add payload to the list, warn if not empty
*
* @param p the payload, clear list if null to retain previous behavior
*/
@Override
protected synchronized void setPayload(@Nullable final IBaseDataObject p) {
if (p == null) {
this.payloadList.clear();
super.setPayload(null);
return;
}
if (payloadCount() != 0) {
logger.warn("Unanticipated call to psetPayload when payloadList is not empty.");
}
addPayload(p);
}
/**
* Add a new payload (i.e. instead of sprouting a new agent for it)
*
* @param p the new payload
* @return true
*/
public synchronized boolean addPayload(final IBaseDataObject p) {
return this.payloadList.add(p);
}
/**
* Add a collection of new payload objects
*
* @param c the collection to add
* @return true
*/
public synchronized boolean addPayload(final Collection<IBaseDataObject> c) {
return this.payloadList.addAll(c);
}
/**
* Get number of payload objects on list
*/
@Override
public synchronized int payloadCount() {
return this.payloadList.size();
}
/**
* Clear the payloadList and all other state info
*/
@Override
protected synchronized void clear() {
super.clear();
this.payloadList.clear();
}
/**
* The arrive method that takes in a list of payloads arriving on the new machine
*
* @param payload the real payload, exisitng if any will be cleared
* @param arrivalPlace the place we start at
* @param moveErrorCount state transfer from sending agent
* @param queuedItineraryItems state transfer from sending agent
*/
@Override
@SuppressWarnings("unchecked")
public synchronized void arrive(final Object payload, final IServiceProviderPlace arrivalPlace, final int moveErrorCount,
final List<DirectoryEntry> queuedItineraryItems) throws Exception {
logger.debug("Arrived at {}", arrivalPlace.toString());
clear();
moveErrorsOccurred = moveErrorCount;
nextKeyQueue.addAll(queuedItineraryItems);
if (payload instanceof IBaseDataObject) {
go(payload, arrivalPlace, true);
} else if (payload instanceof Collection) {
addPayload((Collection<IBaseDataObject>) payload);
setAgentId(getPayload().shortName());
go(null, arrivalPlace, true);
} else {
throw new Exception("Illegal payload sent to HDMobileAgent, cannot handle " + payload.getClass().getName());
}
}
@Override
@SuppressWarnings("unchecked")
public synchronized void go(final Object payload, final IServiceProviderPlace arrivalPlace) {
if (payload instanceof IBaseDataObject) {
super.go(payload, arrivalPlace);
} else if (payload instanceof Collection) {
addPayload((Collection<IBaseDataObject>) payload);
setAgentId(getPayload().shortName());
go(null, arrivalPlace, false);
} else {
logger.error("Illegal payload sent to HDMobileAgent, cannot handle {}", payload.getClass().getName());
}
}
/**
* The main control loop to determine and go through an itinerary Since we have a list of payload object that can
* potentially grow at every place we visit, we need to iterate over them until we get done but we cannot use a normal
* iterator which will throw an exception if the underlying collection mutates while iterating. So we make the selection
* of the first payload that needs work, find the place for it, process all other payloads that have the same form and
* lastPlace, then move on.
*/
@Override
protected void agentControl(final IServiceProviderPlace currentPlaceArg) {
DirectoryEntry newEntry = currentPlaceArg.getDirectoryEntry();
logger.debug("In agentControlHD {} for {}", currentPlaceArg, agentId);
// Set into the super classes payload member...
IBaseDataObject mypayload = getPayload();
// Go until all the payloads disappear or get done
IServiceProviderPlace currentPlace = currentPlaceArg;
int loopCount = 0;
boolean nextKeyRecorded = true;
boolean controlError = false;
while (currentPlace != null && newEntry != null && mypayload != null) {
// One based loop counter
loopCount++;
// Remember the payload's form and last place before
// doing the processing
final String primaryCurrentForm = mypayload.currentForm();
final DirectoryEntry primaryLastEntry = mypayload.getLastPlaceVisited();
if (logger.isDebugEnabled()) {
logger.debug("Starting control loop for {}, currentPlace={}, newEntry= {}, loopCount={}", mypayload.shortName(),
currentPlace.getKey(), newEntry.getFullKey(), loopCount);
}
// First time in, we just have the pickup place where we started
// our mission. We dont process there, just use it to call through
// to the directory, so skip the processing if this is true
if ((loopCount > 1 || getProcessFirstPlace()) && !controlError) {
// If we are at IO phase, add them all since the deferrals
// below should make everyone ready to drop off at the same time
if ("IO".equals(currentPlace.getDirectoryEntry().getServiceType())) {
// Drop off doesn't sprout so ignore return value
if (!nextKeyRecorded) {
logger.debug("Recording history drop off case");
recordHistory(newEntry, this.payloadList);
nextKeyRecorded = true;
}
atPlaceHD(currentPlace, this.payloadList);
} else {
// Add the primary payload object to a list
final List<IBaseDataObject> toBeProcessed = new ArrayList<>();
toBeProcessed.add(mypayload);
// Add any other payload that has the same current form
// and last place visited as this one while we are here...
for (final IBaseDataObject slug : this.payloadList) {
final DirectoryEntry slugLastPlaceVisited = slug.getLastPlaceVisited();
if (slug != mypayload
&& slug.searchCurrentForm(primaryCurrentForm) > -1
&& ((primaryLastEntry == null && slugLastPlaceVisited == null) || (primaryLastEntry != null
&& slugLastPlaceVisited != null && slugLastPlaceVisited.getKey().equals(primaryLastEntry.getKey())))) {
// We don't need to call getNextKey but do
// need to simulate this side effect of it...
slug.pullFormToTop(primaryCurrentForm);
toBeProcessed.add(slug);
if (logger.isDebugEnabled()) {
logger.debug("Adding slug {} with key {} to ride with {} having key {} current form {}", slug.shortName(),
(slugLastPlaceVisited == null ? "null" : slugLastPlaceVisited.getKey()), mypayload.shortName(),
(primaryLastEntry == null ? "null" : primaryLastEntry.getKey()), primaryCurrentForm);
}
}
}
// Process everything on the list
if (!nextKeyRecorded) {
if (loopCount == 1 && !getProcessFirstPlace()) {
// ArrivalPlace for MobileAgent.send()
logger.debug("Recording history two normal loop-1 case");
recordHistory(currentPlace, toBeProcessed);
} else {
logger.debug("Recording history two non-loop-1 case");
recordHistory(newEntry, toBeProcessed);
}
nextKeyRecorded = true;
}
final List<IBaseDataObject> sprouts = atPlaceHD(currentPlace, toBeProcessed);
// Add any sprouts collected from the payloads
if (!sprouts.isEmpty()) {
addPayload(sprouts);
}
}
}
// Where to go next...
controlError = false;
newEntry = getNextKey(currentPlace, mypayload);
nextKeyRecorded = false;
// Defer IO phase for now if there are attachments to process
// and we aren't already in the io phase
if ((newEntry != null) && (payloadCount() > 1) && "IO".equals(newEntry.getServiceType())
&& !"IO".equals(currentPlace.getDirectoryEntry().getServiceType())) {
logger.debug("Deferring IO Phase place for {}", newEntry);
newEntry = null;
} else {
if (newEntry != null) {
logger.debug("Continuing with place {}", newEntry);
}
}
// Choose the first place on the list that
// doesn't have a null nextKey when we run out
// of key for the one we were working on intitially
DirectoryEntry dropOffEntry = null;
int haveDropOffFor = -1;
if (newEntry == null) {
logger.debug("Got null newEntry for {} looking for a better payload...", mypayload.shortName());
for (int i = 0; i < payloadCount(); i++) {
final IBaseDataObject p = getPayload(i);
if (p == mypayload) {
continue;
}
setParallelTrackingInfoFor(p);
newEntry = getNextKey(currentPlace, p);
if (newEntry != null) {
// Defer IO Phase until sure we are all done
if ("IO".equals(newEntry.getServiceType())) {
logger.debug("Found IO service for part {}, {} deferring that and continuing to look", i, p.shortName());
if (haveDropOffFor == -1) {
haveDropOffFor = i;
dropOffEntry = newEntry;
}
newEntry = null;
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("Found good key {} for new payload {}, serviceName={}", newEntry, p.shortName(), newEntry.getServiceName());
}
// Found a new top dog to process
// Pull it to the top of the list in case we have to move
if (i != 0) {
switchPrimaryPayload(i);
}
// Remember it for processing
mypayload = p;
break; // out of the for loop
}
}
}
// Reset drop off if we deferred it above and found nothing better
if (newEntry == null && haveDropOffFor > -1) {
// Pull entry to top
if (haveDropOffFor != 0) {
switchPrimaryPayload(haveDropOffFor);
mypayload = getPayload(0);
setParallelTrackingInfoFor(mypayload);
logger.debug("Pulling payload {} to top before IO reinstatement", haveDropOffFor);
}
// Set newEntry and go to drop off, deferred as long as possible
newEntry = dropOffEntry;
logger.debug("Resetting newEntry to IO phase");
}
// Null entry at this point means we are all done
// with all the payloads, normal processing termination
if (newEntry == null) {
break;
}
// Local processing, go around the loop and process there
if (newEntry.isLocal()) {
logger.debug("Choosing local place {}", newEntry.getFullKey());
currentPlace = newEntry.getLocalPlace();
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("Recording history one move case loopCount={} getProcessFirstPlace={} currentPlace={} newEntry={}", loopCount,
getProcessFirstPlace(), currentPlace, newEntry.getFullKey());
}
// Time to move, entry is remote, record the history and go
recordHistory(newEntry, mypayload);
nextKeyRecorded = true;
controlError = true;
if (!KeyManipulator.isKeyComplete(mypayload.currentForm())) {
mypayload.replaceCurrentForm(ERROR_FORM);
} else {
mypayload.popCurrentForm();
}
}
// If null we are completely finished, otherwise we
// should just be moving to another machine
if (newEntry == null) {
logAgentCompletion();
}
}
/**
* Make the payload at the specified index the new primary one and reset the logger context to the new value
*
* @param i the index of the desired payload
*/
protected void switchPrimaryPayload(final int i) {
synchronized (this) {
// Pull them both
final IBaseDataObject oldTop = getPayload(0);
final IBaseDataObject p = getPayload(i);
// Swap them
this.payloadList.set(0, p);
this.payloadList.set(i, oldTop);
// switch logger context
MDC.put(MDCConstants.SHORT_NAME, p.shortName());
}
}
/**
* Do work now that we have arrived at the specified place
*
* @param place the place we are asking to work for us
* @param payloadListArg list of IBaseDataObject for the place to operate on
* @return list of "sprouted" payloads
*/
@SuppressWarnings("MemberName")
protected List<IBaseDataObject> atPlaceHD(final IServiceProviderPlace place, final List<IBaseDataObject> payloadListArg) {
MDC.put(MDCConstants.SERVICE_LOCATION, place.toString());
logger.debug("In atPlaceHD {} with {} payload items", place, payloadListArg.size());
List<IBaseDataObject> ret = Collections.emptyList();
try (TimedResource tr = resourceWatcherStart(place)) {
assert tr != null; // to silence an unused resource warning
// Process and get back a list of sprouted payloads
lastPlaceProcessed = place.getDirectoryEntry().getKey();
if (moveErrorsOccurred > 0) {
addMoveErrorCount(payloadListArg);
}
ret = place.agentProcessHeavyDuty(payloadListArg);
for (Iterator<IBaseDataObject> it = ret.iterator(); it.hasNext();) {
final IBaseDataObject ibdo = it.next();
if (ibdo == null) {
logger.error("{} violated contract and returned null IBaseDataObject. Child counts and IDs may be inconsistent.", place);
it.remove();
}
}
if (moveErrorsOccurred > 0) {
deleteMoveErrorCount(payloadListArg);
}
logger.debug("done with agentProcessHD for {} with {} sprouted results along for the ride", place, ret.size());
} catch (Throwable problem) {
logger.warn("**{} caught {} with {} payloads", place, problem, payloadListArg.size(), problem);
// We don't know here which one of the items on the list
// caused the exception so we are going to error them all
// If place providers don't catch their own exceptions
// we are conservative in what we think is safe to do here
for (final IBaseDataObject p : payloadListArg) {
p.addProcessingError("agentProcessHeavyDury(" + place + "): " + problem);
p.replaceCurrentForm(MobileAgent.ERROR_FORM);
}
} finally {
if (!(place instanceof EmptyFormPlace)) {
for (final IBaseDataObject p : payloadListArg) {
if (p.currentFormSize() == 0) {
logger.error("Place {} left an empty form stack, changing it to ERROR", place);
p.addProcessingError(place + " left an empty form stack");
p.pushCurrentForm(ERROR_FORM);
}
}
}
MDC.remove(MDCConstants.SERVICE_LOCATION);
checkInterrupt(place);
}
return ret;
}
/**
* Add the move error count to each payload
*/
protected void addMoveErrorCount(final List<IBaseDataObject> payloadListArg) {
for (final IBaseDataObject payload : payloadListArg) {
payload.setParameter("AGENT_MOVE_ERRORS", Integer.toString(moveErrorsOccurred));
}
}
/**
* Delete the move error count from each payload
*/
protected void deleteMoveErrorCount(final List<IBaseDataObject> payloadListArg) {
for (final IBaseDataObject payload : payloadListArg) {
payload.deleteParameter("AGENT_MOVE_ERRORS");
}
}
/**
* Record history for a bunch of payload objects (IBaseDataObject)
*/
protected void recordHistory(final IServiceProviderPlace place, final List<IBaseDataObject> payloadListArg) {
logger.debug("In recordHistory with {} payloads", payloadListArg.size());
final DirectoryEntry placeEntry = place.getDirectoryEntry();
for (final IBaseDataObject d : payloadListArg) {
recordHistory(placeEntry, d);
}
}
/**
* Record history for a bunch of payload objects (IBaseDataObject)
*/
protected void recordHistory(final DirectoryEntry placeEntry, final List<IBaseDataObject> payloadListArg) {
logger.debug("In recordHistory with {} payloads", payloadListArg.size());
for (final IBaseDataObject d : payloadListArg) {
recordHistory(placeEntry, d);
}
}
/**
* Make the log message for all the payloads
*/
protected void logAgentCompletion() {
for (final IBaseDataObject payload : this.payloadList) {
logAgentCompletion(payload);
}
}
/**
* Return whatever we carry as an object for serialization
*/
@Override
public Object getPayloadForTransport() {
return this.payloadList;
}
/**
* Setup the parallel type set tracking variable for a possible new payload
*/
protected void setParallelTrackingInfoFor(final IBaseDataObject d) {
// Clear out current value
clearParallelTrackingInfo();
// Look at history from the tail backwards in time and
// while looking at the same parallelType serviceType
// add values to the visitedPlaces tracking variable.
// We need to do this because this data object may have
// been processed through some of them as a slug and it
// will not have filled in the visitedPlace properly.
final List<String> history = d.transformHistory();
int lastParallelType = -1;
for (int i = history.size() - 1; i >= 0; i--) {
final String key = history.get(i);
final int typeSet = typeLookup(KeyManipulator.getServiceType(key));
if (lastParallelType == -1 && isParallelServiceType(typeSet)) {
lastParallelType = typeSet;
}
if (typeSet != lastParallelType) {
break;
}
addParallelTrackingInfo(KeyManipulator.getServiceName(key));
}
}
/**
* To string method useful from the Namespace when bound there
*/
@Override
public String toString() {
if (isZombie()) {
return "Closed";
} else if (!isInUse()) {
return "Idle";
}
String sn = null;
int sz = 0;
if (this.payloadList != null && !this.payloadList.isEmpty()) {
// Avoid synchronizing this [don't call getPayload()]
try {
sn = this.payloadList.get(0).shortName();
sz = this.payloadList.size();
} catch (Throwable t) {
// empty catch block
}
}
if (sn == null) {
sn = "Missing payload";
}
return sn + "(" + sz + ") - " + lastPlaceProcessed;
}
/**
* @return the lastPlaceProcessed
*/
@Override
public String getLastPlaceProcessed() {
return this.lastPlaceProcessed;
}
/**
* Interrupt the agent's thread Seems a little weird to be public, but there aren't a lot of choices.
*/
@Override
@SuppressWarnings("Interruption")
public void interrupt() {
this.thread.interrupt();
}
/**
* Determine if this agent is walking un-dead
*/
@Override
public boolean isZombie() {
return this.timeToQuit;
}
}