001package co.codewizards.cloudstore.core.repo.sync; 002 003import static java.util.Objects.*; 004 005import java.beans.PropertyChangeListener; 006import java.beans.PropertyChangeSupport; 007import java.net.URL; 008import java.util.ArrayList; 009import java.util.Collections; 010import java.util.HashMap; 011import java.util.HashSet; 012import java.util.Iterator; 013import java.util.LinkedHashSet; 014import java.util.LinkedList; 015import java.util.List; 016import java.util.Map; 017import java.util.Set; 018import java.util.UUID; 019import java.util.concurrent.ExecutorService; 020import java.util.concurrent.Executors; 021import java.util.concurrent.ThreadFactory; 022import java.util.concurrent.atomic.AtomicInteger; 023 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import co.codewizards.cloudstore.core.Severity; 028import co.codewizards.cloudstore.core.config.Config; 029import co.codewizards.cloudstore.core.config.ConfigImpl; 030import co.codewizards.cloudstore.core.dto.Error; 031import co.codewizards.cloudstore.core.oio.File; 032import co.codewizards.cloudstore.core.repo.local.LocalRepoHelper; 033import co.codewizards.cloudstore.core.repo.local.LocalRepoManager; 034import co.codewizards.cloudstore.core.repo.local.LocalRepoManagerFactory; 035 036public class RepoSyncDaemonImpl implements RepoSyncDaemon { 037 private final PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this); 038 039 private Set<RepoSyncQueueItem> syncQueue = new LinkedHashSet<>(); 040 private Map<UUID, RepoSyncRunner> repositoryId2SyncRunner = new HashMap<>(); 041 private final ExecutorService executorService; 042 private Map<UUID, Set<RepoSyncActivity>> repositoryId2SyncActivities = new HashMap<>(); 043 private Map<UUID, List<RepoSyncState>> repositoryId2SyncStates = new HashMap<>(); 044 private static final AtomicInteger threadGroupIndex = new AtomicInteger(); 045 private final AtomicInteger threadIndex = new AtomicInteger(); 046 047 private static final class Holder { 048 public static final RepoSyncDaemonImpl instance = new RepoSyncDaemonImpl(); 049 } 050 051 protected RepoSyncDaemonImpl() { 052 final int tgi = threadGroupIndex.getAndIncrement(); 053 final ThreadGroup threadGroup = new ThreadGroup("RepoSyncDaemonThreadGroup_" + tgi); 054 executorService = Executors.newCachedThreadPool(new ThreadFactory() { 055 @Override 056 public Thread newThread(Runnable r) { 057 return new Thread(threadGroup, r, "RepoSyncDaemonThread_" + tgi + "_" + threadIndex.getAndIncrement()); 058 } 059 }); 060 } 061 062 public static RepoSyncDaemon getInstance() { 063 return Holder.instance; 064 } 065 066 @Override 067 public UUID startSync(final File file) { 068 requireNonNull(file, "file"); 069 final File localRoot = LocalRepoHelper.getLocalRootContainingFile(file); 070 if (localRoot == null) 071 throw new IllegalArgumentException("File is not located inside a local repository: " + file); 072 073 final UUID repositoryId; 074 try (final LocalRepoManager localRepoManager = LocalRepoManagerFactory.Helper.getInstance().createLocalRepoManagerForExistingRepository(localRoot);) { 075 repositoryId = localRepoManager.getRepositoryId(); 076 } 077 078 final RepoSyncQueueItem repoSyncQueueItem = new RepoSyncQueueItem(repositoryId, localRoot); 079 enqueue(repoSyncQueueItem); 080 startSyncWithNextSyncQueueItem(repositoryId); 081 updateActivities(repositoryId); 082 return repositoryId; 083 } 084 085 private synchronized void enqueue(final RepoSyncQueueItem repoSyncQueueItem) { 086 syncQueue.add(repoSyncQueueItem); 087 } 088 089 private synchronized void startSyncWithNextSyncQueueItem(final UUID repositoryId) { 090 requireNonNull(repositoryId, "repositoryId"); 091 if (!repositoryId2SyncRunner.containsKey(repositoryId)) { 092 final RepoSyncQueueItem nextSyncQueueItem = pollSyncQueueItem(repositoryId); 093 if (nextSyncQueueItem != null) { 094 final RepoSyncRunner repoSyncRunner = new RepoSyncRunner(nextSyncQueueItem); 095 repositoryId2SyncRunner.put(nextSyncQueueItem.repositoryId, repoSyncRunner); 096 submitToExecutorService(nextSyncQueueItem); 097 } 098 } 099 } 100 101 private void submitToExecutorService(final RepoSyncQueueItem repoSyncQueueItem) { 102 final RepoSyncRunner repoSyncRunner = new RepoSyncRunner(repoSyncQueueItem); 103 synchronized (this) { 104 repositoryId2SyncRunner.put(repoSyncQueueItem.repositoryId, repoSyncRunner); 105 } 106 executorService.submit(new WrapperRunnable(repoSyncRunner)); 107 } 108 109 private class WrapperRunnable implements Runnable { 110 private final Logger logger = LoggerFactory.getLogger(RepoSyncDaemonImpl.WrapperRunnable.class); 111 112 private final UUID repositoryId; 113 private final RepoSyncRunner repoSyncRunner; 114 115 public WrapperRunnable(final RepoSyncRunner repoSyncRunner) { 116 this.repoSyncRunner = requireNonNull(repoSyncRunner, "repoSyncRunner"); 117 this.repositoryId = repoSyncRunner.getSyncQueueItem().repositoryId; 118 } 119 120 @Override 121 public void run() { 122 try { 123 repoSyncRunner.run(); 124 registerSyncSuccess(repoSyncRunner); 125 } catch (final Throwable x) { 126 logger.error("run: " + x, x); 127 registerSyncError(repoSyncRunner, x); 128 } 129 synchronized (RepoSyncDaemonImpl.this) { 130 final RepoSyncRunner removed = repositoryId2SyncRunner.remove(repositoryId); 131 if (removed != repoSyncRunner) 132 logger.error("run: removed != repoSyncRunner"); 133 134 startSyncWithNextSyncQueueItem(repositoryId); 135 } 136 updateActivities(repositoryId); 137 } 138 } 139 140 private void registerSyncSuccess(final RepoSyncRunner repoSyncRunner) { 141 requireNonNull(repoSyncRunner, "repoSyncRunner"); 142 143 final List<RepoSyncState> statesAdded = new ArrayList<RepoSyncState>(); 144 final List<RepoSyncState> statesRemoved; 145 final UUID localRepositoryId = repoSyncRunner.getSyncQueueItem().repositoryId; 146 final File localRoot = repoSyncRunner.getSyncQueueItem().localRoot; 147 synchronized (this) { 148 final List<RepoSyncState> list = _getRepoSyncStates(localRepositoryId); 149 for (final Map.Entry<UUID, URL> me : repoSyncRunner.getRemoteRepositoryId2RemoteRootMap().entrySet()) { 150 final UUID remoteRepositoryId = me.getKey(); 151 final URL remoteRoot = me.getValue(); 152 final RepoSyncState state = new RepoSyncState(localRepositoryId, remoteRepositoryId, localRoot, remoteRoot, 153 Severity.INFO, "Sync OK.", null, 154 repoSyncRunner.getSyncStarted(), repoSyncRunner.getSyncFinished()); 155 list.add(state); 156 statesAdded.add(state); 157 } 158 statesRemoved = evictOldStates(localRepositoryId, localRoot); 159 } 160 161 firePropertyChange(PropertyEnum.states_added, null, Collections.unmodifiableList(statesAdded)); 162 163 if (! statesRemoved.isEmpty()) 164 firePropertyChange(PropertyEnum.states_removed, null, Collections.unmodifiableList(statesRemoved)); 165 166 firePropertyChange(PropertyEnum.states, null, getStates(localRepositoryId)); 167 } 168 169 private void registerSyncError(final RepoSyncRunner repoSyncRunner, final Throwable exception) { 170 requireNonNull(repoSyncRunner, "repoSyncRunner"); 171 requireNonNull(exception, "exception"); 172 173 final List<RepoSyncState> statesAdded = new ArrayList<RepoSyncState>(); 174 final List<RepoSyncState> statesRemoved; 175 final UUID localRepositoryId = repoSyncRunner.getSyncQueueItem().repositoryId; 176 final File localRoot = repoSyncRunner.getSyncQueueItem().localRoot; 177 synchronized (this) { 178 final List<RepoSyncState> list = _getRepoSyncStates(localRepositoryId); 179 UUID remoteRepositoryId = repoSyncRunner.getRemoteRepositoryId(); 180 URL remoteRoot = repoSyncRunner.getRemoteRoot(); 181 final RepoSyncState state = new RepoSyncState(localRepositoryId, remoteRepositoryId, localRoot, remoteRoot, 182 Severity.ERROR, exception.getMessage(), new Error(exception), 183 repoSyncRunner.getSyncStarted(), repoSyncRunner.getSyncFinished()); 184 if (remoteRepositoryId != null && remoteRoot != null) { 185 list.add(state); 186 statesAdded.add(state); 187 } 188 else { 189 for (Map.Entry<UUID, URL> me : repoSyncRunner.getRemoteRepositoryId2RemoteRootMap().entrySet()) { 190 remoteRepositoryId = me.getKey(); 191 remoteRoot = me.getValue(); 192 list.add(state); 193 statesAdded.add(state); 194 } 195 } 196 statesRemoved = evictOldStates(localRepositoryId, localRoot); 197 } 198 199 firePropertyChange(PropertyEnum.states_added, null, Collections.unmodifiableList(statesAdded)); 200 201 if (! statesRemoved.isEmpty()) 202 firePropertyChange(PropertyEnum.states_removed, null, Collections.unmodifiableList(statesRemoved)); 203 204 firePropertyChange(PropertyEnum.states, null, getStates(localRepositoryId)); 205 } 206 207 private List<RepoSyncState> _getRepoSyncStates(final UUID localRepositoryId) { 208 List<RepoSyncState> list = repositoryId2SyncStates.get(localRepositoryId); 209 if (list == null) { 210 list = new LinkedList<>(); 211 repositoryId2SyncStates.put(localRepositoryId, list); 212 } 213 return list; 214 } 215 216 private synchronized List<RepoSyncState> evictOldStates(final UUID localRepositoryId, final File localRoot) { 217 requireNonNull(localRepositoryId, "localRepositoryId"); 218 requireNonNull(localRoot, "localRoot"); 219 final List<RepoSyncState> evicted = new ArrayList<RepoSyncState>(); 220 final Config config = ConfigImpl.getInstanceForDirectory(localRoot); 221 final int syncStatesMaxSize = config.getPropertyAsPositiveOrZeroInt(CONFIG_KEY_SYNC_STATES_MAX_SIZE, DEFAULT_SYNC_STATES_MAX_SIZE); 222 final List<RepoSyncState> list = repositoryId2SyncStates.get(localRepositoryId); 223 if (list != null) { 224 // Note: This implementation is not very efficient, but the list usually has a size of only a few 225 // entries - rarely ever more than a few dozen. Thus, this algorithm is certainly fast enough ;-) 226 for (final Iterator<RepoSyncState> it = list.iterator(); it.hasNext();) { 227 final RepoSyncState repoSyncState = it.next(); 228 if (getSyncStatesSizeForServerRepositoryId(list, repoSyncState.getServerRepositoryId()) > syncStatesMaxSize) { 229 evicted.add(repoSyncState); 230 it.remove(); 231 } 232 } 233 } 234 return evicted; 235 } 236 237 private int getSyncStatesSizeForServerRepositoryId(final List<RepoSyncState> repoSyncStates, final UUID serverRepositoryId) { 238 requireNonNull(serverRepositoryId, "serverRepositoryId"); 239 int result = 0; 240 for (RepoSyncState repoSyncState : repoSyncStates) { 241 if (serverRepositoryId.equals(repoSyncState.getServerRepositoryId())) 242 ++result; 243 } 244 return result; 245 } 246 247 private synchronized RepoSyncQueueItem pollSyncQueueItem(UUID repositoryId) { 248 requireNonNull(repositoryId, "repositoryId"); 249 for (Iterator<RepoSyncQueueItem> it = syncQueue.iterator(); it.hasNext(); ) { 250 final RepoSyncQueueItem repoSyncQueueItem = it.next(); 251 if (repositoryId.equals(repoSyncQueueItem.repositoryId)) { 252 it.remove(); 253 return repoSyncQueueItem; 254 } 255 } 256 return null; 257 } 258 259 @Override 260 public synchronized List<RepoSyncState> getStates(final UUID localRepositoryId) { 261 requireNonNull(localRepositoryId, "localRepositoryId"); 262 final List<RepoSyncState> list = repositoryId2SyncStates.get(localRepositoryId); 263 if (list == null) 264 return Collections.emptyList(); 265 else 266 return Collections.unmodifiableList(new ArrayList<>(list)); 267 } 268 269 @Override 270 public synchronized Set<RepoSyncActivity> getActivities(final UUID localRepositoryId) { 271 requireNonNull(localRepositoryId, "localRepositoryId"); 272 final Set<RepoSyncActivity> activities = repositoryId2SyncActivities.get(localRepositoryId); 273 if (activities == null) 274 return Collections.emptySet(); 275 276 return Collections.unmodifiableSet(new HashSet<RepoSyncActivity>(activities)); 277 } 278 279 private void updateActivities(final UUID localRepositoryId) { 280 requireNonNull(localRepositoryId, "localRepositoryId"); 281 282 final List<RepoSyncActivity> activitiesAdded = new ArrayList<RepoSyncActivity>(); 283 final List<RepoSyncActivity> activitiesRemoved = new ArrayList<RepoSyncActivity>(); 284 285 synchronized (this) { 286 Set<RepoSyncActivity> activities = repositoryId2SyncActivities.get(localRepositoryId); 287 if (activities == null) { 288 activities = new HashSet<RepoSyncActivity>(2); 289 repositoryId2SyncActivities.put(localRepositoryId, activities); 290 } 291 292 final RepoSyncRunner repoSyncRunner = repositoryId2SyncRunner.get(localRepositoryId); 293 if (repoSyncRunner == null) { 294 final List<RepoSyncActivity> activitiesStale = _findActivities(localRepositoryId, RepoSyncActivityType.IN_PROGRESS); 295 activitiesRemoved.addAll(activitiesStale); 296 activities.removeAll(activitiesStale); 297 } 298 else { 299 final RepoSyncActivity activity = new RepoSyncActivity( 300 repoSyncRunner.getSyncQueueItem().repositoryId, 301 repoSyncRunner.getSyncQueueItem().localRoot, RepoSyncActivityType.IN_PROGRESS); 302 303 if (activities.add(activity)) 304 activitiesAdded.add(activity); 305 } 306 307 final List<RepoSyncQueueItem> queueItems = _findQueueItems(localRepositoryId); 308 if (queueItems.isEmpty()) { 309 final List<RepoSyncActivity> activitiesStale = _findActivities(localRepositoryId, RepoSyncActivityType.QUEUED); 310 activitiesRemoved.addAll(activitiesStale); 311 activities.removeAll(activitiesStale); 312 } 313 else { 314 final RepoSyncActivity activity = new RepoSyncActivity( 315 repoSyncRunner.getSyncQueueItem().repositoryId, 316 repoSyncRunner.getSyncQueueItem().localRoot, RepoSyncActivityType.QUEUED); 317 318 if (activities.add(activity)) 319 activitiesAdded.add(activity); 320 } 321 } 322 323 if (! activitiesRemoved.isEmpty()) 324 firePropertyChange(PropertyEnum.activities_removed, null, activitiesRemoved); 325 326 if (! activitiesAdded.isEmpty()) 327 firePropertyChange(PropertyEnum.activities_added, null, activitiesAdded); 328 329 if (! activitiesAdded.isEmpty() || ! activitiesRemoved.isEmpty()) 330 firePropertyChange(PropertyEnum.activities, null, getActivities(localRepositoryId)); 331 } 332 333 private synchronized List<RepoSyncActivity> _findActivities(final UUID localRepositoryId, final RepoSyncActivityType activityType) { 334 requireNonNull(localRepositoryId, "localRepositoryId"); 335 final Set<RepoSyncActivity> activities = repositoryId2SyncActivities.get(localRepositoryId); 336 if (activities == null) 337 return Collections.emptyList(); 338 339 final List<RepoSyncActivity> result = new ArrayList<>(1); 340 for (RepoSyncActivity activity : activities) { 341 if (activity.getActivityType() == activityType) 342 result.add(activity); 343 } 344 345 return Collections.unmodifiableList(result); 346 } 347 348 private synchronized List<RepoSyncQueueItem> _findQueueItems(final UUID localRepositoryId) { 349 requireNonNull(localRepositoryId, "localRepositoryId"); 350 final List<RepoSyncQueueItem> result = new ArrayList<RepoSyncQueueItem>(2); 351 for (final RepoSyncQueueItem queueItem : syncQueue) { 352 if (localRepositoryId.equals(queueItem.repositoryId)) 353 result.add(queueItem); 354 } 355 return Collections.unmodifiableList(result); 356 } 357 358 @Override 359 public void shutdown() { 360 executorService.shutdown(); 361 } 362 363 @Override 364 public void shutdownNow() { 365 executorService.shutdownNow(); 366 } 367 368 @Override 369 public void addPropertyChangeListener(PropertyChangeListener listener) { 370 propertyChangeSupport.addPropertyChangeListener(listener); 371 } 372 373 @Override 374 public void addPropertyChangeListener(Property property, PropertyChangeListener listener) { 375 propertyChangeSupport.addPropertyChangeListener(property.name(), listener); 376 } 377 378 @Override 379 public void removePropertyChangeListener(PropertyChangeListener listener) { 380 propertyChangeSupport.removePropertyChangeListener(listener); 381 } 382 383 @Override 384 public void removePropertyChangeListener(Property property, PropertyChangeListener listener) { 385 propertyChangeSupport.removePropertyChangeListener(property.name(), listener); 386 } 387 388 protected void firePropertyChange(Property property, Object oldValue, Object newValue) { 389 propertyChangeSupport.firePropertyChange(property.name(), oldValue, newValue); 390 } 391}