CoordinationPlace.java
package emissary.place;
import emissary.admin.PlaceStarter;
import emissary.core.EmissaryException;
import emissary.core.Form;
import emissary.core.IBaseDataObject;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.core.ResourceException;
import emissary.core.ResourceWatcher;
import emissary.core.TimedResource;
import emissary.directory.DirectoryEntry;
import emissary.directory.KeyManipulator;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import static emissary.core.constants.Configurations.OUTPUT_FORM;
/**
* This place will coordinate service to several lower level service places. We have a list and will execute each place
* in turn. This is good for service orchestration and there is an override point for derived classed to determine
* whether to continue processing at each step. It is good for these types of places to register as "STUDY" places so
* that current forms of the requested type will come here first instead of going to the individual places for ID, or as
* "COORDINATE" if coordinating for TRANSFORM places, or as "VERIFY" if coordinating for IO places (OUTPUT, that is).
*
* We only coordinate among places in the local Namespace. If the place specified is not initially in the local
* namespace we attempt to create it. If it cannot be created it is not used.
*/
public class CoordinationPlace extends ServiceProviderPlace {
// The list of place keys we coordinate for
protected List<String> placeKeys;
// The list of place references we coordinate for
protected List<IServiceProviderPlace> placeRefs;
@Nullable
protected String outputForm = null; // What we call it when we are finished
protected boolean pushForm = true; // push or set on the form
protected boolean updateTransformHistory = false;
// set of coordination places that failed to be created/did not exist
protected static final Set<String> failedCoordPlaceCreation = new LinkedHashSet<>();
/**
* Create the place using the supplied configuration and location
*
* @param cfgInfo the configuration resource to use
* @param dir the controlling directory
* @param placeLoc binding information for this instance
*/
public CoordinationPlace(String cfgInfo, String dir, String placeLoc) throws IOException {
super(cfgInfo, dir, placeLoc);
configurePlace();
}
/**
* Create the place using the supplied configuration
*
* @param cfgInfo the configuration resource to use
*/
public CoordinationPlace(String cfgInfo) throws IOException {
super(cfgInfo, "TestCoordinationPlace.foo.com:8003");
configurePlace();
}
/**
* Create the place using the supplied configuration
*
* @param cfgInfo the configuration stream to use
*/
public CoordinationPlace(InputStream cfgInfo) throws IOException {
super(cfgInfo);
configurePlace();
}
/**
* Create the place taking all defaults for location and config
*/
public CoordinationPlace() throws IOException {
super();
configurePlace();
}
/**
* Set up the place specific information Config items read here are:
*
* <ul>
* <li>OUTPUT_FORM: default=null, output form for final step of coordination</li>
* <li>PUSH_OUTPUT_FORM: default=true, calls pushOutputForm when true, setOutputForm otherwise</li>
* <li>SERVICE_COORDINATION: place entries to use for this coordination place, an ordered list of places that must
* already be constructed</li>
* </ul>
*/
protected void configurePlace() {
outputForm = configG.findStringEntry(OUTPUT_FORM, null);
pushForm = configG.findBooleanEntry("PUSH_OUTPUT_FORM", true);
updateTransformHistory = configG.findBooleanEntry("UPDATE_TRANSFORM_HISTORY", false);
placeKeys = configG.findEntries("SERVICE_COORDINATION");
logger.debug("We got {} entries to coordinate", placeKeys.size());
placeRefs = new ArrayList<>();
for (String s : placeKeys) {
try {
// See if the place already exists
Object ref = Namespace.lookup(s);
if (ref instanceof IServiceProviderPlace) {
placeRefs.add((IServiceProviderPlace) ref);
logger.debug("Added reference for {}:{}", s, ref);
} else {
logger.error("Referenced place {} is of the wrong type: {}", s, ref.getClass().getName());
}
} catch (NamespaceException ex) {
// Try creating the place
try {
String skey = KeyManipulator.getServiceHostUrl(keys.get(0)) + s;
logger.debug("No such place {}, creating as {}", s, skey, ex);
String sclz = PlaceStarter.getClassString(skey);
IServiceProviderPlace p = PlaceStarter.createPlace(skey, null, sclz, dirPlace);
if (p != null) {
placeRefs.add(p);
logger.debug("Place created: {}", p);
} else {
failedCoordPlaceCreation.add(s + " in " + configG.findStringEntry("PLACE_NAME"));
logger.error("Place does not exist and cannot be created: {}", s);
}
} catch (RuntimeException e) {
failedCoordPlaceCreation.add(s + " in " + configG.findStringEntry("PLACE_NAME"));
logger.error("Place does not exist and cannot be created: {}", s, e);
}
}
}
}
/**
* Evaluate whether to continue processing. Classes can override this method to provide any additional logic during
* coordination
*/
protected boolean shouldContinue(IBaseDataObject d, IServiceProviderPlace p) {
logger.debug("Continuing with currentForm {} to place {}", d.currentForm(), p);
return true;
}
/**
* Return whether to continue traversing the list of coordinated places when an error occurs in one of them
*
* @param p place that is currently processing the ibdo
* @param errorOccurred true if an error occurred
*
* @return false if processing should not continue
*/
protected boolean shouldContinue(IServiceProviderPlace p, boolean errorOccurred) {
if (!continueOnError() && errorOccurred) {
logger.info("Error terminating coordination step at {}", p);
return false;
}
return true;
}
/**
* Evaluate whether to skip processing. This will allow the coordination place to continue to the next configured place.
* Note that shouldContinue method takes precedence. Use one or the other and be cautious when using both Classes can
* override this method to provide any additional logic during coordination
*/
protected boolean shouldSkip(IBaseDataObject d, IServiceProviderPlace p) {
logger.debug("Skipping with currentForm {} to place {}", d.currentForm(), p);
return false;
}
/**
* Classes can override this method to do any last things to the data object before closing out the job
*/
protected void cleanUpHook(IBaseDataObject d) {
logger.debug("In base cleanUpHook for coordination {}", d.currentForm());
}
/**
* Classes can override this method to do anything to the list of sprouted data objects from this coordination
*/
protected void sproutHook(List<IBaseDataObject> sproutCollection, IBaseDataObject d) {
logger.debug("In base sproutHook with {}", sproutCollection.size());
}
private void sproutHook(List<IBaseDataObject> sproutCollection, IBaseDataObject d, boolean hd) {
// Allow derived classes a shot at the sprouts
if (hd) {
sproutHook(sproutCollection, d);
}
}
/**
* Consume a data object and coordinate its processing
*
* @param d the payload to process
* @param hd true if doing heavy-duty processing
* @return the list of sprouted data objects
*/
protected List<IBaseDataObject> coordinate(IBaseDataObject d, boolean hd) {
List<IBaseDataObject> sproutCollection = new ArrayList<>();
boolean errorOccurred = false;
// Iterate over the configured places
for (IServiceProviderPlace p : placeRefs) {
// Let derived classed decide to quit or continue this loop
if (!shouldContinue(d, p)) {
break;
} else if (shouldSkip(d, p)) {
continue;
}
updateTransformHistory(d, p);
// Collect attachments for hd processing
List<IBaseDataObject> sprouts = null;
// Like an agent would do it
try (TimedResource tr = resourceWatcherStart(p)) {
assert tr != null; // to silence an unused resource warning
if (hd) {
// Do the normal HD processing
sprouts = p.agentProcessHeavyDuty(d);
} else {
// Do the normal Non-HD processing
p.agentProcessCall(d);
}
errorOccurred = d.currentForm().equals(Form.ERROR);
} catch (Exception ex) {
errorOccurred = handlePlaceException(p, hd, ex);
} finally {
if (Thread.interrupted()) {
logger.warn("Place {} was interrupted during execution.", p);
}
}
if (!shouldContinue(p, errorOccurred)) {
break;
}
// Track any new attachments
if (CollectionUtils.isNotEmpty(sprouts)) {
sproutCollection.addAll(sprouts);
}
}
applyForm(d, errorOccurred);
sproutHook(sproutCollection, d, hd);
// Allow derived classes a shot to clean up the parent
cleanUpHook(d);
return sproutCollection;
}
/**
* Allow derived classes a shot to handle a place exception
*
* @param p place that was processing when the exception was thrown
* @param hd true if doing heavy-duty processing
* @param ex exception thrown by the place
*
* @return if an error occurred
*/
protected boolean handlePlaceException(IServiceProviderPlace p, boolean hd, Exception ex) {
logger.warn("agentProcess{} called from Coordinate problem", (hd ? "HeavyDuty" : "Call"), ex);
return true;
}
private void updateTransformHistory(IBaseDataObject d, IServiceProviderPlace p) {
if (updateTransformHistory) {
DirectoryEntry de = p.getDirectoryEntry();
de.setDataType(d.currentForm());
// append to the transform history, with flag indicating that the visit was coordinated
d.appendTransformHistory(de.getKey(), true);
}
}
/**
* How to handle applying the output form if an error occurred during processing
*
* @param d the ibdo to process
* @param errorOccurred true if an error occurred
*/
protected void applyForm(IBaseDataObject d, boolean errorOccurred) {
if (!errorOccurred || shouldApplyOutputFormOnError()) {
applyOutputForm(d);
// Clean up my proxies
nukeMyProxies(d);
}
}
/**
* If true, process the output form the same for the default and error condition
*
* @return boolean to allow processing of output for when an error occurs
*/
protected boolean shouldApplyOutputFormOnError() {
return false;
}
/**
* Apply the output form according to configuration
*
* @param d the ibdo to process
*/
protected void applyOutputForm(IBaseDataObject d) {
if (outputForm != null) {
if (pushForm) {
d.pushCurrentForm(outputForm);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Object has {} number of forms in the Stack. CoordinationPlace Setting top form from {} to {}", d.currentFormSize(),
d.currentForm(), outputForm);
}
d.setCurrentForm(outputForm);
}
}
}
/**
* If false, do not continue processing other places after an error occurs
*
* @return boolean to not continue processing after an error
*/
protected boolean continueOnError() {
return false;
}
protected TimedResource resourceWatcherStart(final IServiceProviderPlace place) {
TimedResource tr = TimedResource.EMPTY;
try {
tr = ResourceWatcher.lookup().starting(getAgent(), place);
} catch (EmissaryException ex) {
logger.debug("No resource monitoring enabled");
}
return (tr == null) ? TimedResource.EMPTY : tr;
}
/**
* Process point when not using HDMobileAgent
*
* @param d the payload to process
*/
@Override
public void process(IBaseDataObject d) throws ResourceException {
List<IBaseDataObject> l = coordinate(d, false);
if (CollectionUtils.isNotEmpty(l)) {
logger.error("Non-sprouted documents are being lost {}", l.size());
}
}
/**
* Process point for HDMobileAgent
*
* @param d the payload to process
* @return the list of sprouted data objects
*/
@Override
public List<IBaseDataObject> processHeavyDuty(IBaseDataObject d) throws ResourceException {
return coordinate(d, true);
}
/**
* Get method for the set of failed coordination places
*
* @return the names of the failed coordination places
*/
public static Set<String> getFailedCoordinationPlaces() {
return failedCoordPlaceCreation;
}
}