RollScheduledExecutor.java
package emissary.roll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Scheduled Executor to support unhandled execution Exceptions. The intention of Rollable tasks is that they run for
* the life of the process without exception. We don't track the individual futures to reduce complexity and instead
* will log uncaught errors here.
*/
public class RollScheduledExecutor extends ScheduledThreadPoolExecutor {
static final Logger log = LoggerFactory.getLogger(RollScheduledExecutor.class);
public RollScheduledExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
if (runnable instanceof Roller) {
return new RollFuture<>(task, (Roller) runnable);
} else {
return super.decorateTask(runnable, task);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (r instanceof RollFuture) {
RollFuture<?> f = (RollFuture<?>) r;
try {
// we don't want to block if we're scheduled for another run
if (f.isDone()) {
f.get();
}
} catch (InterruptedException ex) {
// shouldn't happen;
log.warn("Thread Interrupted", ex);
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
Throwable ex = ee.getCause();
Rollable rollable = f.r.getRollable();
log.error("Unhandled Throwable in Rollable, {}. To String: {}", rollable.getClass(), rollable.toString(), ex);
}
}
}
/**
* Wrapper for the Future to give us a handle to our Rollable object.
*/
static class RollFuture<V> implements RunnableScheduledFuture<V> {
private final RunnableScheduledFuture<V> rsf;
final Roller r;
public RollFuture(RunnableScheduledFuture<V> rsf, Roller r) {
this.rsf = rsf;
this.r = r;
}
@Override
public boolean isPeriodic() {
return rsf.isPeriodic();
}
@Override
public void run() {
rsf.run();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return rsf.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return rsf.isCancelled();
}
@Override
public boolean isDone() {
return rsf.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return rsf.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return rsf.get(timeout, unit);
}
@Override
public long getDelay(TimeUnit unit) {
return rsf.getDelay(unit);
}
@Override
public int compareTo(Delayed o) {
return rsf.compareTo(o);
}
}
}