KffMemcached.java
package emissary.kff;
import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.util.Hexl;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.ConnectionFactoryBuilder.Protocol;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* KffMemcached checks Emissary hashes against a set of external memcached servers. If a given Emissary hash does not
* hit in memcached, it is added to the memcached. The value stored is however the input is identified (notionally, some
* type of id). Only one type of hash can be checked against memcached (i.e. SHA-1...not SHA-1 <i>and</i> SHA-256). See
* the PREF_ALG configuration option.
*
* If the option MEMCACHED_STORE_ID_DUPE is set to true, if an Emissary hash already exists in memcached, the id will
* also be loaded into memcached as a <i>key</i>. The purpose of this is for other follow-on processes (non-Emissary) to
* query memcached and determine if a given id is a duplicate (i.e. if it is present).
*
* If a server goes down, this code will wait and try to reconnect with increasingly long intervals between retries.
* Under this mode of operation, it is expected someone will bring the server back online.
*
* Configuration file options are:
*
* MEMCACHED_SERVER: one line for each server, optional : separated port MEMCACHED_AGEOFF: how long to hold an object in
* memcached before expiring MEMCACHED_OP_TIMEOUT_MILLIS: how long to wait before timing out a memcached operation
* MEMCACHED_IGNORE_VALUE_PATTERN: do not store values that contain this pattern (non-regexO MEMCACHED_FAILURE_MODE:
* what to do in case of server failure MEMCACHED_STORE_ID_DUPE: boolean to store the id if it's hash is already
* contained in memcached PREF_ALG: Which Emissary hash to use as the key stored in memcached
*/
public class KffMemcached implements KffFilter {
/**
* Logger
*/
private final Logger logger;
/**
* The hash to use as the key
*/
protected String preferredAlgorithm = "SHA-1";
/**
* String logical name for this filter
*/
protected String filterName = "UNKNOWN";
/**
* Filter type
*/
protected FilterType ftype = FilterType.UNKNOWN;
/**
* The age-off in the memcached client
*/
protected int ageoff = 86400;
/**
* The timeout on any given network operation in milliseconds
*/
protected long opTimeoutMillis = 2500L;
/**
* What to do in case there is a failure contacting a given server
*/
protected FailureMode failMode = FailureMode.Cancel;
/**
* Do not store values that contain these substrings exactly (this is not treated as a regex)
*/
@Nullable
protected Set<String> ignorePatterns = null;
/**
* If this is set to true, if an Emissary hash already exists in memcached, the id will also be loaded into memcached as
* a <i>key</i>. The purpose of this is for other follow-on processes (non-Emissary) to query memcached and determine if
* a given id is a duplicate (i.e. if it is present).
*/
protected boolean storeIdDupe = false;
/**
* Whether or not to use the memcached binary protocol
*/
protected boolean useBinaryProtocol = false;
/**
* A handle to the set of servers
*/
protected MemcachedClient client;
/**
*
* @param filename Unused
* @param filterName Name of the filter (typically sent in by KffChainLoader)
* @param ftype Filter type (again, sent in by KffChainLoader)
* @throws IOException is thrown if either the file cannot be read of memcached cannot be contacted
*/
public KffMemcached(String filename, String filterName, FilterType ftype) throws IOException {
this(filename, filterName, ftype, null);
}
/**
*
* @param testIdWithSpaces Unused
* @param filterName Name of the filter (typically sent in by KffChainLoader)
* @param duplicate Filter type (again, sent in by KffChainLoader)
* @param testClient Memcached client to be used if specified (will instantiate a client if null)
* @throws IOException is thrown if either the file cannot be read of memcached cannot be contacted
*/
public KffMemcached(String testIdWithSpaces, String filterName, FilterType duplicate, @Nullable MemcachedClient testClient) throws IOException {
// Set logger to run time class
logger = LoggerFactory.getLogger(this.getClass().getName());
// Set the logger impl to use log4j
System.setProperty("net.spy.log.LoggerImpl", "net.spy.memcached.compat.log.Log4JLogger");
// testIdWithSpaces is not used
this.ftype = duplicate;
this.filterName = filterName;
Configurator configG = ConfigUtil.getConfigInfo(KffMemcached.class);
// Load up the list of servers
Set<String> serversFromConfig = configG.findEntriesAsSet("MEMCACHED_SERVER");
List<InetSocketAddress> servers = new ArrayList<>();
for (String serverFromConfig : serversFromConfig) {
// Transform to an InetSocketAddress
if (serverFromConfig.contains(":")) {
String[] serverTokens = serverFromConfig.split(":");
String host = serverTokens[0];
int port = Integer.parseInt(serverTokens[1]);
servers.add(new InetSocketAddress(host, port));
} else {
// In this case, assume port is 11211
servers.add(new InetSocketAddress(serverFromConfig, 11211));
}
}
logger.debug("The following memcached servers are configured:");
for (InetSocketAddress server : servers) {
logger.debug("Server configured: {}", server);
}
// Default to 24 hours timeout
ageoff = configG.findIntEntry("MEMCACHED_AGEOFF", 86400);
// Set the preferred algorithm
preferredAlgorithm = configG.findStringEntry("PREF_ALG");
// Set the preferred algorithm
ignorePatterns = configG.findEntriesAsSet("MEMCACHED_IGNORE_VALUE_PATTERN");
// Whether or not to keep track of dupe IDs in memcached
storeIdDupe = configG.findBooleanEntry("MEMCACHED_STORE_ID_DUPE", false);
// Set whether to use the binary protocol or not
useBinaryProtocol = configG.findBooleanEntry("MEMCACHED_USE_BINARY_PROTOCOL", useBinaryProtocol);
// Set the operation timeout
opTimeoutMillis = configG.findLongEntry("MEMCACHED_OP_TIMEOUT_MILLIS", opTimeoutMillis);
String failModeAsString = configG.findStringEntry("MEMCACHED_FAILURE_MODE", "Cancel");
if (failModeAsString.equalsIgnoreCase("cancel")) {
failMode = FailureMode.Cancel;
} else if (failModeAsString.equalsIgnoreCase("retry")) {
failMode = FailureMode.Retry;
}
// Finally, setup the client. ConnectionFactoryBuilder ultimately
// creates a DefaultConnectionFactory with the values set below
ConnectionFactoryBuilder cfb = new ConnectionFactoryBuilder();
cfb.setDaemon(true); // Just to keep the process from hanging
cfb.setFailureMode(failMode); // How to handle operations when they fail
cfb.setMaxReconnectDelay(60); // At most, wait 1 minute for attempting to reconnect to a server
cfb.setOpTimeout(opTimeoutMillis); // Use the same for the connection as the concurrent Future object
if (useBinaryProtocol) {
cfb.setProtocol(Protocol.BINARY);
}
if (testClient == null) {
client = new MemcachedClient(cfb.build(), servers);
} else {
client = testClient;
}
// logger.debug(client.toString());
}
/**
* Contact the memcached server and lookup the hash. If it is found, then return true. If it is not found, store it and
* return false. If it matches a special ignore pattern, return false. If the server is down or any other problems throw
* an exception
*/
@Override
public boolean check(String id, ChecksumResults sums) throws Exception {
if (sums == null) {
throw new Exception("Poorly formed input to check() in sums");
}
if ((id == null) || (id.length() == 0)) {
throw new Exception("Poorly formed input to check() in fname");
}
// Ignore any IDs that contain the ignorePatterns string verbatim
if (ignorePatterns != null) {
// Loop through all patterns to see if any match
for (String ignorePattern : ignorePatterns) {
if (id.contains(ignorePattern)) {
return false;
}
}
}
byte[] hash = sums.getHash(preferredAlgorithm);
if ((hash == null) || (hash.length == 0)) {
throw new Exception("Poorly formed input to check() in hash");
}
String key = Hexl.toUnformattedHexString(hash);
// Send the query
Future<Object> future = client.asyncGet(key);
// Let the TimeoutException propagate up
Object result = future.get(opTimeoutMillis, TimeUnit.MILLISECONDS);
if (result != null) {
if (storeIdDupe) {
if (!((String) result).equals(id)) {
// As long as the id is not the same as what was already stored, then
// store it on its own
var unused = client.set(id, ageoff, key);
// logger.debug("Storing duplicate Id: {} with value (hash) {}", id, key);
}
}
// logger.debug("Found key: {} with value {}", key, (String) result);
// Found the key
return true;
}
// logger.debug("Did not find key: {}", key);
// Did not find the key...store it and move on
var unused = client.set(key, ageoff, id);
return false;
}
public String getPreferredAlgorithm() {
return preferredAlgorithm;
}
public void setPreferredAlgorithm(String preferredAlgorithm) {
this.preferredAlgorithm = preferredAlgorithm;
}
@Override
public String getName() {
return filterName;
}
@Override
public FilterType getFilterType() {
return ftype;
}
}