DirectoryPlace.java

package emissary.directory;

import emissary.config.Configurator;
import emissary.core.EmissaryException;
import emissary.core.IBaseDataObject;
import emissary.core.Namespace;
import emissary.log.MDCConstants;
import emissary.place.ServiceProviderPlace;
import emissary.server.mvc.adapters.DirectoryAdapter;

import org.slf4j.MDC;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.annotation.Nullable;

/**
 * The DirectoryPlace class is used to store information relating to Places/Services in the Emissary Agent-Based
 * architecture. When a Place comes up it calls the method addPlace passing in all the relevant information to store in
 * the Directory. Agents query the directory by calling the method nextKeys which requires a query String search
 * pattern.
 *
 * <p>
 * We try to support some network topographic constructions by providing a set of peer directories. Peers are monitored
 * and checked automatically by HeartbeatManager and the peer network is assumed to be fully connected. Peer directories
 * are a fairly-static list of peer directories read from a config file. At least one host must be listed in order to
 * bootstrap the network.
 *
 * <p>
 * Emissary directory instances are also observable with respect to Peer activities, and Place activities. Peer
 * observers will be called with a list of current members of the peer group (including this directory) whenever the
 * peer group loses or gains members. Place observers will be called with a key that matches the pattern supplied on
 * their subscription and an indication of whether it is a register or deregister or cost change.
 *
 */
public class DirectoryPlace extends ServiceProviderPlace implements IRemoteDirectory {

    /**
     * Map of DirectoryEntryList objects by data id. This map contains the actual advertisements seen by this directory and
     * available for MobilAgent/Place use via nextKeys
     */
    protected DirectoryEntryMap entryMap = new DirectoryEntryMap();

    /** Peer directories to this one */
    protected Set<DirectoryEntry> peerDirectories = new CopyOnWriteArraySet<>();

    /**
     * Statically configured peers. Remember them even when they shut down. A subset of peerDirectories
     */
    protected Set<String> staticPeers = new HashSet<>();

    /** Heartbeat manager for checking up on remote directories */
    protected HeartbeatManager heartbeat;

    /** Manage observers */
    protected DirectoryObserverManager observerManager;

    /** True if this directory is a rendezvous peer */
    protected boolean rdvPeer = false;

    /** True is this directory is shutdown */
    protected boolean shutdownInitiated = false;

    /** True if this directory is running */
    protected boolean running = false;

    /** Emissary node configuration for network topology */
    protected EmissaryNode emissaryNode;

    /**
     * Window of slop between asking for a zone and purging "stale" entries from the entry map. Since there is a window of
     * time when the remote directory might be spewing out addPlace calls while we are asking for the zone transfer we can't
     * just remove all entries once we get the zone, demarshall it and decide (finally) that it's ready to put into our map.
     * We have to allow things somewhat recent to stay around also. This time window looks back from the beginning of the
     * zone transfer request to provide some leniency.
     */
    protected long zoneSlopWindowMillis = 30000; // 30 sec

    /**
     * Create a new empty directory using this location and no parent
     *
     * @param placeLoc string key to register this directory
     * @param node EmissaryNode for this directory place
     * 
     * @throws IOException when configuration fails
     */
    public DirectoryPlace(final String placeLoc, EmissaryNode node) throws IOException {
        super(placeLoc);
        this.emissaryNode = node;
        setupDirectory();
    }

    /**
     * Create a new directory as specified by the config info with a parent for relaying through.
     *
     * @param configStream config info
     * @param parentDir the parent directory or null if none
     * @param placeLoc key for this place
     * @param node node configuration details or null for defaults
     * @throws IOException when configuration fails
     */
    public DirectoryPlace(final InputStream configStream, final String parentDir, final String placeLoc, final EmissaryNode node) throws IOException {
        super(configStream, parentDir, placeLoc);
        this.emissaryNode = node;
        setupDirectory();
    }

    /**
     * Create a new child directory as specified by the config info
     *
     * @param configInfo our config file to read
     * @param placeLoc string key to register this directory
     * @param node node configuration details or null for defaults
     * @throws IOException when configuration fails
     */
    public DirectoryPlace(final String configInfo, final String placeLoc, final EmissaryNode node) throws IOException {
        super(configInfo, placeLoc);
        this.emissaryNode = node;
        setupDirectory();
    }

    /**
     * Create a new directory as specified by the config info
     *
     * @param configStream config info
     * @param placeLoc key for this place
     * @param node node configuration details or null for defaults
     * @throws IOException when configuration fails
     */
    public DirectoryPlace(final InputStream configStream, final String placeLoc, final EmissaryNode node) throws IOException {
        super(configStream, placeLoc);
        this.emissaryNode = node;
        setupDirectory();
    }

    /**
     * Shared code for all the constructors to take advantage of in initializing directory services Configuration items read
     * here are
     * <ul>
     * <li>HEARTBEAT_DELAY_SECONDS, default is 30</li>
     * <li>HEARTBEAT_INTERVAL_SECONDS, default is 30</li>
     * <li>HEARTBEAT_FAILURE_THRESHOLD, set transient failure count, default owned by HeartbeatManager</li>
     * <li>HEARTBEAT_PERMANENT_FAILURE_THRESHOLD, set permanent failure count, default owned by HeartbeatManager</li>
     * </ul>
     */
    private void setupDirectory() {
        if (this.emissaryNode.isValid() && !this.emissaryNode.isStandalone()) {
            // Start a heart beat manager with initial and interval seconds
            final int initialSeconds = configG.findIntEntry("HEARTBEAT_DELAY_SECONDS", 30);
            final int intervalSeconds = configG.findIntEntry("HEARTBEAT_INTERVAL_SECONDS", 30);

            this.heartbeat = new HeartbeatManager(myKey, initialSeconds, intervalSeconds);

            final int heartbeatFailureThreshold = configG.findIntEntry("HEARTBEAT_FAILURE_THRESHOLD", -1);
            if (heartbeatFailureThreshold > 0) {
                this.heartbeat.setFailThreshold(heartbeatFailureThreshold);
            }

            final int heartbeatPermanentFailure = configG.findIntEntry("HEARTBEAT_PERMANENT_FAILURE_THRESHOLD", -1);
            if (heartbeatPermanentFailure > 0) {
                this.heartbeat.setPermanentFailThreshold(heartbeatPermanentFailure);
            }
        }

        // Set up deferred stuff from ServiceProviderPlace
        // for directories only we are our own localDirPlace
        // and the key is our own key
        localDirPlace = this;
        dirPlace = myKey;

        // Start an observer manager
        this.observerManager = new DirectoryObserverManager(myKey);

        // Configure my initial rendezvous peers
        configureNetworkTopology();

        // Add an entry representing myself into the
        // local entry map. This allows observers to
        // work for this case, and allows Jetty instances
        // with just a DirectoryPlace and some bunches
        // of other non-Place code to function well and trigger
        // the peer discovery mechanism when they zone transfer
        // this entry
        final List<String> list = new ArrayList<>();
        list.add(keys.get(0));
        addPlaces(list);
        this.running = true;
    }

