001 /*
002 * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org
003 * Copyright (C) 2011 NightLabs Consulting GmbH
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program. If not, see <http://www.gnu.org/licenses/>.
017 */
018 package org.cumulus4j.store.crypto.keymanager.messagebroker.pmf;
019
020 import java.io.IOException;
021 import java.io.InputStream;
022 import java.lang.ref.WeakReference;
023 import java.util.Collection;
024 import java.util.Date;
025 import java.util.HashMap;
026 import java.util.LinkedList;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Properties;
030 import java.util.Random;
031 import java.util.Timer;
032 import java.util.TimerTask;
033 import java.util.concurrent.TimeoutException;
034
035 import javax.jdo.FetchPlan;
036 import javax.jdo.JDOHelper;
037 import javax.jdo.PersistenceManager;
038 import javax.jdo.PersistenceManagerFactory;
039
040 import org.cumulus4j.keymanager.back.shared.GetKeyRequest;
041 import org.cumulus4j.keymanager.back.shared.IdentifierUtil;
042 import org.cumulus4j.keymanager.back.shared.Message;
043 import org.cumulus4j.keymanager.back.shared.Request;
044 import org.cumulus4j.keymanager.back.shared.Response;
045 import org.cumulus4j.keymanager.back.shared.SystemPropertyUtil;
046 import org.cumulus4j.store.crypto.AbstractCryptoManager;
047 import org.cumulus4j.store.crypto.keymanager.messagebroker.AbstractMessageBroker;
048 import org.cumulus4j.store.crypto.keymanager.messagebroker.MessageBroker;
049 import org.cumulus4j.store.crypto.keymanager.messagebroker.MessageBrokerRegistry;
050 import org.cumulus4j.store.crypto.keymanager.rest.ErrorResponseException;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 /**
055 * <p>
056 * {@link PersistenceManagerFactory}-backed implementation of {@link MessageBroker}.
057 * </p>
058 * <p>
059 * All {@link Message messages} are transferred via a shared database. Which database to be used can be
060 * configured by {@link #SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX system properties}.
061 * </p>
062 *
063 * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de
064 */
065 public class MessageBrokerPMF extends AbstractMessageBroker
066 {
067 private static final Logger logger = LoggerFactory.getLogger(MessageBrokerPMF.class);
068
069 /**
070 * Prefix for system properties used to configure the {@link PersistenceManagerFactory}.
071 * <p>
072 * Every system property that begins with {@value #SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX}
073 * is passed (after truncating this prefix, of course) to the {@link JDOHelper#getPersistenceManagerFactory(Map)}.
074 * </p>
075 * <p>
076 * For example, to set the property "javax.jdo.option.ConnectionURL", you have to define the system
077 * property "cumulus4j.MessageBrokerPMF.persistenceProperties.javax.jdo.option.ConnectionURL".
078 * </p>
079 * <p>
080 * A set of defaults is loaded from a resource file, hence you do not need to configure everything, but
081 * without setting some basic coordinates (e.g. the JDBC URL), it is unlikely that your database server can be
082 * contacted. Of course, you could add an appropriate host record to your "/etc/hosts"
083 * and create a database with the name from our defaults on this host, but very likely you want to override these default
084 * coordinates:
085 * </p>
086 * <ul>
087 * <li>javax.jdo.option.ConnectionDriverName=com.mysql.jdbc.Driver</li>
088 * <li>javax.jdo.option.ConnectionURL=jdbc:mysql://cumulus4j-db/cumulus4jmessagebroker</li>
089 * </ul>
090 * <p>
091 * These defaults might be changed with a future version.
092 * </p>
093 */
094 public static final String SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX = "cumulus4j.MessageBrokerPMF.persistenceProperties.";
095
096 /**
097 * <p>
098 * System property to control when the timer for cleaning up old {@link PendingRequest}s is called. The
099 * value configured here is a period, i.e. the timer will be triggered every X ms (roughly).
100 * </p><p>
101 * If this system property is not present (or not a valid number), the default is 3600000 (1 hour), which means
102 * the timer will wake up once every hour and call {@link #removeExpiredPendingRequests(boolean)} with <code>force = true</code>.
103 * </p><p>
104 * All <code>PendingRequest</code>s with a {@link PendingRequest#getLastStatusChangeTimestamp() lastStatusChangeTimestamp}
105 * being older than the {@link AbstractMessageBroker#getQueryTimeout() queryTimeout} (plus a safety margin of currently
106 * this period) are deleted.
107 * </p>
108 * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED
109 */
110 public static final String SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD = "cumulus4j.MessageBrokerPMF.cleanupTimer.period";
111
112 /**
113 * <p>
114 * System property to control whether the timer for cleaning up old {@link PendingRequest}s should be enabled. The
115 * value configured here is a boolean value, i.e. it can be "true" or "false".
116 * </p><p>
117 * If it is disabled, the "normal" threads will do the clean-up-work periodically, when they run through
118 * {@link #_query(Class, Request)} or {@link #_pollRequest(String)}.
119 * </p>
120 * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD
121 */
122 public static final String SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED = "cumulus4j.MessageBrokerPMF.cleanupTimer.enabled";
123
124 private long cleanupTimerPeriod = Long.MIN_VALUE;
125
126 private Boolean cleanupTimerEnabled = null;
127
128 protected long getCleanupTimerPeriod()
129 {
130 if (cleanupTimerPeriod < 0) {
131 final String propName = SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD;
132 String property = System.getProperty(propName);
133 long timeout = -1;
134 if (property != null && !property.isEmpty()) {
135 try {
136 timeout = Long.parseLong(property);
137 } catch (NumberFormatException x) {
138 logger.warn("Value \"{}\" of system property '{}' is not valid, because it cannot be parsed as number!", property, propName);
139 }
140 if (timeout <= 0)
141 logger.warn("Value \"{}\" of system property '{}' is not valid, because it is less than or equal to 0!", property, propName);
142 else {
143 logger.info("System property '{}' is specified with value {}.", propName, timeout);
144 cleanupTimerPeriod = timeout;
145 }
146 }
147
148 if (cleanupTimerPeriod < 0) {
149 timeout = 60L * 60L * 1000L;
150 cleanupTimerPeriod = timeout;
151 logger.info("System property '{}' is not specified; using default value {}.", propName, timeout);
152 }
153 }
154
155 return cleanupTimerPeriod;
156 }
157
158 /**
159 * <p>
160 * Get the enabled status of the timer used to cleanup.
161 * </p>
162 * <p>
163 * This value can be configured using the system property {@value #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED}.
164 * </p>
165 *
166 * @return the enabled status.
167 * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD
168 * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED
169 */
170 protected boolean getCleanupTimerEnabled()
171 {
172 Boolean val = cleanupTimerEnabled;
173 if (val == null) {
174 String propName = SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED;
175 String propVal = System.getProperty(propName);
176 propVal = propVal == null ? null : propVal.trim();
177 if (propVal != null && !propVal.isEmpty()) {
178 if (propVal.equalsIgnoreCase(Boolean.TRUE.toString()))
179 val = Boolean.TRUE;
180 else if (propVal.equalsIgnoreCase(Boolean.FALSE.toString()))
181 val = Boolean.FALSE;
182
183 if (val == null)
184 logger.warn("System property '{}' is set to '{}', which is an ILLEGAL value. Falling back to default value.", propName, propVal);
185 else
186 logger.info("System property '{}' is set to '{}'.", propName, val);
187 }
188
189 if (val == null) {
190 val = Boolean.TRUE;
191 logger.info("System property '{}' is not set. Using default value '{}'.", propName, val);
192 }
193
194 cleanupTimerEnabled = val;
195 }
196 return val;
197 }
198
199 private static volatile Timer cleanupTimer = null;
200 private static volatile boolean cleanupTimerInitialised = false;
201 private volatile boolean cleanupTaskInitialised = false;
202
203 private static class CleanupTask extends TimerTask
204 {
205 private final Logger logger = LoggerFactory.getLogger(CleanupTask.class);
206
207 private final String thisID;
208 private WeakReference<MessageBrokerPMF> messageBrokerPMFRef;
209 private final long expiryTimerPeriodMSec;
210
211 public CleanupTask(MessageBrokerPMF messageBrokerPMF, long expiryTimerPeriodMSec)
212 {
213 if (messageBrokerPMF == null)
214 throw new IllegalArgumentException("messageBrokerPMF == null");
215
216 this.thisID = messageBrokerPMF.thisID + '.' + Long.toString(System.identityHashCode(this), 36);
217 this.messageBrokerPMFRef = new WeakReference<MessageBrokerPMF>(messageBrokerPMF);
218 this.expiryTimerPeriodMSec = expiryTimerPeriodMSec;
219 }
220
221 @Override
222 public void run() {
223 try {
224 logger.debug("[{}] run: entered", thisID);
225 final MessageBrokerPMF messageBrokerPMF = messageBrokerPMFRef.get();
226 if (messageBrokerPMF == null) {
227 logger.info("[{}] run: MessageBrokerPMF was garbage-collected. Cancelling this TimerTask.", thisID);
228 this.cancel();
229 return;
230 }
231
232 messageBrokerPMF.removeExpiredPendingRequests(true);
233
234 long currentPeriodMSec = messageBrokerPMF.getCleanupTimerPeriod();
235 if (currentPeriodMSec != expiryTimerPeriodMSec) {
236 logger.info(
237 "[{}] run: The expiryTimerPeriodMSec changed (oldValue={}, newValue={}). Re-scheduling this task.",
238 new Object[] { thisID, expiryTimerPeriodMSec, currentPeriodMSec }
239 );
240 this.cancel();
241
242 cleanupTimer.schedule(new CleanupTask(messageBrokerPMF, currentPeriodMSec), currentPeriodMSec, currentPeriodMSec);
243 }
244 } catch (Throwable x) {
245 // The TimerThread is cancelled, if a task throws an exception. Furthermore, they are not logged at all.
246 // Since we do not want the TimerThread to die, we catch everything (Throwable - not only Exception) and log
247 // it here. IMHO there's nothing better we can do. Marco :-)
248 logger.error("[" + thisID + "] run: " + x, x);
249 }
250 }
251 };
252
253 private final void initTimerTaskOrRemoveExpiredPendingRequestsPeriodically()
254 {
255 if (!cleanupTimerInitialised) {
256 synchronized (AbstractCryptoManager.class) {
257 if (!cleanupTimerInitialised) {
258 if (getCleanupTimerEnabled())
259 cleanupTimer = new Timer(MessageBrokerPMF.class.getSimpleName(), true);
260
261 cleanupTimerInitialised = true;
262 }
263 }
264 }
265
266 if (!cleanupTaskInitialised) {
267 synchronized (this) {
268 if (!cleanupTaskInitialised) {
269 if (cleanupTimer != null) {
270 long periodMSec = getCleanupTimerPeriod();
271 cleanupTimer.schedule(new CleanupTask(this, periodMSec), periodMSec, periodMSec);
272 }
273 cleanupTaskInitialised = true;
274 }
275 }
276 }
277
278 if (cleanupTimer == null) {
279 logger.trace("[{}] initTimerTaskOrRemoveExpiredPendingRequestsPeriodically: No timer enabled => calling removeExpiredEntries(false) now.", thisID);
280 removeExpiredPendingRequests(false);
281 }
282 }
283
284 private Date lastRemoveExpiredPendingRequestsTimestamp = null;
285
286 private void removeExpiredPendingRequests(boolean force)
287 {
288 synchronized (this) {
289 if (
290 !force && (
291 lastRemoveExpiredPendingRequestsTimestamp != null &&
292 lastRemoveExpiredPendingRequestsTimestamp.after(new Date(System.currentTimeMillis() - getCleanupTimerPeriod()))
293 )
294 )
295 {
296 logger.trace("[{}] removeExpiredPendingRequests: force == false and period not yet elapsed. Skipping.", thisID);
297 return;
298 }
299
300 lastRemoveExpiredPendingRequestsTimestamp = new Date();
301 }
302
303 Date removePendingRequestsBeforeThisTimestamp = new Date(
304 System.currentTimeMillis() - getQueryTimeout()
305 // We use this cleanupTimerPeriod as a margin to prevent collisions with the code that still uses a PendingRequest
306 // and might right now (after the query-timeout) be about to delete it. Even though this time might thus
307 // be pretty long, it doesn't matter, if entries linger in the DB for a while as most are immediately cleaned up, anyway.
308 // This cleanup is only required for rare situations (e.g. when a JVM crashes). Otherwise our code should already
309 // ensure that objects are deleted immediately when they're not needed anymore.
310 // We might in the future replace the 'getCleanupTimerPeriod()' by a new system-property-controllable
311 // value (e.g. 'getCleanupDelay()'), though, to make it really nice & clean. But that's not important at all, IMHO.
312 // Marco :-)
313 - getCleanupTimerPeriod()
314 );
315
316 try {
317
318 Integer deletedCount = null;
319
320 PersistenceManager pm = createTransactionalPersistenceManager();
321 try {
322 Collection<PendingRequest> c = PendingRequest.getPendingRequestsWithLastStatusChangeTimestampOlderThanTimestamp(
323 pm, removePendingRequestsBeforeThisTimestamp
324 );
325
326 if (logger.isDebugEnabled())
327 deletedCount = c.size();
328
329 pm.deletePersistentAll(c);
330
331 pm.currentTransaction().commit();
332 } finally {
333 if (pm.currentTransaction().isActive())
334 pm.currentTransaction().rollback();
335
336 pm.close();
337 }
338
339 logger.debug("[{}] removeExpiredPendingRequests: Deleted {} expired PendingRequest instances.", thisID, deletedCount);
340
341 } catch (Exception x) {
342 String errMsg = "[" + thisID + "] removeExpiredPendingRequests: Deleting the expired pending requests failed. This might *occasionally* happen due to the optimistic transaction handling (=> collisions). ";
343 if (logger.isDebugEnabled())
344 logger.warn(errMsg + x, x);
345 else
346 logger.warn(errMsg + "Enable DEBUG logging to see the stack trace. " + x);
347 }
348 }
349
350 private PersistenceManagerFactory pmf;
351
352 private Random random = new Random();
353
354 private final String thisID = Long.toString(System.identityHashCode(this), 36);
355
356 /**
357 * Create an instance of <code>MessageBrokerPMF</code>. You should not call this constructor directly, but
358 * instead use {@link MessageBrokerRegistry#getActiveMessageBroker()} to obtain the currently active {@link MessageBroker}.
359 */
360 public MessageBrokerPMF()
361 {
362 logger.info("[{}] Instantiating MessageBrokerPMF.", thisID);
363 Properties propertiesRaw = new Properties();
364 InputStream in = MessageBrokerPMF.class.getResourceAsStream("messagebroker-datanucleus.properties");
365 try {
366 propertiesRaw.load(in);
367 in.close();
368 } catch (IOException e) {
369 throw new RuntimeException(e);
370 }
371
372 for (Map.Entry<?, ?> me : System.getProperties().entrySet()) {
373 String key = String.valueOf(me.getKey());
374 if (key.startsWith(SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX))
375 propertiesRaw.setProperty(key.substring(SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX.length()), String.valueOf(me.getValue()));
376 }
377
378 Map<String, Object> properties = new HashMap<String, Object>(propertiesRaw.size());
379 for (Map.Entry<?, ?> me : propertiesRaw.entrySet())
380 properties.put(String.valueOf(me.getKey()), SystemPropertyUtil.resolveSystemProperties(String.valueOf(me.getValue())));
381
382 Object connectionDriverNameObj = properties.get("javax.jdo.option.ConnectionDriverName");
383 String connectionDriverName = connectionDriverNameObj == null ? null : connectionDriverNameObj.toString();
384 logger.info("[{}] javax.jdo.option.ConnectionDriverName={}", thisID, connectionDriverName);
385 logger.info("[{}] javax.jdo.option.ConnectionURL={}", thisID, properties.get("javax.jdo.option.ConnectionURL"));
386
387 // With JDBC 4, it is not necessary anymore to load the driver:
388 // http://onjava.com/pub/a/onjava/2006/08/02/jjdbc-4-enhancements-in-java-se-6.html
389 // And since DN might use some other class loader, anyway, it does not even make any sense at all.
390 // Hence, I commented this out, again. Marco :-)
391 // ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
392 // if (contextClassLoader != null) { // I think this is never null, but better check to play safe. Marco :-)
393 // try {
394 // Class.forName(connectionDriverName, true, contextClassLoader);
395 // logger.info("[{}] Loaded class \"" + connectionDriverName + "\" with contextClassLoader=\"" + contextClassLoader + "\" successfully!");
396 // } catch (ClassNotFoundException e) {
397 // logger.warn("[{}] Loading class \"" + connectionDriverName + "\" with contextClassLoader=\"" + contextClassLoader + "\" failed: " + e, e);
398 // }
399 // }
400
401 pmf = JDOHelper.getPersistenceManagerFactory(properties);
402 // First create the structure in a separate tx (in case, the underlying DB/configuration requires this.
403 PersistenceManager pm = pmf.getPersistenceManager();
404 try {
405 pm.currentTransaction().begin();
406 pm.getExtent(PendingRequest.class);
407 pm.currentTransaction().commit();
408 } finally {
409 if (pm.currentTransaction().isActive())
410 pm.currentTransaction().rollback();
411
412 pm.close();
413 }
414
415 // Now test the DB access.
416 pm = pmf.getPersistenceManager();
417 try {
418 pm.currentTransaction().begin();
419 // Testing WRITE and READ access.
420 String cryptoSessionIDPrefix = IdentifierUtil.createRandomID(50); // using a length that is not used normally to prevent collisions with absolute certainty.
421 String cryptoSessionID = cryptoSessionIDPrefix + '*' + IdentifierUtil.createRandomID(10);
422 GetKeyRequest dummyRequest = new GetKeyRequest(cryptoSessionID, 1, "RSA", new byte[16]);
423 PendingRequest pendingRequest = new PendingRequest(dummyRequest);
424 pendingRequest = pm.makePersistent(pendingRequest);
425 pm.flush(); // Make sure, things are written NOW.
426
427 PendingRequest queriedPendingRequest = PendingRequest.getOldestPendingRequest(pm, cryptoSessionIDPrefix, PendingRequestStatus.waitingForProcessing);
428 if (!pendingRequest.equals(queriedPendingRequest))
429 throw new IllegalStateException("Query did not find the PendingRequest instance, we just persisted for testing!");
430
431 // And delete the garbage immediately again.
432 pm.deletePersistent(pendingRequest);
433
434 pm.currentTransaction().commit();
435 } finally {
436 if (pm.currentTransaction().isActive())
437 pm.currentTransaction().rollback();
438
439 pm.close();
440 }
441 logger.info("[{}] Successfully instantiated and tested MessageBrokerPMF.", thisID);
442 }
443
444 protected PersistenceManager createTransactionalPersistenceManager()
445 {
446 PersistenceManager pm = pmf.getPersistenceManager();
447 pm.currentTransaction().begin();
448 return pm;
449 }
450
451 @Override
452 protected Response _query(Class<? extends Response> responseClass, Request request)
453 throws TimeoutException, ErrorResponseException
454 {
455 String requestID = request.getRequestID();
456
457 logger.debug("[{}] _query[requestID={}]: Entered with request: {}", new Object[] { thisID , requestID, request });
458
459 initTimerTaskOrRemoveExpiredPendingRequestsPeriodically();
460
461 PersistenceManager pm = createTransactionalPersistenceManager();
462 try {
463 pm.makePersistent(new PendingRequest(request));
464 pm.currentTransaction().commit();
465 } finally {
466 if (pm.currentTransaction().isActive())
467 pm.currentTransaction().rollback();
468
469 pm.close();
470 }
471 request = null;
472
473 logger.debug("[{}] _query[requestID={}]: Request persisted.", thisID, requestID);
474
475 // it would be nice if we could notify here, but this is not possible
476
477
478 // // BEGIN trying to produce collisions.
479 // try {
480 // Thread.sleep(1000L);
481 // } catch (InterruptedException e) {
482 // // ignore - only log - and break loop.
483 // logger.warn("_query: Thread.sleep(...) was interrupted with an InterruptedException.");
484 // }
485 // // END trying to produce collisions.
486
487
488 long beginTimestamp = System.currentTimeMillis();
489 Response response = null;
490 do {
491
492 try {
493 Thread.sleep(100L);
494 // Thread.sleep(100L + random.nextInt(900)); // TODO make configurable?!
495 } catch (InterruptedException e) {
496 // ignore
497 }
498
499 logger.trace("[{}] _query[requestID={}]: Beginning tx.", thisID, requestID);
500
501 pm = createTransactionalPersistenceManager();
502 try {
503 // We now use optimistic tx, hence setSerializeRead makes no sense anymore.
504 // pm.currentTransaction().setSerializeRead(true);
505
506 pm.getFetchPlan().setGroups(new String[] { FetchPlan.DEFAULT, PendingRequest.FetchGroup.response });
507 PendingRequest pendingRequest = PendingRequest.getPendingRequest(pm, requestID);
508 if (pendingRequest == null)
509 logger.warn("_query[requestID={}]: Request is not found in the list of table of PendingRequest objects anymore.", requestID);
510 else {
511 switch (pendingRequest.getStatus()) {
512 case waitingForProcessing:
513 // nothing to do => wait!
514 break;
515 case currentlyBeingProcessed:
516 // nothing to do => wait!
517 break;
518 case completed:
519 response = pendingRequest.getResponse();
520 if (response == null)
521 throw new IllegalStateException("pending.response is null, even though status is 'completed'!!!");
522 break;
523 default:
524 throw new IllegalStateException("Unknown status: " + pendingRequest.getStatus());
525 }
526
527 if (response != null)
528 pm.deletePersistent(pendingRequest);
529 }
530
531 if (response == null && System.currentTimeMillis() - beginTimestamp > getQueryTimeout()) {
532 logger.warn(
533 "[{}] _query[requestID={}]: Request for session {} was not answered within timeout. Current status is {}.",
534 new Object[] {
535 thisID,
536 requestID,
537 (pendingRequest == null ? null : pendingRequest.getRequest().getCryptoSessionID()),
538 (pendingRequest == null ? null : pendingRequest.getStatus())
539 }
540 );
541
542 if (pendingRequest != null)
543 pm.deletePersistent(pendingRequest);
544
545 pm.currentTransaction().commit();
546
547 throw new TimeoutException("Request was not answered within timeout! requestID=" + requestID);
548 }
549
550 pm.currentTransaction().commit();
551 } catch (Exception x) {
552 response = null;
553 logger.warn("[{}] _query[requestID={}]: {}", new Object[] { thisID, requestID, x.toString() });
554 } finally {
555 if (pm.currentTransaction().isActive())
556 pm.currentTransaction().rollback();
557
558 pm.close();
559 }
560
561 logger.trace("[{}] _query[requestID={}]: Ended tx. response={}", new Object[] { thisID, requestID, response });
562
563 } while (response == null);
564
565 return response;
566 }
567
568 // private Set<String> testCollisionDetection = Collections.synchronizedSet(new HashSet<String>());
569
570 @Override
571 protected Request _pollRequest(String cryptoSessionIDPrefix)
572 {
573 logger.debug("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Entered.", thisID, cryptoSessionIDPrefix);
574
575 long beginTimestamp = System.currentTimeMillis();
576
577 initTimerTaskOrRemoveExpiredPendingRequestsPeriodically();
578
579 Request request = null;
580 do {
581 logger.trace("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Beginning tx.", thisID, cryptoSessionIDPrefix);
582
583 PersistenceManager pm = createTransactionalPersistenceManager();
584 try {
585 // We now use optimistic tx, hence the following makes no sense anymore.
586 // pm.currentTransaction().setSerializeRead(true);
587
588 PendingRequest pendingRequest = PendingRequest.getOldestPendingRequest(
589 pm, cryptoSessionIDPrefix, PendingRequestStatus.waitingForProcessing
590 );
591
592
593 // // BEGIN trying to produce collisions.
594 // try {
595 // Thread.sleep(500L);
596 // } catch (InterruptedException e) {
597 // // ignore - only log - and break loop.
598 // logger.warn("_pollRequest[cryptoSessionIDPrefix={}]: Thread.sleep(...) was interrupted with an InterruptedException.");
599 // }
600 // // END trying to produce collisions.
601
602
603 if (pendingRequest != null) {
604 pendingRequest.setStatus(PendingRequestStatus.currentlyBeingProcessed);
605 request = pendingRequest.getRequest();
606 }
607
608 pm.currentTransaction().commit();
609 } catch (Exception x) {
610 request = null;
611 logger.warn("[{}] _pollRequest[cryptoSessionIDPrefix={}]: {}", new Object[] { thisID, cryptoSessionIDPrefix, x.toString() });
612 } finally {
613 if (pm.currentTransaction().isActive())
614 pm.currentTransaction().rollback();
615
616 pm.close();
617 }
618
619 logger.trace("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Ended tx. request={}", new Object[] { thisID, cryptoSessionIDPrefix, request });
620
621 if (request == null) {
622 if (System.currentTimeMillis() - beginTimestamp > getPollRequestTimeout())
623 break;
624
625 try {
626 Thread.sleep(50L + random.nextInt(50)); // TODO make configurable?!
627 } catch (InterruptedException e) {
628 // ignore - only log - and break loop.
629 logger.warn("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Thread.sleep(...) was interrupted with an InterruptedException.", thisID);
630 break;
631 }
632 }
633 } while (request == null);
634
635 // if (request != null && !testCollisionDetection.add(request.getRequestID()))
636 // logger.error("_pollRequest[cryptoSessionIDPrefix={}]: COLLISION!!! At least two threads process the same request! requestID={}", request.getRequestID());
637
638 logger.debug("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Returning request: {}", new Object[] { thisID, cryptoSessionIDPrefix, request });
639
640 return request;
641 }
642
643 @Override
644 protected void _pushResponse(Response response)
645 {
646 if (response == null)
647 throw new IllegalArgumentException("response == null");
648
649 if (response.getRequestID() == null)
650 throw new IllegalArgumentException("response.requestID == null");
651
652 String requestID = response.getRequestID();
653
654 logger.debug("[{}] _pushResponse[requestID={}]: Entered.", thisID, requestID);
655
656 List<Throwable> errors = new LinkedList<Throwable>();
657 boolean successful;
658 for (int tryCounter = 0; tryCounter < 10; ++tryCounter) {
659 successful = false;
660 PersistenceManager pm = createTransactionalPersistenceManager();
661 try {
662 // pm.currentTransaction().setSerializeRead(true); // Now using optimistic TX instead.
663
664 PendingRequest pendingRequest = PendingRequest.getPendingRequest(pm, response.getRequestID());
665 if (pendingRequest == null || pendingRequest.getStatus() != PendingRequestStatus.currentlyBeingProcessed)
666 logger.warn("[{}] _pushResponse[requestID={}]: There is no request currently being processed with this requestID!!!", thisID, requestID);
667 else {
668 pendingRequest.setResponse(response);
669 pendingRequest.setStatus(PendingRequestStatus.completed);
670 }
671
672 pm.currentTransaction().commit(); successful = true;
673 } catch (Exception x) {
674 errors.add(x);
675 logger.warn("[{}] _pushResponse[requestID={}]: {}", new Object[] { thisID, requestID, x.toString() });
676 } finally {
677 if (pm.currentTransaction().isActive())
678 pm.currentTransaction().rollback();
679
680 pm.close();
681 }
682
683 if (successful) {
684 errors.clear();
685 break;
686 }
687 else {
688 // In case of an error, we wait a bit before trying it again.
689 try {
690 Thread.sleep(500L);
691 } catch (InterruptedException e) {
692 // ignore - only log - and break loop.
693 logger.warn("[{}] _pushResponse[requestID={}]: Thread.sleep(...) was interrupted with an InterruptedException.", thisID, requestID);
694 break;
695 }
696 }
697 }
698
699 if (!errors.isEmpty()) {
700 Throwable lastError = null;
701 for (Throwable e : errors) {
702 lastError = e;
703 logger.warn("[" + thisID + "] _pushResponse[requestID=" + requestID + "]: " + e, e);
704 }
705 if (lastError instanceof RuntimeException)
706 throw (RuntimeException)lastError;
707 else
708 throw new RuntimeException(lastError);
709 }
710 }
711
712 }