View Javadoc
1   package emissary.pool;
2   
3   import emissary.core.IMobileAgent;
4   import emissary.core.Namespace;
5   import emissary.core.NamespaceException;
6   
7   import jakarta.annotation.Nullable;
8   import org.apache.commons.pool2.impl.GenericObjectPool;
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  
12  import java.time.Duration;
13  
14  /**
15   * Extends the GenericObjectPool to hold MobileAgents, each on it's own thread.
16   */
17  public class AgentPool extends GenericObjectPool<IMobileAgent> {
18  
19      private static final int MAX_CALCULATED_AGENT_COUNT = 50;
20      private static final int BYTES_IN_GIGABYTES = 1073741824;
21  
22      /**
23       * The default name by which we register into the namespace
24       */
25      protected static final String DEFAULT_NAMESPACE_NAME = "AgentPool";
26  
27      /**
28       * Super class has private access on factory, so save here
29       */
30      protected MobileAgentFactory factory;
31  
32      /**
33       * Our logger
34       */
35      protected static final Logger logger = LoggerFactory.getLogger(AgentPool.class);
36  
37      /**
38       * The name used by this pool
39       */
40      protected String namespaceName;
41  
42      private final int initialPoolSize;
43  
44      /**
45       * Compute the default size for the pool
46       * 
47       * @param maxMemoryInBytes System max memory used in calculating pool size
48       * @param poolSizeOverride User set property for pool size
49       */
50      protected static int computePoolSize(final long maxMemoryInBytes, @Nullable final Integer poolSizeOverride) {
51  
52          // Override based on property
53          if (poolSizeOverride != null && poolSizeOverride > 0) {
54              logger.debug("Default pool size from properties {}", poolSizeOverride);
55              return poolSizeOverride;
56          }
57          // Check that maxMemoryInBytes is a valid argument
58          if (maxMemoryInBytes <= 0) {
59              throw new IllegalArgumentException("Must be greater then zero.");
60          }
61  
62          // 15 if less than 1 Gb
63          // 20 for first Gb, +5 for each additional Gb, no more then 50 when calculated
64          int size = (((int) (maxMemoryInBytes / BYTES_IN_GIGABYTES) - 1) * 5) + 20;
65          size = Math.min(size, MAX_CALCULATED_AGENT_COUNT);
66          logger.debug("Computed default pool size of {}", size);
67  
68          return size;
69      }
70  
71      /**
72       * Compute the default size for the pool
73       */
74      public static int computePoolSize() {
75          final Integer poolSizeProperty = Integer.getInteger("agent.poolsize", null);
76          final long maxMemoryInBytes = Runtime.getRuntime().maxMemory();
77          return computePoolSize(maxMemoryInBytes, poolSizeProperty);
78      }
79  
80      /**
81       * Create and configure the pool using the default name and size
82       * 
83       * @param factory pool object producer
84       */
85      public AgentPool(MobileAgentFactory factory) {
86          this(factory, AgentPool.computePoolSize(), DEFAULT_NAMESPACE_NAME);
87      }
88  
89      /**
90       * Create and configure the pool using the default name
91       * 
92       * @param maxActive max pool size
93       * @param factory pool object producer
94       */
95      public AgentPool(MobileAgentFactory factory, int maxActive) {
96          this(factory, maxActive, DEFAULT_NAMESPACE_NAME);
97      }
98  
99  
100     /**
101      * Create and configure the pool using the specified name
102      * 
103      * @param factory pool object producer
104      * @param maxActive max pool size
105      * @param name name of the pool in the namespace
106      */
107     public AgentPool(MobileAgentFactory factory, int maxActive, String name) {
108         super(factory);
109         this.factory = factory;
110         initialPoolSize = maxActive;
111         configurePool(name);
112     }
113 
114     /**
115      * Configure the commons pool stuff based on our requirements
116      * 
117      * @param name name of the pool in the namespace
118      */
119     protected void configurePool(String name) {
120         namespaceName = name;
121 
122         // Set blocking policy
123         setBlockWhenExhausted(true);
124 
125         // Set maximum wait time when blocking on exhausted pool
126         setMaxWait(Duration.ofMinutes(50));
127 
128         logger.debug("Configuring AgentPool to use {} agents", initialPoolSize);
129 
130         setMaxTotal(initialPoolSize);
131         setMinIdle(initialPoolSize);
132         setMaxIdle(initialPoolSize);
133 
134         bindPool();
135         fillPool();
136     }
137 
138     /**
139      * Ensure the pool is full
140      */
141     protected void fillPool() {
142         int level = getMaxTotal();
143         // fill in the pool
144         for (int i = 0; i < level; i++) {
145             try {
146                 addObject();
147             } catch (Exception e) {
148                 logger.error("Cannot fill AgentPool", e);
149             }
150         }
151     }
152 
153     /**
154      * Reset the factory. Pool will be emptied and refilled
155      * 
156      * @param factory the new factory
157      */
158     public void resetFactory(MobileAgentFactory factory) {
159         // Ideally we will need to drop and recreate the entire pool
160         // in order to get around this deprecated method, but that has
161         // impact on the global namespace, most weirdly for the caller
162         // of this method since the reference they hold is obsoleted by
163         // making this call
164         this.factory = factory;
165         emptyPool();
166         fillPool();
167     }
168 
169     /**
170      * Bind the pool into the namespace
171      */
172     protected void bindPool() {
173         // register this pool in the namespace
174         Namespace.bind(namespaceName, this);
175     }
176 
177     /**
178      * Get the name used to register this pool
179      */
180     public String getPoolName() {
181         return namespaceName;
182     }
183 
184     /**
185      * Get an agent from the pool
186      */
187     public IMobileAgent borrowAgent() throws Exception {
188         try {
189             IMobileAgent a = borrowObject();
190             logger.trace("POOL borrow active={}", getNumActive());
191             return a;
192         } catch (Exception e) {
193             logger.info("AgentPool.borrowAgent did not work, stats={}", this);
194             throw e;
195         }
196     }
197 
198     /*
199      * Get the total current agents in the pool
200      */
201     public synchronized int getCurrentPoolSize() {
202         return getNumIdle() + getNumActive();
203     }
204 
205     protected void emptyPool() {
206         int numberKilled = 0;
207         int numberToKill = getCurrentPoolSize();
208         long waitTil = System.currentTimeMillis() + (30 * 60 * 1000); // 30 min
209         logger.debug("Going to kill {} agents", numberToKill);
210         try {
211             while (getCurrentPoolSize() != 0) {
212                 if (System.currentTimeMillis() > waitTil) {
213                     throw new InterruptedException("Too long, tired of waiting. Some MobileAgents are going to die poorly");
214                 }
215 
216                 logger.debug("Emptying pool, {} active, {} idle", getNumActive(), getNumIdle());
217                 int currentIdle = getNumIdle();
218                 int killedThisRound = 0;
219                 setMaxIdle(0); // so the returnAgent call below destroys the agent
220                 for (int i = 0; i < currentIdle; i++) {
221                     IMobileAgent a;
222                     try {
223                         a = borrowAgent();
224                     } catch (Exception e) {
225                         logger.error("Error trying to borrowAgent", e);
226                         continue;
227                     }
228 
229                     a.killAgent();
230                     numberKilled++;
231                     killedThisRound++;
232 
233                     try {
234                         // destroys the object, needed to decrement the numIdle
235                         returnAgent(a);
236                     } catch (RuntimeException e) {
237                         logger.error("Error trying to returnAgent: {}", a.getName(), e);
238                     }
239                 }
240                 logger.debug("Killed {} agents this round, {} total killed", killedThisRound, numberKilled);
241                 // give some space for working agents to be returned
242                 setMaxIdle(numberToKill - numberKilled);
243                 Thread.sleep(5000);
244             }
245             logger.info("Pool is now empty");
246         } catch (InterruptedException e) {
247             logger.error("emptyPool interrupted", e);
248             Thread.currentThread().interrupt();
249         } finally {
250             setMaxIdle(0); // just in case
251         }
252     }
253 
254     /**
255      * Gracefully close down all agents and unbind the pool
256      */
257     @Override
258     public void close() {
259         logger.info("Closing the agent pool");
260         setMaxTotal(0);
261         emptyPool();
262         super.close();
263         Namespace.unbind(getPoolName());
264         logger.info("Done closing the agent pool");
265     }
266 
267     /**
268      * Forcibly stop all agents and unbind the pool
269      */
270     public void kill() {
271         logger.info("Killing the agent pool");
272         super.close();
273         Namespace.unbind(getPoolName());
274         logger.info("Done killing the agent pool");
275     }
276 
277     /**
278      * Return an agent to the pool
279      */
280     public void returnAgent(IMobileAgent agent) {
281         logger.trace("Returning {}", agent.getName());
282         returnObject(agent);
283         logger.trace("POOL return active={}", getNumActive());
284     }
285 
286     /**
287      * Return the default named agent pool instance from the namespace
288      */
289     public static AgentPool lookup() throws NamespaceException {
290         return (AgentPool) Namespace.lookup(DEFAULT_NAMESPACE_NAME);
291     }
292 
293     /**
294      * Return the specified agent pool instance from the Namespace
295      */
296     public static AgentPool lookup(String name) throws NamespaceException {
297         return (AgentPool) Namespace.lookup(name);
298     }
299 
300     /**
301      * To string for lightweight reporting
302      */
303     @Override
304     public synchronized String toString() {
305         return "Poolsize active/idle = " + getNumActive() + "/" + getNumIdle() + " - " + getPoolName();
306     }
307 
308     /**
309      * Get the name of the class being used from the factory
310      * 
311      * @return class name for the agents
312      */
313     public String getClassName() {
314         return factory.getClassString();
315     }
316 
317     /**
318      * Try to predict whether a borrow will block/grow the pool
319      */
320     public boolean isAgentAvailable() {
321         return getNumIdle() > 0;
322     }
323 }