001 /*
002 * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org
003 * Copyright (C) 2011 NightLabs Consulting GmbH
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program. If not, see <http://www.gnu.org/licenses/>.
017 */
018 package org.cumulus4j.store;
019
020 import java.util.HashMap;
021 import java.util.Locale;
022 import java.util.Map;
023 import java.util.Set;
024
025 import javax.jdo.JDOHelper;
026 import javax.jdo.PersistenceManager;
027 import javax.jdo.PersistenceManagerFactory;
028 import javax.transaction.xa.XAException;
029 import javax.transaction.xa.XAResource;
030 import javax.transaction.xa.Xid;
031
032 import org.cumulus4j.store.model.ClassMeta;
033 import org.cumulus4j.store.model.DataEntry;
034 import org.cumulus4j.store.model.DatastoreVersion;
035 import org.cumulus4j.store.model.EmbeddedClassMeta;
036 import org.cumulus4j.store.model.EmbeddedFieldMeta;
037 import org.cumulus4j.store.model.EncryptionCoordinateSet;
038 import org.cumulus4j.store.model.FieldMeta;
039 import org.cumulus4j.store.model.IndexEntry;
040 import org.cumulus4j.store.model.IndexEntryContainerSize;
041 import org.cumulus4j.store.model.Sequence2;
042 import org.cumulus4j.store.resource.ResourceHelper;
043 import org.datanucleus.ExecutionContext;
044 import org.datanucleus.PersistenceConfiguration;
045 import org.datanucleus.store.StoreManager;
046 import org.datanucleus.store.connection.AbstractConnectionFactory;
047 import org.datanucleus.store.connection.AbstractManagedConnection;
048 import org.datanucleus.store.connection.ManagedConnection;
049 import org.datanucleus.util.NucleusLogger;
050 import org.datanucleus.util.StringUtils;
051
052 /**
053 * <p>
054 * Connection factory implementation for Cumulus4j-connections.
055 * </p><p>
056 * A "connection" in Cumulus4J is a <code>PersistenceManager</code> for the backing datastore.
057 * When the transaction in Cumulus4J is committed, the equivalent transaction is committed in the PM(s) of the
058 * backing datastore(s).
059 * </p><p>
060 * How to configure a connection factory is documented on
061 * <a href="http://cumulus4j.org/1.2.0/documentation/persistence-api.html">Persistence API</a>.
062 * </p>
063 * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de
064 */
065 public class Cumulus4jConnectionFactory extends AbstractConnectionFactory
066 {
067 /** PMF for DataEntry, ClassMeta+FieldMeta, and optionally index data (if not using pmfIndex). */
068 private PersistenceManagerFactory pmf;
069
070 /** Optional PMF for index data. */
071 private PersistenceManagerFactory pmfIndex;
072
073 private String[] propertiesToForward = {
074 "datanucleus.ConnectionDriverName",
075 "datanucleus.ConnectionURL",
076 "datanucleus.ConnectionUserName",
077 "datanucleus.ConnectionFactory",
078 "datanucleus.ConnectionFactoryName",
079 "datanucleus.ConnectionFactory2",
080 "datanucleus.ConnectionFactory2Name"
081 };
082
083 private static final String CUMULUS4J_PROPERTY_PREFIX = "cumulus4j.";
084 private static final String CUMULUS4J_INDEX_PROPERTY_PREFIX = "cumulus4j.index.";
085
086 private static final String[] CUMULUS4J_FORWARD_PROPERTY_PREFIXES = {
087 CUMULUS4J_PROPERTY_PREFIX + "datanucleus.",
088 CUMULUS4J_PROPERTY_PREFIX + "javax."
089 };
090
091 private static final String[] CUMULUS4J_INDEX_FORWARD_PROPERTY_PREFIXES = {
092 CUMULUS4J_INDEX_PROPERTY_PREFIX + "datanucleus.",
093 CUMULUS4J_INDEX_PROPERTY_PREFIX + "javax."
094 };
095
096 public Cumulus4jConnectionFactory(StoreManager storeMgr, String resourceType) {
097 super(storeMgr, resourceType);
098
099 Map<String, Object> backendProperties = ResourceHelper.getCumulus4jBackendProperties();
100 Map<String, Object> persistenceProperties = ResourceHelper.getCumulus4jPersistenceProperties();
101 Map<String, Object> backendIndexProperties = null;
102
103 PersistenceConfiguration persistenceConfiguration = storeMgr.getNucleusContext().getPersistenceConfiguration();
104 for (Map.Entry<String, Object> me : persistenceProperties.entrySet()) {
105 if (me.getKey() == null) // can't happen, but better play safe
106 continue;
107
108 if (!persistenceConfiguration.hasProperty(me.getKey())) // we load our defaults after the user config, hence we must not override!
109 persistenceConfiguration.setProperty(me.getKey(), me.getValue());
110 }
111
112 // Copy the properties that are directly (as is) forwarded.
113 for (String propKey : propertiesToForward) {
114 Object propValue = persistenceConfiguration.getProperty(propKey);
115 if (propValue != null)
116 backendProperties.put(propKey.toLowerCase(Locale.ENGLISH), propValue);
117 }
118
119 // Copy the properties that are prefixed with "cumulus4j." and thus forwarded.
120 for (Map.Entry<String, Object> me : persistenceConfiguration.getPersistenceProperties().entrySet()) {
121 if (me.getKey() == null) // don't know if null keys can ever occur, but better play safe
122 continue;
123
124 for (String prefix : CUMULUS4J_FORWARD_PROPERTY_PREFIXES) {
125 if (me.getKey().startsWith(prefix)) {
126 String propKey = me.getKey().substring(CUMULUS4J_PROPERTY_PREFIX.length());
127 backendProperties.put(propKey.toLowerCase(Locale.ENGLISH), me.getValue());
128 }
129 }
130
131 for (String prefix : CUMULUS4J_INDEX_FORWARD_PROPERTY_PREFIXES) {
132 if (me.getKey().startsWith(prefix)) {
133 String propKey = me.getKey().substring(CUMULUS4J_INDEX_PROPERTY_PREFIX.length());
134 if (backendIndexProperties == null) {
135 backendIndexProperties = new HashMap<String, Object>(backendProperties);
136 }
137 backendIndexProperties.put(propKey.toLowerCase(Locale.ENGLISH), me.getValue());
138 }
139 }
140 }
141
142 // The password might be encrypted, but the getConnectionPassword(...) method decrypts it.
143 String pw = storeMgr.getConnectionPassword();
144 if (pw != null) {
145 backendProperties.put("datanucleus.ConnectionPassword".toLowerCase(Locale.ENGLISH), pw);
146 }
147
148 // This block is an alternative to getting Extent of each Cumulus4j schema class
149 /* StringBuffer classNameStr = new StringBuffer();
150 classNameStr.append(ClassMeta.class.getName()).append(",");
151 classNameStr.append(DataEntry.class.getName()).append(",");
152 classNameStr.append(FieldMeta.class.getName()).append(",");
153 classNameStr.append(IndexEntryContainerSize.class.getName()).append(",");
154 classNameStr.append(Sequence.class.getName());
155 PluginManager pluginMgr = storeMgr.getNucleusContext().getPluginManager();
156 ConfigurationElement[] elems = pluginMgr.getConfigurationElementsForExtension(
157 "org.cumulus4j.store.index_mapping", null, null);
158 if (elems != null && elems.length > 0) {
159 HashSet<Class> initialisedClasses = new HashSet<Class>();
160 for (int i=0;i<elems.length;i++) {
161 String indexTypeName = elems[i].getAttribute("index-entry-type");
162 Class cls = pluginMgr.loadClass("org.cumulus4j.store.index_mapping", indexTypeName);
163 if (!initialisedClasses.contains(cls)) {
164 initialisedClasses.add(cls);
165 classNameStr.append(",").append(indexTypeName);
166 }
167 }
168 }
169 cumulus4jBackendProperties.put("datanucleus.autostartmechanism", "Classes");
170 cumulus4jBackendProperties.put("datanucleus.autostartclassnames", classNameStr.toString());*/
171
172 // PMF for data (and optionally index)
173 if (backendIndexProperties == null) {
174 NucleusLogger.GENERAL.debug("Creating PMF for Data+Index with the following properties : "+StringUtils.mapToString(backendProperties));
175 }
176 else {
177 NucleusLogger.GENERAL.debug("Creating PMF for Data with the following properties : "+StringUtils.mapToString(backendProperties));
178 }
179 pmf = JDOHelper.getPersistenceManagerFactory(backendProperties);
180
181 // initialise meta-data (which partially tests it)
182 PersistenceManager pm = pmf.getPersistenceManager();
183 try {
184 // Class structure meta-data
185 pm.getExtent(ClassMeta.class);
186 pm.getExtent(FieldMeta.class);
187 pm.getExtent(EmbeddedClassMeta.class);
188 pm.getExtent(EmbeddedFieldMeta.class);
189
190 // Data
191 pm.getExtent(DataEntry.class);
192
193 // Sequence for ID generation
194 pm.getExtent(Sequence2.class);
195
196 // Mapping for encryption settings (encryption algorithm, mode, padding, MAC, etc.
197 // are mapped to a number which reduces the size of each record)
198 pm.getExtent(EncryptionCoordinateSet.class);
199
200 // versioning of datastore structure
201 pm.getExtent(DatastoreVersion.class);
202
203 if (backendIndexProperties == null) {
204 // Index
205 initialiseIndexMetaData(pm, storeMgr);
206 }
207 } finally {
208 pm.close();
209 }
210
211 if (backendIndexProperties != null) {
212 // PMF for index data
213 NucleusLogger.GENERAL.debug("Creating PMF for Index data with the following properties : "+StringUtils.mapToString(backendIndexProperties));
214 pmfIndex = JDOHelper.getPersistenceManagerFactory(backendIndexProperties);
215
216 PersistenceManager pmIndex = pmfIndex.getPersistenceManager();
217 try {
218 // Class structure meta-data
219 pmIndex.getExtent(ClassMeta.class);
220 pmIndex.getExtent(FieldMeta.class);
221
222 // Index
223 initialiseIndexMetaData(pmIndex, storeMgr);
224
225 // versioning of datastore structure
226 pm.getExtent(DatastoreVersion.class);
227 } finally {
228 pmIndex.close();
229 }
230 }
231 }
232
233 private static void initialiseIndexMetaData(PersistenceManager pm, StoreManager storeMgr)
234 {
235 // While it is not necessary to initialise the meta-data now (can be done lazily,
236 // when the index is used), it is still better as it prevents delays when the
237 // data is persisted.
238 // Furthermore, if the underlying database uses transactional DDL (like PostgreSQL, MSSQL
239 // and others), and a separate JDBC connection is used for DDL (like it must be in
240 // a JEE server), it is essentially required to initialise the meta-data in a separate
241 // transaction before actually using the tables.
242 pm.getExtent(IndexEntryContainerSize.class);
243
244 Set<Class<? extends IndexEntry>> indexEntryClasses = ((Cumulus4jStoreManager)storeMgr).getIndexFactoryRegistry().getIndexEntryClasses();
245 for (Class<? extends IndexEntry> indexEntryClass : indexEntryClasses) {
246 pm.getExtent(indexEntryClass);
247 }
248
249 // PluginManager pluginMgr = storeMgr.getNucleusContext().getPluginManager();
250 // ConfigurationElement[] elems = pluginMgr.getConfigurationElementsForExtension(
251 // "org.cumulus4j.store.index_mapping", null, null);
252 // if (elems != null && elems.length > 0) {
253 // HashSet<Class<?>> initialisedClasses = new HashSet<Class<?>>();
254 // for (int i=0;i<elems.length;i++) {
255 // String indexTypeName = elems[i].getAttribute("index-entry-type");
256 // Class<?> cls = pluginMgr.loadClass("org.cumulus4j.store.index_mapping", indexTypeName);
257 // if (!initialisedClasses.contains(cls)) {
258 // initialisedClasses.add(cls);
259 // pm.getExtent(cls);
260 // }
261 // }
262 // }
263 }
264
265 public PersistenceManagerFactory getPMFData() {
266 return pmf;
267 }
268
269 public PersistenceManagerFactory getPMFIndex() {
270 return pmfIndex;
271 }
272
273 @Override
274 public ManagedConnection createManagedConnection(ExecutionContext ec, @SuppressWarnings("rawtypes") Map transactionOptions)
275 {
276 return new Cumulus4jManagedConnection(ec, transactionOptions);
277 }
278
279 /**
280 * Cumulus4j-specific {@link ManagedConnection} implementation. Avoid to access this specific class whenever possible!
281 * @author mschulze
282 */
283 class Cumulus4jManagedConnection extends AbstractManagedConnection
284 {
285 private ExecutionContext ec;
286
287 @SuppressWarnings({"rawtypes","unused"})
288 private Map options;
289
290 PersistenceManagerConnection pmConnection;
291
292 @Override
293 public XAResource getXAResource() {
294 return new Cumulus4jXAResource((PersistenceManagerConnection)getConnection());
295 }
296
297 public Cumulus4jManagedConnection(ExecutionContext ec, @SuppressWarnings("rawtypes") Map options) {
298 this.ec = ec;
299 this.options = options;
300 }
301
302 public ExecutionContext getExecutionContext() {
303 return ec;
304 }
305
306 @Override
307 public void close() {
308 if (pmConnection != null) {
309 PersistenceManager dataPM = pmConnection.getDataPM();
310 dataPM.close();
311 if (pmConnection.indexHasOwnPM()) {
312 PersistenceManager indexPM = pmConnection.getIndexPM();
313 indexPM.close();
314 }
315 pmConnection = null;
316 }
317 }
318
319 @Override
320 public Object getConnection() {
321 if (pmConnection == null) {
322 this.pmConnection = new PersistenceManagerConnection(pmf.getPersistenceManager(),
323 pmfIndex != null ? pmfIndex.getPersistenceManager() : null);
324 }
325 return pmConnection;
326 }
327 }
328
329 class Cumulus4jXAResource implements XAResource {
330 private PersistenceManagerConnection pmConnection;
331 // private Xid xid;
332
333 Cumulus4jXAResource(PersistenceManagerConnection pmConn) {
334 this.pmConnection = pmConn;
335 }
336
337 @Override
338 public void start(Xid xid, int arg1) throws XAException {
339 // if (this.xid != null)
340 // throw new IllegalStateException("Transaction already started! Cannot start twice!");
341
342 PersistenceManager dataPM = pmConnection.getDataPM();
343 dataPM.currentTransaction().begin();
344 if (pmConnection.indexHasOwnPM()) {
345 PersistenceManager indexPM = pmConnection.getIndexPM();
346 indexPM.currentTransaction().begin();
347 }
348 // this.xid = xid;
349 }
350
351 @Override
352 public void commit(Xid xid, boolean arg1) throws XAException {
353 // if (this.xid == null)
354 // throw new IllegalStateException("Transaction not active!");
355 //
356 // if (!this.xid.equals(xid))
357 // throw new IllegalStateException("Transaction mismatch! this.xid=" + this.xid + " otherXid=" + xid);
358
359 PersistenceManager dataPM = pmConnection.getDataPM();
360 dataPM.currentTransaction().commit();
361 if (pmConnection.indexHasOwnPM()) {
362 PersistenceManager indexPM = pmConnection.getIndexPM();
363 indexPM.currentTransaction().commit();
364 }
365
366 // this.xid = null;
367 }
368
369 @Override
370 public void rollback(Xid xid) throws XAException {
371 // if (this.xid == null)
372 // throw new IllegalStateException("Transaction not active!");
373 //
374 // if (!this.xid.equals(xid))
375 // throw new IllegalStateException("Transaction mismatch! this.xid=" + this.xid + " otherXid=" + xid);
376
377 PersistenceManager dataPM = pmConnection.getDataPM();
378 dataPM.currentTransaction().rollback();
379 if (pmConnection.indexHasOwnPM()) {
380 PersistenceManager indexPM = pmConnection.getIndexPM();
381 indexPM.currentTransaction().rollback();
382 }
383
384 // this.xid = null;
385 }
386
387 @Override
388 public void end(Xid arg0, int arg1) throws XAException {
389 //ignore
390 }
391
392 @Override
393 public void forget(Xid arg0) throws XAException {
394 //ignore
395 }
396
397 @Override
398 public int getTransactionTimeout() throws XAException {
399 return 0;
400 }
401
402 @Override
403 public boolean isSameRM(XAResource resource) throws XAException {
404 if ((resource instanceof Cumulus4jXAResource) && pmConnection.equals(((Cumulus4jXAResource)resource).pmConnection))
405 return true;
406 else
407 return false;
408 }
409
410 @Override
411 public int prepare(Xid arg0) throws XAException {
412 return 0;
413 }
414
415 @Override
416 public Xid[] recover(int arg0) throws XAException {
417 throw new XAException("Unsupported operation");
418 }
419
420 @Override
421 public boolean setTransactionTimeout(int arg0) throws XAException {
422 return false;
423 }
424 }
425 }