    /**
     * Find an optional peer config stream or file and initialize tracking of the peers found there.
     * <p>
     * We don't actually contact any of the remote directories here, so we can get the heck out of the constructor code and
     * get this place registered in the namespace quick! so other directories can find us in a timely fashion.
     */
    private void configureNetworkTopology() {
        if (!this.emissaryNode.isValid()) {
            if (this.emissaryNode.isStandalone()) {
                logger.debug("Running as a standalone emissary node");
            } else {
                logger.debug("Not configured as an emissary node");
            }
            return;
        }

        logger.debug("Emissary node info: {}", this.emissaryNode);

        try {
            // Peer network configuration is from peer.cfg
            final Configurator peerConfig = this.emissaryNode.getPeerConfigurator();
            final Set<String> peers = peerConfig.findEntriesAsSet("RENDEZVOUS_PEER");
            this.staticPeers.addAll(peers);
            addPeerDirectories(peers, true);

            logger.debug("Configured {} rendezvous peers from {} config entries.", this.peerDirectories.size(), peers.size());
            logger.debug("This directory is {}a rendezvous peer.", (this.rdvPeer ? "" : "NOT (yet) "));
        } catch (IOException iox) {
            logger.debug("There is no peer.cfg data available");
        }
    }

    /**
     * Determine if the key is local to this directory
     *
     * @param key the key to query for
     * @return true iff the key host and port are the same (jvm locality test)
     */
    private boolean isLocal(final String key) {
        return KeyManipulator.isLocalTo(key, myKey);
    }

    /**
     * Determine if the entry is local to this directory
     *
     * @param entry the entry to query for
     * @return true iff the entry key host and port are the same (jvm locality test)
     */
    private boolean isLocal(final DirectoryEntry entry) {
        return isLocal(entry.getKey());
    }

    /**
     * Add a Set of peer directory to this one
     *
     * @param keys set of string key for peer directories
     */
    @Override
    public void irdAddPeerDirectories(@Nullable final Set<String> keys) {
        // Validate contract
        if ((keys == null) || keys.isEmpty()) {
            logger.warn("Ignoring irdAddPeerDirectories called with null or no keys");
            return;
        }

        // Validate remote parameters
        for (final String key : keys) {
            if (!KeyManipulator.isValid(key)) {
                logger.warn("Ignoring irdAddPeerDirectories called with {} keys, invalid key {}", keys.size(), key);
                return;
            }
        }
        addPeerDirectories(keys, false);
    }

    /**
     * Add a Set of peer directory to this one
     *
     * @param keys set of string key for peer directories
     * @param initPhase true if during place initialization
     */
    // TODO Look at DirectoryPlaceTest at the cases where spied methods are used
    public void addPeerDirectories(final Set<String> keys, final boolean initPhase) {

        if (this.shutdownInitiated) {
            logger.error("Shutdown has been initiated. Cannot add peer directories in this state.");
            return;
        }

        boolean changeMade = false;

        for (final String key : keys) {
            if (isLocal(key)) {
                // I am listed as a rendezvous for someone
                this.rdvPeer = true;
                continue;
            }

            if (this.emissaryNode.isStandalone()) {
                logger.debug("Not adding peers in standalone nodes");
                continue;
            }

            if (!isStaticPeer(key)) {
                logger.warn("Unknown peer requesting to be added: {}", key);
                continue;
            }

            if (!isKnownPeer(key)) {
                this.peerDirectories.add(new DirectoryEntry(key));
                logger.debug("Added peer directory {}", key);

                // Setup heartbeat to new peer directory
                if (initPhase) {
                    // not contacted yet
                    this.heartbeat.addRemoteDirectory(key, HeartbeatManager.NO_CONTACT);
                } else {
                    // already contacted
                    this.heartbeat.addRemoteDirectory(key, HeartbeatManager.IS_ALIVE);

                    // Initial transfer of remote directory info here
                    // It may not be up yet, so be resilient
                    loadPeerEntries(key);
                }


                changeMade = true;
            } else {
                logger.debug("We already knew about peer {}", key);
                if (!this.heartbeat.isAlive(key)) {
                    logger.debug("Forcing peer {} alive due to arriving registration", key);
                    this.heartbeat.setHealthStatus(key, HeartbeatManager.IS_ALIVE, "Received peer registration");
                    loadPeerEntries(key);
                }
            }
        }

        // Notify all observers
        if (changeMade) {
            this.observerManager.peerUpdate(new HashSet<>(this.peerDirectories));
        }
    }

