001package co.codewizards.cloudstore.core.repo.sync; 002 003import static co.codewizards.cloudstore.core.objectfactory.ObjectFactoryUtil.*; 004import static co.codewizards.cloudstore.core.oio.OioFileFactory.*; 005import static co.codewizards.cloudstore.core.util.HashUtil.*; 006import static co.codewizards.cloudstore.core.util.Util.*; 007import static java.util.Objects.*; 008 009import java.io.IOException; 010import java.net.MalformedURLException; 011import java.net.URL; 012import java.util.ArrayList; 013import java.util.Collection; 014import java.util.HashMap; 015import java.util.HashSet; 016import java.util.Iterator; 017import java.util.List; 018import java.util.Map; 019import java.util.Set; 020import java.util.SortedMap; 021import java.util.TreeMap; 022import java.util.UUID; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import java.util.concurrent.Future; 027 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import co.codewizards.cloudstore.core.dto.ChangeSetDto; 032import co.codewizards.cloudstore.core.dto.ConfigPropSetDto; 033import co.codewizards.cloudstore.core.dto.CopyModificationDto; 034import co.codewizards.cloudstore.core.dto.DeleteModificationDto; 035import co.codewizards.cloudstore.core.dto.DirectoryDto; 036import co.codewizards.cloudstore.core.dto.FileChunkDto; 037import co.codewizards.cloudstore.core.dto.ModificationDto; 038import co.codewizards.cloudstore.core.dto.NormalFileDto; 039import co.codewizards.cloudstore.core.dto.RepoFileDto; 040import co.codewizards.cloudstore.core.dto.RepoFileDtoTreeNode; 041import co.codewizards.cloudstore.core.dto.RepositoryDto; 042import co.codewizards.cloudstore.core.dto.SymlinkDto; 043import co.codewizards.cloudstore.core.dto.VersionInfoDto; 044import co.codewizards.cloudstore.core.oio.File; 045import co.codewizards.cloudstore.core.progress.ProgressMonitor; 046import co.codewizards.cloudstore.core.progress.SubProgressMonitor; 047import co.codewizards.cloudstore.core.repo.local.LocalRepoHelper; 048import co.codewizards.cloudstore.core.repo.local.LocalRepoManager; 049import co.codewizards.cloudstore.core.repo.local.LocalRepoManagerFactory; 050import co.codewizards.cloudstore.core.repo.transport.CollisionException; 051import co.codewizards.cloudstore.core.repo.transport.LocalRepoTransport; 052import co.codewizards.cloudstore.core.repo.transport.RepoTransport; 053import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactory; 054import co.codewizards.cloudstore.core.repo.transport.RepoTransportFactoryRegistry; 055import co.codewizards.cloudstore.core.util.UrlUtil; 056import co.codewizards.cloudstore.core.version.VersionCompatibilityValidator; 057 058/** 059 * Logic for synchronising a local with a remote repository. 060 * @author Marco หงุ่ยตระกูล-Schulze - marco at codewizards dot co 061 */ 062public class RepoToRepoSync implements AutoCloseable { 063 private static final Logger logger = LoggerFactory.getLogger(RepoToRepoSync.class); 064 065 /** 066 * Sync in the inverse direction. This is only for testing whether the RepoTransport implementations 067 * are truly symmetric. It is less efficient! Therefore, this must NEVER be true in production!!! 068 */ 069 private static final boolean TEST_INVERSE = false; 070 071 protected File localRoot; 072 protected URL remoteRoot; 073 protected final LocalRepoManager localRepoManager; 074 protected final LocalRepoTransport localRepoTransport; 075 protected final RepoTransport remoteRepoTransport; 076 protected UUID localRepositoryId; 077 protected UUID remoteRepositoryId; 078 079 private ExecutorService localSyncExecutor; 080 private Future<Void> localSyncFuture; 081 private final Set<UUID> lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds = new HashSet<>(); 082 private File localRepoTmpDir; 083 084 public static final String FILE_DONE_DIR_NAME_PREFIX = "File."; 085 public static final String MODIFICATION_DONE_DIR_NAME_PREFIX = "Modification."; 086 public static final String DONE_DIR_NAME_SUFFIX = ".done"; 087 088 private DoneMarker doneMarker; 089 090 /** 091 * Create an instance. 092 * @param localRoot the root of the local repository or any file/directory inside it. This is 093 * automatically adjusted to fit the connection-point to the remote repository (the remote 094 * repository might be connected to a sub-directory). 095 * @param remoteRoot the root of the remote repository. This must exactly match the connection point. 096 * If a sub-directory of the remote repository is connected to the local repository, this sub-directory 097 * must be referenced here. 098 */ 099 protected RepoToRepoSync(File localRoot, final URL remoteRoot) { 100 this.localRoot = requireNonNull(localRoot, "localRoot"); 101 this.remoteRoot = UrlUtil.canonicalizeURL(requireNonNull(remoteRoot, "remoteRoot")); 102 103 final File localRootWithoutPathPrefix = LocalRepoHelper.getLocalRootContainingFile(requireNonNull(localRoot, "localRoot")); 104 localRepoManager = LocalRepoManagerFactory.Helper.getInstance().createLocalRepoManagerForExistingRepository(localRootWithoutPathPrefix); 105 localRoot = createFile(localRootWithoutPathPrefix, localRepoManager.getLocalPathPrefixOrFail(remoteRoot)); 106 107 localRepositoryId = localRepoManager.getRepositoryId(); 108 if (localRepositoryId == null) 109 throw new IllegalStateException("localRepoManager.getRepositoryId() returned null!"); 110 111 remoteRepositoryId = localRepoManager.getRemoteRepositoryIdOrFail(remoteRoot); 112 113 remoteRepoTransport = createRepoTransport(remoteRoot, localRepositoryId); 114 localRepoTransport = (LocalRepoTransport) createRepoTransport(localRoot, remoteRepositoryId); 115 } 116 117 public static RepoToRepoSync create(final File localRoot, final URL remoteRoot) { 118 return createObject(RepoToRepoSync.class, localRoot, remoteRoot); 119 } 120 121 public void sync(final ProgressMonitor monitor) { 122 requireNonNull(monitor, "monitor"); 123 monitor.beginTask("Synchronising...", 201); 124 try { 125 lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds.clear(); 126 final VersionInfoDto clientVersionInfoDto = localRepoTransport.getVersionInfoDto(); 127 final VersionInfoDto serverVersionInfoDto = remoteRepoTransport.getVersionInfoDto(); 128 VersionCompatibilityValidator.getInstance().validate(clientVersionInfoDto, serverVersionInfoDto); 129 130 readRemoteRepositoryIdFromRepoTransport(); 131 monitor.worked(1); 132 133 if (localSyncExecutor != null) 134 throw new IllegalStateException("localSyncExecutor != null"); 135 136 if (localSyncFuture != null) 137 throw new IllegalStateException("localSyncFuture != null"); 138 139 localSyncExecutor = Executors.newFixedThreadPool(1); 140 localSyncFuture = localSyncExecutor.submit(new Callable<Void>() { 141 @Override 142 public Void call() throws Exception { 143 logger.info("sync: locally syncing {} ('{}')", localRepositoryId, localRoot); 144 localRepoManager.localSync(new SubProgressMonitor(monitor, 50)); 145 return null; 146 } 147 }); 148 149 if (!TEST_INVERSE) { // This is the normal sync (NOT test). 150 syncDown(true, new SubProgressMonitor(monitor, 50)); 151 152 if (localSyncExecutor != null) 153 throw new IllegalStateException("localSyncExecutor != null"); 154 155 if (localSyncFuture != null) 156 throw new IllegalStateException("localSyncFuture != null"); 157 158 syncUp(new SubProgressMonitor(monitor, 50)); 159 // Immediately sync back to make sure the changes we caused don't cause problems later 160 // (right now there's very likely no collision and this should be very fast). 161 syncDown(false, new SubProgressMonitor(monitor, 50)); 162 } 163 else { // THIS IS FOR TESTING ONLY! 164 logger.info("sync: locally syncing on *remote* side {} ('{}')", localRepositoryId, localRoot); 165 remoteRepoTransport.getChangeSetDto(true, null); // trigger the local sync on the remote side (we don't need the change set) 166 167 waitForAndCheckLocalSyncFuture(); 168 169 syncUp(new SubProgressMonitor(monitor, 50)); 170 syncDown(false, new SubProgressMonitor(monitor, 50)); 171 syncUp(new SubProgressMonitor(monitor, 50)); 172 } 173 } finally { 174 monitor.done(); 175 } 176 } 177 178 protected void syncUp(final ProgressMonitor monitor) { 179 logger.info("syncUp: fromID={} from='{}' toID={} to='{}'", 180 localRepositoryId, localRoot, remoteRepositoryId, remoteRoot); 181 sync(localRepoTransport, false, remoteRepoTransport, monitor); 182 } 183 184 protected void syncDown(final boolean fromRepoLocalSync, final ProgressMonitor monitor) { 185 logger.info("syncDown: fromID={} from='{}' toID={} to='{}', fromRepoLocalSync={}", 186 remoteRepositoryId, remoteRoot, localRepositoryId, localRoot, fromRepoLocalSync); 187 sync(remoteRepoTransport, fromRepoLocalSync, localRepoTransport, monitor); 188 } 189 190 private void waitForAndCheckLocalSyncFutureIfExists() { 191 if (localSyncFuture != null) 192 waitForAndCheckLocalSyncFuture(); 193 } 194 195 private void waitForAndCheckLocalSyncFuture() { 196 try { 197 requireNonNull(localSyncFuture, "localSyncFuture").get(); 198 } catch (final RuntimeException e) { 199 throw e; 200 } catch (final Exception e) { 201 throw new RuntimeException(e); 202 } 203 requireNonNull(localSyncExecutor, "localSyncExecutor").shutdown(); 204 localSyncFuture = null; 205 localSyncExecutor = null; 206 } 207 208 private void readRemoteRepositoryIdFromRepoTransport() { 209 final UUID repositoryId = remoteRepoTransport.getRepositoryId(); 210 if (repositoryId == null) 211 throw new IllegalStateException("remoteRepoTransport.getRepositoryId() returned null!"); 212 213 if (!repositoryId.equals(remoteRepositoryId)) 214 throw new IllegalStateException( 215 String.format("remoteRepoTransport.getRepositoryId() does not match repositoryId in local DB! %s != %s", repositoryId, remoteRepositoryId)); 216 } 217 218 private RepoTransport createRepoTransport(final File rootFile, final UUID clientRepositoryId) { 219 URL rootURL; 220 try { 221 rootURL = rootFile.toURI().toURL(); 222 } catch (final MalformedURLException e) { 223 throw new RuntimeException(e); 224 } 225 return createRepoTransport(rootURL, clientRepositoryId); 226 } 227 228 private RepoTransport createRepoTransport(final URL remoteRoot, final UUID clientRepositoryId) { 229 final RepoTransportFactory repoTransportFactory = RepoTransportFactoryRegistry.getInstance().getRepoTransportFactoryOrFail(remoteRoot); 230 return repoTransportFactory.createRepoTransport(remoteRoot, clientRepositoryId); 231 } 232 233 protected void sync(final RepoTransport fromRepoTransport, final boolean fromRepoLocalSync, final RepoTransport toRepoTransport, final ProgressMonitor monitor) { 234 monitor.beginTask("Synchronising...", 100); 235 try { 236 Long lastSyncToRemoteRepoLocalRepositoryRevisionSynced = null; 237 if (lastSyncToRemoteRepoLocalRepositoryRevisionSyncedUpdatedInFromRepositoryIds.add(fromRepoTransport.getRepositoryId())) { 238 RepositoryDto clientRepositoryDto = toRepoTransport.getClientRepositoryDto(); 239 requireNonNull(clientRepositoryDto, "clientRepositoryDto"); 240 lastSyncToRemoteRepoLocalRepositoryRevisionSynced = clientRepositoryDto.getRevision() == Long.MIN_VALUE ? null : clientRepositoryDto.getRevision(); 241 } 242 243 final ChangeSetDto changeSetDto = fromRepoTransport.getChangeSetDto(fromRepoLocalSync, lastSyncToRemoteRepoLocalRepositoryRevisionSynced); 244 monitor.worked(8); 245 246 waitForAndCheckLocalSyncFutureIfExists(); 247 toRepoTransport.prepareForChangeSetDto(changeSetDto); 248 sync(fromRepoTransport, toRepoTransport, changeSetDto, new SubProgressMonitor(monitor, 90)); 249 250 fromRepoTransport.endSyncFromRepository(); 251 toRepoTransport.endSyncToRepository(changeSetDto.getRepositoryDto().getRevision()); 252 deleteDoneDirs(); 253 monitor.worked(2); 254 } finally { 255 monitor.done(); 256 } 257 } 258 259 protected void sync(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 260 final ChangeSetDto changeSetDto, final ProgressMonitor monitor) { 261 monitor.beginTask("Synchronising...", 1 + changeSetDto.getModificationDtos().size() + 3 * changeSetDto.getRepoFileDtos().size() + 1); 262 try { 263 syncParentConfigPropSetDto(fromRepoTransport, toRepoTransport, changeSetDto.getParentConfigPropSetDto(), 264 new SubProgressMonitor(monitor, 1)); 265 266 final RepoFileDtoTreeNode repoFileDtoTree = RepoFileDtoTreeNode.createTree(changeSetDto.getRepoFileDtos()); 267 if (repoFileDtoTree != null) { 268 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 269 new Class<?>[] { DirectoryDto.class }, new Class<?>[0], false, 270 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 271 } 272 273 syncModifications(fromRepoTransport, toRepoTransport, changeSetDto.getModificationDtos(), 274 new SubProgressMonitor(monitor, changeSetDto.getModificationDtos().size())); 275 276 if (repoFileDtoTree != null) { 277 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 278 new Class<?>[] { RepoFileDto.class }, new Class<?>[] { DirectoryDto.class }, true, 279 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 280 } 281 282 if (repoFileDtoTree != null) { 283 sync(fromRepoTransport, toRepoTransport, repoFileDtoTree, 284 new Class<?>[] { RepoFileDto.class }, new Class<?>[] { DirectoryDto.class }, false, 285 new SubProgressMonitor(monitor, repoFileDtoTree.size())); 286 } 287 } finally { 288 monitor.done(); 289 } 290 } 291 292 protected void syncParentConfigPropSetDto(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 293 final ConfigPropSetDto parentConfigPropSetDto, final ProgressMonitor monitor) { 294 requireNonNull(fromRepoTransport, "fromRepoTransport"); 295 requireNonNull(toRepoTransport, "toRepoTransport"); 296 // parentConfigPropSetDto may be null! 297 requireNonNull(monitor, "monitor"); 298 299 monitor.beginTask("Synchronising parent-config...", 1); 300 try { 301 if (parentConfigPropSetDto == null) 302 return; 303 304 toRepoTransport.putParentConfigPropSetDto(parentConfigPropSetDto); 305 } finally { 306 monitor.done(); 307 } 308 } 309 310 protected void sync(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 311 final RepoFileDtoTreeNode repoFileDtoTree, 312 final Class<?>[] repoFileDtoClassesIncl, final Class<?>[] repoFileDtoClassesExcl, final boolean filesInProgressOnly, 313 final ProgressMonitor monitor) { 314 requireNonNull(fromRepoTransport, "fromRepoTransport"); 315 requireNonNull(toRepoTransport, "toRepoTransport"); 316 requireNonNull(repoFileDtoTree, "repoFileDtoTree"); 317 requireNonNull(repoFileDtoClassesIncl, "repoFileDtoClassesIncl"); 318 requireNonNull(repoFileDtoClassesExcl, "repoFileDtoClassesExcl"); 319 requireNonNull(monitor, "monitor"); 320 321 final Map<Class<?>, Boolean> repoFileDtoClass2Included = new HashMap<Class<?>, Boolean>(); 322 final Map<Class<?>, Boolean> repoFileDtoClass2Excluded = new HashMap<Class<?>, Boolean>(); 323 324 final Set<String> fileInProgressPaths = filesInProgressOnly 325 ? localRepoTransport.getFileInProgressPaths(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId()) 326 : null; 327 328 monitor.beginTask("Synchronising...", repoFileDtoTree.size()); 329 try { 330 for (final RepoFileDtoTreeNode repoFileDtoTreeNode : repoFileDtoTree) { 331 if (repoFileDtoTreeNode.getRepoFileDto().isNeededAsParent()) { // not actually modified - serves only to complete the tree structure. 332 monitor.worked(1); 333 continue; 334 } 335 336 if (fileInProgressPaths != null && ! fileInProgressPaths.contains(repoFileDtoTreeNode.getPath())) { 337 monitor.worked(1); 338 continue; 339 } 340 final RepoFileDto repoFileDto = repoFileDtoTreeNode.getRepoFileDto(); 341 final Class<? extends RepoFileDto> repoFileDtoClass = repoFileDto.getClass(); 342 343 Boolean included = repoFileDtoClass2Included.get(repoFileDtoClass); 344 if (included == null) { 345 included = false; 346 for (final Class<?> clazz : repoFileDtoClassesIncl) { 347 if (clazz.isAssignableFrom(repoFileDtoClass)) { 348 included = true; 349 break; 350 } 351 } 352 repoFileDtoClass2Included.put(repoFileDtoClass, included); 353 } 354 355 Boolean excluded = repoFileDtoClass2Excluded.get(repoFileDtoClass); 356 if (excluded == null) { 357 excluded = false; 358 for (final Class<?> clazz : repoFileDtoClassesExcl) { 359 if (clazz.isAssignableFrom(repoFileDtoClass)) { 360 excluded = true; 361 break; 362 } 363 } 364 repoFileDtoClass2Excluded.put(repoFileDtoClass, excluded); 365 } 366 367 if (!included || excluded) { 368 monitor.worked(1); 369 continue; 370 } 371 372 if (isDone(fromRepoTransport, toRepoTransport, repoFileDto)) { 373 logger.debug("sync: Skipping file already done in an interrupted transfer before: {}", repoFileDtoTreeNode.getPath()); 374 monitor.worked(1); 375 continue; 376 } 377 378 if (repoFileDto instanceof DirectoryDto) 379 syncDirectory(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, (DirectoryDto) repoFileDto, new SubProgressMonitor(monitor, 1)); 380 else if (repoFileDto instanceof NormalFileDto) { 381 syncFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, repoFileDto, monitor); 382 } 383 else if (repoFileDto instanceof SymlinkDto) 384 syncSymlink(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, (SymlinkDto) repoFileDto, new SubProgressMonitor(monitor, 1)); 385 else 386 throw new IllegalStateException("Unsupported RepoFileDto type: " + repoFileDto); 387 388 markDone(fromRepoTransport, toRepoTransport, repoFileDto); 389 } 390 } finally { 391 monitor.done(); 392 } 393 } 394 395 protected DoneMarker getDoneMarker(final String doneDirNamePrefix, UUID fromRepositoryId, UUID toRepositoryId) { 396 requireNonNull(doneDirNamePrefix, "doneDirNamePrefix"); 397 final String doneDirName = doneDirNamePrefix + fromRepositoryId + '.' + toRepositoryId + DONE_DIR_NAME_SUFFIX; 398 if (doneMarker != null) { 399 if (doneDirName.equals(doneMarker.getDoneDir().getName())) 400 return doneMarker; 401 402 doneMarker.close(); 403 doneMarker = null; 404 } 405 final File doneDir = getLocalRepoTmpDir().createFile(doneDirName); 406 doneMarker = new DoneMarker(doneDir); 407 return doneMarker; 408 } 409 410 protected void deleteDoneDirs() { 411 if (doneMarker != null) { 412 doneMarker.close(); 413 doneMarker = null; 414 } 415 final File localRepoTmpDir = getLocalRepoTmpDir(); 416 final File[] tmpFiles = localRepoTmpDir.listFiles(); 417 if (tmpFiles != null) { 418 for (final File file : tmpFiles) { 419 if (file.getName().endsWith(DONE_DIR_NAME_SUFFIX)) { 420 file.deleteRecursively();; 421 if (file.exists()) { 422 logger.error("deleteDoneDirs: Cannot delete directory (permissions?): " + file.getAbsolutePath()); 423 } 424 } 425 } 426 } 427 } 428 429 protected File getLocalRepoTmpDir() { 430 try { 431 if (localRepoTmpDir == null) { 432 final File metaDir = getLocalRepoMetaDir(); 433 if (! metaDir.isDirectory()) { 434 if (metaDir.isFile()) 435 throw new IOException(String.format("Path '%s' already exists as ordinary file! It should be a directory!", metaDir.getAbsolutePath())); 436 else 437 throw new IOException(String.format("Directory '%s' does not exist!", metaDir.getAbsolutePath())); 438 } 439 this.localRepoTmpDir = metaDir.createFile(LocalRepoManager.REPO_TEMP_DIR_NAME); 440 } 441 442 if (! localRepoTmpDir.isDirectory()) { 443 localRepoTmpDir.mkdir(); 444 445 if (! localRepoTmpDir.isDirectory()) { 446 if (localRepoTmpDir.isFile()) 447 throw new IOException(String.format("Cannot create directory '%s', because this path already exists as an ordinary file!", localRepoTmpDir.getAbsolutePath())); 448 else 449 throw new IOException(String.format("Creating directory '%s' failed for an unknown reason (permissions? disk full?)!", localRepoTmpDir.getAbsolutePath())); 450 } 451 } 452 return localRepoTmpDir; 453 } catch (RuntimeException x) { 454 throw x; 455 } catch (Exception x) { 456 throw new RuntimeException(x); 457 } 458 } 459 460 protected File getLocalRepoMetaDir() { 461 final File localRoot = localRepoTransport.getLocalRepoManager().getLocalRoot(); 462 return createFile(localRoot, LocalRepoManager.META_DIR_NAME); 463 } 464 465 private boolean isDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final RepoFileDto repoFileDto) { 466 return getDoneMarker(FILE_DONE_DIR_NAME_PREFIX, fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId()) 467 .isDone(repoFileDto.getId(), repoFileDto.getLocalRevision()); 468 469// return localRepoTransport.isTransferDone( 470// fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 471// TransferDoneMarkerType.REPO_FILE, repoFileDto.getId(), repoFileDto.getLocalRevision()); 472 } 473 474 private void markDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final RepoFileDto repoFileDto) { 475 getDoneMarker(FILE_DONE_DIR_NAME_PREFIX, fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId()) 476 .markDone(repoFileDto.getId(), repoFileDto.getLocalRevision()); 477 478// localRepoTransport.markTransferDone( 479// fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 480// TransferDoneMarkerType.REPO_FILE, repoFileDto.getId(), repoFileDto.getLocalRevision()); 481 } 482 483 private boolean isDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final ModificationDto modificationDto) { 484 return getDoneMarker(MODIFICATION_DONE_DIR_NAME_PREFIX, fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId()) 485 .isDone(modificationDto.getId(), modificationDto.getLocalRevision()); 486 487// return localRepoTransport.isTransferDone( 488// fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 489// TransferDoneMarkerType.MODIFICATION, modificationDto.getId(), modificationDto.getLocalRevision()); 490 } 491 492 private void markDone(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final ModificationDto modificationDto) { 493 getDoneMarker(MODIFICATION_DONE_DIR_NAME_PREFIX, fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId()) 494 .markDone(modificationDto.getId(), modificationDto.getLocalRevision()); 495 496// localRepoTransport.markTransferDone( 497// fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), 498// TransferDoneMarkerType.MODIFICATION, modificationDto.getId(), modificationDto.getLocalRevision()); 499 } 500 501 private SortedMap<Long, Collection<ModificationDto>> getLocalRevision2ModificationDtos(final Collection<ModificationDto> modificationDtos) { 502 final SortedMap<Long, Collection<ModificationDto>> map = new TreeMap<Long, Collection<ModificationDto>>(); 503 for (final ModificationDto modificationDto : modificationDtos) { 504 final long localRevision = modificationDto.getLocalRevision(); 505 Collection<ModificationDto> collection = map.get(localRevision); 506 if (collection == null) { 507 collection = new ArrayList<ModificationDto>(); 508 map.put(localRevision, collection); 509 } 510 collection.add(modificationDto); 511 } 512 return map; 513 } 514 515 private void syncModifications(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final Collection<ModificationDto> modificationDtos, final ProgressMonitor monitor) { 516 monitor.beginTask("Synchronising...", modificationDtos.size()); 517 try { 518 final SortedMap<Long,Collection<ModificationDto>> localRevision2ModificationDtos = getLocalRevision2ModificationDtos(modificationDtos); 519 for (final Map.Entry<Long,Collection<ModificationDto>> me : localRevision2ModificationDtos.entrySet()) { 520 final ModificationDtoSet modificationDtoSet = new ModificationDtoSet(me.getValue()); 521 522 for (final List<CopyModificationDto> copyModificationDtos : modificationDtoSet.getFromPath2CopyModificationDtos().values()) { 523 for (final Iterator<CopyModificationDto> itCopyMod = copyModificationDtos.iterator(); itCopyMod.hasNext(); ) { 524 final CopyModificationDto copyModificationDto = itCopyMod.next(); 525 526 if (isDone(fromRepoTransport, toRepoTransport, copyModificationDto)) { 527 logger.debug("sync: Skipping CopyModificaton already done in an interrupted transfer before: {} => {}", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 528 monitor.worked(1); 529 continue; 530 } 531 532 final List<DeleteModificationDto> deleteModificationDtos = modificationDtoSet.getPath2DeleteModificationDtos().get(copyModificationDto.getFromPath()); 533 boolean moveInstead = false; 534 if (!itCopyMod.hasNext() && deleteModificationDtos != null && !deleteModificationDtos.isEmpty()) 535 moveInstead = true; 536 537 if (moveInstead) { 538 logger.info("syncModifications: Moving from '{}' to '{}'", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 539 toRepoTransport.move(copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 540 } 541 else { 542 logger.info("syncModifications: Copying from '{}' to '{}'", copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 543 toRepoTransport.copy(copyModificationDto.getFromPath(), copyModificationDto.getToPath()); 544 } 545 546 if (!moveInstead && deleteModificationDtos != null) { 547 for (final DeleteModificationDto deleteModificationDto : deleteModificationDtos) { 548 logger.info("syncModifications: Deleting '{}'", deleteModificationDto.getPath()); 549 applyDeleteModification(fromRepoTransport, toRepoTransport, deleteModificationDto); 550 } 551 } 552 553 markDone(fromRepoTransport, toRepoTransport, copyModificationDto); 554 } 555 } 556 557 for (final List<DeleteModificationDto> deleteModificationDtos : modificationDtoSet.getPath2DeleteModificationDtos().values()) { 558 for (final DeleteModificationDto deleteModificationDto : deleteModificationDtos) { 559 if (isDone(fromRepoTransport, toRepoTransport, deleteModificationDto)) { 560 logger.debug("sync: Skipping DeleteModificaton already done in an interrupted transfer before: {}", deleteModificationDto.getPath()); 561 monitor.worked(1); 562 continue; 563 } 564 565 logger.info("syncModifications: Deleting '{}'", deleteModificationDto.getPath()); 566 applyDeleteModification(fromRepoTransport, toRepoTransport, deleteModificationDto); 567 568 markDone(fromRepoTransport, toRepoTransport, deleteModificationDto); 569 } 570 } 571 } 572 } finally { 573 monitor.done(); 574 } 575 } 576 577 protected void applyDeleteModification(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final DeleteModificationDto deleteModificationDto) { 578 requireNonNull(fromRepoTransport, "fromRepoTransport"); 579 requireNonNull(toRepoTransport, "toRepoTransport"); 580 requireNonNull(deleteModificationDto, "deleteModificationDto"); 581 582 try { 583 delete(fromRepoTransport, toRepoTransport, deleteModificationDto); 584 } catch (final CollisionException x) { // Note: This cannot happen in CloudStore! But in can happen in downstream projects with different RepoTransport implementations! 585 logger.info("CollisionException during delete: {}", deleteModificationDto.getPath()); 586 if (logger.isDebugEnabled()) 587 logger.debug(x.toString(), x); 588 589 return; 590 } 591 } 592 593 protected void delete(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, final DeleteModificationDto deleteModificationDto) { 594 toRepoTransport.delete(deleteModificationDto.getPath()); 595 } 596 597 private void syncDirectory( 598 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 599 final RepoFileDtoTreeNode repoFileDtoTreeNode, final DirectoryDto directoryDto, final ProgressMonitor monitor) { 600 monitor.beginTask("Synchronising...", 100); 601 try { 602 final String path = repoFileDtoTreeNode.getPath(); 603 logger.info("syncDirectory: path='{}'", path); 604 try { 605 makeDirectory(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, directoryDto); 606 } catch (final CollisionException x) { 607 logger.info("CollisionException during makeDirectory: {}", path); 608 if (logger.isDebugEnabled()) 609 logger.debug(x.toString(), x); 610 611 return; 612 } 613 } finally { 614 monitor.done(); 615 } 616 } 617 618 protected void makeDirectory(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 619 final RepoFileDtoTreeNode repoFileDtoTreeNode, final String path, final DirectoryDto directoryDto) { 620 toRepoTransport.makeDirectory(path, directoryDto.getLastModified()); 621 } 622 623 private void syncSymlink( 624 final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 625 final RepoFileDtoTreeNode repoFileDtoTreeNode, final SymlinkDto symlinkDto, final SubProgressMonitor monitor) { 626 monitor.beginTask("Synchronising...", 100); 627 try { 628 final String path = repoFileDtoTreeNode.getPath(); 629 try { 630 toRepoTransport.makeSymlink(path, symlinkDto.getTarget(), symlinkDto.getLastModified()); 631 } catch (final CollisionException x) { 632 logger.info("CollisionException during makeSymlink: {}", path); 633 if (logger.isDebugEnabled()) 634 logger.debug(x.toString(), x); 635 636 return; 637 } 638 } finally { 639 monitor.done(); 640 } 641 } 642 643 private void syncFile(final RepoTransport fromRepoTransport, 644 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 645 final RepoFileDto normalFileDto, final ProgressMonitor monitor) { 646 monitor.beginTask("Synchronising...", 100); 647 try { 648 final String path = repoFileDtoTreeNode.getPath(); 649 logger.info("syncFile: path='{}'", path); 650 651 final RepoFileDto fromRepoFileDto = fromRepoTransport.getRepoFileDto(path); 652 if (fromRepoFileDto == null) { 653 logger.warn("File was deleted during sync on source side: {}", path); 654 return; 655 } 656 if (!(fromRepoFileDto instanceof NormalFileDto)) { 657 logger.warn("Normal file was replaced by a directory (or another type) during sync on source side: {}", path); 658 return; 659 } 660 monitor.worked(10); 661 662 final NormalFileDto fromNormalFileDto = (NormalFileDto) fromRepoFileDto; 663 664 final RepoFileDto toRepoFileDto = toRepoTransport.getRepoFileDto(path); 665 if (areFilesExistingAndEqual(fromRepoFileDto, toRepoFileDto)) { 666 logger.info("File is already equal on destination side (sha1='{}'): {}", fromNormalFileDto.getSha1(), path); 667 return; 668 } 669 monitor.worked(10); 670 671 logger.info("Beginning to copy file (from.sha1='{}' to.sha1='{}'): {}", fromNormalFileDto.getSha1(), 672 toRepoFileDto instanceof NormalFileDto ? ((NormalFileDto)toRepoFileDto).getSha1() : "<NoInstanceOf_NormalFileDto>", 673 path); 674 675 final NormalFileDto toNormalFileDto; 676 if (toRepoFileDto instanceof NormalFileDto) 677 toNormalFileDto = (NormalFileDto) toRepoFileDto; 678 else 679 toNormalFileDto = createObject(NormalFileDto.class); // dummy (null-object pattern) 680 681 try { 682 beginPutFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fromNormalFileDto); 683 } catch (final CollisionException x) { 684 logger.info("CollisionException during beginPutFile: {}", path); 685 if (logger.isDebugEnabled()) 686 logger.debug(x.toString(), x); 687 688 return; 689 } 690 localRepoTransport.markFileInProgress(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), path, true); 691 monitor.worked(1); 692 693 final Map<Long, FileChunkDto> offset2ToTempFileChunkDto = new HashMap<>(toNormalFileDto.getTempFileChunkDtos().size()); 694 for (final FileChunkDto toTempFileChunkDto : toNormalFileDto.getTempFileChunkDtos()) 695 offset2ToTempFileChunkDto.put(toTempFileChunkDto.getOffset(), toTempFileChunkDto); 696 697 logger.debug("Comparing {} FileChunkDtos. path='{}'", fromNormalFileDto.getFileChunkDtos().size(), path); 698 final List<FileChunkDto> fromFileChunkDtosDirty = new ArrayList<FileChunkDto>(); 699 final Iterator<FileChunkDto> toFileChunkDtoIterator = toNormalFileDto.getFileChunkDtos().iterator(); 700 int fileChunkIndex = -1; 701 for (final FileChunkDto fromFileChunkDto : fromNormalFileDto.getFileChunkDtos()) { 702 final FileChunkDto toFileChunkDto = toFileChunkDtoIterator.hasNext() ? toFileChunkDtoIterator.next() : null; 703 ++fileChunkIndex; 704 final FileChunkDto toTempFileChunkDto = offset2ToTempFileChunkDto.get(fromFileChunkDto.getOffset()); 705 if (toTempFileChunkDto == null) { 706 if (toFileChunkDto != null 707 && equal(fromFileChunkDto.getOffset(), toFileChunkDto.getOffset()) 708 && equal(fromFileChunkDto.getLength(), toFileChunkDto.getLength()) 709 && equal(fromFileChunkDto.getSha1(), toFileChunkDto.getSha1())) { 710 if (logger.isTraceEnabled()) { 711 logger.trace("Skipping clean FileChunkDto. index={} offset={} sha1='{}'", 712 fileChunkIndex, fromFileChunkDto.getOffset(), fromFileChunkDto.getSha1()); 713 } 714 continue; 715 } 716 } 717 else { 718 if (equal(fromFileChunkDto.getOffset(), toTempFileChunkDto.getOffset()) 719 && equal(fromFileChunkDto.getLength(), toTempFileChunkDto.getLength()) 720 && equal(fromFileChunkDto.getSha1(), toTempFileChunkDto.getSha1())) { 721 if (logger.isTraceEnabled()) { 722 logger.trace("Skipping clean temporary FileChunkDto. index={} offset={} sha1='{}'", 723 fileChunkIndex, fromFileChunkDto.getOffset(), fromFileChunkDto.getSha1()); 724 } 725 continue; 726 } 727 } 728 729 if (logger.isTraceEnabled()) { 730 logger.trace("Enlisting dirty FileChunkDto. index={} fromOffset={} toOffset={} fromSha1='{}' toSha1='{}'", 731 fileChunkIndex, fromFileChunkDto.getOffset(), 732 (toFileChunkDto == null ? "null" : toFileChunkDto.getOffset()), 733 fromFileChunkDto.getSha1(), 734 (toFileChunkDto == null ? "null" : toFileChunkDto.getSha1())); 735 } 736 fromFileChunkDtosDirty.add(fromFileChunkDto); 737 } 738 739 logger.info("Need to copy {} dirty file-chunks (of {} total). path='{}'", 740 fromFileChunkDtosDirty.size(), fromNormalFileDto.getFileChunkDtos().size(), path); 741 742 final ProgressMonitor subMonitor = new SubProgressMonitor(monitor, 73); 743 subMonitor.beginTask("Synchronising...", fromFileChunkDtosDirty.size()); 744 fileChunkIndex = -1; 745 long bytesCopied = 0; 746 final long copyChunksBeginTimestamp = System.currentTimeMillis(); 747 for (final FileChunkDto fileChunkDto : fromFileChunkDtosDirty) { 748 ++fileChunkIndex; 749 if (logger.isTraceEnabled()) { 750 logger.trace("Reading data for dirty FileChunkDto (index {} of {}). path='{}' offset={}", 751 fileChunkIndex, fromFileChunkDtosDirty.size(), path, fileChunkDto.getOffset()); 752 } 753 final byte[] fileData = getFileData(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fileChunkDto); 754 755 if (fileData == null) { 756 logger.warn("Source file was modified or deleted during sync: {}", path); 757 // The file is left in state 'inProgress'. Thus it should definitely not be synced back in the opposite 758 // direction. The file should be synced again in the correct direction in the next run (after the source 759 // repo did a local sync, too). 760 return; 761 } 762 763 if (logger.isTraceEnabled()) { 764 logger.trace("Writing data for dirty FileChunkDto ({} of {}). path='{}' offset={}", 765 fileChunkIndex + 1, fromFileChunkDtosDirty.size(), path, fileChunkDto.getOffset()); 766 } 767 768 try { 769 putFileData(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fileChunkDto, fileData); 770 } catch (final CollisionException x) { // Never happens in CloudStore, but in down-stream-projects. Important: They must handle this properly themselves! 771 logger.info("CollisionException during putFileData: {}", path); 772 if (logger.isDebugEnabled()) 773 logger.debug(x.toString(), x); 774 775 return; 776 } 777 778 bytesCopied += fileData.length; 779 subMonitor.worked(1); 780 } 781 subMonitor.done(); 782 783 logger.info("Copied {} dirty file-chunks with together {} bytes in {} ms. path='{}'", 784 fromFileChunkDtosDirty.size(), bytesCopied, System.currentTimeMillis() - copyChunksBeginTimestamp, path); 785 786 endPutFile(fromRepoTransport, toRepoTransport, repoFileDtoTreeNode, path, fromNormalFileDto); 787 localRepoTransport.markFileInProgress(fromRepoTransport.getRepositoryId(), toRepoTransport.getRepositoryId(), path, false); 788 monitor.worked(6); 789 } finally { 790 monitor.done(); 791 } 792 } 793 794 protected byte[] getFileData(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 795 final RepoFileDtoTreeNode repoFileDtoTreeNode, 796 final String path, final FileChunkDto fileChunkDto) { 797 798 final byte[] fileData = fromRepoTransport.getFileData(path, fileChunkDto.getOffset(), fileChunkDto.getLength()); 799 if (fileData == null) 800 return null; // file was deleted 801 802 if (fileData.length != fileChunkDto.getLength() || !sha1(fileData).equals(fileChunkDto.getSha1())) 803 return null; // file was modified 804 805 return fileData; 806 } 807 808 protected void putFileData(final RepoTransport fromRepoTransport, final RepoTransport toRepoTransport, 809 final RepoFileDtoTreeNode repoFileDtoTreeNode, 810 final String path, final FileChunkDto fileChunkDto, 811 final byte[] fileData) { 812 813 toRepoTransport.putFileData(path, fileChunkDto.getOffset(), fileData); 814 } 815 816 protected void beginPutFile(final RepoTransport fromRepoTransport, 817 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 818 final String path, final NormalFileDto fromNormalFileDto) throws CollisionException { 819 820 toRepoTransport.beginPutFile(path); 821 } 822 823 protected void endPutFile(final RepoTransport fromRepoTransport, 824 final RepoTransport toRepoTransport, final RepoFileDtoTreeNode repoFileDtoTreeNode, 825 final String path, final NormalFileDto fromNormalFileDto) { 826 827 toRepoTransport.endPutFile( 828 path, fromNormalFileDto.getLastModified(), 829 fromNormalFileDto.getLength(), fromNormalFileDto.getSha1()); 830 } 831 832 private boolean areFilesExistingAndEqual(final RepoFileDto fromRepoFileDto, final RepoFileDto toRepoFileDto) { 833 if (!(fromRepoFileDto instanceof NormalFileDto)) 834 return false; 835 836 if (!(toRepoFileDto instanceof NormalFileDto)) 837 return false; 838 839 final NormalFileDto fromNormalFileDto = (NormalFileDto) fromRepoFileDto; 840 final NormalFileDto toNormalFileDto = (NormalFileDto) toRepoFileDto; 841 842 return equal(fromNormalFileDto.getLength(), toNormalFileDto.getLength()) 843 && equal(fromNormalFileDto.getLastModified(), toNormalFileDto.getLastModified()) 844 && equal(fromNormalFileDto.getSha1(), toNormalFileDto.getSha1()); 845 } 846 847 @Override 848 public void close() { 849 if (doneMarker != null) { 850 doneMarker.close(); 851 doneMarker = null; 852 } 853 localRepoTransport.close(); 854 remoteRepoTransport.close(); 855 localRepoManager.close(); 856 857 if (localRepoTmpDir != null) 858 localRepoTmpDir.delete(); // deletes only, if empty. 859 } 860}