MobileAgent.java

package emissary.core;

import emissary.directory.DirectoryEntry;
import emissary.directory.DirectoryPlace;
import emissary.directory.KeyManipulator;
import emissary.log.MDCConstants;
import emissary.place.CoordinationPlace;
import emissary.place.EmptyFormPlace;
import emissary.place.IServiceProviderPlace;
import emissary.pool.AgentPool;
import emissary.pool.AgentThreadGroup;
import emissary.util.JMXUtil;
import emissary.util.PayloadUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/**
 * An autonomous hunk of software
 */
public abstract class MobileAgent implements IMobileAgent, MobileAgentMBean {

    // Serializable
    static final long serialVersionUID = 2656898442450171891L;

    // Our logger
    protected static final Logger logger = LoggerFactory.getLogger(MobileAgent.class);

    // Probe logger
    protected static final Logger probeLogger = LoggerFactory.getLogger(MobileAgent.class.getPackage().toString() + ".PROBE");

    // The thread we plan to run on (we are autonomous, in a limited sense)
    @Nullable
    protected transient Thread thread = null;

    // Name for our threads
    public static final String AGENT_THREAD = "MobileAgent-";
    @SuppressWarnings("NonFinalStaticField")
    private static int agentCounter = 0;

    // For tracking errors
    public static final int DEFAULT_MAX_MOVE_ERRORS = 3;
    protected int maxMoveErrors = DEFAULT_MAX_MOVE_ERRORS;

    // For stopping infinite loops
    public static final int DEFAULT_MAX_ITINERARY_STEPS = 100;
    protected int maxItinerarySteps = DEFAULT_MAX_ITINERARY_STEPS;

    // Stages of processing
    protected static final String ERROR_FORM = Form.ERROR;
    protected static final String DONE_FORM = Form.DONE;

    // What we carry around with us
    @Nullable
    protected IBaseDataObject payload = null;

    // Track if the MobileAgent is currently in use
    protected AtomicBoolean idle = new AtomicBoolean(true);

    // Place we are at now
    @Nullable
    protected transient IServiceProviderPlace arrivalPlace = null;
    protected boolean processFirstPlace = false;
    @Nullable
    protected String lastPlaceProcessed = null;

    // ID string for this agent
    protected static final String NO_AGENT_ID = "No_AgentID_Set".intern();
    protected transient String agentId = NO_AGENT_ID;
    private static final String TG_ID = "Agent Threads".intern();

    // This might not be needed anymore, not carried with agent on a move...
    final Set<String> visitedPlaces = new HashSet<>();

    // To externally control the runnable loop
    protected transient volatile boolean timeToQuit = false;

    // Queue of DirectoryEntry keys to be processed
    protected Deque<DirectoryEntry> nextKeyQueue = new ArrayDeque<>();

    // Track moveErrors on all parts of a given payload
    protected int moveErrorsOccurred = 0;

    /**
     * Still have an uncaught exception handler but not really in a true ThreadGroup with other agents
     */
    public MobileAgent() {
        this(new AgentThreadGroup(TG_ID), AGENT_THREAD + agentCounter++);
    }

    /**
     * Create a new reusable Agent
     *
     * @param threadGroup group we operate it
     * @param threadName symbolic name for this agent thread
     */
    @SuppressWarnings("ThreadPriorityCheck")
    public MobileAgent(final ThreadGroup threadGroup, final String threadName) {
        logger.debug("Constructing agent {}", threadName);
        this.thread = new Thread(threadGroup, this, threadName);
        this.thread.setPriority(Thread.NORM_PRIORITY);
        this.thread.setDaemon(true);
        this.thread.start();

        JMXUtil.registerMBean(this);
    }

    /**
     * Report this agents name for logging purposes
     */
    @Override
    public String getName() {
        return this.thread.getName();
    }