    /**
     * Retrieve and load (zone transfer) all the entries from the specified peer directory. Zone transfers do not trigger
     * observables like addPlaces does
     *
     * @param peerKey the key of the peer directory
     */
    protected void loadPeerEntries(final String peerKey) {

        if (this.emissaryNode.isStandalone()) {
            logger.debug("Cannot load peer entries in standalone nodes");
            return;
        }

        logger.debug("Doing zone transfer with peer {}", peerKey);
        // TODO See DirectoryPlace for spy example which needs to be addressed
        final DirectoryEntryMap newEntries = loadRemoteEntries(peerKey, this.entryMap);
        if ((newEntries == null) || newEntries.isEmpty()) {
            logger.debug("We got nothing back from the peer zone xfer");
            return;
        }

        // We just did this guy remove his stuff
        newEntries.removeAllOnDirectory(peerKey);

        // Remove local stuff
        newEntries.removeAllOnDirectory(myKey);

        // Make note of any possible new peer directory
        // We should only be seeing peers here
        final Set<String> newPeers = new HashSet<>();
        for (final DirectoryEntry newEntry : newEntries.allEntries()) {
            if (!isLocal(newEntry)) {
                final String possiblePeer = KeyManipulator.getDefaultDirectoryKey(newEntry.getKey());
                if (!isKnownPeer(possiblePeer) && !newPeers.contains(possiblePeer)) {
                    logger.debug("Discovered new peer {} from {} during zt with {}", possiblePeer, newEntry.getKey(), peerKey);
                    newPeers.add(possiblePeer);
                }
            }
        }
        if (!newPeers.isEmpty()) {
            logger.debug("Adding {} new peers from zt with {}", newPeers.size(), peerKey);
            addPeerDirectories(newPeers, false);
        }
    }

    /**
     * Retrieve and load (zone transfer) all the entries from specified remote directory into the specified map. Remove any
     * stale entries from the destination map if one is specified and merge in the new entries. Zone transfers do not
     * trigger observables like addPlaces does
     *
     * @param key key of the remote directory to transfer from
     * @param loadMap the map to load into or null for no load. Observers are notified if loadMap is not null
     * @return the new entries
     */
    @Nullable
    private DirectoryEntryMap loadRemoteEntries(final String key, @Nullable final DirectoryEntryMap loadMap) {

        if (this.emissaryNode.isStandalone()) {
            logger.debug("Cannot load remote entries in standalone nodes");
            return null;
        }

        if (!isStaticPeer(key)) {
            logger.debug("Ignoring non-configured peer {}", key);
            return null;
        }

        // Track how long the zone transfer takes and use that
        // info along with the slop window to help determine if
        // there are stale entries and what they might be.

        final long startZone = System.currentTimeMillis();
        DirectoryEntryMap map = null;
        try {
            // Also registers as a peer with them
            // TODO should we need to get the current EmissaryClient to ensure parameters are set correctly
            final DirectoryAdapter da = new DirectoryAdapter();
            map = da.outboundRegisterPeer(key, myKey);

            if (logger.isDebugEnabled()) {
                logger.debug("Retrieved {} entries in zone transfer from {} in {} millis", map.entryCount(), key,
                        (System.currentTimeMillis() - startZone));
            }

            // No entries mean we got the remote message,
            // and they just don't have any places registered yet
            if (map.isEmpty()) {
                return map;
            }

            if (loadMap != null) {

                // Remove and notify of any stale entries in loadMap
                removeStaleEntries(loadMap, key, startZone - this.zoneSlopWindowMillis, map, true);

                // Remove any duplicate entries from map
                // so that they don't get double notified to observers
                // do the load and notify all observers
                cleanLoadNotifyEntries(map, loadMap, myKey, REMOTE_COST_OVERHEAD);
            } else {
                logger.debug("Skipping load of {} new entries from {} returning list to caller", map.entryCount(), key);
            }
        } catch (Exception ex) {
            if (logger.isDebugEnabled()) {
                logger.debug("Unable to zone transfer with {}", key, ex);
            } else {
                logger.info("Unable to zone transfer with {}", key);
            }
            // Failure condition. Trigger state change in heartbeat manager
            this.heartbeat.setHealthStatus(key, HeartbeatManager.NO_CONTACT, "Remote directory failed zone transfer");
        }

        return map;
    }

    /**
     * Remove stale entries from the specified map and notify any observers Nothing older than checkpoint time can be
     * considered stale and nothing that is on the incming newEntries list can be considered stale since we would just be
     * adding it back again. Duplicates (non-stale entries) are removed from the newEntries map to avoid further confusion
     * but only if the cost is the same. Otherwise, we leave it so that a cost-change event can propagete from later code
     * but still avoid triggering a place removed event.
     *
     * @param loadMap the map we are removing from
     * @param key the key of the directory whose entries might be stale
     * @param checkpoint the time window to determine possible staleness
     * @param newEntries the new map arriving
     * @param performNotification only use observerManager if true
     * @return list of entries that were removed
     */
    private List<DirectoryEntry> removeStaleEntries(final DirectoryEntryMap loadMap, final String key, final long checkpoint,
            @Nullable final DirectoryEntryMap newEntries, final boolean performNotification) {

        final List<DirectoryEntry> staleEntries = new ArrayList<>();

        // Nothing newer than the checkpoint time can be stale
        // Nothing that is in teh loadMap but also duplicated in
        // the newEntries map can be stale either. This helps eliminate
        // the problem of removing it just so we can add it back.
        // This uses a mark and sweep to prevent concurrent mod exceptions
        for (final DirectoryEntry d : loadMap.collectAllMatching(key)) {
            // is it old enough to be possibly stale
            if (d.getAge() < checkpoint) {
                // is it missing from the new list
                if (newEntries != null) {
                    final List<DirectoryEntry> matches = newEntries.collectAllMatching(d.getKey());
                    if (matches.isEmpty()) {
                        logger.debug("Marking stale entry {}", d.getKey());
                        staleEntries.add(d);
                    } else if (matches.size() == 1) {
                        // remove from newEntries if exact dup
                        final DirectoryEntry me = matches.get(0);
                        if (me.getFullKey().equals(d.getFullKey())) {
                            logger.debug("Removing duplcate key from incoming map {}", me.getKey());
                            newEntries.removeEntry(me.getKey());
                        }
                    }
                } else {
                    // must be stale if no newEntries
                    logger.debug("Marking stale entry (no new entries){}", d.getKey());
                    staleEntries.add(d);
                }
            }
        }

        // Remove and notify
        if (!staleEntries.isEmpty()) {
            for (final DirectoryEntry stale : staleEntries) {
                logger.debug("Removing stale entry {}", stale.getKey());
                loadMap.removeEntry(stale.getKey());
            }

            if (performNotification) {
                logger.debug("Notifying observers of {} stale entry removals", staleEntries.size());
                this.observerManager.placeRemoveEntries(staleEntries);
            }
        } else {
            logger.debug("There were no stale entries to remove");
        }

        return staleEntries;
    }

