001package co.codewizards.cloudstore.local;
002
003import static java.util.Objects.*;
004
005import java.util.HashMap;
006import java.util.Map;
007import java.util.concurrent.CopyOnWriteArrayList;
008import java.util.concurrent.TimeUnit;
009import java.util.concurrent.locks.Lock;
010
011import javax.jdo.PersistenceManager;
012import javax.jdo.PersistenceManagerFactory;
013import javax.jdo.Transaction;
014
015import org.slf4j.Logger;
016import org.slf4j.LoggerFactory;
017
018import co.codewizards.cloudstore.core.context.ExtensibleContextSupport;
019import co.codewizards.cloudstore.core.io.TimeoutException;
020import co.codewizards.cloudstore.core.repo.local.ContextWithLocalRepoManager;
021import co.codewizards.cloudstore.core.repo.local.LocalRepoManager;
022import co.codewizards.cloudstore.core.repo.local.LocalRepoTransaction;
023import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionListenerRegistry;
024import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPostCloseEvent;
025import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPostCloseListener;
026import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPreCloseEvent;
027import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPreCloseListener;
028import co.codewizards.cloudstore.local.persistence.Dao;
029import co.codewizards.cloudstore.local.persistence.LocalRepository;
030import co.codewizards.cloudstore.local.persistence.LocalRepositoryDao;
031
032public class LocalRepoTransactionImpl implements LocalRepoTransaction, ContextWithLocalRepoManager, ContextWithPersistenceManager {
033        private static final Logger logger = LoggerFactory.getLogger(LocalRepoTransactionImpl.class);
034
035        public static final long LOCK_TIMEOUT = 5 * 60 * 1000;
036
037        private final LocalRepoManager localRepoManager;
038        private final PersistenceManagerFactory persistenceManagerFactory;
039        private final boolean write;
040        private PersistenceManager persistenceManager;
041        private Transaction jdoTransaction;
042        private final Lock lock;
043        private long localRevision = -1;
044        private final Map<Class<?>, Object> daoClass2Dao = new HashMap<>();
045        private final ExtensibleContextSupport extensibleContextSupport = new ExtensibleContextSupport();
046
047        private final LocalRepoTransactionListenerRegistry listenerRegistry = new LocalRepoTransactionListenerRegistry(this);
048
049        private final CopyOnWriteArrayList<LocalRepoTransactionPreCloseListener> preCloseListeners = new CopyOnWriteArrayList<>();
050        private final CopyOnWriteArrayList<LocalRepoTransactionPostCloseListener> postCloseListeners = new CopyOnWriteArrayList<>();
051
052        public LocalRepoTransactionImpl(final LocalRepoManagerImpl localRepoManager, final boolean write) {
053                this.localRepoManager = requireNonNull(localRepoManager, "localRepoManager");
054                this.persistenceManagerFactory = requireNonNull(localRepoManager.getPersistenceManagerFactory(), "localRepoManager.persistenceManagerFactory");
055                this.lock = localRepoManager.getLock();
056                this.write = write;
057                begin();
058        }
059
060        private void begin() {
061                boolean locked = false;
062                try {
063                        locked = lock.tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
064                } catch (InterruptedException e) {
065                        // ignore
066                }
067                if (! locked)
068                        throw new TimeoutException(String.format("Starting %s transaction on '%s' within timeout (%s ms) failed! ", write ? "write" : "read", localRepoManager.getLocalRoot(), LOCK_TIMEOUT));
069                try {
070                        if (isActive())
071                                throw new IllegalStateException("Transaction is already active!");
072
073                        lockIfWrite();
074
075                        persistenceManager = persistenceManagerFactory.getPersistenceManager();
076                        jdoTransaction = persistenceManager.currentTransaction();
077                        jdoTransaction.begin();
078                        listenerRegistry.onBegin();
079                } finally {
080                        lock.unlock();
081                }
082        }
083
084        private final void lockIfWrite() {
085                if (write)
086                        lock.lock(); // UNbalance lock to keep it after method returns!
087        }
088
089        private final void unlockIfWrite() {
090                if (write)
091                        lock.unlock(); // UNbalance unlock to counter the unbalanced lock in lockIfWrite().
092        }
093
094        @Override
095        public void commit() {
096                lock.lock();
097                try {
098                        if (!isActive())
099                                throw new IllegalStateException("Transaction is not active!");
100
101                        listenerRegistry.onCommit();
102                        firePreCloseListeners(true);
103                        daoClass2Dao.clear();
104                        jdoTransaction.commit();
105                        persistenceManager.close();
106                        jdoTransaction = null;
107                        persistenceManager = null;
108                        localRevision = -1;
109
110                        unlockIfWrite();
111                } finally {
112                        lock.unlock();
113                }
114                firePostCloseListeners(true);
115        }
116
117        @Override
118        public boolean isActive() {
119                lock.lock();
120                try {
121                        return jdoTransaction != null && jdoTransaction.isActive();
122                } finally {
123                        lock.unlock();
124                }
125        }
126
127        @Override
128        public void rollback() {
129                _rollback();
130                firePostCloseListeners(false);
131        }
132
133        @Override
134        public void rollbackIfActive() {
135                boolean active;
136                lock.lock();
137                try {
138                        active = isActive();
139                        if (active) {
140                                _rollback();
141                        }
142                } finally {
143                        lock.unlock();
144                }
145                if (active) {
146                        firePostCloseListeners(false);
147                }
148        }
149
150        protected void _rollback() {
151                lock.lock();
152                try {
153                        if (!isActive())
154                                throw new IllegalStateException("Transaction is not active!");
155
156                        listenerRegistry.onRollback();
157                        firePreCloseListeners(false);
158                        daoClass2Dao.clear();
159                        jdoTransaction.rollback();
160                        persistenceManager.close();
161                        jdoTransaction = null;
162                        persistenceManager = null;
163                        localRevision = -1;
164
165                        unlockIfWrite();
166                } finally {
167                        lock.unlock();
168                }
169        }
170
171        @Override
172        public void close() {
173                rollbackIfActive();
174        }
175
176        @Override
177        public PersistenceManager getPersistenceManager() {
178                if (!isActive()) {
179                        throw new IllegalStateException("Transaction is not active!");
180                }
181                return persistenceManager;
182        }
183
184        @Override
185        public long getLocalRevision() {
186                if (localRevision < 0) {
187                        if (!write)
188                                throw new IllegalStateException("This is a read-only transaction!");
189
190                        jdoTransaction.setSerializeRead(true);
191                        final LocalRepository lr = getDao(LocalRepositoryDao.class).getLocalRepositoryOrFail();
192                        jdoTransaction.setSerializeRead(null);
193                        localRevision = lr.getRevision() + 1;
194                        lr.setRevision(localRevision);
195                        persistenceManager.flush();
196                }
197                return localRevision;
198        }
199
200        @Override
201        public LocalRepoManager getLocalRepoManager() {
202                return localRepoManager;
203        }
204
205        @Override
206        public <D> D getDao(final Class<D> daoClass) {
207                requireNonNull(daoClass, "daoClass");
208
209                @SuppressWarnings("unchecked")
210                D dao = (D) daoClass2Dao.get(daoClass);
211
212                if (dao == null) {
213                        final PersistenceManager pm = getPersistenceManager();
214                        try {
215                                dao = daoClass.newInstance();
216                        } catch (final InstantiationException e) {
217                                throw new RuntimeException(e);
218                        } catch (final IllegalAccessException e) {
219                                throw new RuntimeException(e);
220                        }
221
222                        if (!(dao instanceof Dao))
223                                throw new IllegalStateException(String.format("dao class %s does not extend Dao!", daoClass.getName()));
224
225                        ((Dao<?, ?>)dao).setPersistenceManager(pm);
226                        ((Dao<?, ?>)dao).setDaoProvider(this);
227
228                        daoClass2Dao.put(daoClass, dao);
229                }
230                return dao;
231        }
232
233        @Override
234        public void flush() {
235                final PersistenceManager pm = getPersistenceManager();
236                pm.flush();
237        }
238
239        @Override
240        public void setContextObject(final Object object) {
241                extensibleContextSupport.setContextObject(object);
242        }
243
244        @Override
245        public <T> T getContextObject(final Class<T> clazz) {
246                return extensibleContextSupport.getContextObject(clazz);
247        }
248
249        @Override
250        public void removeContextObject(Object object) {
251                extensibleContextSupport.removeContextObject(object);
252        }
253
254        @Override
255        public void removeContextObject(Class<?> clazz) {
256                extensibleContextSupport.removeContextObject(clazz);
257        }
258
259        @Override
260        public void addPreCloseListener(LocalRepoTransactionPreCloseListener listener) {
261                preCloseListeners.add(requireNonNull(listener, "listener"));
262        }
263        @Override
264        public void addPostCloseListener(LocalRepoTransactionPostCloseListener listener) {
265                postCloseListeners.add(requireNonNull(listener, "listener"));
266        }
267
268        protected void firePreCloseListeners(final boolean commit) {
269                LocalRepoTransactionPreCloseEvent event = null;
270                for (final LocalRepoTransactionPreCloseListener listener : preCloseListeners) {
271                        try {
272                                if (event == null)
273                                        event = new LocalRepoTransactionPreCloseEvent(this);
274
275                                if (commit)
276                                        listener.preCommit(event);
277                                else
278                                        listener.preRollback(event);
279                        } catch (Exception x) {
280                                logger.error("firePreCloseListeners: " + x, x);
281                        }
282                }
283        }
284        protected void firePostCloseListeners(final boolean commit) {
285                LocalRepoTransactionPostCloseEvent event = null;
286                for (final LocalRepoTransactionPostCloseListener listener : postCloseListeners) {
287                        try {
288                                if (event == null)
289                                        event = new LocalRepoTransactionPostCloseEvent(this);
290
291                                if (commit)
292                                        listener.postCommit(event);
293                                else
294                                        listener.postRollback(event);
295                        } catch (Exception x) {
296                                logger.error("firePostCloseListeners: " + x, x);
297                        }
298                }
299        }
300}