Startup.java
package emissary.admin;
import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.config.ServiceConfigGuide;
import emissary.core.EmissaryException;
import emissary.core.EmissaryRuntimeException;
import emissary.core.Namespace;
import emissary.directory.DirectoryEntry;
import emissary.directory.DirectoryPlace;
import emissary.directory.EmissaryNode;
import emissary.directory.IDirectoryPlace;
import emissary.directory.KeyManipulator;
import emissary.pickup.PickUpPlace;
import emissary.place.CoordinationPlace;
import emissary.place.IServiceProviderPlace;
import emissary.server.EmissaryServer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
public class Startup {
public static final int DIRECTORYSTART = 0;
public static final int DIRECTORYADD = 1;
public static final int DIRECTORYDELETE = 2;
public static final String ACTIONADD = "-add";
public static final String ACTIONDELETE = "-delete";
public static final String ACTIONSTART = "-start";
private static final String PARALLEL_PLACE_STARTUP_CONFIG = "PARALLEL_PLACE_STARTUP";
@SuppressWarnings("NonFinalStaticField")
static int directoryAction = DIRECTORYADD;
// If we are an emissary node these will be present
private final EmissaryNode node;
// Our logger
private static final Logger logger = LoggerFactory.getLogger(Startup.class);
// The startup config object
@Nullable
protected Configurator hostsConfig = null;
// Successfully started directories
protected final Map<String, String> localDirectories = new ConcurrentHashMap<>();
// Failed directories
protected final Map<String, String> failedLocalDirectories = new ConcurrentHashMap<>();
protected final Set<String> failedPlaces = ConcurrentHashMap.newKeySet();
// Collection of the places as they finish coming up
protected static final Map<String, String> places = new ConcurrentHashMap<>();
// Collection of places that are being started
protected final Set<String> placesToStart = ConcurrentHashMap.newKeySet();
// sorted lists of the place types, grouped by hostname
protected final Map<String, Set<String>> placeLists = new ConcurrentHashMap<>();
protected final Map<String, Set<String>> pickupLists = new ConcurrentHashMap<>();
// sets to keep track of possible invisible place startup
protected static final Set<String> activeDirPlaces = new LinkedHashSet<>();
protected static final Set<String> placeAlreadyStarted = new LinkedHashSet<>();
// invisible place startups occurred in strict mode
@SuppressWarnings("NonFinalStaticField")
protected static boolean invisPlacesStartedInStrictMode = false;
/**
* n return the full DNS name and port without the protocol part
*/
public static String placeHost(final String key) {
return KeyManipulator.getServiceHost(key);
}
/**
* return the type of place specified Key manipulator does not work on this, though it seems to if the key has dots in
* the hostname like many do.
*/
public static String placeName(final String key) {
final int pos = key.lastIndexOf("/");
if (pos != -1) {
return key.substring(pos + 1);
}
return key;
}
/**
* Set the action based on the command line argument
*/
public static int setAction(final String optarg) {
if (ACTIONADD.equalsIgnoreCase(optarg)) {
return DIRECTORYADD;
}
if (ACTIONDELETE.equalsIgnoreCase(optarg)) {
return DIRECTORYDELETE;
}
// default
return DIRECTORYSTART;
}
private static String makeConfig(final String path, final String file) {
if (file.startsWith("/") && new File(file).exists()) {
return file;
}
return ConfigUtil.getConfigFile(path, file);
}
/**
* The main entry point
*/
@SuppressWarnings("SystemOut")
public static void main(final String[] args) throws IOException, EmissaryException {
//
// Evaluate arguments to the static main
//
// Need config path and startup config file on command line
if (args.length < 1 || args.length > 3) {
System.err.println("Usage: java emissary.admin.Startup " + "[-start|-add|-delete] [config_path] config_file");
return;
}
final String startupConfigFile;
if (args.length == 1) {
directoryAction = setAction(ACTIONSTART);
if (args[0].startsWith("/") || args[0].toUpperCase(Locale.getDefault()).startsWith("HTTP")) {
startupConfigFile = args[0];
} else {
startupConfigFile = ConfigUtil.getConfigFile(args[0]);
}
} else if (args.length == 2) {
directoryAction = setAction(ACTIONSTART);
startupConfigFile = makeConfig(args[0], args[1]);
} else {
directoryAction = setAction(args[0]);
startupConfigFile = makeConfig(args[1], args[2]);
}
final Startup start = new Startup(startupConfigFile, new EmissaryNode());
start.start();
logger.info("The system is up and running fine. All ahead Warp-7.");
}
/**
* Start the system
*/
public void start() throws EmissaryException {
final boolean bootStatus = bootstrap();
if (!bootStatus) {
throw new EmissaryException("Unable to bootstrap the system");
}
// bootstrap now only starts the processing places (this allows
// derived classes to hold off starting the pickup places until
// they've completed their own set-up). So we have to startup
// the pickup places here.
startPickUpPlaces();
if (!verifyNoInvisiblePlacesStarted() && node.isStrictStartupMode()) {
invisPlacesStartedInStrictMode = true;
}
}
// $AUTO: Constructors.
/**
* Class constructor loads the config file
*/
public Startup(final String startupConfigFile, EmissaryNode node) throws IOException {
// Read in startup config file specifying place/host setup
this(new ServiceConfigGuide(startupConfigFile), node);
}
public Startup(final InputStream startupConfigStream, EmissaryNode node) throws IOException {
this(new ServiceConfigGuide(startupConfigStream), node);
}
public Startup(final Configurator config, EmissaryNode node) {
this.hostsConfig = config;
this.node = node;
}
public boolean bootstrap() {
//
// Setup the Local Directories in a hashtable
//
final boolean status = localDirectorySetup(this.localDirectories);
if (!status) {
logger.warn("Startup: local directory setup failed.");
return false;
}
//
// Set up the rest of the Places except no pickups
//
sortPlaces(this.hostsConfig.findEntries("PLACE"));
logger.info("Ready to start {} place(s) and {} PickUp place(s).", hashListSize(this.placeLists), hashListSize(this.pickupLists));
logger.info("Processing non-pickup places...");
startMapOfPlaces(this.placeLists);
//
// Wait for all places to get started and registered
//
this.stopAndWaitForPlaceCreation();
logger.debug("Done with bootstrap phase");
return true;
}
/**
* Start all the pickup places and wait for them to finish
*/
void startPickUpPlaces() {
startMapOfPlaces(this.pickupLists);
logger.info("Processing pickup places...");
//
// Wait for all places to get started and registered
//
stopAndWaitForPlaceCreation();
}
void startMapOfPlaces(final Map<String, Set<String>> m) {
if (hashListSize(m) > 0) {
for (final Set<String> placeList : m.values()) {
final boolean status = placeSetup(directoryAction, this.localDirectories, places, placeList);
if (!status) {
logger.warn("Startup: places setup failed!");
return;
}
}
}
logger.debug("done with map of {} places", hashListSize(m));
}
/**
* Count all entries in lists of a map
*/
private static int hashListSize(@Nullable final Map<String, Set<String>> m) {
int total = 0;
if (m != null) {
for (final Set<String> l : m.values()) {
if (l != null) {
total += l.size();
}
}
}
return total;
}
protected boolean localDirectorySetup(final Map<String, String> localDirectoriesArg) {
final List<String> hostParameters = this.hostsConfig.findEntries("LOCAL_DIRECTORY");
final long start = System.currentTimeMillis();
final Map<String, String> dirStarts = new HashMap<>();
EmissaryNode emissaryNode = EmissaryServer.getInstance().getNode();
for (final String thePlaceLocation : hostParameters) {
final String host = placeHost(thePlaceLocation);
if (KeyManipulator.isLocalTo(thePlaceLocation, "http://" + this.node.getNodeName() + ":" + this.node.getNodePort() + "/StartupEngine")) {
final String thePlaceClassStr = PlaceStarter.getClassString(thePlaceLocation);
if (logger.isInfoEnabled()) {
logger.info("Doing local startup for directory {}({}) ", getLocationName(thePlaceLocation), thePlaceClassStr);
}
final IServiceProviderPlace p = PlaceStarter.createPlace(thePlaceLocation, null, thePlaceClassStr, null, emissaryNode);
if (p != null) {
dirStarts.put(host, thePlaceLocation);
localDirectoriesArg.put(host, p.toString());
} else {
localDirectoriesArg.remove(thePlaceLocation);
logger.warn("Giving up on directory {}", thePlaceLocation);
}
} else {
logger.warn("Directory location is not local: {}", thePlaceLocation);
}
}
// All local directories must be up before proceeding
logger.debug("Waiting for all local directories to start, expecting {}", dirStarts.size());
int prevCount = 0;
while (localDirectoriesArg.size() + this.failedLocalDirectories.size() < dirStarts.size()) {
final int newCount = localDirectoriesArg.size() + this.failedLocalDirectories.size();
if (newCount > prevCount && newCount < dirStarts.size()) {
logger.info("Completed {} of {} local directories", localDirectoriesArg.size(), dirStarts.size());
prevCount = newCount;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (logger.isDebugEnabled()) {
logger.debug("Directories all up in {}s", (System.currentTimeMillis() - start) / 1000.0);
}
return true;
}
/**
* Start all places on the list on a thread, return control immediately. All places in hostParameters list must be for
* the same host:port!
*/
protected boolean placeSetup(final int directoryActionArg, final Map<String, String> localDirectoriesArg, final Map<String, String> placesArg,
final Set<String> hostParameters) {
// Track how many places we are trying to start
this.placesToStart.addAll(hostParameters);
final Thread t = new Thread(() -> {
final String thePlaceHost = placeHost(hostParameters.stream().findFirst().get());
final String localDirectory = localDirectoriesArg.get(thePlaceHost);
if (localDirectory == null) {
hostParameters.forEach(placesToStart::remove);
if (failedLocalDirectories.get(thePlaceHost) != null) {
logger.warn("Skipping {} due to previously failed directory", thePlaceHost);
} else {
logger.warn("Skipping {} due to local Directory not found", thePlaceHost);
}
return;
}
if (directoryActionArg != DIRECTORYSTART && directoryActionArg != DIRECTORYADD) {
hostParameters.forEach(placesToStart::remove);
return;
}
logger.debug("Using localDir={} to create {} places on {}", localDirectory, hostParameters.size(), thePlaceHost);
// Create a stream of places that can be configured to start in parallel
boolean parallelPlaceStartup = hostsConfig.findBooleanEntry(PARALLEL_PLACE_STARTUP_CONFIG, false);
Stream<String> hostParametersStream = StreamSupport.stream(hostParameters.spliterator(), parallelPlaceStartup);
logger.info("Using parallel place startup: {}", hostParametersStream.isParallel());
// Start everything in hostParameters
// (PLACE lines from cfg file for a given host
hostParametersStream.forEach(thePlaceLocation -> {
placeName(thePlaceLocation);
// Get the class name and Class object for what we want to make
final String thePlaceLocName = getLocationName(thePlaceLocation);
final String thePlaceClassString = PlaceStarter.getClassString(thePlaceLocation);
StringBuilder startupBuilder =
new StringBuilder("Doing local startup on ")
.append(thePlaceLocName)
.append("(")
.append(thePlaceClassString).append(")...");
if (thePlaceClassString == null) {
startupBuilder.append("skipping, no class string!!");
placesToStart.remove(thePlaceLocation);
logger.warn(startupBuilder.toString());
return;
}
logger.debug("Starting place {}", thePlaceLocation);
if (KeyManipulator.isLocalTo(thePlaceLocation, String.format("http://%s:%s/StartupEngine", node.getNodeName(), node.getNodePort()))) {
if (directoryActionArg == DIRECTORYADD && Namespace.exists(thePlaceLocation)) {
// logger.info("Local place already exists: {}", thePlaceLocation);
startupBuilder.append("local place already exists");
placesToStart.remove(thePlaceLocation);
// add place to placeAlreadyStarted list, so can be verified in verifyNoInvisibleStartPlaces
placeAlreadyStarted.add(thePlaceLocation.substring(thePlaceLocation.lastIndexOf("/") + 1));
logger.info(startupBuilder.toString());
return;
}
final IServiceProviderPlace p = PlaceStarter.createPlace(thePlaceLocation, null, thePlaceClassString, localDirectory);
if (p != null) {
placesArg.put(thePlaceLocation, thePlaceLocation);
startupBuilder.append("done!");
logger.info(startupBuilder.toString());
} else {
// logger.error("{} failed to start!", thePlaceLocation);
failedPlaces.add(thePlaceLocation);
placesToStart.remove(thePlaceLocation);
startupBuilder.append("FAILED!!");
logger.error(startupBuilder.toString());
}
}
});
});
t.start();
return true;
}
/**
* Check to see if all the places have started and been registered in the directory. This doesn't account for
* directories, just things started with a "PLACE" tag
*/
protected void stopAndWaitForPlaceCreation() {
int numPlacesExpected = this.placesToStart.size();
int numPlacesFound;
int numPlacesFoundPreviously = 0;
logger.info("Waiting for {} places to start {}", placesToStart.size(),
placesToStart.stream().map(s -> StringUtils.substringAfterLast(s, "/")).sorted().collect(Collectors.toList()));
do {
if (this.placesToStart.size() != numPlacesExpected) {
logger.info("Now waiting for {} places to start. (originally {} places)", this.placesToStart.size(), numPlacesExpected);
numPlacesExpected = this.placesToStart.size();
}
numPlacesFound = places.size();
if (numPlacesFound >= numPlacesExpected) {
boolean failedPlaceStartups = false;
if (!this.failedPlaces.isEmpty()) {
failedPlaceStartups = true;
String failedPlaceList = String.join("; ", this.failedPlaces);
logger.warn("The following places have failed to start: {}", failedPlaceList);
}
if (!CoordinationPlace.getFailedCoordinationPlaces().isEmpty()) {
failedPlaceStartups = true;
String failedCoordPlaceList = String.join("; ", CoordinationPlace.getFailedCoordinationPlaces());
logger.warn("The following coordination places have failed to start: {}", failedCoordPlaceList);
}
// check if strict startup & places/coordination places failed, if yes, shut down server
if (this.node.isStrictStartupMode() && failedPlaceStartups) {
logger.error("Server failed to start due to Strict mode being enabled. To disable strict mode, " +
"run server start command without the --strict flag");
logger.error("Server shutting down");
System.exit(1);
}
// normal termination of the loop
logger.debug("Woohoo! {} of {} places are up and running.", numPlacesFound, numPlacesExpected);
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (numPlacesFound != numPlacesFoundPreviously) {
numPlacesFoundPreviously = numPlacesFound;
final float percentageUp = (float) numPlacesFound / (float) numPlacesExpected;
final String leadString;
if (percentageUp < 0.20) {
leadString = "Hmmm... only ";
} else if (percentageUp < 0.40) {
leadString = "Ok, now ";
} else if (percentageUp < 0.60) {
leadString = "Making progress, ";
} else if (percentageUp < 0.80) {
leadString = "Over half way there! ";
} else if (percentageUp < 0.95) {
leadString = "Almost ready! ";
} else if (numPlacesFound + 1 == numPlacesExpected) {
leadString = "One more to go... ";
} else {
leadString = "Yeah! ";
}
logger.debug("{}{} of {} places are up and running.", leadString, numPlacesFound, numPlacesExpected);
}
} while (true); // break terminated loop
}
/**
* sort all the PLACE entries into either a processing place or a pickup place
*/
protected void sortPlaces(final List<String> placeList) {
for (final String location : placeList) {
final String className = PlaceStarter.getClassString(location);
if (className == null) {
continue;
}
try {
sortPickupOrPlace(location, PickUpPlace.implementsPickUpPlace(Class.forName(className)) ? this.pickupLists : this.placeLists);
} catch (ClassNotFoundException e) {
logger.error("Could not create place {}", className, e);
}
}
}
private static void sortPickupOrPlace(String theLocation, Map<String, Set<String>> placeList) {
final String host = placeHost(theLocation);
Set<String> l = placeList.computeIfAbsent(host, k -> new LinkedHashSet<>());
if (l.contains(theLocation)) {
logger.warn("Sorting places found duplicate {}({}), skipping!", getLocationName(theLocation), PlaceStarter.getClassString(theLocation));
} else {
l.add(theLocation);
}
}
protected static String getLocationName(String location) {
return StringUtils.substringAfterLast(location, "/");
}
/**
* Verifies the active directory places vs places started up. Log if any places are started without being announced in
* start-up.
*
* @return true if no invisible places started, false if yes
*/
public static boolean verifyNoInvisiblePlacesStarted() {
try {
IDirectoryPlace dirPlace = DirectoryPlace.lookup();
List<DirectoryEntry> dirEntries = dirPlace.getEntries();
for (DirectoryEntry entry : dirEntries) {
// add place names of active places. getLocalPlace() returns null for any place that failed to start
if (entry.getLocalPlace() != null) {
activeDirPlaces.add(entry.getLocalPlace().getPlaceName());
}
}
// remove DirectoryPlace from activeDirPlaces. DirectoryPlace is started up automatically in order to
// start all other places, so it isn't per se "announced", but it is known and logged
activeDirPlaces.removeIf(dir -> dir.equalsIgnoreCase("DirectoryPlace"));
} catch (EmissaryException e) {
throw new EmissaryRuntimeException(e);
}
// compares place names in active dirs and active places, removes them from set if found
for (String thePlaceLocation : places.values()) {
activeDirPlaces.removeIf(dir -> dir.equalsIgnoreCase(placeName(thePlaceLocation)));
}
// places that are attempted to startup but are already up are added to separate list
// this will only check if places are added to that list
if (!placeAlreadyStarted.isEmpty()) {
for (String thePlaceLocation : placeAlreadyStarted) {
activeDirPlaces.removeIf(dir -> dir.equalsIgnoreCase(thePlaceLocation));
}
}
// if any places are left in active dir keys, they are places not announced on startup
if (!activeDirPlaces.isEmpty()) {
logger.warn("{} place(s) started up without being announced! {}", activeDirPlaces.size(), activeDirPlaces);
return false;
}
return true;
}
// get invisibly started places
public static Set<String> getInvisPlaces() {
return activeDirPlaces;
}
// get if invisible places are started while in strict mode
public static boolean isInvisPlacesStartedInStrictMode() {
return invisPlacesStartedInStrictMode;
}
}