    /**
     * Grok the details of a new entry list and figure out which observers need to be notified. Remove any entries that are
     * not going to end up being added anyway.
     *
     * @param map the new entries to understand
     * @param loadMap the map the entries will be loaded into
     * @param purgeKey remove any keys matching
     * @param costBump add cost to incoming
     */
    private void cleanLoadNotifyEntries(final DirectoryEntryMap map, @Nullable final DirectoryEntryMap loadMap, @Nullable final String purgeKey,
            final int costBump) {
        // Remove local entries from the new map
        // We already know about our local stuff.
        if (purgeKey != null) {
            final List<DirectoryEntry> removed = map.removeAllOnDirectory(purgeKey);
            logger.debug("Clean/load removed {} entries based on {} remaining = {}", removed.size(), purgeKey, map.entryCount());
        }

        // Add remote overhead to remaining
        if (costBump > 0) {
            map.addCostToMatching("*.*.*.*", costBump);
            logger.debug("Clean/load did cost-bump of {} on {} entries", costBump, map.entryCount());
        }

        if (loadMap != null) {
            final DirectoryEntryMap newEntries = new DirectoryEntryMap();
            final DirectoryEntryMap costChangeEntries = new DirectoryEntryMap();
            for (final DirectoryEntry e : map.allEntries()) {
                final List<DirectoryEntry> matches = loadMap.collectAllMatching(e.getKey());
                if (matches.isEmpty()) {
                    newEntries.addEntry(e);
                } else if ((matches.size() == 1) && e.isBetterThan(matches.get(0))) {
                    costChangeEntries.addEntry(e);
                }
            }

            // Merge remaining truly new entries and notify observers
            final int newCount = newEntries.entryCount();
            final int cceCount = costChangeEntries.entryCount();
            if (newCount > 0) {
                logger.debug("Loading {} new entries", newCount);
                loadMap.addEntries(newEntries);
                this.observerManager.placeAdd(newEntries.allEntryKeys());
            } else {
                logger.debug("Nothing truly new from {} entries", map.entryCount());
            }

            // .. and cost change entries
            if (cceCount > 0) {
                logger.debug("Loading {} better cost entries", cceCount);
                loadMap.addEntries(costChangeEntries);
                this.observerManager.placeCostChange(costChangeEntries.allEntryKeys());
            } else {
                logger.debug("No cost change entries from {} entries", map.entryCount());
            }

            // Now let the map that gets returned have just the new
            // and cost changed entries, no already known stuff
            if ((newCount + cceCount) < map.entryCount()) {
                map.clear();
                map.addEntries(costChangeEntries);
                map.addEntries(newEntries);
                map.sort();
            }
        } else {
            logger.debug("Clean/load got a null loadMap so skipping the load for {} entries", map.entryCount());
        }
    }


    /**
     * Get a list of the keys of all the peer directories known here
     *
     * @return set of string names of peer directory keys
     */
    @Override
    public Set<String> getPeerDirectories() {
        final Set<String> l = new TreeSet<>();
        for (final DirectoryEntry sde : this.peerDirectories) {
            l.add(sde.getKey());
        }
        return l;
    }

    /**
     * Add a list of entries to the directory Entries are kept in a Hash by "datatype::serviceType" Each entry is a List of
     * sorted DirectoryEntries sorted order on cost and then quality, held in a DirectoryEntryList object
     *
     * @param entryList the new entries to add
     */
    protected void addEntries(final List<DirectoryEntry> entryList) {
        logger.debug("Adding {} new entries", entryList.size());

        // add them
        this.entryMap.addEntries(entryList);

        // notify all observers
        this.observerManager.placeAddEntries(entryList);

        final Set<String> peerSet = new HashSet<>();
        for (final DirectoryEntry newEntry : entryList) {
            // Make a note of any possible new peer directory
            if (!isLocal(newEntry)) {
                final String peerKey = KeyManipulator.getDefaultDirectoryKey(newEntry.getKey());
                if (!isKnownPeer(peerKey) && !peerSet.contains(peerKey)) {
                    logger.debug("Discovered new peer {} from  addEntries {}", peerKey, newEntry.getKey());
                    peerSet.add(peerKey);
                } else {
                    logger.debug("No new peer implications to {} from {}", peerKey, newEntry.getKey());
                }
            }
        }

        if (!peerSet.isEmpty()) {
            logger.debug("Adding {} newly discovered peer entries", peerSet.size());
            addPeerDirectories(peerSet, false);
        }
    }

    /**
     * Add an entry to the directory Entries are kept in a Hash by "datatype::serviceType" Each entry is a List of sorted
     * DirectoryEntries sorted order on cost and then quality, held in a DirectoryEntryList object
     *
     * @param newEntry the new entry to add
     */
    protected void addEntry(final DirectoryEntry newEntry) {
        logger.debug("Adding single new entry {}", newEntry.getKey());
        final List<DirectoryEntry> entryList = new ArrayList<>();
        entryList.add(newEntry);
        addEntries(entryList);
    }

    /**
     * Determine if key represents a configured peer
     */
    public boolean isStaticPeer(final String key) {
        return this.staticPeers.contains(key);
    }

    /**
     * Determine if key represents a known peer
     */
    private boolean isKnownPeer(final String key) {
        for (final DirectoryEntry sde : this.peerDirectories) {
            if (KeyManipulator.isLocalTo(sde.getKey(), key)) {
                return true;
            }
        }
        return false;
    }

    /**
     * Remove a peer from the peer list
     *
     * @param key the peer to remove
     */
    @Nullable
    private DirectoryEntry removePeer(final String key) {
        if (this.emissaryNode.isStandalone()) {
            logger.debug("Cannot remove peers from standalone nodes");
            return null;
        }

        DirectoryEntry expeer = null;

        // Find it
        for (final DirectoryEntry sde : this.peerDirectories) {
            if (KeyManipulator.isLocalTo(sde.getKey(), key)) {
                // nb. COW Set does not support iterator.remove
                expeer = sde;
                break;
            }
        }

        // Nuke it
        if (expeer != null) {
            // Remove from COW set
            this.peerDirectories.remove(expeer);

            // Remove from heartbeat manager
            this.heartbeat.removeRemoteDirectory(expeer.getKey());

            // Notify all observers, but don't give them
            // access to our own Set object
            this.observerManager.peerUpdate(new HashSet<>(this.peerDirectories));

            // Remove the entries if any remain
            removePlaces(Collections.singletonList(KeyManipulator.getHostMatchKey(expeer.getKey())));
        }

        return expeer;
    }

