Roller.java
package emissary.roll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* Stateful object for the RollManager to track progress of a provided Rollable.
*/
public class Roller implements Runnable {
static final Logger log = LoggerFactory.getLogger(Roller.class);
/** Constant to refer to roll interval in config files **/
public static final String CFG_ROLL_INTERVAL = "ROLL_INTERVAL";
private final PropertyChangeSupport support;
private final long max;
private AtomicLong progress = new AtomicLong();
private final TimeUnit t;
private final long period;
private final Rollable r;
private final ReentrantLock lock = new ReentrantLock();
private long lastRun;
private final AtomicBoolean progressSchedule = new AtomicBoolean(false);
@Deprecated
@SuppressWarnings("InconsistentOverloads")
public Roller(long max, TimeUnit t, long period, Rollable r) {
this.max = max;
this.t = t;
this.period = period;
this.r = r;
this.support = new PropertyChangeSupport(this);
}
public Roller(TimeUnit t, long period, Rollable r) {
this(t, period, r, 0);
}
public Roller(TimeUnit t, long period, Rollable r, long max) {
this.t = t;
this.period = period;
this.r = r;
this.max = max;
this.support = new PropertyChangeSupport(this);
}
public void addPropertyChangeListener(PropertyChangeListener pcl) {
support.addPropertyChangeListener(pcl);
}
public void removePropertyChangeListener(PropertyChangeListener pcl) {
support.removePropertyChangeListener(pcl);
}
public final long incrementProgress() {
return incrementProgress(1L);
}
public final long incrementProgress(long val) {
lock.lock();
try {
long progressValue = progress.addAndGet(val);
if (progressValue >= max) {
support.firePropertyChange("roll", null, this);
}
return progressValue;
} finally {
lock.unlock();
}
}
public final long getMax() {
return max;
}
public final long getProgress() {
return progress.get();
}
public final TimeUnit getTimeUnit() {
return t;
}
public final long getPeriod() {
return period;
}
public final Rollable getRollable() {
return r;
}
/*
* There is the potential that we could lose some progress during a roll since this method is called immediately after a
* roll. Places should control that behavior via internal locking if necessary to maintain exact progress.
*/
protected void resetProgress(long start) {
lock.lock();
try {
progress = new AtomicLong();
lastRun = start;
} finally {
lock.unlock();
}
}
public final long getLastRun() {
return lastRun;
}
/* returns true if we set this flag for progress execution */
protected boolean setProgressScheduled() {
return progressSchedule.compareAndSet(false, true);
}
@Override
public void run() {
try {
long start = System.currentTimeMillis();
if (r.isRolling()) {
log.debug("Rollable target {} already rolling", r.getClass());
return;
}
if (!shouldRoll(start)) {
return;
}
log.debug("Beginning roll for {}", r.getClass());
r.roll();
long time = (System.currentTimeMillis() - start) / 1000L;
resetProgress(start);
log.info("Completed roll for {} in {} seconds", r.getClass(), time);
} finally {
progressSchedule.compareAndSet(true, false);
}
}
/* Convert our scheduled interval to millis */
private long getIntervalInMillis() {
return TimeUnit.MILLISECONDS.convert(period, t);
}
/*
* There are a couple of conditions where we would not want to execute when both a progress value and time schedule are
* configured: - Time based roll happened and a progress run is scheduled - Progress roll happened between schduled runs
*/
private boolean shouldRoll(long start) {
// verify both time and progress are set
// if we're below max we'll check the interval
if ((period > 0 && max > 0) && progress.get() < max) {
// we fired a progress run or delayed start due to starvation. add 100 for clock skew
if ((start - lastRun + 100) < getIntervalInMillis()) {
return false;
}
}
return true;
}
}