001package co.codewizards.cloudstore.core.repo.sync;
002
003import static co.codewizards.cloudstore.core.util.Util.*;
004
005import java.io.ByteArrayInputStream;
006import java.io.File;
007import java.io.IOException;
008import java.net.MalformedURLException;
009import java.net.URL;
010import java.security.NoSuchAlgorithmException;
011import java.util.ArrayList;
012import java.util.Collection;
013import java.util.HashMap;
014import java.util.Iterator;
015import java.util.LinkedList;
016import java.util.List;
017import java.util.Map;
018import java.util.SortedMap;
019import java.util.TreeMap;
020import java.util.UUID;
021import java.util.concurrent.Callable;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Executors;
024import java.util.concurrent.Future;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import co.codewizards.cloudstore.core.concurrent.CallerBlocksPolicy;
033import co.codewizards.cloudstore.core.dto.ChangeSetDTO;
034import co.codewizards.cloudstore.core.dto.CopyModificationDTO;
035import co.codewizards.cloudstore.core.dto.DeleteModificationDTO;
036import co.codewizards.cloudstore.core.dto.DirectoryDTO;
037import co.codewizards.cloudstore.core.dto.FileChunkDTO;
038import co.codewizards.cloudstore.core.dto.ModificationDTO;
039import co.codewizards.cloudstore.core.dto.NormalFileDTO;
040import co.codewizards.cloudstore.core.dto.RepoFileDTO;
041import co.codewizards.cloudstore.core.dto.RepoFileDTOTreeNode;
042import co.codewizards.cloudstore.core.dto.SymlinkDTO;
043import co.codewizards.cloudstore.core.progress.ProgressMonitor;
044import co.codewizards.cloudstore.core.progress.SubProgressMonitor;
045import co.codewizards.cloudstore.core.repo.local.LocalRepoHelper;
046import co.codewizards.cloudstore.core.repo.local.LocalRepoManager;
047import co.codewizards.cloudstore.core.repo.local.LocalRepoManagerFactory;
048import co.codewizards.cloudstore.core.repo.transport.DeleteModificationCollisionException;
049import co.codewizards.cloudstore.core.repo.transport.RepoTransport;
050import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactory;
051import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactoryRegistry;
052import co.codewizards.cloudstore.core.util.HashUtil;
053import co.codewizards.cloudstore.core.util.UrlUtil;
054
055/**
056 * Logic for synchronising a local with a remote repository.
057 * @author Marco หงุ่ยตระกูล-Schulze - marco at codewizards dot co
058 */
059public class RepoToRepoSync {
060        private static final Logger logger = LoggerFactory.getLogger(RepoToRepoSync.class);
061
062        /**
063         * Sync in the inverse direction. This is only for testing whether the RepoTransport implementations
064         * are truly symmetric. It is less efficient! Therefore, this must NEVER be true in production!!!
065         */
066        private static final boolean TEST_INVERSE = false;
067
068        private final File localRoot;
069        private final URL remoteRoot;
070        private final LocalRepoManager localRepoManager;
071        private final RepoTransport localRepoTransport;
072        private final RepoTransport remoteRepoTransport;
073        private final UUID localRepositoryId;
074        private final UUID remoteRepositoryId;
075
076        private ExecutorService localSyncExecutor;
077        private Future<Void> localSyncFuture;
078
079        /**
080         * Create an instance.
081         * @param localRoot the root of the local repository or any file/directory inside it. This is
082         * automatically adjusted to fit the connection-point to the remote repository (the remote
083         * repository might be connected to a sub-directory).
084         * @param remoteRoot the root of the remote repository. This must exactly match the connection point.
085         * If a sub-directory of the remote repository is connected to the local repository, this sub-directory
086         * must be referenced here.
087         */
088        public RepoToRepoSync(File localRoot, final URL remoteRoot) {
089                File localRootWithoutPathPrefix = LocalRepoHelper.getLocalRootContainingFile(assertNotNull("localRoot", localRoot));
090                this.remoteRoot = UrlUtil.canonicalizeURL(assertNotNull("remoteRoot", remoteRoot));
091                localRepoManager = LocalRepoManagerFactory.Helper.getInstance().createLocalRepoManagerForExistingRepository(localRootWithoutPathPrefix);
092                this.localRoot = localRoot = new File(localRootWithoutPathPrefix, localRepoManager.getLocalPathPrefixOrFail(remoteRoot));
093
094                localRepositoryId = localRepoManager.getRepositoryId();
095                if (localRepositoryId == null)
096                        throw new IllegalStateException("localRepoManager.getRepositoryId() returned null!");
097
098                remoteRepositoryId = localRepoManager.getRemoteRepositoryIdOrFail(remoteRoot);
099
100                remoteRepoTransport = createRepoTransport(remoteRoot, localRepositoryId);
101                localRepoTransport = createRepoTransport(localRoot, remoteRepositoryId);
102        }
103
104        public void sync(final ProgressMonitor monitor) {
105                assertNotNull("monitor", monitor);
106                monitor.beginTask("Synchronising...", 201);
107                try {
108                        readRemoteRepositoryIdFromRepoTransport();
109                        monitor.worked(1);
110
111                        if (localSyncExecutor != null)
112                                throw new IllegalStateException("localSyncExecutor != null");
113                        if (localSyncFuture != null)
114                                throw new IllegalStateException("localSyncFuture != null");
115
116                        localSyncExecutor = Executors.newFixedThreadPool(1);
117                        localSyncFuture = localSyncExecutor.submit(new Callable<Void>() {
118                                @Override
119                                public Void call() throws Exception {
120                                        logger.info("sync: locally syncing {} ('{}')", localRepositoryId, localRoot);
121                                        localRepoManager.localSync(new SubProgressMonitor(monitor, 50));
122                                        return null;
123                                }
124                        });
125
126                        if (!TEST_INVERSE) { // This is the normal sync (NOT test).
127                                logger.info("sync: down: fromID={} from='{}' toID={} to='{}'", remoteRepositoryId, remoteRoot, localRepositoryId, localRoot);
128                                sync(remoteRepoTransport, true, localRepoTransport, new SubProgressMonitor(monitor, 50));
129
130                                if (localSyncExecutor != null)
131                                        throw new IllegalStateException("localSyncExecutor != null");
132                                if (localSyncFuture != null)
133                                        throw new IllegalStateException("localSyncFuture != null");
134
135                                logger.info("sync: up: fromID={} from='{}' toID={} to='{}'", localRepositoryId, localRoot, remoteRepositoryId, remoteRoot);
136                                sync(localRepoTransport, false, remoteRepoTransport, new SubProgressMonitor(monitor, 50));
137
138                                // Immediately sync back to make sure the changes we caused don't cause problems later
139                                // (right now there's very likely no collision and this should be very fast).
140                                logger.info("sync: down again: fromID={} from='{}' toID={} to='{}'", remoteRepositoryId, remoteRoot, localRepositoryId, localRoot);
141                                sync(remoteRepoTransport, false, localRepoTransport, new SubProgressMonitor(monitor, 50));
142                        }
143                        else { // THIS IS FOR TESTING ONLY!
144                                logger.info("sync: locally syncing on *remote* side {} ('{}')", localRepositoryId, localRoot);
145                                remoteRepoTransport.getChangeSetDTO(true); // trigger the local sync on the remote side (we don't need the change set)
146
147                                waitForAndCheckLocalSyncFuture();
148
149                                logger.info("sync: up: fromID={} from='{}' toID={} to='{}'", localRepositoryId, localRoot, remoteRepositoryId, remoteRoot);
150                                sync(localRepoTransport, false, remoteRepoTransport, new SubProgressMonitor(monitor, 50));
151
152                                logger.info("sync: down: fromID={} from='{}' toID={} to='{}'", remoteRepositoryId, remoteRoot, localRepositoryId, localRoot);
153                                sync(remoteRepoTransport, false, localRepoTransport, new SubProgressMonitor(monitor, 50));
154
155                                logger.info("sync: up again: fromID={} from='{}' toID={} to='{}'", localRepositoryId, localRoot, remoteRepositoryId, remoteRoot);
156                                sync(localRepoTransport, false, remoteRepoTransport, new SubProgressMonitor(monitor, 50));
157                        }
158                } finally {
159                        monitor.done();
160                }
161        }
162
163        private void waitForAndCheckLocalSyncFutureIfExists() {
164                if (localSyncFuture != null)
165                        waitForAndCheckLocalSyncFuture();
166        }
167
168        private void waitForAndCheckLocalSyncFuture() {
169                try {
170                        assertNotNull("localSyncFuture", localSyncFuture).get();
171                } catch (RuntimeException e) {
172                        throw e;
173                } catch (Exception e) {
174                        throw new RuntimeException(e);
175                }
176                assertNotNull("localSyncExecutor", localSyncExecutor).shutdown();
177                localSyncFuture = null;
178                localSyncExecutor = null;
179        }
180
181        private void readRemoteRepositoryIdFromRepoTransport() {
182                UUID repositoryId = remoteRepoTransport.getRepositoryId();
183                if (repositoryId == null)
184                        throw new IllegalStateException("remoteRepoTransport.getRepositoryId() returned null!");
185
186                if (!repositoryId.equals(remoteRepositoryId))
187                        throw new IllegalStateException(
188                                        String.format("remoteRepoTransport.getRepositoryId() does not match repositoryId in local DB! %s != %s", repositoryId, remoteRepositoryId));
189        }
190
191        private RepoTransport createRepoTransport(File rootFile, UUID clientRepositoryId) {
192                URL rootURL;
193                try {
194                        rootURL = rootFile.toURI().toURL();
195                } catch (MalformedURLException e) {
196                        throw new RuntimeException(e);
197                }
198                return createRepoTransport(rootURL, clientRepositoryId);
199        }
200
201        private RepoTransport createRepoTransport(URL remoteRoot, UUID clientRepositoryId) {
202                RepoTransportFactory repoTransportFactory = RepoTransportFactoryRegistry.getInstance().getRepoTransportFactoryOrFail(remoteRoot);
203                return repoTransportFactory.createRepoTransport(remoteRoot, clientRepositoryId);
204        }
205
206        private void sync(RepoTransport fromRepoTransport, boolean fromRepoLocalSync, RepoTransport toRepoTransport, ProgressMonitor monitor) {
207                monitor.beginTask("Synchronising...", 100);
208                try {
209                        ChangeSetDTO changeSetDTO = fromRepoTransport.getChangeSetDTO(fromRepoLocalSync);
210                        monitor.worked(8);
211
212                        waitForAndCheckLocalSyncFutureIfExists();
213
214                        sync(fromRepoTransport, toRepoTransport, changeSetDTO, new SubProgressMonitor(monitor, 90));
215
216                        fromRepoTransport.endSyncFromRepository();
217                        toRepoTransport.endSyncToRepository(changeSetDTO.getRepositoryDTO().getRevision());
218                        monitor.worked(2);
219                } finally {
220                        monitor.done();
221                }
222        }
223
224        private void sync(RepoTransport fromRepoTransport, RepoTransport toRepoTransport, ChangeSetDTO changeSetDTO, ProgressMonitor monitor) {
225                monitor.beginTask("Synchronising...", changeSetDTO.getModificationDTOs().size() + 2 * changeSetDTO.getRepoFileDTOs().size());
226                try {
227                        RepoFileDTOTreeNode repoFileDTOTree = RepoFileDTOTreeNode.createTree(changeSetDTO.getRepoFileDTOs());
228                        if (repoFileDTOTree != null) {
229                                sync(fromRepoTransport, toRepoTransport, repoFileDTOTree,
230                                                new Class<?>[] { DirectoryDTO.class }, new Class<?>[0],
231                                                new SubProgressMonitor(monitor, repoFileDTOTree.size()));
232                        }
233
234                        syncModifications(fromRepoTransport, toRepoTransport, changeSetDTO.getModificationDTOs(),
235                                        new SubProgressMonitor(monitor, changeSetDTO.getModificationDTOs().size()));
236
237                        if (repoFileDTOTree != null) {
238                                sync(fromRepoTransport, toRepoTransport, repoFileDTOTree,
239                                                new Class<?>[] { RepoFileDTO.class }, new Class<?>[] { DirectoryDTO.class },
240                                                new SubProgressMonitor(monitor, repoFileDTOTree.size()));
241                        }
242                } finally {
243                        monitor.done();
244                }
245        }
246
247        private void sync(RepoTransport fromRepoTransport, RepoTransport toRepoTransport,
248                        RepoFileDTOTreeNode repoFileDTOTree,
249                        Class<?>[] repoFileDTOClassesIncl, Class<?>[] repoFileDTOClassesExcl,
250                        ProgressMonitor monitor) {
251                assertNotNull("fromRepoTransport", fromRepoTransport);
252                assertNotNull("toRepoTransport", toRepoTransport);
253                assertNotNull("repoFileDTOTree", repoFileDTOTree);
254                assertNotNull("repoFileDTOClassesIncl", repoFileDTOClassesIncl);
255                assertNotNull("repoFileDTOClassesExcl", repoFileDTOClassesExcl);
256                assertNotNull("monitor", monitor);
257
258                Map<Class<?>, Boolean> repoFileDTOClass2Included = new HashMap<Class<?>, Boolean>();
259                Map<Class<?>, Boolean> repoFileDTOClass2Excluded = new HashMap<Class<?>, Boolean>();
260
261                monitor.beginTask("Synchronising...", repoFileDTOTree.size());
262                try {
263                        LinkedList<Future<Void>> syncFileAsynchronouslyFutures = new LinkedList<Future<Void>>();
264                        ThreadPoolExecutor syncFileAsynchronouslyExecutor = createSyncFileAsynchronouslyExecutor();
265                        try {
266                                for (RepoFileDTOTreeNode repoFileDTOTreeNode : repoFileDTOTree) {
267                                        RepoFileDTO repoFileDTO = repoFileDTOTreeNode.getRepoFileDTO();
268                                        Class<? extends RepoFileDTO> repoFileDTOClass = repoFileDTO.getClass();
269
270                                        Boolean included = repoFileDTOClass2Included.get(repoFileDTOClass);
271                                        if (included == null) {
272                                                included = false;
273                                                for (Class<?> clazz : repoFileDTOClassesIncl) {
274                                                        if (clazz.isAssignableFrom(repoFileDTOClass)) {
275                                                                included = true;
276                                                                break;
277                                                        }
278                                                }
279                                                repoFileDTOClass2Included.put(repoFileDTOClass, included);
280                                        }
281
282                                        Boolean excluded = repoFileDTOClass2Excluded.get(repoFileDTOClass);
283                                        if (excluded == null) {
284                                                excluded = false;
285                                                for (Class<?> clazz : repoFileDTOClassesExcl) {
286                                                        if (clazz.isAssignableFrom(repoFileDTOClass)) {
287                                                                excluded = true;
288                                                                break;
289                                                        }
290                                                }
291                                                repoFileDTOClass2Excluded.put(repoFileDTOClass, excluded);
292                                        }
293
294                                        if (!included || excluded) {
295                                                monitor.worked(1);
296                                                continue;
297                                        }
298
299                                        if (repoFileDTO instanceof DirectoryDTO)
300                                                syncDirectory(fromRepoTransport, toRepoTransport, repoFileDTOTreeNode, (DirectoryDTO) repoFileDTO, new SubProgressMonitor(monitor, 1));
301                                        else if (repoFileDTO instanceof NormalFileDTO) {
302                                                Future<Void> syncFileAsynchronouslyFuture = syncFileAsynchronously(syncFileAsynchronouslyExecutor,
303                                                                fromRepoTransport, toRepoTransport,
304                                                                repoFileDTOTreeNode, repoFileDTO, new SubProgressMonitor(monitor, 1));
305                                                syncFileAsynchronouslyFutures.add(syncFileAsynchronouslyFuture);
306                                        }
307                                        else if (repoFileDTO instanceof SymlinkDTO)
308                                                syncSymlink(fromRepoTransport, toRepoTransport, repoFileDTOTreeNode, (SymlinkDTO) repoFileDTO, new SubProgressMonitor(monitor, 1));
309                                        else
310                                                throw new IllegalStateException("Unsupported RepoFileDTO type: " + repoFileDTO);
311
312                                        checkAndEvictDoneSyncFileAsynchronouslyFutures(syncFileAsynchronouslyFutures);
313                                }
314
315                                checkAndEvictAllSyncFileAsynchronouslyFutures(syncFileAsynchronouslyFutures);
316                        } finally {
317                                syncFileAsynchronouslyExecutor.shutdown();
318                        }
319                } finally {
320                        monitor.done();
321                }
322        }
323
324        private SortedMap<Long, Collection<ModificationDTO>> getLocalRevision2ModificationDTOs(Collection<ModificationDTO> modificationDTOs) {
325                SortedMap<Long, Collection<ModificationDTO>> map = new TreeMap<Long, Collection<ModificationDTO>>();
326                for (ModificationDTO modificationDTO : modificationDTOs) {
327                        long localRevision = modificationDTO.getLocalRevision();
328                        Collection<ModificationDTO> collection = map.get(localRevision);
329                        if (collection == null) {
330                                collection = new ArrayList<ModificationDTO>();
331                                map.put(localRevision, collection);
332                        }
333                        collection.add(modificationDTO);
334                }
335                return map;
336        }
337
338        private void syncModifications(RepoTransport fromRepoTransport, RepoTransport toRepoTransport, Collection<ModificationDTO> modificationDTOs, ProgressMonitor monitor) {
339                monitor.beginTask("Synchronising...", modificationDTOs.size());
340                try {
341                        SortedMap<Long,Collection<ModificationDTO>> localRevision2ModificationDTOs = getLocalRevision2ModificationDTOs(modificationDTOs);
342                        for (Map.Entry<Long,Collection<ModificationDTO>> me : localRevision2ModificationDTOs.entrySet()) {
343                                ModificationDTOSet modificationDTOSet = new ModificationDTOSet(me.getValue());
344
345                                for (List<CopyModificationDTO> copyModificationDTOs : modificationDTOSet.getFromPath2CopyModificationDTOs().values()) {
346
347                                        for (Iterator<CopyModificationDTO> itCopyMod = copyModificationDTOs.iterator(); itCopyMod.hasNext(); ) {
348                                                CopyModificationDTO copyModificationDTO = itCopyMod.next();
349                                                List<DeleteModificationDTO> deleteModificationDTOs = modificationDTOSet.getPath2DeleteModificationDTOs().get(copyModificationDTO.getFromPath());
350                                                boolean moveInstead = false;
351                                                if (!itCopyMod.hasNext() && deleteModificationDTOs != null && !deleteModificationDTOs.isEmpty())
352                                                        moveInstead = true;
353
354                                                if (moveInstead) {
355                                                        logger.info("syncModifications: Moving from '{}' to '{}'", copyModificationDTO.getFromPath(), copyModificationDTO.getToPath());
356                                                        toRepoTransport.move(copyModificationDTO.getFromPath(), copyModificationDTO.getToPath());
357                                                }
358                                                else {
359                                                        logger.info("syncModifications: Copying from '{}' to '{}'", copyModificationDTO.getFromPath(), copyModificationDTO.getToPath());
360                                                        toRepoTransport.copy(copyModificationDTO.getFromPath(), copyModificationDTO.getToPath());
361                                                }
362
363                                                if (!moveInstead && deleteModificationDTOs != null) {
364                                                        for (DeleteModificationDTO deleteModificationDTO : deleteModificationDTOs) {
365                                                                logger.info("syncModifications: Deleting '{}'", deleteModificationDTO.getPath());
366                                                                toRepoTransport.delete(deleteModificationDTO.getPath());
367                                                        }
368                                                }
369                                        }
370                                }
371
372                                for (List<DeleteModificationDTO> deleteModificationDTOs : modificationDTOSet.getPath2DeleteModificationDTOs().values()) {
373                                        for (DeleteModificationDTO deleteModificationDTO : deleteModificationDTOs) {
374                                                logger.info("syncModifications: Deleting '{}'", deleteModificationDTO.getPath());
375                                                toRepoTransport.delete(deleteModificationDTO.getPath());
376                                        }
377                                }
378                        }
379                } finally {
380                        monitor.done();
381                }
382        }
383
384        private void syncDirectory(
385                        final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
386                        final RepoFileDTOTreeNode repoFileDTOTreeNode, final DirectoryDTO directoryDTO, final ProgressMonitor monitor) {
387                monitor.beginTask("Synchronising...", 100);
388                try {
389                        final String path = repoFileDTOTreeNode.getPath();
390                        logger.info("syncDirectory: path='{}'", path);
391                        try {
392                                toRepoTransport.makeDirectory(path, directoryDTO.getLastModified());
393                        } catch (DeleteModificationCollisionException x) {
394                                logger.info("DeleteModificationCollisionException during makeDirectory: {}", path);
395                                if (logger.isDebugEnabled())
396                                        logger.debug(x.toString(), x);
397
398                                return;
399                        }
400                } finally {
401                        monitor.done();
402                }
403        }
404
405        private void syncSymlink(
406                        final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
407                        final RepoFileDTOTreeNode repoFileDTOTreeNode, final SymlinkDTO symlinkDTO, final SubProgressMonitor monitor) {
408                monitor.beginTask("Synchronising...", 100);
409                try {
410                        final String path = repoFileDTOTreeNode.getPath();
411                        try {
412                                toRepoTransport.makeSymlink(path, symlinkDTO.getTarget(), symlinkDTO.getLastModified());
413                        } catch (DeleteModificationCollisionException x) {
414                                logger.info("DeleteModificationCollisionException during makeSymlink: {}", path);
415                                if (logger.isDebugEnabled())
416                                        logger.debug(x.toString(), x);
417
418                                return;
419                        }
420                } finally {
421                        monitor.done();
422                }
423        }
424
425        private Future<Void> syncFileAsynchronously(
426                        final ThreadPoolExecutor syncFileAsynchronouslyExecutor,
427                        final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport,
428                        final RepoFileDTOTreeNode repoFileDTOTreeNode, final RepoFileDTO normalFileDTO, final ProgressMonitor monitor) {
429
430                Callable<Void> callable = new Callable<Void>() {
431                        @Override
432                        public Void call() throws Exception {
433                                syncFile(fromRepoTransport, toRepoTransport, repoFileDTOTreeNode, normalFileDTO, monitor);
434                                return null;
435                        }
436                };
437                Future<Void> future = syncFileAsynchronouslyExecutor.submit(callable);
438                return future;
439        }
440
441        private void syncFile(RepoTransport fromRepoTransport, RepoTransport toRepoTransport, RepoFileDTOTreeNode repoFileDTOTreeNode, RepoFileDTO normalFileDTO, ProgressMonitor monitor) {
442                monitor.beginTask("Synchronising...", 100);
443                try {
444                        final String path = repoFileDTOTreeNode.getPath();
445                        logger.info("syncFile: path='{}'", path);
446
447                        final RepoFileDTO fromRepoFileDTO = fromRepoTransport.getRepoFileDTO(path);
448                        if (fromRepoFileDTO == null) {
449                                logger.warn("File was deleted during sync on source side: {}", path);
450                                return;
451                        }
452                        if (!(fromRepoFileDTO instanceof NormalFileDTO)) {
453                                logger.warn("Normal file was replaced by a directory (or another type) during sync on source side: {}", path);
454                                return;
455                        }
456                        monitor.worked(10);
457
458                        NormalFileDTO fromNormalFileDTO = (NormalFileDTO) fromRepoFileDTO;
459
460                        final RepoFileDTO toRepoFileDTO = toRepoTransport.getRepoFileDTO(path);
461                        if (areFilesExistingAndEqual(fromRepoFileDTO, toRepoFileDTO)) {
462                                logger.info("File is already equal on destination side (sha1='{}'): {}", fromNormalFileDTO.getSha1(), path);
463                                return;
464                        }
465                        monitor.worked(10);
466
467                        logger.info("Beginning to copy file (from.sha1='{}' to.sha1='{}'): {}", fromNormalFileDTO.getSha1(),
468                                        toRepoFileDTO instanceof NormalFileDTO ? ((NormalFileDTO)toRepoFileDTO).getSha1() : "<NoInstanceOf_NormalFileDTO>",
469                                                        path);
470
471                        final NormalFileDTO toNormalFileDTO;
472                        if (toRepoFileDTO instanceof NormalFileDTO)
473                                toNormalFileDTO = (NormalFileDTO) toRepoFileDTO;
474                        else
475                                toNormalFileDTO = new NormalFileDTO(); // dummy (null-object pattern)
476
477                        try {
478                                toRepoTransport.beginPutFile(path);
479                        } catch (DeleteModificationCollisionException x) {
480                                logger.info("DeleteModificationCollisionException during beginPutFile: {}", path);
481                                if (logger.isDebugEnabled())
482                                        logger.debug(x.toString(), x);
483
484                                return;
485                        }
486                        monitor.worked(1);
487
488                        final Map<Long, FileChunkDTO> offset2ToTempFileChunkDTO = new HashMap<>(toNormalFileDTO.getTempFileChunkDTOs().size());
489                        for (FileChunkDTO toTempFileChunkDTO : toNormalFileDTO.getTempFileChunkDTOs())
490                                offset2ToTempFileChunkDTO.put(toTempFileChunkDTO.getOffset(), toTempFileChunkDTO);
491
492                        logger.debug("Comparing {} FileChunkDTOs. path='{}'", fromNormalFileDTO.getFileChunkDTOs().size(), path);
493                        final List<FileChunkDTO> fromFileChunkDTOsDirty = new ArrayList<FileChunkDTO>();
494                        final Iterator<FileChunkDTO> toFileChunkDTOIterator = toNormalFileDTO.getFileChunkDTOs().iterator();
495                        int fileChunkIndex = -1;
496                        for (final FileChunkDTO fromFileChunkDTO : fromNormalFileDTO.getFileChunkDTOs()) {
497                                final FileChunkDTO toFileChunkDTO = toFileChunkDTOIterator.hasNext() ? toFileChunkDTOIterator.next() : null;
498                                ++fileChunkIndex;
499                                final FileChunkDTO toTempFileChunkDTO = offset2ToTempFileChunkDTO.get(fromFileChunkDTO.getOffset());
500                                if (toTempFileChunkDTO == null) {
501                                        if (toFileChunkDTO != null
502                                                        && equal(fromFileChunkDTO.getOffset(), toFileChunkDTO.getOffset())
503                                                        && equal(fromFileChunkDTO.getLength(), toFileChunkDTO.getLength())
504                                                        && equal(fromFileChunkDTO.getSha1(), toFileChunkDTO.getSha1())) {
505                                                if (logger.isTraceEnabled()) {
506                                                        logger.trace("Skipping clean FileChunkDTO. index={} offset={} sha1='{}'",
507                                                                        fileChunkIndex, fromFileChunkDTO.getOffset(), fromFileChunkDTO.getSha1());
508                                                }
509                                                continue;
510                                        }
511                                }
512                                else {
513                                        if (equal(fromFileChunkDTO.getOffset(), toTempFileChunkDTO.getOffset())
514                                                        && equal(fromFileChunkDTO.getLength(), toTempFileChunkDTO.getLength())
515                                                        && equal(fromFileChunkDTO.getSha1(), toTempFileChunkDTO.getSha1())) {
516                                                if (logger.isTraceEnabled()) {
517                                                        logger.trace("Skipping clean temporary FileChunkDTO. index={} offset={} sha1='{}'",
518                                                                        fileChunkIndex, fromFileChunkDTO.getOffset(), fromFileChunkDTO.getSha1());
519                                                }
520                                                continue;
521                                        }
522                                }
523
524                                if (logger.isTraceEnabled()) {
525                                        logger.trace("Enlisting dirty FileChunkDTO. index={} fromOffset={} toOffset={} fromSha1='{}' toSha1='{}'",
526                                                        fileChunkIndex, fromFileChunkDTO.getOffset(),
527                                                        (toFileChunkDTO == null ? "null" : toFileChunkDTO.getOffset()),
528                                                        fromFileChunkDTO.getSha1(),
529                                                        (toFileChunkDTO == null ? "null" : toFileChunkDTO.getSha1()));
530                                }
531                                fromFileChunkDTOsDirty.add(fromFileChunkDTO);
532                        }
533
534                        logger.info("Need to copy {} dirty file-chunks (of {} total). path='{}'",
535                                        fromFileChunkDTOsDirty.size(), fromNormalFileDTO.getFileChunkDTOs().size(), path);
536
537                        ProgressMonitor subMonitor = new SubProgressMonitor(monitor, 73);
538                        subMonitor.beginTask("Synchronising...", fromFileChunkDTOsDirty.size());
539                        fileChunkIndex = -1;
540                        long bytesCopied = 0;
541                        long copyChunksBeginTimestamp = System.currentTimeMillis();
542                        for (FileChunkDTO fileChunkDTO : fromFileChunkDTOsDirty) {
543                                ++fileChunkIndex;
544                                if (logger.isTraceEnabled()) {
545                                        logger.trace("Reading data for dirty FileChunkDTO (index {} of {}). path='{}' offset={}",
546                                                        fileChunkIndex, fromFileChunkDTOsDirty.size(), path, fileChunkDTO.getOffset());
547                                }
548                                byte[] fileData = fromRepoTransport.getFileData(path, fileChunkDTO.getOffset(), fileChunkDTO.getLength());
549
550                                if (fileData == null || fileData.length != fileChunkDTO.getLength() || !sha1(fileData).equals(fileChunkDTO.getSha1())) {
551                                        logger.warn("Source file was modified or deleted during sync: {}", path);
552                                        // The file is left in state 'inProgress'. Thus it should definitely not be synced back in the opposite
553                                        // direction. The file should be synced again in the correct direction in the next run (after the source
554                                        // repo did a local sync, too).
555                                        return;
556                                }
557
558                                if (logger.isTraceEnabled()) {
559                                        logger.trace("Writing data for dirty FileChunkDTO ({} of {}). path='{}' offset={}",
560                                                        fileChunkIndex + 1, fromFileChunkDTOsDirty.size(), path, fileChunkDTO.getOffset());
561                                }
562                                toRepoTransport.putFileData(path, fileChunkDTO.getOffset(), fileData);
563                                bytesCopied += fileData.length;
564                                subMonitor.worked(1);
565                        }
566                        subMonitor.done();
567
568                        logger.info("Copied {} dirty file-chunks with together {} bytes in {} ms. path='{}'",
569                                        fromFileChunkDTOsDirty.size(), bytesCopied, System.currentTimeMillis() - copyChunksBeginTimestamp, path);
570
571                        toRepoTransport.endPutFile(
572                                        path, fromNormalFileDTO.getLastModified(),
573                                        fromNormalFileDTO.getLength(), fromNormalFileDTO.getSha1());
574
575                        monitor.worked(6);
576                } finally {
577                        monitor.done();
578                }
579        }
580
581        private String sha1(byte[] data) {
582                assertNotNull("data", data);
583                try {
584                        byte[] hash = HashUtil.hash(HashUtil.HASH_ALGORITHM_SHA, new ByteArrayInputStream(data));
585                        return HashUtil.encodeHexStr(hash);
586                } catch (NoSuchAlgorithmException e) {
587                        throw new RuntimeException(e);
588                } catch (IOException e) {
589                        throw new RuntimeException(e);
590                }
591        }
592
593        private boolean areFilesExistingAndEqual(RepoFileDTO fromRepoFileDTO, RepoFileDTO toRepoFileDTO) {
594                if (!(fromRepoFileDTO instanceof NormalFileDTO))
595                        return false;
596
597                if (!(toRepoFileDTO instanceof NormalFileDTO))
598                        return false;
599
600                NormalFileDTO fromNormalFileDTO = (NormalFileDTO) fromRepoFileDTO;
601                NormalFileDTO toNormalFileDTO = (NormalFileDTO) toRepoFileDTO;
602
603                return equal(fromNormalFileDTO.getLength(), toNormalFileDTO.getLength())
604                                && equal(fromNormalFileDTO.getLastModified(), toNormalFileDTO.getLastModified())
605                                && equal(fromNormalFileDTO.getSha1(), toNormalFileDTO.getSha1());
606        }
607
608        public void close() {
609                localRepoManager.close();
610                localRepoTransport.close();
611                remoteRepoTransport.close();
612        }
613
614        private ThreadPoolExecutor createSyncFileAsynchronouslyExecutor() {
615                // TODO make configurable
616                ThreadPoolExecutor syncFileAsynchronouslyExecutor = new ThreadPoolExecutor(3, 3,
617                                60, TimeUnit.SECONDS,
618                                new LinkedBlockingQueue<Runnable>(2));
619                syncFileAsynchronouslyExecutor.setRejectedExecutionHandler(new CallerBlocksPolicy());
620                return syncFileAsynchronouslyExecutor;
621        }
622
623        private void checkAndEvictDoneSyncFileAsynchronouslyFutures(LinkedList<Future<Void>> syncFileAsynchronouslyFutures) {
624                for (Iterator<Future<Void>> it = syncFileAsynchronouslyFutures.iterator(); it.hasNext();) {
625                        Future<Void> future = it.next();
626                        if (future.isDone()) {
627                                it.remove();
628                                try {
629                                        future.get(); // We definitely don't need a timeout here, because we invoke it only, if it's done already. It should never wait.
630                                } catch (RuntimeException e) {
631                                        throw e;
632                                } catch (Exception e) {
633                                        throw new RuntimeException(e);
634                                }
635                        }
636                }
637        }
638
639        private void checkAndEvictAllSyncFileAsynchronouslyFutures(LinkedList<Future<Void>> syncFileAsynchronouslyFutures) {
640                for (Iterator<Future<Void>> it = syncFileAsynchronouslyFutures.iterator(); it.hasNext();) {
641                        Future<Void> future = it.next();
642                        it.remove();
643                        try {
644                                future.get(); // I don't think we need a timeout, because the operations done in the callable have timeouts already.
645                        } catch (RuntimeException e) {
646                                throw e;
647                        } catch (Exception e) {
648                                throw new RuntimeException(e);
649                        }
650                }
651        }
652}