    /**
     * Remove directory. Called from the heartbeat manager and from the EmissaryClient
     *
     * @param key string key of failed directory
     * @param permanent true if from a normal shutdown rather than a transient error
     * @return count of how many places were removed locally
     */
    @Override
    public int irdFailDirectory(final String key, final boolean permanent) {

        if (this.emissaryNode.isStandalone()) {
            logger.debug("Cannot fail remotes in standalone nodes");
            return 0;
        }

        // Validate remote input
        if (!KeyManipulator.isValid(key)) {
            logger.warn("Ignoring, called with invalid key {}", key);
            return 0;
        }

        if (this.shutdownInitiated) {
            logger.debug("Remote {} reported as failed, in shutdown", key);
            return 0;
        }

        // Reports of my demise are premature...
        if (isLocal(key)) {
            logger.warn(
                    "Someone reported me as failed, but I appear to be still running. Refusing to remove my own entries and propagate this filthy lie.");
            return 0;
        }

        final String dirKey = KeyManipulator.getDefaultDirectoryKey(key);
        final String hmKey = KeyManipulator.getHostMatchKey(key);
        int count = 0;

        logger.debug("irdFailDirectory {} {} permanent", key, (permanent ? "is" : "is not"));

        // Modify local entries for the failed remote directory
        // Permanent failure removes entries on failed directory.
        // Transient failure adjusts weight of entries on failed directory.
        if (permanent) {
            logger.debug("Permanent failure of remote {}", key);
            count += removePlaces(Collections.singletonList(hmKey));
        } else {
            // Change the cost for all places matching the
            // failed directory. This has the effect of causing them
            // not to be chosen as much.
            final List<DirectoryEntry> list = this.entryMap.collectAllMatching(hmKey);
            this.observerManager.placeCostChangeEntries(list);
        }

        // Handle permanent removal of remote directory
        if (permanent) {
            // Notify my heartbeat manager so that a normal deregistration
            // followed by a restart will trigger a state transition even
            // if under the timer check time
            this.heartbeat.setHealthStatus(key, HeartbeatManager.NO_CONTACT, "Permanent deregistration");

            // Remove from peer list
            if (isKnownPeer(dirKey)) {
                if (!isStaticPeer(dirKey)) {
                    logger.debug("Removing non-static peer {}", dirKey);
                    removePeer(dirKey);
                } else {
                    logger.debug("Static peer {} is deregistered but monitoring continues", dirKey);
                }
            } else {
                logger.warn("Directory {} failed but it isn't a peer??", dirKey);
            }
        }

        return count;
    }

    /**
     * Send directory failure message to another directory
     *
     * @param directory the place to send the message
     * @param failKey the key of the one that failed
     * @param permanent true if this is from normal deregistrtion
     */
    protected void sendFailMessage(final DirectoryEntry directory, final String failKey, final boolean permanent) {

        if (this.emissaryNode.isStandalone()) {
            logger.debug("No remote failure messages generated in standalone node");
            return;
        }

        try {
            new DirectoryAdapter().outboundFailDirectory(directory.getKey(), failKey, permanent);
        } catch (RuntimeException ex) {
            logger.error("Problem talking to directory {} to fail {}", directory.getKey(), failKey, ex);
        }
    }

    /**
     * Established or re-established contact with a remote directory. Check for presence on peer and initiate zone transfer
     * if needed.
     *
     * @param key the key of the directory we contacted
     */
    void contactedRemoteDirectory(final String key) {
        MDC.put(MDCConstants.SERVICE_LOCATION, KeyManipulator.getServiceLocation(myKey));
        logger.debug("Established contact with {}", key);

        if (isStaticPeer(key) && isKnownPeer(key)) {
            loadPeerEntries(key);
        } else {
            logger.warn("Contact established with {} but it is not a peer", key);
        }
        MDC.remove(MDCConstants.SERVICE_LOCATION);
    }

    /**
     * Register a place with all of its complete keys
     *
     * @param keys list of complete keys with expense
     */
    @Override
    public void addPlaces(@Nullable final List<String> keys) {
        // Validate contract
        if ((keys == null) || keys.isEmpty() || (keys.get(0) == null)) {
            logger.error("addPlaces skipping place with no keys");
            return;
        }

        // Build a list of DirectoryEntry out of these
        final List<DirectoryEntry> del = new ArrayList<>();
        for (final String key : keys) {
            final DirectoryEntry d = new DirectoryEntry(key);
            del.add(d);
        }

        irdAddPlaces(del, false);
    }

    /**
     * Register a list of entries. This signature only meant to be called from within EmissaryClient code. Each entry will
     * have a separate key, cost and quality but should all be local to each other.
     *
     * @param entryList list of directoryEntry to add
     * @param propagating true if going back down the directory chain
     */
    @Override
    public void irdAddPlaces(@Nullable final List<DirectoryEntry> entryList, final boolean propagating) {

        if ((entryList == null) || entryList.isEmpty()) {
            logger.debug("irdAddPlaces called with null or empty entryList!");
            return;
        }

        // Validate remote input
        for (final DirectoryEntry d : entryList) {
            if (d == null || !d.isValid()) {
                logger.warn("Ignoring irdAddPlaces called with {} DirectoryEntry objects due to invalid key in {}", entryList.size(), d);
                return;
            }
        }

        // These keys better all be from the same emissary node
        // We should check that they are and throw if not
        final String place = entryList.get(0).getKey(); // !!
        final boolean isLocal = isLocal(place);

        if (logger.isDebugEnabled()) {
            logger.debug("Starting irdAddPlaces with {} entries for {} place  - place={}, myKey={}", entryList.size(),
                    (isLocal ? "local" : "non-local"), place, myKey);
        }

        // make a defensive deep copy of the incoming list, so we
        // can safely proxy and adjust cost as needed
        final List<DirectoryEntry> entries = new ArrayList<>();
        for (final DirectoryEntry d : entryList) {
            entries.add(new DirectoryEntry(d, DirectoryEntry.PRESERVE_TIME));
        }

        // Let each directory add this non-local component to the cost
        // based on the place locality. This should be enough to
        // dwarf any cost variants among truly local places
        if (!isLocal) {
            for (final DirectoryEntry d : entries) {
                d.addCost(REMOTE_COST_OVERHEAD);
            }
        }

        logger.debug("Doing addEntries for {} new entries", entries.size());
        addEntries(entries);

        // Notify peers if entries are being added locally
        if (isLocal && !this.peerDirectories.isEmpty()) {
            // This may fail if the peer is not up yet. That is normal.
            for (final DirectoryEntry peer : this.peerDirectories) {
                if (this.heartbeat.isAlive(peer.getKey())) {
                    registerWith(peer, entries, false);
                } else if (logger.isDebugEnabled()) {
                    logger.debug("Not registering {} with peer {}, not alive right now", entries.size(), peer.getKey());
                }
            }
        }
    }

