001package co.codewizards.cloudstore.core.repo.sync; 002 003import static co.codewizards.cloudstore.core.util.Util.*; 004 005import java.io.ByteArrayInputStream; 006import java.io.File; 007import java.io.IOException; 008import java.net.MalformedURLException; 009import java.net.URL; 010import java.security.NoSuchAlgorithmException; 011import java.util.ArrayList; 012import java.util.Collection; 013import java.util.HashMap; 014import java.util.Iterator; 015import java.util.LinkedList; 016import java.util.List; 017import java.util.Map; 018import java.util.SortedMap; 019import java.util.TreeMap; 020import java.util.UUID; 021import java.util.concurrent.Callable; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Executors; 024import java.util.concurrent.Future; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import co.codewizards.cloudstore.core.concurrent.CallerBlocksPolicy; 033import co.codewizards.cloudstore.core.dto.ChangeSetDTO; 034import co.codewizards.cloudstore.core.dto.CopyModificationDTO; 035import co.codewizards.cloudstore.core.dto.DeleteModificationDTO; 036import co.codewizards.cloudstore.core.dto.DirectoryDTO; 037import co.codewizards.cloudstore.core.dto.FileChunkDTO; 038import co.codewizards.cloudstore.core.dto.ModificationDTO; 039import co.codewizards.cloudstore.core.dto.NormalFileDTO; 040import co.codewizards.cloudstore.core.dto.RepoFileDTO; 041import co.codewizards.cloudstore.core.dto.RepoFileDTOTreeNode; 042import co.codewizards.cloudstore.core.dto.SymlinkDTO; 043import co.codewizards.cloudstore.core.progress.ProgressMonitor; 044import co.codewizards.cloudstore.core.progress.SubProgressMonitor; 045import co.codewizards.cloudstore.core.repo.local.LocalRepoHelper; 046import co.codewizards.cloudstore.core.repo.local.LocalRepoManager; 047import co.codewizards.cloudstore.core.repo.local.LocalRepoManagerFactory; 048import co.codewizards.cloudstore.core.repo.transport.DeleteModificationCollisionException; 049import co.codewizards.cloudstore.core.repo.transport.RepoTransport; 050import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactory; 051import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactoryRegistry; 052import co.codewizards.cloudstore.core.util.HashUtil; 053import co.codewizards.cloudstore.core.util.UrlUtil; 054 055/** 056 * Logic for synchronising a local with a remote repository. 057 * @author Marco หงุ่ยตระกูล-Schulze - marco at codewizards dot co 058 */ 059public class RepoToRepoSync { 060 private static final Logger logger = LoggerFactory.getLogger(RepoToRepoSync.class); 061 062 /** 063 * Sync in the inverse direction. This is only for testing whether the RepoTransport implementations 064 * are truly symmetric. It is less efficient! Therefore, this must NEVER be true in production!!! 065 */ 066 private static final boolean TEST_INVERSE = false; 067 068 private final File localRoot; 069 private final URL remoteRoot; 070 private final LocalRepoManager localRepoManager; 071 private final RepoTransport localRepoTransport; 072 private final RepoTransport remoteRepoTransport; 073 private final UUID localRepositoryId; 074 private final UUID remoteRepositoryId; 075 076 private ExecutorService localSyncExecutor; 077 private Future<Void> localSyncFuture; 078 079 /** 080 * Create an instance. 081 * @param localRoot the root of the local repository or any file/directory inside it. This is 082 * automatically adjusted to fit the connection-point to the remote repository (the remote 083 * repository might be connected to a sub-directory). 084 * @param remoteRoot the root of the remote repository. This must exactly match the connection point. 085 * If a sub-directory of the remote repository is connected to the local repository, this sub-directory 086 * must be referenced here. 087 */ 088 public RepoToRepoSync(File localRoot, final URL remoteRoot) { 089 File localRootWithoutPathPrefix = LocalRepoHelper.getLocalRootContainingFile(assertNotNull("localRoot", localRoot)); 090 this.remoteRoot = UrlUtil.canonicalizeURL(assertNotNull("remoteRoot", remoteRoot)); 091 localRepoManager = LocalRepoManagerFactory.Helper.getInstance().createLocalRepoManagerForExistingRepository(localRootWithoutPathPrefix); 092 this.localRoot = localRoot = new File(localRootWithoutPathPrefix, localRepoManager.getLocalPathPrefixOrFail(remoteRoot)); 093 094 localRepositoryId = localRepoManager.getRepositoryId(); 095 if (localRepositoryId == null) 096 throw new IllegalStateException("localRepoManager.getRepositoryId() returned null!"); 097 098 remoteRepositoryId = localRepoManager.getRemoteRepositoryIdOrFail(remoteRoot); 099 100 remoteRepoTransport = createRepoTransport(remoteRoot, localRepositoryId); 101 localRepoTransport = createRepoTransport(localRoot, remoteRepositoryId); 102 } 103 104 public void sync(final ProgressMonitor monitor) { 105 assertNotNull("monitor", monitor); 106 monitor.beginTask("Synchronising...", 201); 107 try { 108 readRemoteRepositoryIdFromRepoTransport(); 109 monitor.worked(1); 110 111 if (localSyncExecutor != null) 112 throw new IllegalStateException("localSyncExecutor != null"); 113 if (localSyncFuture != null) 114 throw new IllegalStateException("localSyncFuture != null"); 115 116 localSyncExecutor = Executors.newFixedThreadPool(1); 117 localSyncFuture = localSyncExecutor.submit(new Callable<Void>() { 118 @Override 119 public Void call() throws Exception { 120 logger.info("sync: locally syncing {} ('{}')", localRepositoryId, localRoot); 121 localRepoManager.localSync(new SubProgressMonitor(monitor, 50)); 122 return null; 123 } 124 }); 125 126 if (!TEST_INVERSE) { // This is the normal sync (NOT test). 127 logger.info("sync: down: fromID={} from='{}' toID={} to='{}'", remoteRepositoryId, remoteRoot, localRepositoryId, localRoot); 128 sync(remoteRepoTransport, true, localRepoTransport, new SubProgressMonitor(monitor, 50)); 129 130 if (localSyncExecutor != null) 131 throw new IllegalStateException("localSyncExecutor != null"); 132 if (localSyncFuture != null) 133 throw new IllegalStateException("localSyncFuture != null"); 134 135 logger.info("sync: up: fromID={} from='{}' toID={} to='{}'", localRepositoryId, localRoot, remoteRepositoryId, remoteRoot); 136 sync(localRepoTransport, false, remoteRepoTransport, new SubProgressMonitor(monitor, 50)); 137 138 // Immediately sync back to make sure the changes we caused don't cause problems later 139 // (right now there's very likely no collision and this should be very fast). 140 logger.info("sync: down again: fromID={} from='{}' toID={} to='{}'", remoteRepositoryId, remoteRoot, localRepositoryId, localRoot); 141 sync(remoteRepoTransport, false, localRepoTransport, new SubProgressMonitor(monitor, 50)); 142 } 143 else { // THIS IS FOR TESTING ONLY! 144 logger.info("sync: locally syncing on *remote* side {} ('{}')", localRepositoryId, localRoot); 145 remoteRepoTransport.getChangeSetDTO(true); // trigger the local sync on the remote side (we don't need the change set) 146 147 waitForAndCheckLocalSyncFuture(); 148 149 logger.info("sync: up: fromID={} from='{}' toID={} to='{}'", localRepositoryId, localRoot, remoteRepositoryId, remoteRoot); 150 sync(localRepoTransport, false, remoteRepoTransport, new SubProgressMonitor(monitor, 50)); 151 152 logger.info("sync: down: fromID={} from='{}' toID={} to='{}'", remoteRepositoryId, remoteRoot, localRepositoryId, localRoot); 153 sync(remoteRepoTransport, false, localRepoTransport, new SubProgressMonitor(monitor, 50)); 154 155 logger.info("sync: up again: fromID={} from='{}' toID={} to='{}'", localRepositoryId, localRoot, remoteRepositoryId, remoteRoot); 156 sync(localRepoTransport, false, remoteRepoTransport, new SubProgressMonitor(monitor, 50)); 157 } 158 } finally { 159 monitor.done(); 160 } 161 } 162 163 private void waitForAndCheckLocalSyncFutureIfExists() { 164 if (localSyncFuture != null) 165 waitForAndCheckLocalSyncFuture(); 166 } 167 168 private void waitForAndCheckLocalSyncFuture() { 169 try { 170 assertNotNull("localSyncFuture", localSyncFuture).get(); 171 } catch (RuntimeException e) { 172 throw e; 173 } catch (Exception e) { 174 throw new RuntimeException(e); 175 } 176 assertNotNull("localSyncExecutor", localSyncExecutor).shutdown(); 177 localSyncFuture = null; 178 localSyncExecutor = null; 179 } 180 181 private void readRemoteRepositoryIdFromRepoTransport() { 182 UUID repositoryId = remoteRepoTransport.getRepositoryId(); 183 if (repositoryId == null) 184 throw new IllegalStateException("remoteRepoTransport.getRepositoryId() returned null!"); 185 186 if (!repositoryId.equals(remoteRepositoryId)) 187 throw new IllegalStateException( 188 String.format("remoteRepoTransport.getRepositoryId() does not match repositoryId in local DB! %s != %s", repositoryId, remoteRepositoryId)); 189 } 190 191 private RepoTransport createRepoTransport(File rootFile, UUID clientRepositoryId) { 192 URL rootURL; 193 try { 194 rootURL = rootFile.toURI().toURL(); 195 } catch (MalformedURLException e) { 196 throw new RuntimeException(e); 197 } 198 return createRepoTransport(rootURL, clientRepositoryId); 199 } 200 201 private RepoTransport createRepoTransport(URL remoteRoot, UUID clientRepositoryId) { 202 RepoTransportFactory repoTransportFactory = RepoTransportFactoryRegistry.getInstance().getRepoTransportFactoryOrFail(remoteRoot); 203 return repoTransportFactory.createRepoTransport(remoteRoot, clientRepositoryId); 204 } 205 206 private void sync(RepoTransport fromRepoTransport, boolean fromRepoLocalSync, RepoTransport toRepoTransport, ProgressMonitor monitor) { 207 monitor.beginTask("Synchronising...", 100); 208 try { 209 ChangeSetDTO changeSetDTO = fromRepoTransport.getChangeSetDTO(fromRepoLocalSync); 210 monitor.worked(8); 211 212 waitForAndCheckLocalSyncFutureIfExists(); 213 214 sync(fromRepoTransport, toRepoTransport, changeSetDTO, new SubProgressMonitor(monitor, 90)); 215 216 fromRepoTransport.endSyncFromRepository(); 217 toRepoTransport.endSyncToRepository(changeSetDTO.getRepositoryDTO().getRevision()); 218 monitor.worked(2); 219 } finally { 220 monitor.done(); 221 } 222 } 223 224 private void sync(RepoTransport fromRepoTransport, RepoTransport toRepoTransport, ChangeSetDTO changeSetDTO, ProgressMonitor monitor) { 225 monitor.beginTask("Synchronising...", changeSetDTO.getModificationDTOs().size() + 2 * changeSetDTO.getRepoFileDTOs().size()); 226 try { 227 RepoFileDTOTreeNode repoFileDTOTree = RepoFileDTOTreeNode.createTree(changeSetDTO.getRepoFileDTOs()); 228 if (repoFileDTOTree != null) { 229 sync(fromRepoTransport, toRepoTransport, repoFileDTOTree, 230 new Class<?>[] { DirectoryDTO.class }, new Class<?>[0], 231 new SubProgressMonitor(monitor, repoFileDTOTree.size())); 232 } 233 234 syncModifications(fromRepoTransport, toRepoTransport, changeSetDTO.getModificationDTOs(), 235 new SubProgressMonitor(monitor, changeSetDTO.getModificationDTOs().size())); 236 237 if (repoFileDTOTree != null) { 238 sync(fromRepoTransport, toRepoTransport, repoFileDTOTree, 239 new Class<?>[] { RepoFileDTO.class }, new Class<?>[] { DirectoryDTO.class }, 240 new SubProgressMonitor(monitor, repoFileDTOTree.size())); 241 } 242 } finally { 243 monitor.done(); 244 } 245 } 246 247 private void sync(RepoTransport fromRepoTransport, RepoTransport toRepoTransport, 248 RepoFileDTOTreeNode repoFileDTOTree, 249 Class<?>[] repoFileDTOClassesIncl, Class<?>[] repoFileDTOClassesExcl, 250 ProgressMonitor monitor) { 251 assertNotNull("fromRepoTransport", fromRepoTransport); 252 assertNotNull("toRepoTransport", toRepoTransport); 253 assertNotNull("repoFileDTOTree", repoFileDTOTree); 254 assertNotNull("repoFileDTOClassesIncl", repoFileDTOClassesIncl); 255 assertNotNull("repoFileDTOClassesExcl", repoFileDTOClassesExcl); 256 assertNotNull("monitor", monitor); 257 258 Map<Class<?>, Boolean> repoFileDTOClass2Included = new HashMap<Class<?>, Boolean>(); 259 Map<Class<?>, Boolean> repoFileDTOClass2Excluded = new HashMap<Class<?>, Boolean>(); 260 261 monitor.beginTask("Synchronising...", repoFileDTOTree.size()); 262 try { 263 LinkedList<Future<Void>> syncFileAsynchronouslyFutures = new LinkedList<Future<Void>>(); 264 ThreadPoolExecutor syncFileAsynchronouslyExecutor = createSyncFileAsynchronouslyExecutor(); 265 try { 266 for (RepoFileDTOTreeNode repoFileDTOTreeNode : repoFileDTOTree) { 267 RepoFileDTO repoFileDTO = repoFileDTOTreeNode.getRepoFileDTO(); 268 Class<? extends RepoFileDTO> repoFileDTOClass = repoFileDTO.getClass(); 269 270 Boolean included = repoFileDTOClass2Included.get(repoFileDTOClass); 271 if (included == null) { 272 included = false; 273 for (Class<?> clazz : repoFileDTOClassesIncl) { 274 if (clazz.isAssignableFrom(repoFileDTOClass)) { 275 included = true; 276 break; 277 } 278 } 279 repoFileDTOClass2Included.put(repoFileDTOClass, included); 280 } 281 282 Boolean excluded = repoFileDTOClass2Excluded.get(repoFileDTOClass); 283 if (excluded == null) { 284 excluded = false; 285 for (Class<?> clazz : repoFileDTOClassesExcl) { 286 if (clazz.isAssignableFrom(repoFileDTOClass)) { 287 excluded = true; 288 break; 289 } 290 } 291 repoFileDTOClass2Excluded.put(repoFileDTOClass, excluded); 292 } 293 294 if (!included || excluded) { 295 monitor.worked(1); 296 continue; 297 } 298 299 if (repoFileDTO instanceof DirectoryDTO) 300 syncDirectory(fromRepoTransport, toRepoTransport, repoFileDTOTreeNode, (DirectoryDTO) repoFileDTO, new SubProgressMonitor(monitor, 1)); 301 else if (repoFileDTO instanceof NormalFileDTO) { 302 Future<Void> syncFileAsynchronouslyFuture = syncFileAsynchronously(syncFileAsynchronouslyExecutor, 303 fromRepoTransport, toRepoTransport, 304 repoFileDTOTreeNode, repoFileDTO, new SubProgressMonitor(monitor, 1)); 305 syncFileAsynchronouslyFutures.add(syncFileAsynchronouslyFuture); 306 } 307 else if (repoFileDTO instanceof SymlinkDTO) 308 syncSymlink(fromRepoTransport, toRepoTransport, repoFileDTOTreeNode, (SymlinkDTO) repoFileDTO, new SubProgressMonitor(monitor, 1)); 309 else 310 throw new IllegalStateException("Unsupported RepoFileDTO type: " + repoFileDTO); 311 312 checkAndEvictDoneSyncFileAsynchronouslyFutures(syncFileAsynchronouslyFutures); 313 } 314 315 checkAndEvictAllSyncFileAsynchronouslyFutures(syncFileAsynchronouslyFutures); 316 } finally { 317 syncFileAsynchronouslyExecutor.shutdown(); 318 } 319 } finally { 320 monitor.done(); 321 } 322 } 323 324 private SortedMap<Long, Collection<ModificationDTO>> getLocalRevision2ModificationDTOs(Collection<ModificationDTO> modificationDTOs) { 325 SortedMap<Long, Collection<ModificationDTO>> map = new TreeMap<Long, Collection<ModificationDTO>>(); 326 for (ModificationDTO modificationDTO : modificationDTOs) { 327 long localRevision = modificationDTO.getLocalRevision(); 328 Collection<ModificationDTO> collection = map.get(localRevision); 329 if (collection == null) { 330 collection = new ArrayList<ModificationDTO>(); 331 map.put(localRevision, collection); 332 } 333 collection.add(modificationDTO); 334 } 335 return map; 336 } 337 338 private void syncModifications(RepoTransport fromRepoTransport, RepoTransport toRepoTransport, Collection<ModificationDTO> modificationDTOs, ProgressMonitor monitor) { 339 monitor.beginTask("Synchronising...", modificationDTOs.size()); 340 try { 341 SortedMap<Long,Collection<ModificationDTO>> localRevision2ModificationDTOs = getLocalRevision2ModificationDTOs(modificationDTOs); 342 for (Map.Entry<Long,Collection<ModificationDTO>> me : localRevision2ModificationDTOs.entrySet()) { 343 ModificationDTOSet modificationDTOSet = new ModificationDTOSet(me.getValue()); 344 345 for (List<CopyModificationDTO> copyModificationDTOs : modificationDTOSet.getFromPath2CopyModificationDTOs().values()) { 346 347 for (Iterator<CopyModificationDTO> itCopyMod = copyModificationDTOs.iterator(); itCopyMod.hasNext(); ) { 348 CopyModificationDTO copyModificationDTO = itCopyMod.next(); 349 List<DeleteModificationDTO> deleteModificationDTOs = modificationDTOSet.getPath2DeleteModificationDTOs().get(copyModificationDTO.getFromPath()); 350 boolean moveInstead = false; 351 if (!itCopyMod.hasNext() && deleteModificationDTOs != null && !deleteModificationDTOs.isEmpty()) 352 moveInstead = true; 353 354 if (moveInstead) { 355 logger.info("syncModifications: Moving from '{}' to '{}'", copyModificationDTO.getFromPath(), copyModificationDTO.getToPath()); 356 toRepoTransport.move(copyModificationDTO.getFromPath(), copyModificationDTO.getToPath()); 357 } 358 else { 359 logger.info("syncModifications: Copying from '{}' to '{}'", copyModificationDTO.getFromPath(), copyModificationDTO.getToPath()); 360 toRepoTransport.copy(copyModificationDTO.getFromPath(), copyModificationDTO.getToPath()); 361 } 362 363 if (!moveInstead && deleteModificationDTOs != null) { 364 for (DeleteModificationDTO deleteModificationDTO : deleteModificationDTOs) { 365 logger.info("syncModifications: Deleting '{}'", deleteModificationDTO.getPath()); 366 toRepoTransport.delete(deleteModificationDTO.getPath()); 367 } 368 } 369 } 370 } 371 372 for (List<DeleteModificationDTO> deleteModificationDTOs : modificationDTOSet.getPath2DeleteModificationDTOs().values()) { 373 for (DeleteModificationDTO deleteModificationDTO : deleteModificationDTOs) { 374 logger.info("syncModifications: Deleting '{}'", deleteModificationDTO.getPath()); 375 toRepoTransport.delete(deleteModificationDTO.getPath()); 376 } 377 } 378 } 379 } finally { 380 monitor.done(); 381 } 382 } 383 384 private void syncDirectory( 385 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 386 final RepoFileDTOTreeNode repoFileDTOTreeNode, final DirectoryDTO directoryDTO, final ProgressMonitor monitor) { 387 monitor.beginTask("Synchronising...", 100); 388 try { 389 final String path = repoFileDTOTreeNode.getPath(); 390 logger.info("syncDirectory: path='{}'", path); 391 try { 392 toRepoTransport.makeDirectory(path, directoryDTO.getLastModified()); 393 } catch (DeleteModificationCollisionException x) { 394 logger.info("DeleteModificationCollisionException during makeDirectory: {}", path); 395 if (logger.isDebugEnabled()) 396 logger.debug(x.toString(), x); 397 398 return; 399 } 400 } finally { 401 monitor.done(); 402 } 403 } 404 405 private void syncSymlink( 406 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 407 final RepoFileDTOTreeNode repoFileDTOTreeNode, final SymlinkDTO symlinkDTO, final SubProgressMonitor monitor) { 408 monitor.beginTask("Synchronising...", 100); 409 try { 410 final String path = repoFileDTOTreeNode.getPath(); 411 try { 412 toRepoTransport.makeSymlink(path, symlinkDTO.getTarget(), symlinkDTO.getLastModified()); 413 } catch (DeleteModificationCollisionException x) { 414 logger.info("DeleteModificationCollisionException during makeSymlink: {}", path); 415 if (logger.isDebugEnabled()) 416 logger.debug(x.toString(), x); 417 418 return; 419 } 420 } finally { 421 monitor.done(); 422 } 423 } 424 425 private Future<Void> syncFileAsynchronously( 426 final ThreadPoolExecutor syncFileAsynchronouslyExecutor, 427 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 428 final RepoFileDTOTreeNode repoFileDTOTreeNode, final RepoFileDTO normalFileDTO, final ProgressMonitor monitor) { 429 430 Callable<Void> callable = new Callable<Void>() { 431 @Override 432 public Void call() throws Exception { 433 syncFile(fromRepoTransport, toRepoTransport, repoFileDTOTreeNode, normalFileDTO, monitor); 434 return null; 435 } 436 }; 437 Future<Void> future = syncFileAsynchronouslyExecutor.submit(callable); 438 return future; 439 } 440 441 private void syncFile(RepoTransport fromRepoTransport, RepoTransport toRepoTransport, RepoFileDTOTreeNode repoFileDTOTreeNode, RepoFileDTO normalFileDTO, ProgressMonitor monitor) { 442 monitor.beginTask("Synchronising...", 100); 443 try { 444 final String path = repoFileDTOTreeNode.getPath(); 445 logger.info("syncFile: path='{}'", path); 446 447 final RepoFileDTO fromRepoFileDTO = fromRepoTransport.getRepoFileDTO(path); 448 if (fromRepoFileDTO == null) { 449 logger.warn("File was deleted during sync on source side: {}", path); 450 return; 451 } 452 if (!(fromRepoFileDTO instanceof NormalFileDTO)) { 453 logger.warn("Normal file was replaced by a directory (or another type) during sync on source side: {}", path); 454 return; 455 } 456 monitor.worked(10); 457 458 NormalFileDTO fromNormalFileDTO = (NormalFileDTO) fromRepoFileDTO; 459 460 final RepoFileDTO toRepoFileDTO = toRepoTransport.getRepoFileDTO(path); 461 if (areFilesExistingAndEqual(fromRepoFileDTO, toRepoFileDTO)) { 462 logger.info("File is already equal on destination side (sha1='{}'): {}", fromNormalFileDTO.getSha1(), path); 463 return; 464 } 465 monitor.worked(10); 466 467 logger.info("Beginning to copy file (from.sha1='{}' to.sha1='{}'): {}", fromNormalFileDTO.getSha1(), 468 toRepoFileDTO instanceof NormalFileDTO ? ((NormalFileDTO)toRepoFileDTO).getSha1() : "<NoInstanceOf_NormalFileDTO>", 469 path); 470 471 final NormalFileDTO toNormalFileDTO; 472 if (toRepoFileDTO instanceof NormalFileDTO) 473 toNormalFileDTO = (NormalFileDTO) toRepoFileDTO; 474 else 475 toNormalFileDTO = new NormalFileDTO(); // dummy (null-object pattern) 476 477 try { 478 toRepoTransport.beginPutFile(path); 479 } catch (DeleteModificationCollisionException x) { 480 logger.info("DeleteModificationCollisionException during beginPutFile: {}", path); 481 if (logger.isDebugEnabled()) 482 logger.debug(x.toString(), x); 483 484 return; 485 } 486 monitor.worked(1); 487 488 final Map<Long, FileChunkDTO> offset2ToTempFileChunkDTO = new HashMap<>(toNormalFileDTO.getTempFileChunkDTOs().size()); 489 for (FileChunkDTO toTempFileChunkDTO : toNormalFileDTO.getTempFileChunkDTOs()) 490 offset2ToTempFileChunkDTO.put(toTempFileChunkDTO.getOffset(), toTempFileChunkDTO); 491 492 logger.debug("Comparing {} FileChunkDTOs. path='{}'", fromNormalFileDTO.getFileChunkDTOs().size(), path); 493 final List<FileChunkDTO> fromFileChunkDTOsDirty = new ArrayList<FileChunkDTO>(); 494 final Iterator<FileChunkDTO> toFileChunkDTOIterator = toNormalFileDTO.getFileChunkDTOs().iterator(); 495 int fileChunkIndex = -1; 496 for (final FileChunkDTO fromFileChunkDTO : fromNormalFileDTO.getFileChunkDTOs()) { 497 final FileChunkDTO toFileChunkDTO = toFileChunkDTOIterator.hasNext() ? toFileChunkDTOIterator.next() : null; 498 ++fileChunkIndex; 499 final FileChunkDTO toTempFileChunkDTO = offset2ToTempFileChunkDTO.get(fromFileChunkDTO.getOffset()); 500 if (toTempFileChunkDTO == null) { 501 if (toFileChunkDTO != null 502 && equal(fromFileChunkDTO.getOffset(), toFileChunkDTO.getOffset()) 503 && equal(fromFileChunkDTO.getLength(), toFileChunkDTO.getLength()) 504 && equal(fromFileChunkDTO.getSha1(), toFileChunkDTO.getSha1())) { 505 if (logger.isTraceEnabled()) { 506 logger.trace("Skipping clean FileChunkDTO. index={} offset={} sha1='{}'", 507 fileChunkIndex, fromFileChunkDTO.getOffset(), fromFileChunkDTO.getSha1()); 508 } 509 continue; 510 } 511 } 512 else { 513 if (equal(fromFileChunkDTO.getOffset(), toTempFileChunkDTO.getOffset()) 514 && equal(fromFileChunkDTO.getLength(), toTempFileChunkDTO.getLength()) 515 && equal(fromFileChunkDTO.getSha1(), toTempFileChunkDTO.getSha1())) { 516 if (logger.isTraceEnabled()) { 517 logger.trace("Skipping clean temporary FileChunkDTO. index={} offset={} sha1='{}'", 518 fileChunkIndex, fromFileChunkDTO.getOffset(), fromFileChunkDTO.getSha1()); 519 } 520 continue; 521 } 522 } 523 524 if (logger.isTraceEnabled()) { 525 logger.trace("Enlisting dirty FileChunkDTO. index={} fromOffset={} toOffset={} fromSha1='{}' toSha1='{}'", 526 fileChunkIndex, fromFileChunkDTO.getOffset(), 527 (toFileChunkDTO == null ? "null" : toFileChunkDTO.getOffset()), 528 fromFileChunkDTO.getSha1(), 529 (toFileChunkDTO == null ? "null" : toFileChunkDTO.getSha1())); 530 } 531 fromFileChunkDTOsDirty.add(fromFileChunkDTO); 532 } 533 534 logger.info("Need to copy {} dirty file-chunks (of {} total). path='{}'", 535 fromFileChunkDTOsDirty.size(), fromNormalFileDTO.getFileChunkDTOs().size(), path); 536 537 ProgressMonitor subMonitor = new SubProgressMonitor(monitor, 73); 538 subMonitor.beginTask("Synchronising...", fromFileChunkDTOsDirty.size()); 539 fileChunkIndex = -1; 540 long bytesCopied = 0; 541 long copyChunksBeginTimestamp = System.currentTimeMillis(); 542 for (FileChunkDTO fileChunkDTO : fromFileChunkDTOsDirty) { 543 ++fileChunkIndex; 544 if (logger.isTraceEnabled()) { 545 logger.trace("Reading data for dirty FileChunkDTO (index {} of {}). path='{}' offset={}", 546 fileChunkIndex, fromFileChunkDTOsDirty.size(), path, fileChunkDTO.getOffset()); 547 } 548 byte[] fileData = fromRepoTransport.getFileData(path, fileChunkDTO.getOffset(), fileChunkDTO.getLength()); 549 550 if (fileData == null || fileData.length != fileChunkDTO.getLength() || !sha1(fileData).equals(fileChunkDTO.getSha1())) { 551 logger.warn("Source file was modified or deleted during sync: {}", path); 552 // The file is left in state 'inProgress'. Thus it should definitely not be synced back in the opposite 553 // direction. The file should be synced again in the correct direction in the next run (after the source 554 // repo did a local sync, too). 555 return; 556 } 557 558 if (logger.isTraceEnabled()) { 559 logger.trace("Writing data for dirty FileChunkDTO ({} of {}). path='{}' offset={}", 560 fileChunkIndex + 1, fromFileChunkDTOsDirty.size(), path, fileChunkDTO.getOffset()); 561 } 562 toRepoTransport.putFileData(path, fileChunkDTO.getOffset(), fileData); 563 bytesCopied += fileData.length; 564 subMonitor.worked(1); 565 } 566 subMonitor.done(); 567 568 logger.info("Copied {} dirty file-chunks with together {} bytes in {} ms. path='{}'", 569 fromFileChunkDTOsDirty.size(), bytesCopied, System.currentTimeMillis() - copyChunksBeginTimestamp, path); 570 571 toRepoTransport.endPutFile( 572 path, fromNormalFileDTO.getLastModified(), 573 fromNormalFileDTO.getLength(), fromNormalFileDTO.getSha1()); 574 575 monitor.worked(6); 576 } finally { 577 monitor.done(); 578 } 579 } 580 581 private String sha1(byte[] data) { 582 assertNotNull("data", data); 583 try { 584 byte[] hash = HashUtil.hash(HashUtil.HASH_ALGORITHM_SHA, new ByteArrayInputStream(data)); 585 return HashUtil.encodeHexStr(hash); 586 } catch (NoSuchAlgorithmException e) { 587 throw new RuntimeException(e); 588 } catch (IOException e) { 589 throw new RuntimeException(e); 590 } 591 } 592 593 private boolean areFilesExistingAndEqual(RepoFileDTO fromRepoFileDTO, RepoFileDTO toRepoFileDTO) { 594 if (!(fromRepoFileDTO instanceof NormalFileDTO)) 595 return false; 596 597 if (!(toRepoFileDTO instanceof NormalFileDTO)) 598 return false; 599 600 NormalFileDTO fromNormalFileDTO = (NormalFileDTO) fromRepoFileDTO; 601 NormalFileDTO toNormalFileDTO = (NormalFileDTO) toRepoFileDTO; 602 603 return equal(fromNormalFileDTO.getLength(), toNormalFileDTO.getLength()) 604 && equal(fromNormalFileDTO.getLastModified(), toNormalFileDTO.getLastModified()) 605 && equal(fromNormalFileDTO.getSha1(), toNormalFileDTO.getSha1()); 606 } 607 608 public void close() { 609 localRepoManager.close(); 610 localRepoTransport.close(); 611 remoteRepoTransport.close(); 612 } 613 614 private ThreadPoolExecutor createSyncFileAsynchronouslyExecutor() { 615 // TODO make configurable 616 ThreadPoolExecutor syncFileAsynchronouslyExecutor = new ThreadPoolExecutor(3, 3, 617 60, TimeUnit.SECONDS, 618 new LinkedBlockingQueue<Runnable>(2)); 619 syncFileAsynchronouslyExecutor.setRejectedExecutionHandler(new CallerBlocksPolicy()); 620 return syncFileAsynchronouslyExecutor; 621 } 622 623 private void checkAndEvictDoneSyncFileAsynchronouslyFutures(LinkedList<Future<Void>> syncFileAsynchronouslyFutures) { 624 for (Iterator<Future<Void>> it = syncFileAsynchronouslyFutures.iterator(); it.hasNext();) { 625 Future<Void> future = it.next(); 626 if (future.isDone()) { 627 it.remove(); 628 try { 629 future.get(); // We definitely don't need a timeout here, because we invoke it only, if it's done already. It should never wait. 630 } catch (RuntimeException e) { 631 throw e; 632 } catch (Exception e) { 633 throw new RuntimeException(e); 634 } 635 } 636 } 637 } 638 639 private void checkAndEvictAllSyncFileAsynchronouslyFutures(LinkedList<Future<Void>> syncFileAsynchronouslyFutures) { 640 for (Iterator<Future<Void>> it = syncFileAsynchronouslyFutures.iterator(); it.hasNext();) { 641 Future<Void> future = it.next(); 642 it.remove(); 643 try { 644 future.get(); // I don't think we need a timeout, because the operations done in the callable have timeouts already. 645 } catch (RuntimeException e) { 646 throw e; 647 } catch (Exception e) { 648 throw new RuntimeException(e); 649 } 650 } 651 } 652}