RollManager.java

package emissary.roll;

import emissary.config.ConfigUtil;
import emissary.config.Configurator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
 * RollManager handles all incremental rolls for configured objects within the framework
 */
public class RollManager implements PropertyChangeListener {
    static final Logger log = LoggerFactory.getLogger(RollManager.class);
    public static final String CFG_ROLL_MANAGER_THREADS = "ROLL_MANAGER_THREADS";
    int executorThreadCount = 10;
    ScheduledThreadPoolExecutor exec;

    final HashSet<Roller> rollers = new HashSet<>();
    // SINGLETON
    @Nullable
    @SuppressWarnings("NonFinalStaticField")
    private static RollManager rollManager;

    protected RollManager() {
        init();
    }

    protected RollManager(Configurator configG) {
        init(configG);
    }

    /**
     * Load the configurator
     */
    @SuppressWarnings("SystemExitOutsideMain")
    protected void init() {
        try {
            Configurator configG = ConfigUtil.getConfigInfo(this.getClass());
            init(configG);
        } catch (IOException ex) {
            if (ex.getMessage().startsWith("No config stream available")) {
                log.info("No Rollables configured in the default configuration");
            } else {
                log.warn("Unable to configure RollManager from Configurator.", ex);
            }
            System.exit(1);
        }
    }

    protected void init(Configurator configG) {
        ArrayList<Roller> cfgRollers = new ArrayList<>();
        executorThreadCount = configG.findIntEntry(CFG_ROLL_MANAGER_THREADS, executorThreadCount);
        for (String roller : configG.findEntries("ROLLABLE")) {
            try {
                Map<String, String> map = configG.findStringMatchMap(roller + "_");
                cfgRollers.add(RollUtil.buildRoller(map));
            } catch (RuntimeException e) {
                log.warn("Unable to configure Rollable for: {}", roller);
            }
        }

        exec = new RollScheduledExecutor(executorThreadCount, new RMThreadFactory());
        for (Roller r : cfgRollers) {
            addRoller(r);
        }
    }

    public final void addRoller(Roller r) {
        boolean time = r.getTimeUnit() != null && r.getPeriod() > 0L;
        boolean progress = r.getMax() > 0;
        if (time) {
            if (log.isInfoEnabled()) {
                log.info("Scheduling Rollable {} at {} {}", r.getRollable().getClass(), r.getPeriod(), r.getTimeUnit().name());
            }
            var unused = exec.scheduleAtFixedRate(r, r.getPeriod(), r.getPeriod(), r.getTimeUnit());
        }
        if (progress) {
            r.addPropertyChangeListener(this);
        }
        if (time || progress) {
            rollers.add(r);
        } else {
            log.error("Roller not scheduled. Time or progress must be set: Class={} Max={} Interval={} {}", r.getClass().getName(), r.getMax(),
                    r.getPeriod(), r.getTimeUnit());
        }
    }

    @Override
    public void propertyChange(PropertyChangeEvent evt) {
        if (rollers.contains((Roller) evt.getNewValue())) {
            Roller r = (Roller) evt.getNewValue();
            // only schedule one time when we're notified
            if (r.setProgressScheduled()) {
                exec.execute((Roller) evt.getNewValue());
            }

        }
    }

    /**
     * Synchronized on RM to prevent multiple returns on RollManager
     */
    public static synchronized RollManager getManager() {
        if (rollManager == null) {
            rollManager = new RollManager();
        }
        return rollManager;
    }

    /**
     * Synchronized on RM to prevent multiple returns on RollManager
     * <p>
     * Used to create custom RollManager based on configs.
     */
    public static synchronized RollManager getManager(Configurator configG) {
        if (rollManager == null) {
            rollManager = new RollManager(configG);
        }
        return rollManager;
    }

    public static void shutdown() {
        rollManager.exec.shutdown();
        log.info("Closing all rollers ({})", rollManager.rollers.size());
        for (Roller roller : rollManager.rollers) {
            Rollable r = roller.getRollable();
            try {
                r.roll();
                r.close();
            } catch (IOException ex) {
                log.warn("Error while closing Rollable: {}", r.getClass(), ex);
            }
        }
        rollManager = null;
    }

    private static final class RMThreadFactory implements ThreadFactory {
        final AtomicInteger count = new AtomicInteger();

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "RollManager-daemon-" + count.getAndIncrement());
            t.setDaemon(true);
            return t;
        }
    }
}