    /**
     * Private helper to register directories. This method handles multiple directory entries, each can have separate key,
     * description, cost, and quality
     *
     * @param dir the place entry to register
     * @param entryList the new entries
     * @param propagating true if propagating back down from higher level directory
     */
    protected void registerWith(final DirectoryEntry dir, final List<DirectoryEntry> entryList, final boolean propagating) {
        if (logger.isDebugEnabled()) {
            logger.debug("registerWith({},{},{})", dir.getKey(), entryList, propagating);
        }

        try {
            new DirectoryAdapter().outboundAddPlaces(dir.getKey(), entryList, propagating);
            logger.debug("registration succeeded");
        } catch (RuntimeException ex) {
            logger.warn("DirectoryPlace.registerWith: Problem talking to directory {} to add {} entries", dir.getKey(), entryList.size(), ex);
        }
    }

    /**
     * Called by mobile agent to get a destination for a payload
     *
     * @param dataId key to entryMap, dataType::serviceType, e.g. UNKNOWN::ID
     * @param payload the payload being evaluated
     * @param lastPlace place agent visited last, this is not stateless
     * @return List of DirectoryEntry with next place to go or empty list if none
     */
    @Override
    public List<DirectoryEntry> nextKeys(final String dataId, final IBaseDataObject payload, final DirectoryEntry lastPlace) {
        // Normal lookup in public entry map
        logger.debug("nextKey called with dataId='{}', and lastPlace={}", dataId, (lastPlace == null ? "null" : lastPlace.getFullKey()));

        List<DirectoryEntry> entries = nextKeys(dataId, payload, lastPlace, this.entryMap);
        if (logger.isDebugEnabled() && (entries != null) && !entries.isEmpty()) {
            logger.debug("nextKey produced {} entries from main map {}", entries.size(), entries);
        }
        return entries;
    }

    /**
     * Get tne next logical entry based on current dataId and last place visited
     *
     * @param dataId key to entryMap, dataType::serviceType, e.g. UNKNOWN::ID
     * @param payload the payload being routed
     * @param lastPlace place agent visited last, this is not stateless
     * @param entries map of DirectoryEntry stored in this directory
     * @return List of DirectoryEntry with next place to go or empty list if none
     */
    protected List<DirectoryEntry> nextKeys(final String dataId, final IBaseDataObject payload, @Nullable final DirectoryEntry lastPlace,
            final DirectoryEntryMap entries) {
        // Find the entry list for the type being requested
        final DirectoryEntryList currentList = getWildcardedEntryList(dataId, entries);

        // Nothing for the dataId or any wildcarded versions, we are done
        if ((currentList == null) || currentList.isEmpty()) {
            logger.debug("nextKey - nothing found here for {}", dataId);
            return List.of();
        }

        // remove denied entries
        currentList.removeIf(de -> de.getLocalPlace() != null && de.getLocalPlace().isDenied(payload.currentForm()));

        if (currentList.isEmpty()) {
            logger.debug("nextKeys - no non-DENIED entries found here for {}", dataId);
            return List.of();
        }
        // The list we are building for return to the caller
        final List<DirectoryEntry> keyList = new ArrayList<>();

        // The dataId this time is different from the last place
        // visited, so we can just choose from the list of the lowest
        // expense places and get on with it
        DirectoryEntry trialEntry = currentList.getEntry(0);
        if (lastPlace == null || (!lastPlace.getDataId().equals(dataId) && !trialEntry.getServiceLocation().equals(lastPlace.getServiceLocation()))) {
            logger.debug("doing first in list for {}", trialEntry);
            keyList.add(currentList.pickOneOf(trialEntry.getExpense()));
        } else {
            // Trying a particular "dataType::serviceType" pair again
            for (int i = 0; i < currentList.size(); i++) {
                trialEntry = currentList.getEntry(i);

                // Skip entry if less/same expensive. Includes the obvious
                // test, plus evaluation of whether we would choose a
                // particular non-local place if it was here. If we wouldn't
                // choose it if it was here, we certainly aren't willing
                // to move to get it.
                final int te = trialEntry.getExpense() % REMOTE_EXPENSE_OVERHEAD;
                final int le = lastPlace.getExpense() % REMOTE_EXPENSE_OVERHEAD;

                // Always skip service cheaper than what we already did
                if (te < le) {
                    logger.debug("nextKey skip lower cost {}", trialEntry.getFullKey());
                    continue;
                }

                // If relaying, we want to be hopping closer to the target
                if ((te == le) && (trialEntry.getExpense() >= lastPlace.getExpense())
                        && !trialEntry.getServiceHostUrl().equals(lastPlace.getServiceHostUrl())) {
                    logger.debug("nextKey skip equal cost {}", trialEntry.getFullKey());
                    continue;
                }

                // If equal or lower cost, no point in using the entry
                if ((trialEntry.getExpense() <= lastPlace.getExpense()) && trialEntry.getServiceHostUrl().equals(lastPlace.getServiceHostUrl())) {
                    logger.debug("nextKey skip lower cost not relaying {}", trialEntry.getFullKey());
                    continue;
                }

                // Entry is more expense and different service
                logger.debug("nextKey - doing next in list");
                keyList.add(currentList.pickOneOf(trialEntry.getExpense()));
                break;
            }

        }

        return keyList;
    }

