PickupQueue.java

package emissary.pickup;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Deque;
import javax.annotation.Nullable;

/**
 * A size limited queue for holding data to process.
 */
public class PickupQueue {
    // Data structure for the bundles
    protected final Deque<WorkBundle> queue = new ArrayDeque<>();

    // Our logger
    private static final Logger logger = LoggerFactory.getLogger(PickupQueue.class);

    /**
     * These parameters determine the enqueing behavior. The desire is to minimize the number of remote calls from main to
     * and instance of this class with the getQueSize method, and at the same keep all of the places busy. We do this by
     * making the MAX_QUE_SIZE large enough to hold enough files to be processed in SLEEP_TIME. BUT we don't just make the
     * MAX_QUE_SIZE huge because then we use too much memory. Some feeds put stuff on the Que in blocks. If our que is a
     * prime numbered size they cannot fill it completely, which will help prevent blocking maybe.
     */
    private int maxQueSize = 19;

    /**
     * Normal pickup queue creation
     */
    public PickupQueue() {}

    /**
     * Create a queue with the given max size
     *
     * @param maxSize the maximum size the queue can grow to
     */
    public PickupQueue(int maxSize) {
        maxQueSize = maxSize;
    }

    /**
     * Add incoming information to the queue of file names to process and notify anyone waiting on the queue
     *
     * @param paths the WorkBundle object containing files to queue up
     * @return true if it was enqueued, false if we are too busy to handle it
     */
    public boolean enque(@Nullable WorkBundle paths) {
        if (paths == null || paths.size() == 0) {
            logger.warn("Enque of a null or empty WorkBundle structure!");
            return true;
        }

        synchronized (queue) {
            // Add it to the queue
            queue.addFirst(paths);
            queue.notifyAll();
        }

        synchronized (this) {
            // notify waiters every time paths added
            this.notifyAll();
        }

        return true;
    }

    /**
     * Return size of queue
     */
    public int size() {
        return getQueSize();
    }

    /**
     * Getter for the size of the queue
     *
     * @return the size of the queue
     */
    public int getQueSize() {
        int size = -1;
        synchronized (queue) {
            size = queue.size();
        }
        return size;
    }

    /**
     * Get one data object from the queue.
     *
     * @return the dequeued WorkBundle or null if none
     */
    public synchronized WorkBundle deque() {
        WorkBundle nextFile = null;
        int size = -1;
        synchronized (queue) {
            size = queue.size();
            if (size > 0) {
                nextFile = queue.removeLast();
            }
        }
        return nextFile;
    }

    /**
     * Tell caller if we can hold this many more items
     *
     * @return true iff there is room for num items
     */
    public boolean canHold(int num) {
        return getQueSize() + num <= maxQueSize;
    }
}