    /**
     * Runnable interface, starts this agent running on its own thread. It will wait unless it has a payload and a place to
     * start with. You can set both of these items at once using the <em>go</em> method, which will then notify us to come
     * out of the wait state and process the payload
     */
    @Override
    public void run() {
        logger.debug("Starting the 'run' loop");
        synchronized (this) {
            while (!this.timeToQuit) {

                if (!isInUse()) {
                    try {
                        // MAX time in case we miss a notify
                        // we bail out every 60 seconds just
                        // as a last resort
                        wait(60000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                // Allow communications threads to take priority
                // Thread.yield();

                if (isInUse()) {
                    logger.debug("Starting work for {}", agentId());
                    MDC.put(MDCConstants.SHORT_NAME, getPayload().shortName());
                    try {
                        agentControl(this.arrivalPlace);
                    } catch (Throwable throwable) {
                        logger.error("Problem with agent", throwable);
                    } finally {
                        // prevent an interrupted thread from returning the agent
                        if (!this.timeToQuit) {
                            agentReturn();
                            MDC.clear(); // clear all MDC context
                        }
                    }
                }
            }
        }
    }

    /**
     * Call this method to permanently stop the running thread when we finish what we are doing
     */
    @Override
    public void killAgent() {
        logger.debug("killAgent called on {}", getName());
        synchronized (this) {
            this.timeToQuit = true;
            notifyAll();
        }
    }

    /**
     * Kill asynchronously
     */
    @Override
    @SuppressWarnings({"Interruption", "ThreadPriorityCheck"})
    public void killAgentAsync() {
        logger.debug("killAgentAsync called on {}", getName());
        this.timeToQuit = true;
        try {
            this.thread.setPriority(Thread.MIN_PRIORITY);
            this.thread.interrupt();
        } catch (RuntimeException ignored) {
            // empty catch block
        }
    }

    /**
     * Report whether we are busy or not
     */
    @Override
    public boolean isInUse() {
        return !this.idle.get();
    }

    /**
     * Set the current place we should kick off with
     */
    protected synchronized void setArrivalPlace(@Nullable final IServiceProviderPlace p) {
        this.arrivalPlace = p;
    }

    /**
     * Set the payload
     */
    protected synchronized void setPayload(@Nullable final IBaseDataObject p) {
        this.payload = p;
    }

    /**
     * Return the ID
     */
    @Override
    public String agentId() {
        return this.agentId;
    }

    /**
     * Clear out the payload and other private stuff
     */
    protected synchronized void clear() {
        logger.debug("Clearing payload");
        setPayload(null);
        setAgentId(NO_AGENT_ID);
        this.moveErrorsOccurred = 0;
        this.nextKeyQueue.clear();
        clearParallelTrackingInfo();
    }

    protected void clearParallelTrackingInfo() {
        this.visitedPlaces.clear();
    }

    protected void addParallelTrackingInfo(final String t) {
        this.visitedPlaces.add(t);
    }

    protected boolean checkParallelTrackingFor(final String type) {
        return this.visitedPlaces.contains(type);
    }

    /**
     * Return a reference to the payload of this agent
     */
    @Override
    public synchronized IBaseDataObject getPayload() {
        return this.payload;
    }

    /**
     * The main control loop to determine and go through an itinerary until the payload is finished (no where else to go)
     *
     * @param currentPlaceArg where we are now
     */
    protected void agentControl(final IServiceProviderPlace currentPlaceArg) {
        logger.debug("In agentControl {} for {}", currentPlaceArg, this.agentId);
        DirectoryEntry newEntry = currentPlaceArg.getDirectoryEntry();

        IServiceProviderPlace currentPlace = currentPlaceArg;
        final IBaseDataObject mypayload = getPayload();
        int loopCount = 0;
        boolean controlError = false;

        while (currentPlace != null && newEntry != null && mypayload != null && !this.timeToQuit) {
            // One based counter
            loopCount++;

            // First time in, we just have the pickup place where we started
            // our mission. We dont process there, just use it to call through
            // to the directory, so skip the processing. See the difference
            // between the go() and arrive() methods for details
            if ((loopCount > 1 || getProcessFirstPlace()) && !controlError) {
                atPlace(currentPlace, mypayload);
            }

            // Choose next place
            controlError = false;
            newEntry = getNextKey(currentPlace, mypayload);

            // Nothing to do, bail out,
            // normal processing termination
            if (newEntry == null) {
                break;
            }

            // Record what we are doing in the history log
            if (loopCount == 1 && !getProcessFirstPlace()) {
                // Use arrivalPlace for MobileAgent.send()
                recordHistory(currentPlace, this.payload);
                recordHistory(newEntry, this.payload);
            } else {
                recordHistory(newEntry, this.payload);
            }

            // A local place, around the loop and hit it
            if (newEntry.isLocal()) {
                logger.debug("Choosing local place {}", newEntry.getFullKey());
                currentPlace = newEntry.getLocalPlace();
                continue;
            }

            controlError = true;
            if (++this.moveErrorsOccurred > this.maxMoveErrors || this.payload.transformHistory().size() > this.maxItinerarySteps) {
                logger.error("Too many move errors, giving up");
                newEntry = null;
                break;
            }

            if (!KeyManipulator.isKeyComplete(mypayload.currentForm())) {
                // Let it try going to the ERROR place, if any
                purgeNonFinalForms(mypayload);
                mypayload.replaceCurrentForm(ERROR_FORM);
            } else {
                // It was a full key, just pop it and try what
                // is next on the list. People who ask for full
                // key moves should be made aware of this somehow
                mypayload.popCurrentForm();
                if (mypayload.currentFormSize() == 0) {
                    mypayload.replaceCurrentForm(ERROR_FORM);
                }
            }
        }

        logger.debug("Out of the control loop");

        // If null we are completely finished, or the payload
        // is just moving to another machine to continue
        // Either way, log it and let this agent go back to
        // the pool
        if (newEntry == null) {
            logAgentCompletion(mypayload);
        }
    }

    /**
     * Do work now that we have arrived at the specified place
     *
     * @param place the place we are asking to work for us
     * @param payloadArg the data for the place to operate on
     */
    protected void atPlace(final IServiceProviderPlace place, final IBaseDataObject payloadArg) {
        logger.debug("In atPlace {} with {}", place, payloadArg.shortName());

        try (TimedResource timer = resourceWatcherStart(place)) {
            assert timer != null; // to silence an unused resource warning

            this.lastPlaceProcessed = place.getDirectoryEntry().getKey();
            if (this.moveErrorsOccurred > 0) {
                payloadArg.setParameter("AGENT_MOVE_ERRORS", Integer.toString(this.moveErrorsOccurred));
            }

            place.agentProcessCall(payloadArg);

            if (this.moveErrorsOccurred > 0) {
                payloadArg.deleteParameter("AGENT_MOVE_ERRORS");
            }
            logger.debug("done with agentProcessCall for {}", place);
        } catch (Throwable problem) {
            logger.warn("** {} place caught problem:", place, problem);
            payloadArg.addProcessingError("atPlace(" + place + "): " + problem);
            payloadArg.replaceCurrentForm(ERROR_FORM);
        } finally {
            if (!(place instanceof EmptyFormPlace) && payloadArg.currentFormSize() == 0) {
                logger.error("Place {} left an empty form stack, changing it to ERROR", place);
                payloadArg.addProcessingError(place + " left an empty form stack");
                payloadArg.pushCurrentForm(ERROR_FORM);
            }
            checkInterrupt(place);
        }
    }

    protected final void checkInterrupt(final IServiceProviderPlace place) {
        if (Thread.interrupted()) {
            // this should NEVER happen. if it does, we've done something bad
            if (this.thread != Thread.currentThread()) {
                logger.error("MobileAgent thread instance is not the current thread. Instance thread: {} \tCurrent thread: {}", this.thread,
                        Thread.currentThread());
            }
            // Log only when interrupted by the ResourceWatcher, not during shutdown.
            if (!this.timeToQuit) {
                logger.warn("Place {} was interrupted during execution. Adjust place time out or modify code accordingly.", place);
            } else {
                // It must be time to quit so re-interrupt the current thread
                Thread.currentThread().interrupt();
            }
        }
    }

    protected TimedResource resourceWatcherStart(final IServiceProviderPlace place) {
        TimedResource tr = TimedResource.EMPTY;
        // CoordinationPlaces are tracked individually
        if (!(place instanceof CoordinationPlace)) {
            try {
                tr = ResourceWatcher.lookup().starting(this, place);
            } catch (EmissaryException ex) {
                logger.debug("No resource monitoring enabled");
            }
        }
        return (tr == null) ? TimedResource.EMPTY : tr;
    }

    /**
     * Clean up, idle, and return agent to pool
     */
    protected synchronized void agentReturn() {
        clear();
        setArrivalPlace(null);
        this.lastPlaceProcessed = null;
        this.idle.set(true);
        AgentPool pool = null;
        try {
            pool = AgentPool.lookup();
            pool.returnAgent(this);
        } catch (Exception e) {
            logger.error("Cannot get return agent to pool", e);
        }
    }

    /**
     * Get the next key from the directory with error handling Can return null if there is no place to handle the form
     *
     * @param place the place we will use to access the directory
     * @param payloadArg the current payload we care about
     * @return the SDE answer from the directory
     */
    @Nullable
    protected DirectoryEntry getNextKey(@Nullable final IServiceProviderPlace place, @Nullable final IBaseDataObject payloadArg) {

        logger.debug("start getNextKey");

        // Check for bad preconditions.
        if (payloadArg == null || place == null) {
            logger.warn("Null payload or place in getNextKey");
            return null;
        }

        // Stop looping from occurring
        if (payloadArg.transformHistory().size() > this.maxItinerarySteps &&
                !ERROR_FORM.equals(payloadArg.currentForm())) {
            payloadArg.replaceCurrentForm(ERROR_FORM);
            payloadArg.addProcessingError("Agent stopped due to larger than max transform history size (looping?)");
        }

        // Perhaps we already have additional keys to process
        // from the last time we asked the directory. If so,
        // choose the next one and spit it out
        if (!this.nextKeyQueue.isEmpty()) {
            logger.debug("Returning next key from stack size={}", this.nextKeyQueue.size());
            return this.nextKeyQueue.removeFirst();
        }

        // We would need a current form of the payload to continue
        if (payloadArg.currentFormSize() < 1) {
            logger.debug("No current forms on payload {}", payloadArg.shortName());
            return null;
        }

        // Maybe we are done, if so quit now
        if (payloadArg.currentForm().startsWith(DONE_FORM)) {
            return null;
        }

        // If we are in the error condition,
        // clean up and try for error drop off
        final String curKey = payloadArg.currentForm();
        if (ERROR_FORM.equals(curKey)) {
            if (payloadArg.currentFormSize() > 1 && ERROR_FORM.equals(payloadArg.currentFormAt(1))) {
                logger.error("ERROR handling place produced an error, purging all current forms");
                while (payloadArg.currentFormSize() > 0) {
                    payloadArg.popCurrentForm();
                }
                payloadArg.appendTransformHistory("ERROR.SKIP.*.http://Previous_Error_Bypass$99999");
            } else {
                if (payloadArg.currentFormSize() > 1) {
                    logger.warn("Got current form of ERROR, clearing form stack on {}: {}", payloadArg.shortName(), payloadArg.getAllCurrentForms());
                }
                purgeNonFinalForms(payloadArg);
            }
        }

        // If we have a fully specified key as current form
        // just go there and process
        if (KeyManipulator.isKeyComplete(curKey)) {
            logger.debug("Got current full key form of {}", curKey);
            return new DirectoryEntry(curKey);
        }

        /* Get the last entry from the payload */
        DirectoryEntry lastEntry = payloadArg.getLastPlaceVisited();

        final List<String> dataForms = payloadArg.getAllCurrentForms();
        logger.debug(">>> Current forms for {} are {}", payloadArg.shortName(), dataForms);

        /* Get the last service type from the last key */
        String lastServiceType = Stage.getStageName(0);
        if (lastEntry != null) {
            // lastServiceType = KeyManipulator.serviceType(lastEntry.key());
            lastServiceType = lastEntry.getServiceType();
        }

        logger.debug("Payload reports lastEntry is {} with serviceType {}", lastEntry, lastServiceType);

        // For Analyze type, don't allow it to go back to ID
        // *.INPUT.* and *.<SPROUT>.* are not in the list so will
        // both start at 0
        int startType = typeLookup(lastServiceType);

        // If we came from transform we can start at the beginning again
        if (lastEntry != null && startType != 0) {
            if ("TRANSFORM".equals(lastEntry.getServiceType())) {
                startType = 0;
            }
        }

        DirectoryEntry curEntry = null;
        for (int curType = startType; curType < Stage.values().length; curType++) {
            // Search the form stack starting with the top.
            final String stageName = Stage.getStageName(curType);
            for (String form : dataForms) {

                // Test a full key form to see if it is the correct stage to be chosen
                if (KeyManipulator.isKeyComplete(form)) {
                    if (KeyManipulator.getServiceType(form).equals(stageName)) {
                        logger.debug("Choosing cur form {} in stage {}", form, stageName);
                        payloadArg.pullFormToTop(form);
                        return new DirectoryEntry(form);
                    }
                }

                String formId = form + KeyManipulator.DATAIDSEPARATOR + stageName;
                curEntry = nextKeyFromDirectory(formId, place, lastEntry, payloadArg);

                // Process through the parallel service type once per place max
                // no matter how many forms would route there
                if (curEntry != null && isParallelServiceType(curType)) {
                    boolean parallelEntryRejected = false;
                    do {
                        parallelEntryRejected = false;
                        logger.debug(
                                "curEntry isParallel with curType={}, curEntry={}, visitedPlace={}, serviceName={}, lastServiceType={}, curTypeName={}",
                                curType, curEntry.getFullKey(), this.visitedPlaces, curEntry.getServiceName(), lastServiceType,
                                stageName);
                        if (this.visitedPlaces.isEmpty() || stageName.equals(lastServiceType)) {
                            if (checkParallelTrackingFor(curEntry.getServiceName())) {
                                lastEntry = new DirectoryEntry(curEntry);
                                lastEntry.setDataType(form);
                                formId = lastEntry.getDataId();
                                parallelEntryRejected = true;
                                logger.debug("Rejecting parallel entry found for {}: visitedPlaces={}", lastEntry.getFullKey(), this.visitedPlaces);
                                curEntry = nextKeyFromDirectory(formId, place, lastEntry, payloadArg);
                            } else {
                                addParallelTrackingInfo(curEntry.getServiceName());
                                logger.debug("Added parallel tracking = {}", this.visitedPlaces);
                            }
                        } else {
                            clearParallelTrackingInfo();
                            logger.debug("Cleared parallel tracking info");
                        }
                    } while (parallelEntryRejected && curEntry != null);
                }

                if (curEntry != null) {
                    logger.debug("===== --- *** Doing {}.{}--{}", stageName, formId, curEntry.getServiceName());
                    payloadArg.pullFormToTop(form);
                    return curEntry;
                }
            }
        }
        return null;
    }

    /**
     * Evaluate parallel attribute of specified type index
     */
    protected boolean isParallelServiceType(final int typeSetPosition) {
        return Stage.isParallelStage(typeSetPosition);
    }

    /**
     * Get index in typeSet for specified string, 0 if not found
     */
    public static int typeLookup(final String s) {
        Stage stage = Stage.getByName(s);
        int idx = (stage == null) ? 0 : stage.ordinal();
        if (idx < 0) {
            idx = 0; // failsafe
        }
        return idx;
    }

    /**
     * Communicate with the directory through the current place to get the next place to go. These are all local calls since
     * all the local directories have all the information
     *
     * This call may cause several key entries to be returned from the directory. All will be put on an internal queue and
     * the first one will be returned to the caller. Caller knows to look on the internal queue for additional entries
     * before calling this method again.
     */
    protected DirectoryEntry nextKeyFromDirectory(final String dataId, final IServiceProviderPlace place, final DirectoryEntry lastEntry,
            final IBaseDataObject payloadArg) {

        try {
            logger.debug("Trying nextKey for {} with last={}, atPlace={}", dataId, lastEntry, place);

            // Query the directory
            final List<DirectoryEntry> entries = place.nextKeys(dataId, payloadArg, lastEntry);

            // Add the entries returned to the queue
            if ((entries != null) && !entries.isEmpty()) {
                this.nextKeyQueue.addAll(entries);
                logger.debug("Added {} new key entries from the directory for {}", entries.size(), dataId);
            }

        } catch (RuntimeException e) {
            logger.warn("cannot get key, I was working on: {}", payloadArg.shortName(), e);
            // Returning instead of throwing will allow
            // the next form to be tried.
        }

        // Dequeue first item and return it to the caller
        DirectoryEntry tmpEntry = null;
        if (!this.nextKeyQueue.isEmpty()) {
            tmpEntry = this.nextKeyQueue.removeFirst();
        }
        logger.debug("nextKeyFromDirectory found {}", tmpEntry);
        return tmpEntry;
    }

    /**
     * Build the unique agent ID for carrying this payload around mostly used in error reporting
     *
     * @param theId usually comes from the shortName of the payload
     */
    protected void setAgentId(@Nullable final String theId) {
        final long t = (System.currentTimeMillis() % 10000);
        final String id = "Agent-" + t;
        this.agentId = id + "-" + ((theId != null) ? theId : "blah");
    }

    /**
     * A little more than the name implies, this method sets the things required for an idle agent to get moving again. This
     * is to be used when starting the agent from a pickup place because although we start with an initial 'place' we don't
     * use it for processing, just to get the nextKey from the directory there.
     *
     * @param payloadArg the real payload, existing if any will be cleared
     * @param arrivalPlaceArg the place we start at
     */
    @Override
    public synchronized void go(final Object payloadArg, final IServiceProviderPlace arrivalPlaceArg) {
        clear();
        go(payloadArg, arrivalPlaceArg, false);
    }

    /**
     * Private implementation for both of the above arrive and go methods, uses the setProcessFirstPlace to communicate on
     * which path we entered to the agent's thread
     *
     * @param dataObject the real payload
     * @param arrivalPlaceArg the place we start at
     * @param processAtFirstPlace true if we should call process on arrivalPlaceArg
     */
    protected synchronized void go(@Nullable final Object dataObject, @Nullable final IServiceProviderPlace arrivalPlaceArg,
            final boolean processAtFirstPlace) {
        // Check conditions
        if (dataObject != null && !(dataObject instanceof IBaseDataObject)) {
            throw new IllegalArgumentException("Illegal payload sent to MobileAgent, " + "cannot handle " + dataObject.getClass().getName());
        }

        this.idle.set(false);

        setProcessFirstPlace(processAtFirstPlace);

        // Allow this to be null to that derived classes
        // can handle the setting of their payload and still
        // be able to use this method
        if (dataObject != null) {
            final IBaseDataObject d = (IBaseDataObject) dataObject;
            logger.debug("Setting payload {}", d.shortName());
            setPayload(d);
            setAgentId(d.shortName());
        }

        // Likewise...
        if (arrivalPlaceArg != null) {
            setArrivalPlace(arrivalPlaceArg);

            // If a "go" rather an an "arrive", log the arrivalPlaceArg
            // on the transform history of the payload
            if (!processAtFirstPlace) {
                logger.debug("Adding history for arrival place {}", arrivalPlaceArg.getKey());
                recordHistory(arrivalPlaceArg, getPayload());
            }
        }

        // the run() loop now takes over on the agent's thread and we return
        // control of the currentThread to the caller of this method
        notifyAll();
    }

    /**
     * Provide access to the move-error counter for the MoveAdapter
     */
    @Override
    public int getMoveErrorCount() {
        return this.moveErrorsOccurred;
    }

    /**
     * Provide access to the itinerary queue for the MoveAdapter
     */
    @Override
    public DirectoryEntry[] getItineraryQueueItems() {
        return this.nextKeyQueue.toArray(new DirectoryEntry[0]);
    }

    /**
     * This is for an already in process agent arriving at a new place from a "moveTo". This is different than the above
     * method because we presume we have arrived at this place in order to do some processing here, not just because we got
     * picked up by it. So we don't need to get a key first, just start processing.
     *
     * @param dataObject the real payload, exisitng if any will be cleared
     * @param arrivalPlaceArg the place we start at
     * @param moveErrorCount transported move error counter
     * @param queuedItineraryItems transported itinerary items list of DirectoryEntry
     */
    @Override
    public synchronized void arrive(final Object dataObject, final IServiceProviderPlace arrivalPlaceArg, final int moveErrorCount,
            final List<DirectoryEntry> queuedItineraryItems) throws Exception {

        if (dataObject instanceof IBaseDataObject) {
            clear();
            this.moveErrorsOccurred = moveErrorCount;
            this.nextKeyQueue.addAll(queuedItineraryItems);
            go(dataObject, arrivalPlaceArg, true);
        } else {
            throw new Exception("Illegal payload sent to MobileAgent, cannot handle " + dataObject.getClass().getName());
        }
    }

    /**
     * Delete all forms on the stack that are not final. This is called in error conditions to try and break out of loops or
     * terminate other badness and zip to the end
     *
     * @param payloadArg the dataobject to work on
     */
    protected static void purgeNonFinalForms(final IBaseDataObject payloadArg) {
        int i = 0;
        while (i < payloadArg.currentFormSize()) {
            final String form = payloadArg.getAllCurrentForms().get(i);
            if (form.equals(ERROR_FORM)) {
                i++;
                continue;
            }
            final String pseudoKey = payloadArg.currentFormAt(i) + ".SKIP.*.http://Previous_Error_Bypass$100";

            logger.debug("Removed {} because of ERROR.SKIP", payloadArg.currentFormAt(i));
            payloadArg.appendTransformHistory(pseudoKey);
            payloadArg.deleteCurrentFormAt(i);
        }
    }

    /**
     * Make a nice log message when we are done with the payload
     *
     * @param payloadArg the one we just finished with
     */
    protected void logAgentCompletion(final IBaseDataObject payloadArg) {
        // Keep this at a nice high level, above the debug chatter
        final Object isProbe = payloadArg.getParameter("DIRECTORY_PROBE");
        final Logger dest = (isProbe == null) ? logger : probeLogger;

        if (dest.isInfoEnabled()) {
            dest.info(PayloadUtil.getPayloadDisplayString(payloadArg));
        }
    }

    /**
     * Record the processing history in the data object
     *
     * @param place where the processing is taking place
     * @param payloadArg the dataobject that is being processed
     */
    protected void recordHistory(final IServiceProviderPlace place, final IBaseDataObject payloadArg) {
        recordHistory(place.getDirectoryEntry(), payloadArg);
    }

    /**
     * Record the processing history in the data object
     *
     * @param placeEntry where the processing is taking place
     * @param payloadArg the data object that is being processed
     */
    protected void recordHistory(final DirectoryEntry placeEntry, final IBaseDataObject payloadArg) {

        String placeKey = null;
        final String cf = payloadArg.currentForm();
        final DirectoryEntry dnew = new DirectoryEntry(placeEntry);
        if (!KeyManipulator.isKeyComplete(cf)) {
            // Splice this current form into the place key
            // for a proper representation of why we are here
            dnew.setDataType(cf);
            placeKey = dnew.getFullKey();
        } else {
            // We already have a full key in the current form
            // just need to figure out the current cost
            if (!payloadArg.beforeStart()) {
                final DirectoryEntry lpv = payloadArg.getLastPlaceVisited();

                // Subtract one remote overhead if this represents a move
                int exp = lpv.getExpense();
                if (!KeyManipulator.getServiceHostUrl(cf).equals(lpv.getServiceHostUrl()) && exp > DirectoryPlace.REMOTE_EXPENSE_OVERHEAD) {
                    exp -= DirectoryPlace.REMOTE_EXPENSE_OVERHEAD;
                }

                // Current form cannot perhaps handle the cost, but
                // we need it here for the xform history
                if (exp > 0) {
                    placeKey = cf + KeyManipulator.DOLLAR + exp;
                } else {
                    placeKey = cf;
                }
            } else {
                // Full part key in current form and before start.
                // Must use key from "Sending Place" rather than
                // current form here
                placeKey = dnew.getFullKey();
            }
        }

        payloadArg.appendTransformHistory(placeKey);

        logger.debug("Appended {} to history which now has size {}", placeKey, payloadArg.transformHistory().size());
    }

    /**
     * Setter for processFirstPlace
     *
     * @param arg the new value for processFirstPlace
     */
    protected void setProcessFirstPlace(final boolean arg) {
        this.processFirstPlace = arg;
    }

    /**
     * Getter for processFirstPlace
     *
     * @return the value of processFirstPlace
     */
    protected boolean getProcessFirstPlace() {
        return this.processFirstPlace;
    }

    @Override
    public void dumpPlaceStats() {
        ResourceWatcher rw = null;
        try {
            rw = ResourceWatcher.lookup();
        } catch (NamespaceException ne) {
            logger.error("Exception occurred while trying to lookup resource", ne);
            return;
        }

        logger.info("Dumping All Stats for {}:", AGENT_THREAD);
        logger.info("===============");
        rw.logStats(logger);
        logger.info("===============");
    }

    /**
     * Get the number of move errors
     */
    @Override
    public int getMaxMoveErrors() {
        return this.maxMoveErrors;
    }

    /**
     * Set the maximum number of move attempts that can error out before this instance will quit trying and set the workflow
     * to be an ERROR condition
     *
     * @param value the maximum number of move failures
     */
    @Override
    public void setMaxMoveErrors(final int value) {
        this.maxMoveErrors = value;
    }

    /**
     * Get the maximum number of itinerary steps
     */
    @Override
    public int getMaxItinerarySteps() {
        return this.maxItinerarySteps;
    }

    /**
     * Set the maximum number of itinerary steps before this instance will turn the workflow into an ERROR condition
     *
     * @param value the new maximum number of steps
     */
    @Override
    public void setMaxItinerarySteps(final int value) {
        this.maxItinerarySteps = value;
    }
}