1 package emissary.pool;
2
3 import emissary.core.IMobileAgent;
4 import emissary.core.Namespace;
5 import emissary.core.NamespaceException;
6
7 import jakarta.annotation.Nullable;
8 import org.apache.commons.pool2.impl.GenericObjectPool;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import java.time.Duration;
13
14
15
16
17 public class AgentPool extends GenericObjectPool<IMobileAgent> {
18
19 private static final int MAX_CALCULATED_AGENT_COUNT = 50;
20 private static final int BYTES_IN_GIGABYTES = 1073741824;
21
22
23
24
25 protected static final String DEFAULT_NAMESPACE_NAME = "AgentPool";
26
27
28
29
30 protected MobileAgentFactory factory;
31
32
33
34
35 protected static final Logger logger = LoggerFactory.getLogger(AgentPool.class);
36
37
38
39
40 protected String namespaceName;
41
42 private final int initialPoolSize;
43
44
45
46
47
48
49
50 protected static int computePoolSize(final long maxMemoryInBytes, @Nullable final Integer poolSizeOverride) {
51
52
53 if (poolSizeOverride != null && poolSizeOverride > 0) {
54 logger.debug("Default pool size from properties {}", poolSizeOverride);
55 return poolSizeOverride;
56 }
57
58 if (maxMemoryInBytes <= 0) {
59 throw new IllegalArgumentException("Must be greater then zero.");
60 }
61
62
63
64 int size = (((int) (maxMemoryInBytes / BYTES_IN_GIGABYTES) - 1) * 5) + 20;
65 size = Math.min(size, MAX_CALCULATED_AGENT_COUNT);
66 logger.debug("Computed default pool size of {}", size);
67
68 return size;
69 }
70
71
72
73
74 public static int computePoolSize() {
75 final Integer poolSizeProperty = Integer.getInteger("agent.poolsize", null);
76 final long maxMemoryInBytes = Runtime.getRuntime().maxMemory();
77 return computePoolSize(maxMemoryInBytes, poolSizeProperty);
78 }
79
80
81
82
83
84
85 public AgentPool(MobileAgentFactory factory) {
86 this(factory, AgentPool.computePoolSize(), DEFAULT_NAMESPACE_NAME);
87 }
88
89
90
91
92
93
94
95 public AgentPool(MobileAgentFactory factory, int maxActive) {
96 this(factory, maxActive, DEFAULT_NAMESPACE_NAME);
97 }
98
99
100
101
102
103
104
105
106
107 public AgentPool(MobileAgentFactory factory, int maxActive, String name) {
108 super(factory);
109 this.factory = factory;
110 initialPoolSize = maxActive;
111 configurePool(name);
112 }
113
114
115
116
117
118
119 protected void configurePool(String name) {
120 namespaceName = name;
121
122
123 setBlockWhenExhausted(true);
124
125
126 setMaxWait(Duration.ofMinutes(50));
127
128 logger.debug("Configuring AgentPool to use {} agents", initialPoolSize);
129
130 setMaxTotal(initialPoolSize);
131 setMinIdle(initialPoolSize);
132 setMaxIdle(initialPoolSize);
133
134 bindPool();
135 fillPool();
136 }
137
138
139
140
141 protected void fillPool() {
142 int level = getMaxTotal();
143
144 for (int i = 0; i < level; i++) {
145 try {
146 addObject();
147 } catch (Exception e) {
148 logger.error("Cannot fill AgentPool", e);
149 }
150 }
151 }
152
153
154
155
156
157
158 public void resetFactory(MobileAgentFactory factory) {
159
160
161
162
163
164 this.factory = factory;
165 emptyPool();
166 fillPool();
167 }
168
169
170
171
172 protected void bindPool() {
173
174 Namespace.bind(namespaceName, this);
175 }
176
177
178
179
180 public String getPoolName() {
181 return namespaceName;
182 }
183
184
185
186
187 public IMobileAgent borrowAgent() throws Exception {
188 try {
189 IMobileAgent a = borrowObject();
190 logger.trace("POOL borrow active={}", getNumActive());
191 return a;
192 } catch (Exception e) {
193 logger.info("AgentPool.borrowAgent did not work, stats={}", this);
194 throw e;
195 }
196 }
197
198
199
200
201 public synchronized int getCurrentPoolSize() {
202 return getNumIdle() + getNumActive();
203 }
204
205 protected void emptyPool() {
206 int numberKilled = 0;
207 int numberToKill = getCurrentPoolSize();
208 long waitTil = System.currentTimeMillis() + (30 * 60 * 1000);
209 logger.debug("Going to kill {} agents", numberToKill);
210 try {
211 while (getCurrentPoolSize() != 0) {
212 if (System.currentTimeMillis() > waitTil) {
213 throw new InterruptedException("Too long, tired of waiting. Some MobileAgents are going to die poorly");
214 }
215
216 logger.debug("Emptying pool, {} active, {} idle", getNumActive(), getNumIdle());
217 int currentIdle = getNumIdle();
218 int killedThisRound = 0;
219 setMaxIdle(0);
220 for (int i = 0; i < currentIdle; i++) {
221 IMobileAgent a;
222 try {
223 a = borrowAgent();
224 } catch (Exception e) {
225 logger.error("Error trying to borrowAgent", e);
226 continue;
227 }
228
229 a.killAgent();
230 numberKilled++;
231 killedThisRound++;
232
233 try {
234
235 returnAgent(a);
236 } catch (RuntimeException e) {
237 logger.error("Error trying to returnAgent: {}", a.getName(), e);
238 }
239 }
240 logger.debug("Killed {} agents this round, {} total killed", killedThisRound, numberKilled);
241
242 setMaxIdle(numberToKill - numberKilled);
243 Thread.sleep(5000);
244 }
245 logger.info("Pool is now empty");
246 } catch (InterruptedException e) {
247 logger.error("emptyPool interrupted", e);
248 Thread.currentThread().interrupt();
249 } finally {
250 setMaxIdle(0);
251 }
252 }
253
254
255
256
257 @Override
258 public void close() {
259 logger.info("Closing the agent pool");
260 setMaxTotal(0);
261 emptyPool();
262 super.close();
263 Namespace.unbind(getPoolName());
264 logger.info("Done closing the agent pool");
265 }
266
267
268
269
270 public void kill() {
271 logger.info("Killing the agent pool");
272 super.close();
273 Namespace.unbind(getPoolName());
274 logger.info("Done killing the agent pool");
275 }
276
277
278
279
280 public void returnAgent(IMobileAgent agent) {
281 logger.trace("Returning {}", agent.getName());
282 returnObject(agent);
283 logger.trace("POOL return active={}", getNumActive());
284 }
285
286
287
288
289 public static AgentPool lookup() throws NamespaceException {
290 return (AgentPool) Namespace.lookup(DEFAULT_NAMESPACE_NAME);
291 }
292
293
294
295
296 public static AgentPool lookup(String name) throws NamespaceException {
297 return (AgentPool) Namespace.lookup(name);
298 }
299
300
301
302
303 @Override
304 public synchronized String toString() {
305 return "Poolsize active/idle = " + getNumActive() + "/" + getNumIdle() + " - " + getPoolName();
306 }
307
308
309
310
311
312
313 public String getClassName() {
314 return factory.getClassString();
315 }
316
317
318
319
320 public boolean isAgentAvailable() {
321 return getNumIdle() > 0;
322 }
323 }