001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2014-2015 ForgeRock AS 025 */ 026package org.opends.server.replication.server.changelog.file; 027 028import static org.opends.messages.ReplicationMessages.*; 029import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 030import static org.opends.server.util.StaticUtils.*; 031 032import java.io.File; 033import java.util.Collections; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.ConcurrentSkipListMap; 041import java.util.concurrent.CopyOnWriteArrayList; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicReference; 044 045import org.forgerock.i18n.LocalizableMessageBuilder; 046import org.forgerock.i18n.slf4j.LocalizedLogger; 047import org.forgerock.opendj.config.server.ConfigException; 048import org.forgerock.util.Pair; 049import org.forgerock.util.time.TimeService; 050import org.opends.server.admin.std.server.ReplicationServerCfg; 051import org.opends.server.api.DirectoryThread; 052import org.opends.server.backends.ChangelogBackend; 053import org.opends.server.replication.common.CSN; 054import org.opends.server.replication.common.MultiDomainServerState; 055import org.opends.server.replication.common.ServerState; 056import org.opends.server.replication.protocol.UpdateMsg; 057import org.opends.server.replication.server.ChangelogState; 058import org.opends.server.replication.server.ReplicationServer; 059import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; 060import org.opends.server.replication.server.changelog.api.ChangelogDB; 061import org.opends.server.replication.server.changelog.api.ChangelogException; 062import org.opends.server.replication.server.changelog.api.DBCursor; 063import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 064import org.opends.server.replication.server.changelog.api.ReplicaId; 065import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 066import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; 067import org.opends.server.types.DN; 068import org.opends.server.util.StaticUtils; 069import org.opends.server.util.TimeThread; 070 071/** 072 * Log file implementation of the ChangelogDB interface. 073 */ 074public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB 075{ 076 077 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 078 079 /** 080 * This map contains the List of updates received from each LDAP server. 081 * <p> 082 * When removing a domainMap, code: 083 * <ol> 084 * <li>first get the domainMap</li> 085 * <li>synchronized on the domainMap</li> 086 * <li>remove the domainMap</li> 087 * <li>then check it's not null</li> 088 * <li>then close all inside</li> 089 * </ol> 090 * When creating a replicaDB, synchronize on the domainMap to avoid 091 * concurrent shutdown. 092 */ 093 private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs = 094 new ConcurrentHashMap<>(); 095 private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors = 096 new ConcurrentSkipListMap<>(); 097 private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<>(); 098 private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors = 099 new ConcurrentSkipListMap<>(); 100 private ReplicationEnvironment replicationEnv; 101 private final ReplicationServerCfg config; 102 private final File dbDirectory; 103 104 /** 105 * The handler of the changelog database, the database stores the relation 106 * between a change number and the associated cookie. 107 * <p> 108 * @GuardedBy("cnIndexDBLock") 109 */ 110 private FileChangeNumberIndexDB cnIndexDB; 111 private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<>(); 112 113 /** Used for protecting {@link ChangeNumberIndexDB} related state. */ 114 private final Object cnIndexDBLock = new Object(); 115 116 /** 117 * The purge delay (in milliseconds). Records in the changelog DB that are 118 * older than this delay might be removed. 119 */ 120 private volatile long purgeDelayInMillis; 121 private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<>(); 122 123 /** The local replication server. */ 124 private final ReplicationServer replicationServer; 125 private final AtomicBoolean shutdown = new AtomicBoolean(); 126 127 private static final RepositionableCursor<CSN, UpdateMsg> EMPTY_CURSOR = Log.getEmptyCursor(); 128 private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = 129 new FileReplicaDBCursor(EMPTY_CURSOR, null, AFTER_MATCHING_KEY); 130 131 /** 132 * Creates a new changelog DB. 133 * 134 * @param replicationServer 135 * the local replication server. 136 * @param config 137 * the replication server configuration 138 * @throws ConfigException 139 * if a problem occurs opening the supplied directory 140 */ 141 public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) 142 throws ConfigException 143 { 144 this.config = config; 145 this.replicationServer = replicationServer; 146 this.dbDirectory = makeDir(config.getReplicationDBDirectory()); 147 } 148 149 private File makeDir(final String dbDirName) throws ConfigException 150 { 151 // Check that this path exists or create it. 152 final File dbDirectory = getFileForPath(dbDirName); 153 try 154 { 155 if (!dbDirectory.exists()) 156 { 157 dbDirectory.mkdir(); 158 } 159 return dbDirectory; 160 } 161 catch (Exception e) 162 { 163 final LocalizableMessageBuilder mb = new LocalizableMessageBuilder( 164 e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory)); 165 throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); 166 } 167 } 168 169 private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN) 170 { 171 final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); 172 if (domainMap != null) 173 { 174 return domainMap; 175 } 176 return Collections.emptyMap(); 177 } 178 179 private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId) 180 { 181 return getDomainMap(baseDN).get(serverId); 182 } 183 184 /** 185 * Returns a {@link FileReplicaDB}, possibly creating it. 186 * 187 * @param baseDN 188 * the baseDN for which to create a ReplicaDB 189 * @param serverId 190 * the serverId for which to create a ReplicaDB 191 * @param server 192 * the ReplicationServer 193 * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created 194 * @throws ChangelogException 195 * if a problem occurred with the database 196 */ 197 Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId, 198 final ReplicationServer server) throws ChangelogException 199 { 200 while (!shutdown.get()) 201 { 202 final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN); 203 final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); 204 if (result != null) 205 { 206 final Boolean dbWasCreated = result.getSecond(); 207 if (dbWasCreated) 208 { // new replicaDB => update all cursors with it 209 final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); 210 if (cursors != null && !cursors.isEmpty()) 211 { 212 for (DomainDBCursor cursor : cursors) 213 { 214 cursor.addReplicaDB(serverId, null); 215 } 216 } 217 } 218 219 return result; 220 } 221 } 222 throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); 223 } 224 225 private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN) 226 { 227 // happy path: the domainMap already exists 228 final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN); 229 if (currentValue != null) 230 { 231 return currentValue; 232 } 233 234 // unlucky, the domainMap does not exist: take the hit and create the 235 // newValue, even though the same could be done concurrently by another thread 236 final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<>(); 237 final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); 238 if (previousValue != null) 239 { 240 // there was already a value associated to the key, let's use it 241 return previousValue; 242 } 243 244 // we just created a new domain => update all cursors 245 for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) 246 { 247 cursor.addDomain(baseDN, null); 248 } 249 return newValue; 250 } 251 252 private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap, 253 final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException 254 { 255 // happy path: the replicaDB already exists 256 FileReplicaDB currentValue = domainMap.get(serverId); 257 if (currentValue != null) 258 { 259 return Pair.of(currentValue, false); 260 } 261 262 // unlucky, the replicaDB does not exist: take the hit and synchronize 263 // on the domainMap to create a new ReplicaDB 264 synchronized (domainMap) 265 { 266 // double-check 267 currentValue = domainMap.get(serverId); 268 if (currentValue != null) 269 { 270 return Pair.of(currentValue, false); 271 } 272 273 if (domainToReplicaDBs.get(baseDN) != domainMap) 274 { 275 // The domainMap could have been concurrently removed because 276 // 1) a shutdown was initiated or 2) an initialize was called. 277 // Return will allow the code to: 278 // 1) shutdown properly or 2) lazily recreate the replicaDB 279 return null; 280 } 281 282 final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, replicationEnv); 283 domainMap.put(serverId, newDB); 284 return Pair.of(newDB, true); 285 } 286 } 287 288 /** {@inheritDoc} */ 289 @Override 290 public void initializeDB() 291 { 292 try 293 { 294 final File dbDir = getFileForPath(config.getReplicationDBDirectory()); 295 replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer, TimeService.SYSTEM); 296 final ChangelogState changelogState = replicationEnv.getChangelogState(); 297 initializeToChangelogState(changelogState); 298 if (config.isComputeChangeNumber()) 299 { 300 startIndexer(changelogState); 301 } 302 setPurgeDelay(replicationServer.getPurgeDelay()); 303 } 304 catch (ChangelogException e) 305 { 306 logger.traceException(e); 307 logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()); 308 } 309 } 310 311 private void initializeToChangelogState(final ChangelogState changelogState) 312 throws ChangelogException 313 { 314 for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet()) 315 { 316 replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); 317 } 318 for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) 319 { 320 for (int serverId : entry.getValue()) 321 { 322 getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer); 323 } 324 } 325 } 326 327 private void shutdownChangeNumberIndexDB() throws ChangelogException 328 { 329 synchronized (cnIndexDBLock) 330 { 331 if (cnIndexDB != null) 332 { 333 cnIndexDB.shutdown(); 334 } 335 } 336 } 337 338 /** {@inheritDoc} */ 339 @Override 340 public void shutdownDB() throws ChangelogException 341 { 342 if (!this.shutdown.compareAndSet(false, true)) 343 { // shutdown has already been initiated 344 return; 345 } 346 347 // Remember the first exception because : 348 // - we want to try to remove everything we want to remove 349 // - then throw the first encountered exception 350 ChangelogException firstException = null; 351 352 final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); 353 if (indexer != null) 354 { 355 indexer.initiateShutdown(); 356 } 357 final ChangelogDBPurger purger = cnPurger.getAndSet(null); 358 if (purger != null) 359 { 360 purger.initiateShutdown(); 361 } 362 363 // wait for shutdown of the threads holding cursors 364 try 365 { 366 if (indexer != null) 367 { 368 indexer.join(); 369 } 370 if (purger != null) 371 { 372 purger.join(); 373 } 374 } 375 catch (InterruptedException e) 376 { 377 // do nothing: we are already shutting down 378 } 379 380 // now we can safely shutdown all DBs 381 try 382 { 383 shutdownChangeNumberIndexDB(); 384 } 385 catch (ChangelogException e) 386 { 387 firstException = e; 388 } 389 390 for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it = 391 this.domainToReplicaDBs.values().iterator(); it.hasNext();) 392 { 393 final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next(); 394 synchronized (domainMap) 395 { 396 it.remove(); 397 for (FileReplicaDB replicaDB : domainMap.values()) 398 { 399 replicaDB.shutdown(); 400 } 401 } 402 } 403 if (replicationEnv != null) 404 { 405 replicationEnv.shutdown(); 406 } 407 408 if (firstException != null) 409 { 410 throw firstException; 411 } 412 } 413 414 /** 415 * Clears all records from the changelog (does not remove the changelog itself). 416 * 417 * @throws ChangelogException 418 * If an error occurs when clearing the changelog. 419 */ 420 public void clearDB() throws ChangelogException 421 { 422 if (!dbDirectory.exists()) 423 { 424 return; 425 } 426 427 // Remember the first exception because : 428 // - we want to try to remove everything we want to remove 429 // - then throw the first encountered exception 430 ChangelogException firstException = null; 431 432 for (DN baseDN : this.domainToReplicaDBs.keySet()) 433 { 434 removeDomain(baseDN); 435 } 436 437 synchronized (cnIndexDBLock) 438 { 439 if (cnIndexDB != null) 440 { 441 try 442 { 443 cnIndexDB.clear(); 444 } 445 catch (ChangelogException e) 446 { 447 firstException = e; 448 } 449 450 try 451 { 452 shutdownChangeNumberIndexDB(); 453 } 454 catch (ChangelogException e) 455 { 456 if (firstException == null) 457 { 458 firstException = e; 459 } 460 else 461 { 462 logger.traceException(e); 463 } 464 } 465 466 cnIndexDB = null; 467 } 468 } 469 470 if (firstException != null) 471 { 472 throw firstException; 473 } 474 } 475 476 /** {@inheritDoc} */ 477 @Override 478 public void removeDB() throws ChangelogException 479 { 480 shutdownDB(); 481 StaticUtils.recursiveDelete(dbDirectory); 482 } 483 484 /** {@inheritDoc} */ 485 @Override 486 public ServerState getDomainOldestCSNs(DN baseDN) 487 { 488 final ServerState result = new ServerState(); 489 for (FileReplicaDB replicaDB : getDomainMap(baseDN).values()) 490 { 491 result.update(replicaDB.getOldestCSN()); 492 } 493 return result; 494 } 495 496 /** {@inheritDoc} */ 497 @Override 498 public ServerState getDomainNewestCSNs(DN baseDN) 499 { 500 final ServerState result = new ServerState(); 501 for (FileReplicaDB replicaDB : getDomainMap(baseDN).values()) 502 { 503 result.update(replicaDB.getNewestCSN()); 504 } 505 return result; 506 } 507 508 /** {@inheritDoc} */ 509 @Override 510 public void removeDomain(DN baseDN) throws ChangelogException 511 { 512 // Remember the first exception because : 513 // - we want to try to remove everything we want to remove 514 // - then throw the first encountered exception 515 ChangelogException firstException = null; 516 517 // 1- clear the replica DBs 518 Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); 519 if (domainMap != null) 520 { 521 final ChangeNumberIndexer indexer = this.cnIndexer.get(); 522 if (indexer != null) 523 { 524 indexer.clear(baseDN); 525 } 526 synchronized (domainMap) 527 { 528 domainMap = domainToReplicaDBs.remove(baseDN); 529 for (FileReplicaDB replicaDB : domainMap.values()) 530 { 531 try 532 { 533 replicaDB.clear(); 534 } 535 catch (ChangelogException e) 536 { 537 firstException = e; 538 } 539 replicaDB.shutdown(); 540 } 541 } 542 } 543 544 545 // 2- clear the changelogstate DB 546 try 547 { 548 replicationEnv.clearGenerationId(baseDN); 549 } 550 catch (ChangelogException e) 551 { 552 if (firstException == null) 553 { 554 firstException = e; 555 } 556 else 557 { 558 logger.traceException(e); 559 } 560 } 561 562 if (firstException != null) 563 { 564 throw firstException; 565 } 566 } 567 568 /** {@inheritDoc} */ 569 @Override 570 public void setPurgeDelay(final long purgeDelayInMillis) 571 { 572 this.purgeDelayInMillis = purgeDelayInMillis; 573 574 // Rotation time interval for CN Index DB log file 575 // needs to be a fraction of the purge delay 576 // to ensure there is at least one file to purge 577 replicationEnv.setCNIndexDBRotationInterval(purgeDelayInMillis / 2); 578 579 if (purgeDelayInMillis > 0) 580 { 581 final ChangelogDBPurger newPurger = new ChangelogDBPurger(); 582 if (cnPurger.compareAndSet(null, newPurger)) 583 { // no purger was running, run this new one 584 newPurger.start(); 585 } 586 else 587 { // a purger was already running, just wake that one up 588 // to verify if some entries can be purged with the new purge delay 589 final ChangelogDBPurger currentPurger = cnPurger.get(); 590 synchronized (currentPurger) 591 { 592 currentPurger.notify(); 593 } 594 } 595 } 596 else 597 { 598 final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null); 599 if (purgerToStop != null) 600 { // stop this purger 601 purgerToStop.initiateShutdown(); 602 } 603 } 604 } 605 606 /** {@inheritDoc} */ 607 @Override 608 public void setComputeChangeNumber(final boolean computeChangeNumber) 609 throws ChangelogException 610 { 611 if (computeChangeNumber) 612 { 613 startIndexer(replicationEnv.getChangelogState()); 614 } 615 else 616 { 617 final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); 618 if (indexer != null) 619 { 620 indexer.initiateShutdown(); 621 } 622 } 623 } 624 625 private void startIndexer(final ChangelogState changelogState) 626 { 627 final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); 628 if (cnIndexer.compareAndSet(null, indexer)) 629 { 630 indexer.start(); 631 } 632 } 633 634 /** {@inheritDoc} */ 635 @Override 636 public ChangeNumberIndexDB getChangeNumberIndexDB() 637 { 638 synchronized (cnIndexDBLock) 639 { 640 if (cnIndexDB == null) 641 { 642 try 643 { 644 cnIndexDB = new FileChangeNumberIndexDB(replicationEnv); 645 } 646 catch (Exception e) 647 { 648 logger.traceException(e); 649 logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage()); 650 } 651 } 652 return cnIndexDB; 653 } 654 } 655 656 /** {@inheritDoc} */ 657 @Override 658 public ReplicationDomainDB getReplicationDomainDB() 659 { 660 return this; 661 } 662 663 /** {@inheritDoc} */ 664 @Override 665 public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options) 666 throws ChangelogException 667 { 668 final Set<DN> excludedDomainDns = Collections.emptySet(); 669 return getCursorFrom(startState, options, excludedDomainDns); 670 } 671 672 /** {@inheritDoc} */ 673 @Override 674 public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, 675 CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException 676 { 677 final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options); 678 registeredMultiDomainCursors.add(cursor); 679 for (DN baseDN : domainToReplicaDBs.keySet()) 680 { 681 if (!excludedDomainDns.contains(baseDN)) 682 { 683 cursor.addDomain(baseDN, startState.getServerState(baseDN)); 684 } 685 } 686 return cursor; 687 } 688 689 /** {@inheritDoc} */ 690 @Override 691 public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options) 692 throws ChangelogException 693 { 694 final DomainDBCursor cursor = newDomainDBCursor(baseDN, options); 695 for (int serverId : getDomainMap(baseDN).keySet()) 696 { 697 // get the last already sent CSN from that server to get a cursor 698 final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null; 699 cursor.addReplicaDB(serverId, lastCSN); 700 } 701 return cursor; 702 } 703 704 private DomainDBCursor newDomainDBCursor(final DN baseDN, final CursorOptions options) 705 { 706 final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options); 707 putCursor(registeredDomainCursors, baseDN, cursor); 708 return cursor; 709 } 710 711 private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) 712 { 713 final MultiDomainServerState offlineReplicas = 714 replicationEnv.getChangelogState().getOfflineReplicas(); 715 final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); 716 if (offlineCSN != null 717 && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) 718 { 719 return offlineCSN; 720 } 721 return null; 722 } 723 724 @Override 725 public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, 726 CursorOptions options) throws ChangelogException 727 { 728 final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); 729 if (replicaDB != null) 730 { 731 final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN(); 732 final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( 733 actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); 734 final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN); 735 final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); 736 final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); 737 738 putCursor(replicaCursors, replicaId, replicaCursor); 739 740 return replicaCursor; 741 } 742 return EMPTY_CURSOR_REPLICA_DB; 743 } 744 745 private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor) 746 { 747 CopyOnWriteArrayList<V> cursors = map.get(key); 748 if (cursors == null) 749 { 750 cursors = new CopyOnWriteArrayList<>(); 751 CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors); 752 if (previousValue != null) 753 { 754 cursors = previousValue; 755 } 756 } 757 cursors.add(cursor); 758 } 759 760 /** {@inheritDoc} */ 761 @Override 762 public void unregisterCursor(final DBCursor<?> cursor) 763 { 764 if (cursor instanceof MultiDomainDBCursor) 765 { 766 registeredMultiDomainCursors.remove(cursor); 767 } 768 else if (cursor instanceof DomainDBCursor) 769 { 770 final DomainDBCursor domainCursor = (DomainDBCursor) cursor; 771 final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); 772 if (cursors != null) 773 { 774 cursors.remove(cursor); 775 } 776 } 777 else if (cursor instanceof ReplicaCursor) 778 { 779 final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; 780 final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId()); 781 if (cursors != null) 782 { 783 cursors.remove(cursor); 784 } 785 } 786 } 787 788 /** {@inheritDoc} */ 789 @Override 790 public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException 791 { 792 final CSN csn = updateMsg.getCSN(); 793 final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, 794 csn.getServerId(), replicationServer); 795 final FileReplicaDB replicaDB = pair.getFirst(); 796 replicaDB.add(updateMsg); 797 798 ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg); 799 800 final ChangeNumberIndexer indexer = cnIndexer.get(); 801 if (indexer != null) 802 { 803 notifyReplicaOnline(indexer, baseDN, csn.getServerId()); 804 indexer.publishUpdateMsg(baseDN, updateMsg); 805 } 806 return pair.getSecond(); // replica DB was created 807 } 808 809 /** {@inheritDoc} */ 810 @Override 811 public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException 812 { 813 final ChangeNumberIndexer indexer = cnIndexer.get(); 814 if (indexer != null) 815 { 816 notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); 817 indexer.publishHeartbeat(baseDN, heartbeatCSN); 818 } 819 } 820 821 private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) 822 throws ChangelogException 823 { 824 if (indexer.isReplicaOffline(baseDN, serverId)) 825 { 826 replicationEnv.notifyReplicaOnline(baseDN, serverId); 827 } 828 updateCursorsWithOfflineCSN(baseDN, serverId, null); 829 } 830 831 /** {@inheritDoc} */ 832 @Override 833 public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException 834 { 835 replicationEnv.notifyReplicaOffline(baseDN, offlineCSN); 836 final ChangeNumberIndexer indexer = cnIndexer.get(); 837 if (indexer != null) 838 { 839 indexer.replicaOffline(baseDN, offlineCSN); 840 } 841 updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN); 842 } 843 844 private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN) 845 { 846 final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId)); 847 if (cursors != null) 848 { 849 for (ReplicaCursor cursor : cursors) 850 { 851 cursor.setOfflineCSN(offlineCSN); 852 } 853 } 854 } 855 856 /** 857 * The thread purging the changelogDB on a regular interval. Records are 858 * purged from the changelogDB if they are older than a delay specified in 859 * seconds. The purge process works in two steps: 860 * <ol> 861 * <li>first purge the changeNumberIndexDB and retrieve information to drive 862 * replicaDBs purging</li> 863 * <li>proceed to purge each replicaDBs based on the information collected 864 * when purging the changeNumberIndexDB</li> 865 * </ol> 866 */ 867 private final class ChangelogDBPurger extends DirectoryThread 868 { 869 private static final int DEFAULT_SLEEP = 500; 870 871 protected ChangelogDBPurger() 872 { 873 super("changelog DB purger"); 874 } 875 876 /** {@inheritDoc} */ 877 @Override 878 public void run() 879 { 880 // initialize CNIndexDB 881 getChangeNumberIndexDB(); 882 while (!isShutdownInitiated()) 883 { 884 try 885 { 886 final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; 887 final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); 888 final CSN oldestNotPurgedCSN; 889 890 // next code assumes that the compute-change-number config 891 // never changes during the life time of an RS 892 if (!config.isComputeChangeNumber()) 893 { 894 oldestNotPurgedCSN = purgeCSN; 895 } 896 else 897 { 898 final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB; 899 if (localCNIndexDB == null) 900 { // shutdown has been initiated 901 return; 902 } 903 904 oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN); 905 if (oldestNotPurgedCSN == null) 906 { // shutdown may have been initiated... 907 // ... or the change number index DB is empty, 908 // wait for new changes to come in. 909 910 // Note we cannot sleep for as long as the purge delay 911 // (3 days default), because we might receive late updates 912 // that will have to be purged before the purge delay elapses. 913 // This can particularly happen in case of network partitions. 914 if (!isShutdownInitiated()) 915 { 916 synchronized (this) 917 { 918 if (!isShutdownInitiated()) 919 { 920 wait(DEFAULT_SLEEP); 921 } 922 } 923 } 924 continue; 925 } 926 } 927 928 for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values()) 929 { 930 for (final FileReplicaDB replicaDB : domainMap.values()) 931 { 932 replicaDB.purgeUpTo(oldestNotPurgedCSN); 933 } 934 } 935 936 if (!isShutdownInitiated()) 937 { 938 synchronized (this) 939 { 940 if (!isShutdownInitiated()) 941 { 942 wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); 943 } 944 } 945 } 946 } 947 catch (InterruptedException e) 948 { 949 // shutdown initiated? 950 } 951 catch (Exception e) 952 { 953 logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e)); 954 if (replicationServer != null) 955 { 956 replicationServer.shutdown(); 957 } 958 } 959 } 960 } 961 962 private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) 963 { 964 final long nextPurgeTime = notPurgedCSN.getTime(); 965 final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis; 966 if (currentPurgeTime <= nextPurgeTime) 967 { 968 // sleep till the next CSN to purge, 969 return nextPurgeTime - currentPurgeTime; 970 } 971 // wait a bit before purging more 972 return DEFAULT_SLEEP; 973 } 974 975 /** {@inheritDoc} */ 976 @Override 977 public void initiateShutdown() 978 { 979 super.initiateShutdown(); 980 synchronized (this) 981 { 982 notify(); // wake up the purger thread for faster shutdown 983 } 984 } 985 } 986}