    /**
     * Get the possibly wildcarded DirectoryEntryList for the dataId
     *
     * @param dataId the type of data being queried
     * @param entries the entry map to use
     * @return DirectoryEntryList or null if none
     */
    protected DirectoryEntryList getWildcardedEntryList(final String dataId, final DirectoryEntryMap entries) {
        // Ids of the form FOO-BAR(ASCII)-BAZ will be wildcarded as:
        // FOO-BAR(ASCII)-BAZ
        // FOO-BAR(*)-BAZ
        // FOO-BAR(*)-*
        // FOO-*
        // See WildcardEntry for a more thorough example
        return WildcardEntry.getWildcardedEntry(dataId, entries);
    }

    /**
     * Payloads that need to traverse the relay gateway can visit here to be forwarded on to the correct destination
     * <p>
     * The payload will have the simple current form that caused this relay point to be selected replace with the full
     * four-tupled key of the place matching the request on the proper side of this relay point.
     *
     * @param d the payload to be inspected
     */
    @Override
    public void process(final IBaseDataObject d) {
        if (d.currentForm().equals(this.myKey)) {
            logger.debug("Probe routing has been removed");
        } else {
            logger.debug("Doing routing on '{}'", d.shortName());
            handleRouting(d);
        }
    }

    /**
     * Handle the routing for a payload
     *
     * @param d the visiting payload
     */
    protected void handleRouting(final IBaseDataObject d) {
        // The source entry we are interested in is the one that got us
        // here. The "lastPlaceVisited" should be my own key, so we want
        // the one before that.
        DirectoryEntry sourceEntry = d.getPenultimatePlaceVisited();

        // Source entry still null?
        if (sourceEntry == null) {
            logger.debug("Payload had no source entry and no places visited. " + "Using my own directory key, which is probably wrong.");
            sourceEntry = this.getDirectoryEntry();
        }

        // Last place visited shows the key that cause the payload
        // to arrive at this place since it is logged into the history
        // just before calling this method. The dataId on this entry
        // is the entry that finally was selected for use, so we reuse
        // it here in the proper entry map.
        final DirectoryEntry thisEntry = d.getLastPlaceVisited();
        final String dataId = thisEntry.getDataId();

        if (logger.isDebugEnabled()) {
            logger.debug("Relay payload '{}' arrived with form {} coming from {} arrival entry {} arrival dataId={}", d.shortName(), d.currentForm(),
                    sourceEntry.getKey(), thisEntry.getKey(), dataId);
        }

        // Where we want to go from here
        List<DirectoryEntry> destination = nextKeys(dataId, d, sourceEntry);

        if (logger.isDebugEnabled()) {
            logger.debug("Selected {} entries {} from incoming {} and data id {} current form={}", destination.size(), destination,
                    sourceEntry.getKey(), dataId, d.currentForm());
        }

        // Replace the current form with the full key version of same
        d.popCurrentForm();
        for (final DirectoryEntry destEntry : destination) {
            d.pushCurrentForm(destEntry.getKey());
        }

        if (logger.isDebugEnabled()) {
            logger.debug("Leaving relay gateway with current form {}", d.getAllCurrentForms());
        }
    }

    /**
     * Make directory contents available for debug or display and analysis
     *
     * @return List of DirectoryEntry (copies)
     */
    @Override
    public List<DirectoryEntry> getEntries() {
        final List<DirectoryEntry> entries = this.entryMap.allEntries();
        return DirectoryEntryList.deepCopy(entries, true);
    }

    /**
     * Get list of DirectoryEntry that match the key pattern
     *
     * @param pattern a key pattern to match
     * @return List of DirectoryEntry (copies)
     */
    @Override
    public List<DirectoryEntry> getMatchingEntries(final String pattern) {
        final List<DirectoryEntry> entries = this.entryMap.collectAllMatching(pattern);
        return DirectoryEntryList.deepCopy(entries, true);
    }

    /**
     * Make directory contents entry keys available for display and transfer
     *
     * @return Set of String in the DataId format DATATYPE::SERVICETYPE
     */
    @Override
    public Set<String> getEntryKeys() {
        return new TreeSet<>(this.entryMap.keySet());
    }

    /**
     * Get the requested directory entry
     *
     * @param dataId the key to the entry Map set of DirectoryEntryList objects
     * @return a DirectoryEntryList object for the key or null if none
     */
    @Override
    public DirectoryEntryList getEntryList(final String dataId) {
        final DirectoryEntryList value = this.entryMap.get(dataId);
        return new DirectoryEntryList(value, DirectoryEntryList.DEEP_COPY, DirectoryEntryList.PRESERVE_TIME);
    }

    /**
     * Deregister places removing all keys for the specified places.
     *
     * @param keys four-tuple key for the place
     * @return count of how many keys removed
     */
    @Override
    public int removePlaces(final List<String> keys) {
        return irdRemovePlaces(keys, false);
    }

