001package co.codewizards.cloudstore.local; 002 003import static java.util.Objects.*; 004 005import java.util.HashMap; 006import java.util.Map; 007import java.util.concurrent.CopyOnWriteArrayList; 008import java.util.concurrent.TimeUnit; 009import java.util.concurrent.locks.Lock; 010 011import javax.jdo.PersistenceManager; 012import javax.jdo.PersistenceManagerFactory; 013import javax.jdo.Transaction; 014 015import org.slf4j.Logger; 016import org.slf4j.LoggerFactory; 017 018import co.codewizards.cloudstore.core.context.ExtensibleContextSupport; 019import co.codewizards.cloudstore.core.io.TimeoutException; 020import co.codewizards.cloudstore.core.repo.local.ContextWithLocalRepoManager; 021import co.codewizards.cloudstore.core.repo.local.LocalRepoManager; 022import co.codewizards.cloudstore.core.repo.local.LocalRepoTransaction; 023import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionListenerRegistry; 024import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPostCloseEvent; 025import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPostCloseListener; 026import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPreCloseEvent; 027import co.codewizards.cloudstore.core.repo.local.LocalRepoTransactionPreCloseListener; 028import co.codewizards.cloudstore.local.persistence.Dao; 029import co.codewizards.cloudstore.local.persistence.LocalRepository; 030import co.codewizards.cloudstore.local.persistence.LocalRepositoryDao; 031 032public class LocalRepoTransactionImpl implements LocalRepoTransaction, ContextWithLocalRepoManager, ContextWithPersistenceManager { 033 private static final Logger logger = LoggerFactory.getLogger(LocalRepoTransactionImpl.class); 034 035 public static final long LOCK_TIMEOUT = 5 * 60 * 1000; 036 037 private final LocalRepoManager localRepoManager; 038 private final PersistenceManagerFactory persistenceManagerFactory; 039 private final boolean write; 040 private PersistenceManager persistenceManager; 041 private Transaction jdoTransaction; 042 private final Lock lock; 043 private long localRevision = -1; 044 private final Map<Class<?>, Object> daoClass2Dao = new HashMap<>(); 045 private final ExtensibleContextSupport extensibleContextSupport = new ExtensibleContextSupport(); 046 047 private final LocalRepoTransactionListenerRegistry listenerRegistry = new LocalRepoTransactionListenerRegistry(this); 048 049 private final CopyOnWriteArrayList<LocalRepoTransactionPreCloseListener> preCloseListeners = new CopyOnWriteArrayList<>(); 050 private final CopyOnWriteArrayList<LocalRepoTransactionPostCloseListener> postCloseListeners = new CopyOnWriteArrayList<>(); 051 052 public LocalRepoTransactionImpl(final LocalRepoManagerImpl localRepoManager, final boolean write) { 053 this.localRepoManager = requireNonNull(localRepoManager, "localRepoManager"); 054 this.persistenceManagerFactory = requireNonNull(localRepoManager.getPersistenceManagerFactory(), "localRepoManager.persistenceManagerFactory"); 055 this.lock = localRepoManager.getLock(); 056 this.write = write; 057 begin(); 058 } 059 060 private void begin() { 061 boolean locked = false; 062 try { 063 locked = lock.tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS); 064 } catch (InterruptedException e) { 065 // ignore 066 } 067 if (! locked) 068 throw new TimeoutException(String.format("Starting %s transaction on '%s' within timeout (%s ms) failed! ", write ? "write" : "read", localRepoManager.getLocalRoot(), LOCK_TIMEOUT)); 069 try { 070 if (isActive()) 071 throw new IllegalStateException("Transaction is already active!"); 072 073 lockIfWrite(); 074 075 persistenceManager = persistenceManagerFactory.getPersistenceManager(); 076 jdoTransaction = persistenceManager.currentTransaction(); 077 jdoTransaction.begin(); 078 listenerRegistry.onBegin(); 079 } finally { 080 lock.unlock(); 081 } 082 } 083 084 private final void lockIfWrite() { 085 if (write) 086 lock.lock(); // UNbalance lock to keep it after method returns! 087 } 088 089 private final void unlockIfWrite() { 090 if (write) 091 lock.unlock(); // UNbalance unlock to counter the unbalanced lock in lockIfWrite(). 092 } 093 094 @Override 095 public void commit() { 096 lock.lock(); 097 try { 098 if (!isActive()) 099 throw new IllegalStateException("Transaction is not active!"); 100 101 listenerRegistry.onCommit(); 102 firePreCloseListeners(true); 103 daoClass2Dao.clear(); 104 jdoTransaction.commit(); 105 persistenceManager.close(); 106 jdoTransaction = null; 107 persistenceManager = null; 108 localRevision = -1; 109 110 unlockIfWrite(); 111 } finally { 112 lock.unlock(); 113 } 114 firePostCloseListeners(true); 115 } 116 117 @Override 118 public boolean isActive() { 119 lock.lock(); 120 try { 121 return jdoTransaction != null && jdoTransaction.isActive(); 122 } finally { 123 lock.unlock(); 124 } 125 } 126 127 @Override 128 public void rollback() { 129 _rollback(); 130 firePostCloseListeners(false); 131 } 132 133 @Override 134 public void rollbackIfActive() { 135 boolean active; 136 lock.lock(); 137 try { 138 active = isActive(); 139 if (active) { 140 _rollback(); 141 } 142 } finally { 143 lock.unlock(); 144 } 145 if (active) { 146 firePostCloseListeners(false); 147 } 148 } 149 150 protected void _rollback() { 151 lock.lock(); 152 try { 153 if (!isActive()) 154 throw new IllegalStateException("Transaction is not active!"); 155 156 listenerRegistry.onRollback(); 157 firePreCloseListeners(false); 158 daoClass2Dao.clear(); 159 jdoTransaction.rollback(); 160 persistenceManager.close(); 161 jdoTransaction = null; 162 persistenceManager = null; 163 localRevision = -1; 164 165 unlockIfWrite(); 166 } finally { 167 lock.unlock(); 168 } 169 } 170 171 @Override 172 public void close() { 173 rollbackIfActive(); 174 } 175 176 @Override 177 public PersistenceManager getPersistenceManager() { 178 if (!isActive()) { 179 throw new IllegalStateException("Transaction is not active!"); 180 } 181 return persistenceManager; 182 } 183 184 @Override 185 public long getLocalRevision() { 186 if (localRevision < 0) { 187 if (!write) 188 throw new IllegalStateException("This is a read-only transaction!"); 189 190 jdoTransaction.setSerializeRead(true); 191 final LocalRepository lr = getDao(LocalRepositoryDao.class).getLocalRepositoryOrFail(); 192 jdoTransaction.setSerializeRead(null); 193 localRevision = lr.getRevision() + 1; 194 lr.setRevision(localRevision); 195 persistenceManager.flush(); 196 } 197 return localRevision; 198 } 199 200 @Override 201 public LocalRepoManager getLocalRepoManager() { 202 return localRepoManager; 203 } 204 205 @Override 206 public <D> D getDao(final Class<D> daoClass) { 207 requireNonNull(daoClass, "daoClass"); 208 209 @SuppressWarnings("unchecked") 210 D dao = (D) daoClass2Dao.get(daoClass); 211 212 if (dao == null) { 213 final PersistenceManager pm = getPersistenceManager(); 214 try { 215 dao = daoClass.newInstance(); 216 } catch (final InstantiationException e) { 217 throw new RuntimeException(e); 218 } catch (final IllegalAccessException e) { 219 throw new RuntimeException(e); 220 } 221 222 if (!(dao instanceof Dao)) 223 throw new IllegalStateException(String.format("dao class %s does not extend Dao!", daoClass.getName())); 224 225 ((Dao<?, ?>)dao).setPersistenceManager(pm); 226 ((Dao<?, ?>)dao).setDaoProvider(this); 227 228 daoClass2Dao.put(daoClass, dao); 229 } 230 return dao; 231 } 232 233 @Override 234 public void flush() { 235 final PersistenceManager pm = getPersistenceManager(); 236 pm.flush(); 237 } 238 239 @Override 240 public void setContextObject(final Object object) { 241 extensibleContextSupport.setContextObject(object); 242 } 243 244 @Override 245 public <T> T getContextObject(final Class<T> clazz) { 246 return extensibleContextSupport.getContextObject(clazz); 247 } 248 249 @Override 250 public void removeContextObject(Object object) { 251 extensibleContextSupport.removeContextObject(object); 252 } 253 254 @Override 255 public void removeContextObject(Class<?> clazz) { 256 extensibleContextSupport.removeContextObject(clazz); 257 } 258 259 @Override 260 public void addPreCloseListener(LocalRepoTransactionPreCloseListener listener) { 261 preCloseListeners.add(requireNonNull(listener, "listener")); 262 } 263 @Override 264 public void addPostCloseListener(LocalRepoTransactionPostCloseListener listener) { 265 postCloseListeners.add(requireNonNull(listener, "listener")); 266 } 267 268 protected void firePreCloseListeners(final boolean commit) { 269 LocalRepoTransactionPreCloseEvent event = null; 270 for (final LocalRepoTransactionPreCloseListener listener : preCloseListeners) { 271 try { 272 if (event == null) 273 event = new LocalRepoTransactionPreCloseEvent(this); 274 275 if (commit) 276 listener.preCommit(event); 277 else 278 listener.preRollback(event); 279 } catch (Exception x) { 280 logger.error("firePreCloseListeners: " + x, x); 281 } 282 } 283 } 284 protected void firePostCloseListeners(final boolean commit) { 285 LocalRepoTransactionPostCloseEvent event = null; 286 for (final LocalRepoTransactionPostCloseListener listener : postCloseListeners) { 287 try { 288 if (event == null) 289 event = new LocalRepoTransactionPostCloseEvent(this); 290 291 if (commit) 292 listener.postCommit(event); 293 else 294 listener.postRollback(event); 295 } catch (Exception x) { 296 logger.error("firePostCloseListeners: " + x, x); 297 } 298 } 299 } 300}