001package co.codewizards.cloudstore.core.repo.sync;
002
003import static co.codewizards.cloudstore.core.objectfactory.ObjectFactoryUtil.*;
004import static co.codewizards.cloudstore.core.oio.OioFileFactory.*;
005import static co.codewizards.cloudstore.core.util.HashUtil.*;
006import static co.codewizards.cloudstore.core.util.Util.*;
007import static java.util.Objects.*;
008
009import java.io.IOException;
010import java.net.MalformedURLException;
011import java.net.URL;
012import java.util.ArrayList;
013import java.util.Collection;
014import java.util.HashMap;
015import java.util.HashSet;
016import java.util.Iterator;
017import java.util.List;
018import java.util.Map;
019import java.util.Set;
020import java.util.SortedMap;
021import java.util.TreeMap;
022import java.util.UUID;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Executors;
026import java.util.concurrent.Future;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import co.codewizards.cloudstore.core.dto.ChangeSetDto;
032import co.codewizards.cloudstore.core.dto.ConfigPropSetDto;
033import co.codewizards.cloudstore.core.dto.CopyModificationDto;
034import co.codewizards.cloudstore.core.dto.DeleteModificationDto;
035import co.codewizards.cloudstore.core.dto.DirectoryDto;
036import co.codewizards.cloudstore.core.dto.FileChunkDto;
037import co.codewizards.cloudstore.core.dto.ModificationDto;
038import co.codewizards.cloudstore.core.dto.NormalFileDto;
039import co.codewizards.cloudstore.core.dto.RepoFileDto;
040import co.codewizards.cloudstore.core.dto.RepoFileDtoTreeNode;
041import co.codewizards.cloudstore.core.dto.RepositoryDto;
042import co.codewizards.cloudstore.core.dto.SymlinkDto;
043import co.codewizards.cloudstore.core.dto.VersionInfoDto;
044import co.codewizards.cloudstore.core.oio.File;
045import co.codewizards.cloudstore.core.progress.ProgressMonitor;
046import co.codewizards.cloudstore.core.progress.SubProgressMonitor;
047import co.codewizards.cloudstore.core.repo.local.LocalRepoHelper;
048import co.codewizards.cloudstore.core.repo.local.LocalRepoManager;
049import co.codewizards.cloudstore.core.repo.local.LocalRepoManagerFactory;
050import co.codewizards.cloudstore.core.repo.transport.CollisionException;
051import co.codewizards.cloudstore.core.repo.transport.LocalRepoTransport;
052import co.codewizards.cloudstore.core.repo.transport.RepoTransport;
053import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactory;
054import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactoryRegistry;
055import co.codewizards.cloudstore.core.util.UrlUtil;
056import co.codewizards.cloudstore.core.version.VersionCompatibilityValidator;
057
058/**
059 * Logic for synchronising a local with a remote repository.
060 * @author Marco หงุ่ยตระกูล-Schulze - marco at codewizards dot co
061 */
062public class RepoToRepoSync implements AutoCloseable {
063        private static final Logger logger = LoggerFactory.getLogger(RepoToRepoSync.class);
064
065        /**
066         * Sync in the inverse direction. This is only for testing whether the RepoTransport implementations
067         * are truly symmetric. It is less efficient! Therefore, this must NEVER be true in production!!!
068         */
069        private static final boolean TEST_INVERSE = false;
070
071        protected File localRoot;
072        protected URL remoteRoot;
073        protected final LocalRepoManager localRepoManager;
074        protected final LocalRepoTransport localRepoTransport;
075        protected final RepoTransport remoteRepoTransport;
076        protected UUID localRepositoryId;
077        protected UUID remoteRepositoryId;
078
079        private ExecutorService localSyncExecutor;
080        private Future<Void> localSyncFuture;
081        private final Set<UUID> lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds = new HashSet<>();
082        private File localRepoTmpDir;
083
084        public static final String FILE_DONE_DIR_NAME_PREFIX = "File.";
085        public static final String MODIFICATION_DONE_DIR_NAME_PREFIX = "Modification.";
086        public static final String DONE_DIR_NAME_SUFFIX = ".done";
087
088        private DoneMarker doneMarker;
089
090        /**
091         * Create an instance.
092         * @param localRoot the root of the local repository or any file/directory inside it. This is
093         * automatically adjusted to fit the connection-point to the remote repository (the remote
094         * repository might be connected to a sub-directory).
095         * @param remoteRoot the root of the remote repository. This must exactly match the connection point.
096         * If a sub-directory of the remote repository is connected to the local repository, this sub-directory
097         * must be referenced here.
098         */
099        protected RepoToRepoSync(File localRoot, final URL remoteRoot) {
100                this.localRoot = requireNonNull(localRoot, "localRoot");
101                this.remoteRoot = UrlUtil.canonicalizeURL(requireNonNull(remoteRoot, "remoteRoot"));
102
103                final File localRootWithoutPathPrefix = LocalRepoHelper.getLocalRootContainingFile(requireNonNull(localRoot, "localRoot"));
104                localRepoManager = LocalRepoManagerFactory.Helper.getInstance().createLocalRepoManagerForExistingRepository(localRootWithoutPathPrefix);
105                localRoot = createFile(localRootWithoutPathPrefix, localRepoManager.getLocalPathPrefixOrFail(remoteRoot));
106
107                localRepositoryId = localRepoManager.getRepositoryId();
108                if (localRepositoryId == null)
109                        throw new IllegalStateException("localRepoManager.getRepositoryId() returned null!");
110
111                remoteRepositoryId = localRepoManager.getRemoteRepositoryIdOrFail(remoteRoot);
112
113                remoteRepoTransport = createRepoTransport(remoteRoot, localRepositoryId);
114                localRepoTransport = (LocalRepoTransport) createRepoTransport(localRoot, remoteRepositoryId);
115        }
116
117        public static RepoToRepoSync create(final File localRoot, final URL remoteRoot) {
118                return createObject(RepoToRepoSync.class, localRoot, remoteRoot);
119        }
120
121        public void sync(final ProgressMonitor monitor) {
122                requireNonNull(monitor, "monitor");
123                monitor.beginTask("Synchronising...", 201);
124                try {
125                        lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds.clear();
126                        final VersionInfoDto clientVersionInfoDto = localRepoTransport.getVersionInfoDto();
127                        final VersionInfoDto serverVersionInfoDto = remoteRepoTransport.getVersionInfoDto();
128                        VersionCompatibilityValidator.getInstance().validate(clientVersionInfoDto, serverVersionInfoDto);
129
130                        readRemoteRepositoryIdFromRepoTransport();
131                        monitor.worked(1);
132
133                        if (localSyncExecutor != null)
134                                throw new IllegalStateException("localSyncExecutor != null");
135
136                        if (localSyncFuture != null)
137                                throw new IllegalStateException("localSyncFuture != null");
138
139                        localSyncExecutor = Executors.newFixedThreadPool(1);
140                        localSyncFuture = localSyncExecutor.submit(new Callable<Void>() {
141                                @Override
142                                public Void call() throws Exception {
143                                        logger.info("sync: locally syncing {} ('{}')", localRepositoryId, localRoot);
144                                        localRepoManager.localSync(new SubProgressMonitor(monitor, 50));
145                                        return null;
146                                }
147                        });
148
149                        if (!TEST_INVERSE) { // This is the normal sync (NOT test).
150                                syncDown(true, new SubProgressMonitor(monitor, 50));
151
152                                if (localSyncExecutor != null)
153                                        throw new IllegalStateException("localSyncExecutor != null");
154
155                                if (localSyncFuture != null)
156                                        throw new IllegalStateException("localSyncFuture != null");
157
158                                syncUp(new SubProgressMonitor(monitor, 50));
159                                // Immediately sync back to make sure the changes we caused don't cause problems later
160                                // (right now there's very likely no collision and this should be very fast).
161                                syncDown(false, new SubProgressMonitor(monitor, 50));
162                        }
163                        else { // THIS IS FOR TESTING ONLY!
164                                logger.info("sync: locally syncing on *remote* side {} ('{}')", localRepositoryId, localRoot);
165                                remoteRepoTransport.getChangeSetDto(true, null); // trigger the local sync on the remote side (we don't need the change set)
166
167                                waitForAndCheckLocalSyncFuture();
168
169                                syncUp(new SubProgressMonitor(monitor, 50));
170                                syncDown(false, new SubProgressMonitor(monitor, 50));
171                                syncUp(new SubProgressMonitor(monitor, 50));
172                        }
173                } finally {
174                        monitor.done();
175                }
176        }
177
178        protected void syncUp(final ProgressMonitor monitor) {
179                logger.info("syncUp: fromID={} from='{}' toID={} to='{}'",
180                                localRepositoryId, localRoot, remoteRepositoryId, remoteRoot);
181                sync(localRepoTransport, false, remoteRepoTransport, monitor);
182        }
183
184        protected void syncDown(final boolean fromRepoLocalSync, final ProgressMonitor monitor) {
185                logger.info("syncDown: fromID={} from='{}' toID={} to='{}', fromRepoLocalSync={}",
186                                remoteRepositoryId, remoteRoot, localRepositoryId, localRoot, fromRepoLocalSync);
187                sync(remoteRepoTransport, fromRepoLocalSync, localRepoTransport, monitor);
188        }
189
190        private void waitForAndCheckLocalSyncFutureIfExists() {
191                if (localSyncFuture != null)
192                        waitForAndCheckLocalSyncFuture();
193        }
194
195        private void waitForAndCheckLocalSyncFuture() {
196                try {
197                        requireNonNull(localSyncFuture, "localSyncFuture").get();
198                } catch (final RuntimeException e) {
199                        throw e;
200                } catch (final Exception e) {
201                        throw new RuntimeException(e);
202                }
203                requireNonNull(localSyncExecutor, "localSyncExecutor").shutdown();
204                localSyncFuture = null;
205                localSyncExecutor = null;
206        }
207
208        private void readRemoteRepositoryIdFromRepoTransport() {
209                final UUID repositoryId = remoteRepoTransport.getRepositoryId();
210                if (repositoryId == null)
211                        throw new IllegalStateException("remoteRepoTransport.getRepositoryId() returned null!");
212
213                if (!repositoryId.equals(remoteRepositoryId))
214                        throw new IllegalStateException(
215                                        String.format("remoteRepoTransport.getRepositoryId() does not match repositoryId in local DB! %s != %s", repositoryId, remoteRepositoryId));
216        }
217
218        private RepoTransport createRepoTransport(final File rootFile, final UUID clientRepositoryId) {
219                URL rootURL;
220                try {
221                        rootURL = rootFile.toURI().toURL();
222                } catch (final MalformedURLException e) {
223                        throw new RuntimeException(e);
224                }
225                return createRepoTransport(rootURL, clientRepositoryId);
226        }
227
228        private RepoTransport createRepoTransport(final URL remoteRoot, final UUID clientRepositoryId) {
229                final RepoTransportFactory repoTransportFactory = RepoTransportFactoryRegistry.getInstance().getRepoTransportFactoryOrFail(remoteRoot);
230                return repoTransportFactory.createRepoTransport(remoteRoot, clientRepositoryId);
231        }
232
233        protected void sync(final RepoTransport fromRepoTransport, final boolean fromRepoLocalSync, final RepoTransport toRepoTransport, final ProgressMonitor monitor) {
234                monitor.beginTask("Synchronising...", 100);
235                try {
236                        Long lastSyncToRemoteRepoLocalRepositoryRevisionSynced = null;
237                        if (lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds.add(fromRepoTransport.getRepositoryId())) {
238                                RepositoryDto clientRepositoryDto = toRepoTransport.getClientRepositoryDto();
239                                requireNonNull(clientRepositoryDto, "clientRepositoryDto");
240                                lastSyncToRemoteRepoLocalRepositoryRevisionSynced = clientRepositoryDto.getRevision() == Long.MIN_VALUE ? null : clientRepositoryDto.getRevision();
241                        }
242
243                        final ChangeSetDto changeSetDto = fromRepoTransport.getChangeSetDto(fromRepoLocalSync, lastSyncToRemoteRepoLocalRepositoryRevisionSynced);
244                        monitor.worked(8);
245
246                        waitForAndCheckLocalSyncFutureIfExists();
247                        toRepoTransport.prepareForChangeSetDto(changeSetDto);
248                        sync(fromRepoTransport, toRepoTransport, changeSetDto, new SubProgressMonitor(monitor, 90));
249
250                        fromRepoTransport.endSyncFromRepository();
251                        toRepoTransport.endSyncToRepository(changeSetDto.getRepositoryDto().getRevision());
252                        deleteDoneDirs();
253                        monitor.worked(2);
254                } finally {
255                        monitor.done();
256                }
257        }
258
259        protected void sync(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
260                        final ChangeSetDto changeSetDto, final ProgressMonitor monitor) {
261                monitor.beginTask("Synchronising...", 1 + changeSetDto.getModificationDtos().size() + 3 * changeSetDto.getRepoFileDtos().size() + 1);
262                try {
263                        syncParentConfigPropSetDto(fromRepoTransport, toRepoTransport, changeSetDto.getParentConfigPropSetDto(),
264                                        new SubProgressMonitor(monitor, 1));
265
266                        final RepoFileDtoTreeNode repoFileDtoTree = RepoFileDtoTreeNode.createTree(changeSetDto.getRepoFileDtos());
267                        if (repoFileDtoTree != null) {
268                                sync(fromRepoTransport, toRepoTransport, repoFileDtoTree,
269                                                new Class<?>[] { DirectoryDto.class }, new Class<?>[0], false,
270                                                new SubProgressMonitor(monitor, repoFileDtoTree.size()));
271                        }
272
273                        syncModifications(fromRepoTransport, toRepoTransport, changeSetDto.getModificationDtos(),
274                                        new SubProgressMonitor(monitor, changeSetDto.getModificationDtos().size()));
275
276                        if (repoFileDtoTree != null) {
277                                sync(fromRepoTransport, toRepoTransport, repoFileDtoTree,
278                                                new Class<?>[] { RepoFileDto.class }, new Class<?>[] { DirectoryDto.class }, true,
279                                                new SubProgressMonitor(monitor, repoFileDtoTree.size()));
280                        }
281
282                        if (repoFileDtoTree != null) {
283                                sync(fromRepoTransport, toRepoTransport, repoFileDtoTree,
284                                                new Class<?>[] { RepoFileDto.class }, new Class<?>[] { DirectoryDto.class }, false,
285                                                new SubProgressMonitor(monitor, repoFileDtoTree.size()));
286                        }
287                } finally {
288                        monitor.done();
289                }
290        }
291
292        protected void syncParentConfigPropSetDto(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
293                        final ConfigPropSetDto parentConfigPropSetDto, final ProgressMonitor monitor) {
294                requireNonNull(fromRepoTransport, "fromRepoTransport");
295                requireNonNull(toRepoTransport, "toRepoTransport");
296                // parentConfigPropSetDto may be null!
297                requireNonNull(monitor, "monitor");
298
299                monitor.beginTask("Synchronising parent-config...", 1);
300                try {
301                        if (parentConfigPropSetDto == null)
302                                return;
303
304                        toRepoTransport.putParentConfigPropSetDto(parentConfigPropSetDto);
305                } finally {
306                        monitor.done();
307                }
308        }
309
310        protected void sync(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
311                        final RepoFileDtoTreeNode repoFileDtoTree,
312                        final Class<?>[] repoFileDtoClassesIncl, final Class<?>[] repoFileDtoClassesExcl, final boolean filesInProgressOnly,
313                        final ProgressMonitor monitor) {
314                requireNonNull(fromRepoTransport, "fromRepoTransport");
315                requireNonNull(toRepoTransport, "toRepoTransport");
316                requireNonNull(repoFileDtoTree, "repoFileDtoTree");
317                requireNonNull(repoFileDtoClassesIncl, "repoFileDtoClassesIncl");
318                requireNonNull(repoFileDtoClassesExcl, "repoFileDtoClassesExcl");
319                requireNonNull(monitor, "monitor");
320
321                final Map<Class<?>, Boolean> repoFileDtoClass2Included = new HashMap<Class<?>, Boolean>();
322                final Map<Class<?>, Boolean> repoFileDtoClass2Excluded = new HashMap<Class<?>, Boolean>();
323
324                final Set<String> fileInProgressPaths = filesInProgressOnly
325                                ? localRepoTransport.getFileInProgressPaths(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId())
326                                                : null;
327
328                monitor.beginTask("Synchronising...", repoFileDtoTree.size());
329                try {
330                        for (final RepoFileDtoTreeNode repoFileDtoTreeNode : repoFileDtoTree) {
331                                if (repoFileDtoTreeNode.getRepoFileDto().isNeededAsParent()) { // not actually modified - serves only to complete the tree structure.
332                                        monitor.worked(1);
333                                        continue;
334                                }
335
336                                if (fileInProgressPaths != null && ! fileInProgressPaths.contains(repoFileDtoTreeNode.getPath())) {
337                                        monitor.worked(1);
338                                        continue;
339                                }
340                                final RepoFileDto repoFileDto = repoFileDtoTreeNode.getRepoFileDto();
341                                final Class<? extends RepoFileDto> repoFileDtoClass = repoFileDto.getClass();
342
343                                Boolean included = repoFileDtoClass2Included.get(repoFileDtoClass);
344                                if (included == null) {
345                                        included = false;
346                                        for (final Class<?> clazz : repoFileDtoClassesIncl) {
347                                                if (clazz.isAssignableFrom(repoFileDtoClass)) {
348                                                        included = true;
349                                                        break;
350                                                }
351                                        }
352                                        repoFileDtoClass2Included.put(repoFileDtoClass, included);
353                                }
354
355                                Boolean excluded = repoFileDtoClass2Excluded.get(repoFileDtoClass);
356                                if (excluded == null) {
357                                        excluded = false;
358                                        for (final Class<?> clazz : repoFileDtoClassesExcl) {
359                                                if (clazz.isAssignableFrom(repoFileDtoClass)) {
360                                                        excluded = true;
361                                                        break;
362                                                }
363                                        }
364                                        repoFileDtoClass2Excluded.put(repoFileDtoClass, excluded);
365                                }
366
367                                if (!included || excluded) {
368                                        monitor.worked(1);
369                                        continue;
370                                }
371
372                                if (isDone(fromRepoTransport, toRepoTransport, repoFileDto)) {
373                                        logger.debug("sync: Skipping file already done in an interrupted transfer before: {}", repoFileDtoTreeNode.getPath());
374                                        monitor.worked(1);
375                                        continue;
376                                }
377
378                                if (repoFileDto instanceof DirectoryDto)
379                                        syncDirectory(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, (DirectoryDto) repoFileDto, new SubProgressMonitor(monitor, 1));
380                                else if (repoFileDto instanceof NormalFileDto) {
381                                        syncFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, repoFileDto, monitor);
382                                }
383                                else if (repoFileDto instanceof SymlinkDto)
384                                        syncSymlink(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, (SymlinkDto) repoFileDto, new SubProgressMonitor(monitor, 1));
385                                else
386                                        throw new IllegalStateException("Unsupported RepoFileDto type: " + repoFileDto);
387
388                                markDone(fromRepoTransport, toRepoTransport, repoFileDto);
389                        }
390                } finally {
391                        monitor.done();
392                }
393        }
394
395        protected DoneMarker getDoneMarker(final String doneDirNamePrefix, UUID fromRepositoryId, UUID toRepositoryId) {
396                requireNonNull(doneDirNamePrefix, "doneDirNamePrefix");
397                final String doneDirName = doneDirNamePrefix + fromRepositoryId + '.' + toRepositoryId + DONE_DIR_NAME_SUFFIX;
398                if (doneMarker != null) {
399                        if (doneDirName.equals(doneMarker.getDoneDir().getName()))
400                                return doneMarker;
401
402                        doneMarker.close();
403                        doneMarker = null;
404                }
405                final File doneDir = getLocalRepoTmpDir().createFile(doneDirName);
406                doneMarker = new DoneMarker(doneDir);
407                return doneMarker;
408        }
409
410        protected void deleteDoneDirs() {
411                if (doneMarker != null) {
412                        doneMarker.close();
413                        doneMarker = null;
414                }
415                final File localRepoTmpDir = getLocalRepoTmpDir();
416                final File[] tmpFiles = localRepoTmpDir.listFiles();
417                if (tmpFiles != null) {
418                        for (final File file : tmpFiles) {
419                                if (file.getName().endsWith(DONE_DIR_NAME_SUFFIX)) {
420                                        file.deleteRecursively();;
421                                        if (file.exists()) {
422                                                logger.error("deleteDoneDirs: Cannot delete directory (permissions?): " + file.getAbsolutePath());
423                                        }
424                                }
425                        }
426                }
427        }
428
429        protected File getLocalRepoTmpDir() {
430                try {
431                        if (localRepoTmpDir == null) {
432                                final File metaDir = getLocalRepoMetaDir();
433                                if (! metaDir.isDirectory()) {
434                                        if (metaDir.isFile())
435                                                throw new IOException(String.format("Path '%s' already exists as ordinary file! It should be a directory!", metaDir.getAbsolutePath()));
436                                        else
437                                                throw new IOException(String.format("Directory '%s' does not exist!", metaDir.getAbsolutePath()));
438                                }
439                                this.localRepoTmpDir = metaDir.createFile(LocalRepoManager.REPO_TEMP_DIR_NAME);
440                        }
441
442                        if (! localRepoTmpDir.isDirectory()) {
443                                localRepoTmpDir.mkdir();
444
445                                if (! localRepoTmpDir.isDirectory()) {
446                                        if (localRepoTmpDir.isFile())
447                                                throw new IOException(String.format("Cannot create directory '%s', because this path already exists as an ordinary file!", localRepoTmpDir.getAbsolutePath()));
448                                        else
449                                                throw new IOException(String.format("Creating directory '%s' failed for an unknown reason (permissions? disk full?)!", localRepoTmpDir.getAbsolutePath()));
450                                }
451                        }
452                        return localRepoTmpDir;
453                } catch (RuntimeException x) {
454                        throw x;
455                } catch (Exception x) {
456                        throw new RuntimeException(x);
457                }
458        }
459
460        protected File getLocalRepoMetaDir() {
461                final File localRoot = localRepoTransport.getLocalRepoManager().getLocalRoot();
462                return createFile(localRoot, LocalRepoManager.META_DIR_NAME);
463        }
464
465        private boolean isDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final RepoFileDto repoFileDto) {
466                return getDoneMarker(FILE_DONE_DIR_NAME_PREFIX, fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId())
467                                .isDone(repoFileDto.getId(), repoFileDto.getLocalRevision());
468
469//              return localRepoTransport.isTransferDone(
470//                              fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(),
471//                              TransferDoneMarkerType.REPO_FILE, repoFileDto.getId(), repoFileDto.getLocalRevision());
472        }
473
474        private void markDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final RepoFileDto repoFileDto) {
475                getDoneMarker(FILE_DONE_DIR_NAME_PREFIX, fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId())
476                .markDone(repoFileDto.getId(), repoFileDto.getLocalRevision());
477
478//              localRepoTransport.markTransferDone(
479//                              fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(),
480//                              TransferDoneMarkerType.REPO_FILE, repoFileDto.getId(), repoFileDto.getLocalRevision());
481        }
482
483        private boolean isDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final ModificationDto modificationDto) {
484                return getDoneMarker(MODIFICATION_DONE_DIR_NAME_PREFIX, fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId())
485                                .isDone(modificationDto.getId(), modificationDto.getLocalRevision());
486
487//              return localRepoTransport.isTransferDone(
488//                              fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(),
489//                              TransferDoneMarkerType.MODIFICATION, modificationDto.getId(), modificationDto.getLocalRevision());
490        }
491
492        private void markDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final ModificationDto modificationDto) {
493                getDoneMarker(MODIFICATION_DONE_DIR_NAME_PREFIX, fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId())
494                .markDone(modificationDto.getId(), modificationDto.getLocalRevision());
495
496//              localRepoTransport.markTransferDone(
497//                              fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(),
498//                              TransferDoneMarkerType.MODIFICATION, modificationDto.getId(), modificationDto.getLocalRevision());
499        }
500
501        private SortedMap<Long, Collection<ModificationDto>> getLocalRevision2ModificationDtos(final Collection<ModificationDto> modificationDtos) {
502                final SortedMap<Long, Collection<ModificationDto>> map = new TreeMap<Long, Collection<ModificationDto>>();
503                for (final ModificationDto modificationDto : modificationDtos) {
504                        final long localRevision = modificationDto.getLocalRevision();
505                        Collection<ModificationDto> collection = map.get(localRevision);
506                        if (collection == null) {
507                                collection = new ArrayList<ModificationDto>();
508                                map.put(localRevision, collection);
509                        }
510                        collection.add(modificationDto);
511                }
512                return map;
513        }
514
515        private void syncModifications(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final Collection<ModificationDto> modificationDtos, final ProgressMonitor monitor) {
516                monitor.beginTask("Synchronising...", modificationDtos.size());
517                try {
518                        final SortedMap<Long,Collection<ModificationDto>> localRevision2ModificationDtos = getLocalRevision2ModificationDtos(modificationDtos);
519                        for (final Map.Entry<Long,Collection<ModificationDto>> me : localRevision2ModificationDtos.entrySet()) {
520                                final ModificationDtoSet modificationDtoSet = new ModificationDtoSet(me.getValue());
521
522                                for (final List<CopyModificationDto> copyModificationDtos : modificationDtoSet.getFromPath2CopyModificationDtos().values()) {
523                                        for (final Iterator<CopyModificationDto> itCopyMod = copyModificationDtos.iterator(); itCopyMod.hasNext(); ) {
524                                                final CopyModificationDto copyModificationDto = itCopyMod.next();
525
526                                                if (isDone(fromRepoTransport, toRepoTransport, copyModificationDto)) {
527                                                        logger.debug("sync: Skipping CopyModificaton already done in an interrupted transfer before: {} => {}", copyModificationDto.getFromPath(), copyModificationDto.getToPath());
528                                                        monitor.worked(1);
529                                                        continue;
530                                                }
531
532                                                final List<DeleteModificationDto> deleteModificationDtos = modificationDtoSet.getPath2DeleteModificationDtos().get(copyModificationDto.getFromPath());
533                                                boolean moveInstead = false;
534                                                if (!itCopyMod.hasNext() && deleteModificationDtos != null && !deleteModificationDtos.isEmpty())
535                                                        moveInstead = true;
536
537                                                if (moveInstead) {
538                                                        logger.info("syncModifications: Moving from '{}' to '{}'", copyModificationDto.getFromPath(), copyModificationDto.getToPath());
539                                                        toRepoTransport.move(copyModificationDto.getFromPath(), copyModificationDto.getToPath());
540                                                }
541                                                else {
542                                                        logger.info("syncModifications: Copying from '{}' to '{}'", copyModificationDto.getFromPath(), copyModificationDto.getToPath());
543                                                        toRepoTransport.copy(copyModificationDto.getFromPath(), copyModificationDto.getToPath());
544                                                }
545
546                                                if (!moveInstead && deleteModificationDtos != null) {
547                                                        for (final DeleteModificationDto deleteModificationDto : deleteModificationDtos) {
548                                                                logger.info("syncModifications: Deleting '{}'", deleteModificationDto.getPath());
549                                                                applyDeleteModification(fromRepoTransport, toRepoTransport, deleteModificationDto);
550                                                        }
551                                                }
552
553                                                markDone(fromRepoTransport, toRepoTransport, copyModificationDto);
554                                        }
555                                }
556
557                                for (final List<DeleteModificationDto> deleteModificationDtos : modificationDtoSet.getPath2DeleteModificationDtos().values()) {
558                                        for (final DeleteModificationDto deleteModificationDto : deleteModificationDtos) {
559                                                if (isDone(fromRepoTransport, toRepoTransport, deleteModificationDto)) {
560                                                        logger.debug("sync: Skipping DeleteModificaton already done in an interrupted transfer before: {}", deleteModificationDto.getPath());
561                                                        monitor.worked(1);
562                                                        continue;
563                                                }
564
565                                                logger.info("syncModifications: Deleting '{}'", deleteModificationDto.getPath());
566                                                applyDeleteModification(fromRepoTransport, toRepoTransport, deleteModificationDto);
567
568                                                markDone(fromRepoTransport, toRepoTransport, deleteModificationDto);
569                                        }
570                                }
571                        }
572                } finally {
573                        monitor.done();
574                }
575        }
576
577        protected void applyDeleteModification(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final DeleteModificationDto deleteModificationDto) {
578                requireNonNull(fromRepoTransport, "fromRepoTransport");
579                requireNonNull(toRepoTransport, "toRepoTransport");
580                requireNonNull(deleteModificationDto, "deleteModificationDto");
581
582                try {
583                        delete(fromRepoTransport, toRepoTransport, deleteModificationDto);
584                } catch (final CollisionException x) { // Note: This cannot happen in CloudStore! But in can happen in downstream projects with different RepoTransport implementations!
585                        logger.info("CollisionException during delete: {}", deleteModificationDto.getPath());
586                        if (logger.isDebugEnabled())
587                                logger.debug(x.toString(), x);
588
589                        return;
590                }
591        }
592
593        protected void delete(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final DeleteModificationDto deleteModificationDto) {
594                toRepoTransport.delete(deleteModificationDto.getPath());
595        }
596
597        private void syncDirectory(
598                        final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
599                        final RepoFileDtoTreeNode repoFileDtoTreeNode, final DirectoryDto directoryDto, final ProgressMonitor monitor) {
600                monitor.beginTask("Synchronising...", 100);
601                try {
602                        final String path = repoFileDtoTreeNode.getPath();
603                        logger.info("syncDirectory: path='{}'", path);
604                        try {
605                                makeDirectory(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, directoryDto);
606                        } catch (final CollisionException x) {
607                                logger.info("CollisionException during makeDirectory: {}", path);
608                                if (logger.isDebugEnabled())
609                                        logger.debug(x.toString(), x);
610
611                                return;
612                        }
613                } finally {
614                        monitor.done();
615                }
616        }
617
618        protected void makeDirectory(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
619                        final RepoFileDtoTreeNode repoFileDtoTreeNode, final String path, final DirectoryDto directoryDto) {
620                toRepoTransport.makeDirectory(path, directoryDto.getLastModified());
621        }
622
623        private void syncSymlink(
624                        final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
625                        final RepoFileDtoTreeNode repoFileDtoTreeNode, final SymlinkDto symlinkDto, final SubProgressMonitor monitor) {
626                monitor.beginTask("Synchronising...", 100);
627                try {
628                        final String path = repoFileDtoTreeNode.getPath();
629                        try {
630                                toRepoTransport.makeSymlink(path, symlinkDto.getTarget(), symlinkDto.getLastModified());
631                        } catch (final CollisionException x) {
632                                logger.info("CollisionException during makeSymlink: {}", path);
633                                if (logger.isDebugEnabled())
634                                        logger.debug(x.toString(), x);
635
636                                return;
637                        }
638                } finally {
639                        monitor.done();
640                }
641        }
642
643        private void syncFile(final RepoTransport fromRepoTransport,
644                        final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode,
645                        final RepoFileDto normalFileDto, final ProgressMonitor monitor) {
646                monitor.beginTask("Synchronising...", 100);
647                try {
648                        final String path = repoFileDtoTreeNode.getPath();
649                        logger.info("syncFile: path='{}'", path);
650
651                        final RepoFileDto fromRepoFileDto = fromRepoTransport.getRepoFileDto(path);
652                        if (fromRepoFileDto == null) {
653                                logger.warn("File was deleted during sync on source side: {}", path);
654                                return;
655                        }
656                        if (!(fromRepoFileDto instanceof NormalFileDto)) {
657                                logger.warn("Normal file was replaced by a directory (or another type) during sync on source side: {}", path);
658                                return;
659                        }
660                        monitor.worked(10);
661
662                        final NormalFileDto fromNormalFileDto = (NormalFileDto) fromRepoFileDto;
663
664                        final RepoFileDto toRepoFileDto = toRepoTransport.getRepoFileDto(path);
665                        if (areFilesExistingAndEqual(fromRepoFileDto, toRepoFileDto)) {
666                                logger.info("File is already equal on destination side (sha1='{}'): {}", fromNormalFileDto.getSha1(), path);
667                                return;
668                        }
669                        monitor.worked(10);
670
671                        logger.info("Beginning to copy file (from.sha1='{}' to.sha1='{}'): {}", fromNormalFileDto.getSha1(),
672                                        toRepoFileDto instanceof NormalFileDto ? ((NormalFileDto)toRepoFileDto).getSha1() : "<NoInstanceOf_NormalFileDto>",
673                                                        path);
674
675                        final NormalFileDto toNormalFileDto;
676                        if (toRepoFileDto instanceof NormalFileDto)
677                                toNormalFileDto = (NormalFileDto) toRepoFileDto;
678                        else
679                                toNormalFileDto = createObject(NormalFileDto.class); // dummy (null-object pattern)
680
681                        try {
682                                beginPutFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fromNormalFileDto);
683                        } catch (final CollisionException x) {
684                                logger.info("CollisionException during beginPutFile: {}", path);
685                                if (logger.isDebugEnabled())
686                                        logger.debug(x.toString(), x);
687
688                                return;
689                        }
690                        localRepoTransport.markFileInProgress(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), path, true);
691                        monitor.worked(1);
692
693                        final Map<Long, FileChunkDto> offset2ToTempFileChunkDto = new HashMap<>(toNormalFileDto.getTempFileChunkDtos().size());
694                        for (final FileChunkDto toTempFileChunkDto : toNormalFileDto.getTempFileChunkDtos())
695                                offset2ToTempFileChunkDto.put(toTempFileChunkDto.getOffset(), toTempFileChunkDto);
696
697                        logger.debug("Comparing {} FileChunkDtos. path='{}'", fromNormalFileDto.getFileChunkDtos().size(), path);
698                        final List<FileChunkDto> fromFileChunkDtosDirty = new ArrayList<FileChunkDto>();
699                        final Iterator<FileChunkDto> toFileChunkDtoIterator = toNormalFileDto.getFileChunkDtos().iterator();
700                        int fileChunkIndex = -1;
701                        for (final FileChunkDto fromFileChunkDto : fromNormalFileDto.getFileChunkDtos()) {
702                                final FileChunkDto toFileChunkDto = toFileChunkDtoIterator.hasNext() ? toFileChunkDtoIterator.next() : null;
703                                ++fileChunkIndex;
704                                final FileChunkDto toTempFileChunkDto = offset2ToTempFileChunkDto.get(fromFileChunkDto.getOffset());
705                                if (toTempFileChunkDto == null) {
706                                        if (toFileChunkDto != null
707                                                        && equal(fromFileChunkDto.getOffset(), toFileChunkDto.getOffset())
708                                                        && equal(fromFileChunkDto.getLength(), toFileChunkDto.getLength())
709                                                        && equal(fromFileChunkDto.getSha1(), toFileChunkDto.getSha1())) {
710                                                if (logger.isTraceEnabled()) {
711                                                        logger.trace("Skipping clean FileChunkDto. index={} offset={} sha1='{}'",
712                                                                        fileChunkIndex, fromFileChunkDto.getOffset(), fromFileChunkDto.getSha1());
713                                                }
714                                                continue;
715                                        }
716                                }
717                                else {
718                                        if (equal(fromFileChunkDto.getOffset(), toTempFileChunkDto.getOffset())
719                                                        && equal(fromFileChunkDto.getLength(), toTempFileChunkDto.getLength())
720                                                        && equal(fromFileChunkDto.getSha1(), toTempFileChunkDto.getSha1())) {
721                                                if (logger.isTraceEnabled()) {
722                                                        logger.trace("Skipping clean temporary FileChunkDto. index={} offset={} sha1='{}'",
723                                                                        fileChunkIndex, fromFileChunkDto.getOffset(), fromFileChunkDto.getSha1());
724                                                }
725                                                continue;
726                                        }
727                                }
728
729                                if (logger.isTraceEnabled()) {
730                                        logger.trace("Enlisting dirty FileChunkDto. index={} fromOffset={} toOffset={} fromSha1='{}' toSha1='{}'",
731                                                        fileChunkIndex, fromFileChunkDto.getOffset(),
732                                                        (toFileChunkDto == null ? "null" : toFileChunkDto.getOffset()),
733                                                        fromFileChunkDto.getSha1(),
734                                                        (toFileChunkDto == null ? "null" : toFileChunkDto.getSha1()));
735                                }
736                                fromFileChunkDtosDirty.add(fromFileChunkDto);
737                        }
738
739                        logger.info("Need to copy {} dirty file-chunks (of {} total). path='{}'",
740                                        fromFileChunkDtosDirty.size(), fromNormalFileDto.getFileChunkDtos().size(), path);
741
742                        final ProgressMonitor subMonitor = new SubProgressMonitor(monitor, 73);
743                        subMonitor.beginTask("Synchronising...", fromFileChunkDtosDirty.size());
744                        fileChunkIndex = -1;
745                        long bytesCopied = 0;
746                        final long copyChunksBeginTimestamp = System.currentTimeMillis();
747                        for (final FileChunkDto fileChunkDto : fromFileChunkDtosDirty) {
748                                ++fileChunkIndex;
749                                if (logger.isTraceEnabled()) {
750                                        logger.trace("Reading data for dirty FileChunkDto (index {} of {}). path='{}' offset={}",
751                                                        fileChunkIndex, fromFileChunkDtosDirty.size(), path, fileChunkDto.getOffset());
752                                }
753                                final byte[] fileData = getFileData(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fileChunkDto);
754
755                                if (fileData == null) {
756                                        logger.warn("Source file was modified or deleted during sync: {}", path);
757                                        // The file is left in state 'inProgress'. Thus it should definitely not be synced back in the opposite
758                                        // direction. The file should be synced again in the correct direction in the next run (after the source
759                                        // repo did a local sync, too).
760                                        return;
761                                }
762
763                                if (logger.isTraceEnabled()) {
764                                        logger.trace("Writing data for dirty FileChunkDto ({} of {}). path='{}' offset={}",
765                                                        fileChunkIndex + 1, fromFileChunkDtosDirty.size(), path, fileChunkDto.getOffset());
766                                }
767
768                                try {
769                                        putFileData(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fileChunkDto, fileData);
770                                } catch (final CollisionException x) { // Never happens in CloudStore, but in down-stream-projects. Important: They must handle this properly themselves!
771                                        logger.info("CollisionException during putFileData: {}", path);
772                                        if (logger.isDebugEnabled())
773                                                logger.debug(x.toString(), x);
774
775                                        return;
776                                }
777
778                                bytesCopied += fileData.length;
779                                subMonitor.worked(1);
780                        }
781                        subMonitor.done();
782
783                        logger.info("Copied {} dirty file-chunks with together {} bytes in {} ms. path='{}'",
784                                        fromFileChunkDtosDirty.size(), bytesCopied, System.currentTimeMillis() - copyChunksBeginTimestamp, path);
785
786                        endPutFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fromNormalFileDto);
787                        localRepoTransport.markFileInProgress(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), path, false);
788                        monitor.worked(6);
789                } finally {
790                        monitor.done();
791                }
792        }
793
794        protected byte[] getFileData(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
795                        final RepoFileDtoTreeNode repoFileDtoTreeNode,
796                        final String path, final FileChunkDto fileChunkDto) {
797
798                final byte[] fileData = fromRepoTransport.getFileData(path, fileChunkDto.getOffset(), fileChunkDto.getLength());
799                if (fileData == null)
800                        return null; // file was deleted
801
802                if (fileData.length != fileChunkDto.getLength() || !sha1(fileData).equals(fileChunkDto.getSha1()))
803                        return null; // file was modified
804
805                return fileData;
806        }
807
808        protected void putFileData(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
809                        final RepoFileDtoTreeNode repoFileDtoTreeNode,
810                        final String path, final FileChunkDto fileChunkDto,
811                        final byte[] fileData) {
812
813                toRepoTransport.putFileData(path, fileChunkDto.getOffset(), fileData);
814        }
815
816        protected void beginPutFile(final RepoTransport fromRepoTransport,
817                        final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode,
818                        final String path, final NormalFileDto fromNormalFileDto) throws CollisionException {
819
820                toRepoTransport.beginPutFile(path);
821        }
822
823        protected void endPutFile(final RepoTransport fromRepoTransport,
824                        final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode,
825                        final String path, final NormalFileDto fromNormalFileDto) {
826
827                toRepoTransport.endPutFile(
828                                path, fromNormalFileDto.getLastModified(),
829                                fromNormalFileDto.getLength(), fromNormalFileDto.getSha1());
830        }
831
832        private boolean areFilesExistingAndEqual(final RepoFileDto fromRepoFileDto, final RepoFileDto toRepoFileDto) {
833                if (!(fromRepoFileDto instanceof NormalFileDto))
834                        return false;
835
836                if (!(toRepoFileDto instanceof NormalFileDto))
837                        return false;
838
839                final NormalFileDto fromNormalFileDto = (NormalFileDto) fromRepoFileDto;
840                final NormalFileDto toNormalFileDto = (NormalFileDto) toRepoFileDto;
841
842                return equal(fromNormalFileDto.getLength(), toNormalFileDto.getLength())
843                                && equal(fromNormalFileDto.getLastModified(), toNormalFileDto.getLastModified())
844                                && equal(fromNormalFileDto.getSha1(), toNormalFileDto.getSha1());
845        }
846
847        @Override
848        public void close() {
849                if (doneMarker != null) {
850                        doneMarker.close();
851                        doneMarker = null;
852                }
853                localRepoTransport.close();
854                remoteRepoTransport.close();
855                localRepoManager.close();
856
857                if (localRepoTmpDir != null)
858                        localRepoTmpDir.delete(); // deletes only, if empty.
859        }
860}