    /**
     * Deregister places removing all keys for the specified places. Should only be called externally from EmissaryClient
     *
     * @see #removePlaces(List)
     * @param keys four-tuple key for the place
     * @param propagating true if going down the line
     * @return count of how many keys removed
     */
    @Override
    public int irdRemovePlaces(@Nullable final List<String> keys, final boolean propagating) {
        if (this.emissaryNode.isStandalone()) {
            logger.debug("Cannot remove remote places in standalone nodes");
            return 0;
        }

        if ((keys == null) || keys.isEmpty()) {
            logger.warn("Ignoring null or empty key list for irdRemovePlaces");
            return 0;
        }

        // Validate remote input
        for (final String key : keys) {
            if (!KeyManipulator.isValid(key)) {
                logger.warn("Ignoring irdRemovePlaces called with {} keys due to invalid key {}", keys.size(), key);
                return 0;
            }
        }

        // Note we don't just pull the dataId from the map
        // because we anticipate the incoming keys will be
        // wildcarded as places go away rather than individual
        // service proxy keys
        final List<DirectoryEntry> matches = new ArrayList<>();
        for (final String key : keys) {
            final List<DirectoryEntry> m = this.entryMap.removeAllMatching(key);
            matches.addAll(m);
        }

        final int count = matches.size();
        if (logger.isDebugEnabled()) {
            logger.debug("Found {} entries for removal matching {} keys={}", count, keys.size(), keys);
        }

        // Nothing do to, nothing to propagate
        if (count == 0) {
            return 0;
        }

        int localCount = 0;

        // Count the locals and hit the observers
        for (final DirectoryEntry e : matches) {
            if (isLocal(e.getKey())) {
                localCount++;
            }
            // Notify observers of entry removal
            this.observerManager.placeRemove(e.getFullKey());
        }

        // Notify peers if local entries are being removed
        if (!this.peerDirectories.isEmpty() && (localCount > 0)) {
            // This may fail if the peer is not up. That is normal.
            for (final DirectoryEntry peer : this.peerDirectories) {
                if (this.heartbeat.isAlive(peer.getKey())) {
                    logger.debug("Deregistering {} keys from peer {}", keys.size(), peer);
                    deregisterFrom(peer, keys, false);
                }
            }
        }

        final List<String> localKeys = new ArrayList<>();
        for (final DirectoryEntry match : matches) {
            final String key = match.getKey();
            if (isLocal(key)) {
                logger.debug("Removed {} putting in local bucket", key);
                localKeys.add(key);
            }
        }

        return localKeys.size();
    }

    /**
     * Private helper to deregister entry from directories
     *
     * @param dir the remote directory to deregister from
     * @param keys the list of keys the place can handle (SERVICE_PROXY)
     * @param propagating true if propagating across levels
     */
    protected void deregisterFrom(final DirectoryEntry dir, final List<String> keys, final boolean propagating) {
        try {
            // Follow the logic to irdRemovePlaces on the remote side
            new DirectoryAdapter().outboundRemovePlaces(dir.getKey(), keys, propagating);
        } catch (RuntimeException ex) {
            logger.error("DirectoryPlace.deregisterFrom: " + "Problem talking to directory " + dir.getKey() + " to deregister keys", ex);
        }
    }

    /**
     * Shutdown this place and deregister and notify any peers and observers that this directory is closing
     */
    @Override
    public void shutDown() {
        if (this.shutdownInitiated) {
            return;
        }
        this.shutdownInitiated = true;
        this.running = false;

        logger.debug("Initiating directory shutdown");

        if (this.heartbeat != null) {
            this.heartbeat.shutDown();
        }

        if (!this.emissaryNode.isStandalone()) {

            // Notify peers of my demise
            for (final DirectoryEntry peer : this.peerDirectories) {
                logger.debug("Sending fail msg to peer {}", peer);
                sendFailMessage(peer, myKey, true);
            }
        }

        // Remove all entries and notify all observers
        final List<DirectoryEntry> matches = this.entryMap.collectAllMatching("*.*.*.*");
        for (final DirectoryEntry e : matches) {
            this.observerManager.placeRemove(e.getFullKey());
        }

        // Nuke em
        this.entryMap.clear();

        // Remove peers and Notify all observers that we are leaving the group
        this.peerDirectories.clear();
        this.observerManager.peerUpdate(this.peerDirectories);

        unbindFromNamespace();
        logger.info("Done shutting down DirectoryPlace");

    }

    /**
     * Add an observer for one of the observable activities in the directory The runtime class of the observer determines
     * what is being observed
     *
     * @param observer the new DirectoryObserver to add
     */
    @Override
    public void addObserver(final DirectoryObserver observer) {
        this.observerManager.addObserver(observer);
        logger.debug("We now have {} observers registered", this.observerManager.getObserverCount());
    }

    /**
     * Remove an observer previously registered with this directory
     *
     * @param observer the object to remove
     * @return true if it was found on the list
     */
    @Override
    public boolean deleteObserver(final DirectoryObserver observer) {
        final boolean removed = this.observerManager.deleteObserver(observer);
        logger.debug("We now have {} observers registered", this.observerManager.getObserverCount());
        return removed;
    }

    /**
     * Pull the local directory from the namespace and return it. This does not work in some test scenarios where we have
     * multiple non-local directories in a single JVM.
     *
     * @return the local directory instance
     * @throws EmissaryException when directory does not exist in namespace
     */
    public static IDirectoryPlace lookup() throws EmissaryException {
        final String name = "DirectoryPlace";

        final Object nsval = Namespace.lookup(name);
        if (nsval instanceof IDirectoryPlace) {
            return (IDirectoryPlace) nsval;
        }

        throw new EmissaryException("Bad directory place lookup found " + nsval);
    }

    /**
     * Get the sync status of a remote directory as seen from this directory. Note that this method only can return true for
     * things that the HeartbeatManager is tracking, i.e. peer directories of this instance.
     *
     * @param key the key of the remote directory
     * @return true if remote is reported as being up, false otherwise
     */
    @Override
    public boolean isRemoteDirectoryAvailable(final String key) {
        return (this.heartbeat != null) && this.heartbeat.isHealthy(key);
    }

    /**
     * Force a heartbeat with a particular directory represented by key does not necessarily need to be one that the
     * HeartbeatManager is already tracking and calling this method will not add it permanently to any list to be tracked.
     * This is a one time event and can be used at the caller's discretion. Note however,that if the key is not a peer of
     * this directory, a warning will be issued here when the success or failure action is taken by the heartbeat manager.
     * It can be ignored in this case. Note also, that a true return from this method merely means that the remote directory
     * responded to the heartbeat method, not that the remote directory is in sync yet with this one.
     *
     * @see #isRemoteDirectoryAvailable(String)
     * @param key the key of the remote directory
     * @return true if remote is up, false otherwise
     */
    @Override
    public boolean heartbeatRemoteDirectory(final String key) {
        return (this.heartbeat != null) && this.heartbeat.heartbeat(key);
    }

    /**
     * Indicate if directory is running
     *
     * @return true if running
     */
    @Override
    public boolean isRunning() {
        return this.running;
    }

    /**
     * Indicate whether shutdown has been initiated
     *
     * @return true if shutdown initiated
     */
    @Override
    public boolean isShutdownInitiated() {
        return this.shutdownInitiated;
    }
}