001package co.codewizards.cloudstore.core.repo.sync;
002
003import static java.util.Objects.*;
004
005import java.beans.PropertyChangeListener;
006import java.beans.PropertyChangeSupport;
007import java.net.URL;
008import java.util.ArrayList;
009import java.util.Collections;
010import java.util.HashMap;
011import java.util.HashSet;
012import java.util.Iterator;
013import java.util.LinkedHashSet;
014import java.util.LinkedList;
015import java.util.List;
016import java.util.Map;
017import java.util.Set;
018import java.util.UUID;
019import java.util.concurrent.ExecutorService;
020import java.util.concurrent.Executors;
021import java.util.concurrent.ThreadFactory;
022import java.util.concurrent.atomic.AtomicInteger;
023
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import co.codewizards.cloudstore.core.Severity;
028import co.codewizards.cloudstore.core.config.Config;
029import co.codewizards.cloudstore.core.config.ConfigImpl;
030import co.codewizards.cloudstore.core.dto.Error;
031import co.codewizards.cloudstore.core.oio.File;
032import co.codewizards.cloudstore.core.repo.local.LocalRepoHelper;
033import co.codewizards.cloudstore.core.repo.local.LocalRepoManager;
034import co.codewizards.cloudstore.core.repo.local.LocalRepoManagerFactory;
035
036public class RepoSyncDaemonImpl implements RepoSyncDaemon {
037        private final PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
038
039        private Set<RepoSyncQueueItem> syncQueue = new LinkedHashSet<>();
040        private Map<UUID, RepoSyncRunner> repositoryId2SyncRunner = new HashMap<>();
041        private final ExecutorService executorService;
042        private Map<UUID, Set<RepoSyncActivity>> repositoryId2SyncActivities = new HashMap<>();
043        private Map<UUID, List<RepoSyncState>> repositoryId2SyncStates = new HashMap<>();
044        private static final AtomicInteger threadGroupIndex = new AtomicInteger();
045        private final AtomicInteger threadIndex = new AtomicInteger();
046
047        private static final class Holder {
048                public static final RepoSyncDaemonImpl instance = new RepoSyncDaemonImpl();
049        }
050
051        protected RepoSyncDaemonImpl() {
052                final int tgi = threadGroupIndex.getAndIncrement();
053                final ThreadGroup threadGroup = new ThreadGroup("RepoSyncDaemonThreadGroup_" + tgi);
054                executorService = Executors.newCachedThreadPool(new ThreadFactory() {
055                        @Override
056                        public Thread newThread(Runnable r) {
057                                return new Thread(threadGroup, r, "RepoSyncDaemonThread_" + tgi + "_" + threadIndex.getAndIncrement());
058                        }
059                });
060        }
061
062        public static RepoSyncDaemon getInstance() {
063                return Holder.instance;
064        }
065
066        @Override
067        public UUID startSync(final File file) {
068                requireNonNull(file, "file");
069                final File localRoot = LocalRepoHelper.getLocalRootContainingFile(file);
070                if (localRoot == null)
071                        throw new IllegalArgumentException("File is not located inside a local repository: " + file);
072
073                final UUID repositoryId;
074                try (final LocalRepoManager localRepoManager = LocalRepoManagerFactory.Helper.getInstance().createLocalRepoManagerForExistingRepository(localRoot);) {
075                        repositoryId = localRepoManager.getRepositoryId();
076                }
077
078                final RepoSyncQueueItem repoSyncQueueItem = new RepoSyncQueueItem(repositoryId, localRoot);
079                enqueue(repoSyncQueueItem);
080                startSyncWithNextSyncQueueItem(repositoryId);
081                updateActivities(repositoryId);
082                return repositoryId;
083        }
084
085        private synchronized void enqueue(final RepoSyncQueueItem repoSyncQueueItem) {
086                syncQueue.add(repoSyncQueueItem);
087        }
088
089        private synchronized void startSyncWithNextSyncQueueItem(final UUID repositoryId) {
090                requireNonNull(repositoryId, "repositoryId");
091                if (!repositoryId2SyncRunner.containsKey(repositoryId)) {
092                        final RepoSyncQueueItem nextSyncQueueItem = pollSyncQueueItem(repositoryId);
093                        if (nextSyncQueueItem != null) {
094                                final RepoSyncRunner repoSyncRunner = new RepoSyncRunner(nextSyncQueueItem);
095                                repositoryId2SyncRunner.put(nextSyncQueueItem.repositoryId, repoSyncRunner);
096                                submitToExecutorService(nextSyncQueueItem);
097                        }
098                }
099        }
100
101        private void submitToExecutorService(final RepoSyncQueueItem repoSyncQueueItem) {
102                final RepoSyncRunner repoSyncRunner = new RepoSyncRunner(repoSyncQueueItem);
103                synchronized (this) {
104                        repositoryId2SyncRunner.put(repoSyncQueueItem.repositoryId, repoSyncRunner);
105                }
106                executorService.submit(new WrapperRunnable(repoSyncRunner));
107        }
108
109        private class WrapperRunnable implements Runnable {
110                private final Logger logger = LoggerFactory.getLogger(RepoSyncDaemonImpl.WrapperRunnable.class);
111
112                private final UUID repositoryId;
113                private final RepoSyncRunner repoSyncRunner;
114
115                public WrapperRunnable(final RepoSyncRunner repoSyncRunner) {
116                        this.repoSyncRunner = requireNonNull(repoSyncRunner, "repoSyncRunner");
117                        this.repositoryId = repoSyncRunner.getSyncQueueItem().repositoryId;
118                }
119
120                @Override
121                public void run() {
122                        try {
123                                repoSyncRunner.run();
124                                registerSyncSuccess(repoSyncRunner);
125                        } catch (final Throwable x) {
126                                logger.error("run: " + x, x);
127                                registerSyncError(repoSyncRunner, x);
128                        }
129                        synchronized (RepoSyncDaemonImpl.this) {
130                                final RepoSyncRunner removed = repositoryId2SyncRunner.remove(repositoryId);
131                                if (removed != repoSyncRunner)
132                                        logger.error("run: removed != repoSyncRunner");
133
134                                startSyncWithNextSyncQueueItem(repositoryId);
135                        }
136                        updateActivities(repositoryId);
137                }
138        }
139
140        private void registerSyncSuccess(final RepoSyncRunner repoSyncRunner) {
141                requireNonNull(repoSyncRunner, "repoSyncRunner");
142
143                final List<RepoSyncState> statesAdded = new ArrayList<RepoSyncState>();
144                final List<RepoSyncState> statesRemoved;
145                final UUID localRepositoryId = repoSyncRunner.getSyncQueueItem().repositoryId;
146                final File localRoot = repoSyncRunner.getSyncQueueItem().localRoot;
147                synchronized (this) {
148                        final List<RepoSyncState> list = _getRepoSyncStates(localRepositoryId);
149                        for (final Map.Entry<UUID, URL> me : repoSyncRunner.getRemoteRepositoryId2RemoteRootMap().entrySet()) {
150                                final UUID remoteRepositoryId = me.getKey();
151                                final URL remoteRoot = me.getValue();
152                                final RepoSyncState state = new RepoSyncState(localRepositoryId, remoteRepositoryId, localRoot, remoteRoot,
153                                                Severity.INFO, "Sync OK.", null,
154                                                repoSyncRunner.getSyncStarted(), repoSyncRunner.getSyncFinished());
155                                list.add(state);
156                                statesAdded.add(state);
157                        }
158                        statesRemoved = evictOldStates(localRepositoryId, localRoot);
159                }
160
161                firePropertyChange(PropertyEnum.states_added, null, Collections.unmodifiableList(statesAdded));
162
163                if (! statesRemoved.isEmpty())
164                        firePropertyChange(PropertyEnum.states_removed, null, Collections.unmodifiableList(statesRemoved));
165
166                firePropertyChange(PropertyEnum.states, null, getStates(localRepositoryId));
167        }
168
169        private void registerSyncError(final RepoSyncRunner repoSyncRunner, final Throwable exception) {
170                requireNonNull(repoSyncRunner, "repoSyncRunner");
171                requireNonNull(exception, "exception");
172
173                final List<RepoSyncState> statesAdded = new ArrayList<RepoSyncState>();
174                final List<RepoSyncState> statesRemoved;
175                final UUID localRepositoryId = repoSyncRunner.getSyncQueueItem().repositoryId;
176                final File localRoot = repoSyncRunner.getSyncQueueItem().localRoot;
177                synchronized (this) {
178                        final List<RepoSyncState> list = _getRepoSyncStates(localRepositoryId);
179                        UUID remoteRepositoryId = repoSyncRunner.getRemoteRepositoryId();
180                        URL remoteRoot = repoSyncRunner.getRemoteRoot();
181                        final RepoSyncState state = new RepoSyncState(localRepositoryId, remoteRepositoryId, localRoot, remoteRoot,
182                                        Severity.ERROR, exception.getMessage(), new Error(exception),
183                                        repoSyncRunner.getSyncStarted(), repoSyncRunner.getSyncFinished());
184                        if (remoteRepositoryId != null && remoteRoot != null) {
185                                list.add(state);
186                                statesAdded.add(state);
187                        }
188                        else {
189                                for (Map.Entry<UUID, URL> me : repoSyncRunner.getRemoteRepositoryId2RemoteRootMap().entrySet()) {
190                                        remoteRepositoryId = me.getKey();
191                                        remoteRoot = me.getValue();
192                                        list.add(state);
193                                        statesAdded.add(state);
194                                }
195                        }
196                        statesRemoved = evictOldStates(localRepositoryId, localRoot);
197                }
198
199                firePropertyChange(PropertyEnum.states_added, null, Collections.unmodifiableList(statesAdded));
200
201                if (! statesRemoved.isEmpty())
202                        firePropertyChange(PropertyEnum.states_removed, null, Collections.unmodifiableList(statesRemoved));
203
204                firePropertyChange(PropertyEnum.states, null, getStates(localRepositoryId));
205        }
206
207        private List<RepoSyncState> _getRepoSyncStates(final UUID localRepositoryId) {
208                List<RepoSyncState> list = repositoryId2SyncStates.get(localRepositoryId);
209                if (list == null) {
210                        list = new LinkedList<>();
211                        repositoryId2SyncStates.put(localRepositoryId, list);
212                }
213                return list;
214        }
215
216        private synchronized List<RepoSyncState> evictOldStates(final UUID localRepositoryId, final File localRoot) {
217                requireNonNull(localRepositoryId, "localRepositoryId");
218                requireNonNull(localRoot, "localRoot");
219                final List<RepoSyncState> evicted = new ArrayList<RepoSyncState>();
220                final Config config = ConfigImpl.getInstanceForDirectory(localRoot);
221                final int syncStatesMaxSize = config.getPropertyAsPositiveOrZeroInt(CONFIG_KEY_SYNC_STATES_MAX_SIZE, DEFAULT_SYNC_STATES_MAX_SIZE);
222                final List<RepoSyncState> list = repositoryId2SyncStates.get(localRepositoryId);
223                if (list != null) {
224                        // Note: This implementation is not very efficient, but the list usually has a size of only a few
225                        // entries - rarely ever more than a few dozen. Thus, this algorithm is certainly fast enough ;-)
226                        for (final Iterator<RepoSyncState> it = list.iterator(); it.hasNext();) {
227                                final RepoSyncState repoSyncState = it.next();
228                                if (getSyncStatesSizeForServerRepositoryId(list, repoSyncState.getServerRepositoryId()) > syncStatesMaxSize) {
229                                        evicted.add(repoSyncState);
230                                        it.remove();
231                                }
232                        }
233                }
234                return evicted;
235        }
236
237        private int getSyncStatesSizeForServerRepositoryId(final List<RepoSyncState> repoSyncStates, final UUID serverRepositoryId) {
238                requireNonNull(serverRepositoryId, "serverRepositoryId");
239                int result = 0;
240                for (RepoSyncState repoSyncState : repoSyncStates) {
241                        if (serverRepositoryId.equals(repoSyncState.getServerRepositoryId()))
242                                ++result;
243                }
244                return result;
245        }
246
247        private synchronized RepoSyncQueueItem pollSyncQueueItem(UUID repositoryId) {
248                requireNonNull(repositoryId, "repositoryId");
249                for (Iterator<RepoSyncQueueItem> it = syncQueue.iterator(); it.hasNext(); ) {
250                        final RepoSyncQueueItem repoSyncQueueItem = it.next();
251                        if (repositoryId.equals(repoSyncQueueItem.repositoryId)) {
252                                it.remove();
253                                return repoSyncQueueItem;
254                        }
255                }
256                return null;
257        }
258
259        @Override
260        public synchronized List<RepoSyncState> getStates(final UUID localRepositoryId) {
261                requireNonNull(localRepositoryId, "localRepositoryId");
262                final List<RepoSyncState> list = repositoryId2SyncStates.get(localRepositoryId);
263                if (list == null)
264                        return Collections.emptyList();
265                else
266                        return Collections.unmodifiableList(new ArrayList<>(list));
267        }
268
269        @Override
270        public synchronized Set<RepoSyncActivity> getActivities(final UUID localRepositoryId) {
271                requireNonNull(localRepositoryId, "localRepositoryId");
272                final Set<RepoSyncActivity> activities = repositoryId2SyncActivities.get(localRepositoryId);
273                if (activities == null)
274                        return Collections.emptySet();
275
276                return Collections.unmodifiableSet(new HashSet<RepoSyncActivity>(activities));
277        }
278
279        private void updateActivities(final UUID localRepositoryId) {
280                requireNonNull(localRepositoryId, "localRepositoryId");
281
282                final List<RepoSyncActivity> activitiesAdded = new ArrayList<RepoSyncActivity>();
283                final List<RepoSyncActivity> activitiesRemoved = new ArrayList<RepoSyncActivity>();
284
285                synchronized (this) {
286                        Set<RepoSyncActivity> activities = repositoryId2SyncActivities.get(localRepositoryId);
287                        if (activities == null) {
288                                activities = new HashSet<RepoSyncActivity>(2);
289                                repositoryId2SyncActivities.put(localRepositoryId, activities);
290                        }
291
292                        final RepoSyncRunner repoSyncRunner = repositoryId2SyncRunner.get(localRepositoryId);
293                        if (repoSyncRunner == null) {
294                                final List<RepoSyncActivity> activitiesStale = _findActivities(localRepositoryId, RepoSyncActivityType.IN_PROGRESS);
295                                activitiesRemoved.addAll(activitiesStale);
296                                activities.removeAll(activitiesStale);
297                        }
298                        else {
299                                final RepoSyncActivity activity = new RepoSyncActivity(
300                                                repoSyncRunner.getSyncQueueItem().repositoryId,
301                                                repoSyncRunner.getSyncQueueItem().localRoot, RepoSyncActivityType.IN_PROGRESS);
302
303                                if (activities.add(activity))
304                                        activitiesAdded.add(activity);
305                        }
306
307                        final List<RepoSyncQueueItem> queueItems = _findQueueItems(localRepositoryId);
308                        if (queueItems.isEmpty()) {
309                                final List<RepoSyncActivity> activitiesStale = _findActivities(localRepositoryId, RepoSyncActivityType.QUEUED);
310                                activitiesRemoved.addAll(activitiesStale);
311                                activities.removeAll(activitiesStale);
312                        }
313                        else {
314                                final RepoSyncActivity activity = new RepoSyncActivity(
315                                                repoSyncRunner.getSyncQueueItem().repositoryId,
316                                                repoSyncRunner.getSyncQueueItem().localRoot, RepoSyncActivityType.QUEUED);
317
318                                if (activities.add(activity))
319                                        activitiesAdded.add(activity);
320                        }
321                }
322
323                if (! activitiesRemoved.isEmpty())
324                        firePropertyChange(PropertyEnum.activities_removed, null, activitiesRemoved);
325
326                if (! activitiesAdded.isEmpty())
327                        firePropertyChange(PropertyEnum.activities_added, null, activitiesAdded);
328
329                if (! activitiesAdded.isEmpty() || ! activitiesRemoved.isEmpty())
330                        firePropertyChange(PropertyEnum.activities, null, getActivities(localRepositoryId));
331        }
332
333        private synchronized List<RepoSyncActivity> _findActivities(final UUID localRepositoryId, final RepoSyncActivityType activityType) {
334                requireNonNull(localRepositoryId, "localRepositoryId");
335                final Set<RepoSyncActivity> activities = repositoryId2SyncActivities.get(localRepositoryId);
336                if (activities == null)
337                        return Collections.emptyList();
338
339                final List<RepoSyncActivity> result = new ArrayList<>(1);
340                for (RepoSyncActivity activity : activities) {
341                        if (activity.getActivityType() == activityType)
342                                result.add(activity);
343                }
344
345                return Collections.unmodifiableList(result);
346        }
347
348        private synchronized List<RepoSyncQueueItem> _findQueueItems(final UUID localRepositoryId) {
349                requireNonNull(localRepositoryId, "localRepositoryId");
350                final List<RepoSyncQueueItem> result = new ArrayList<RepoSyncQueueItem>(2);
351                for (final RepoSyncQueueItem queueItem : syncQueue) {
352                        if (localRepositoryId.equals(queueItem.repositoryId))
353                                result.add(queueItem);
354                }
355                return Collections.unmodifiableList(result);
356        }
357
358        @Override
359        public void shutdown() {
360                executorService.shutdown();
361        }
362
363        @Override
364        public void shutdownNow() {
365                executorService.shutdownNow();
366        }
367
368        @Override
369        public void addPropertyChangeListener(PropertyChangeListener listener) {
370                propertyChangeSupport.addPropertyChangeListener(listener);
371        }
372
373        @Override
374        public void addPropertyChangeListener(Property property, PropertyChangeListener listener) {
375                propertyChangeSupport.addPropertyChangeListener(property.name(), listener);
376        }
377
378        @Override
379        public void removePropertyChangeListener(PropertyChangeListener listener) {
380                propertyChangeSupport.removePropertyChangeListener(listener);
381        }
382
383        @Override
384        public void removePropertyChangeListener(Property property, PropertyChangeListener listener) {
385                propertyChangeSupport.removePropertyChangeListener(property.name(), listener);
386        }
387
388        protected void firePropertyChange(Property property, Object oldValue, Object newValue) {
389                propertyChangeSupport.firePropertyChange(property.name(), oldValue, newValue);
390        }
391}