001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Portions Copyright 2013-2015 ForgeRock AS 025 */ 026package org.opends.server.replication.server.changelog.je; 027 028import static org.opends.messages.ReplicationMessages.*; 029import static org.opends.server.util.StaticUtils.*; 030 031import java.io.File; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.ConcurrentMap; 039import java.util.concurrent.ConcurrentSkipListMap; 040import java.util.concurrent.CopyOnWriteArrayList; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicReference; 043 044import org.forgerock.i18n.LocalizableMessageBuilder; 045import org.forgerock.i18n.slf4j.LocalizedLogger; 046import org.forgerock.opendj.config.server.ConfigException; 047import org.forgerock.util.Pair; 048import org.opends.server.admin.std.server.ReplicationServerCfg; 049import org.opends.server.api.DirectoryThread; 050import org.opends.server.backends.ChangelogBackend; 051import org.opends.server.replication.common.CSN; 052import org.opends.server.replication.common.MultiDomainServerState; 053import org.opends.server.replication.common.ServerState; 054import org.opends.server.replication.protocol.UpdateMsg; 055import org.opends.server.replication.server.ChangelogState; 056import org.opends.server.replication.server.ReplicationServer; 057import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; 058import org.opends.server.replication.server.changelog.api.ChangelogDB; 059import org.opends.server.replication.server.changelog.api.ChangelogException; 060import org.opends.server.replication.server.changelog.api.DBCursor; 061import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 062import org.opends.server.replication.server.changelog.api.ReplicaId; 063import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 064import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer; 065import org.opends.server.replication.server.changelog.file.DomainDBCursor; 066import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor; 067import org.opends.server.replication.server.changelog.file.ReplicaCursor; 068import org.opends.server.types.DN; 069import org.opends.server.util.StaticUtils; 070import org.opends.server.util.TimeThread; 071 072/** 073 * JE implementation of the ChangelogDB interface. 074 */ 075public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB 076{ 077 /** The tracer object for the debug logger. */ 078 protected static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 079 080 /** 081 * This map contains the List of updates received from each LDAP server. 082 * <p> 083 * When removing a domainMap, code: 084 * <ol> 085 * <li>first get the domainMap</li> 086 * <li>synchronized on the domainMap</li> 087 * <li>remove the domainMap</li> 088 * <li>then check it's not null</li> 089 * <li>then close all inside</li> 090 * </ol> 091 * When creating a replicaDB, synchronize on the domainMap to avoid 092 * concurrent shutdown. 093 */ 094 private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs = 095 new ConcurrentHashMap<>(); 096 private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors = 097 new ConcurrentSkipListMap<>(); 098 private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<>(); 099 private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors = 100 new ConcurrentSkipListMap<>(); 101 private ReplicationDbEnv replicationEnv; 102 private final ReplicationServerCfg config; 103 private final File dbDirectory; 104 105 /** 106 * The handler of the changelog database, the database stores the relation 107 * between a change number and the associated cookie. 108 * <p> 109 * @GuardedBy("cnIndexDBLock") 110 */ 111 private JEChangeNumberIndexDB cnIndexDB; 112 private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<>(); 113 114 /** Used for protecting {@link ChangeNumberIndexDB} related state. */ 115 private final Object cnIndexDBLock = new Object(); 116 117 /** 118 * The purge delay (in milliseconds). Records in the changelog DB that are 119 * older than this delay might be removed. 120 */ 121 private volatile long purgeDelayInMillis; 122 private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<>(); 123 124 /** The local replication server. */ 125 private final ReplicationServer replicationServer; 126 private final AtomicBoolean shutdown = new AtomicBoolean(); 127 128 private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = 129 new DBCursor<UpdateMsg>() 130 { 131 132 @Override 133 public boolean next() 134 { 135 return false; 136 } 137 138 @Override 139 public UpdateMsg getRecord() 140 { 141 return null; 142 } 143 144 @Override 145 public void close() 146 { 147 // empty 148 } 149 150 @Override 151 public String toString() 152 { 153 return "EmptyDBCursor<UpdateMsg>"; 154 } 155 }; 156 157 /** 158 * Creates a new changelog DB. 159 * 160 * @param replicationServer 161 * the local replication server. 162 * @param config 163 * the replication server configuration 164 * @throws ConfigException 165 * if a problem occurs opening the supplied directory 166 */ 167 public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config) 168 throws ConfigException 169 { 170 this.config = config; 171 this.replicationServer = replicationServer; 172 this.dbDirectory = makeDir(config.getReplicationDBDirectory()); 173 } 174 175 private File makeDir(final String dbDirName) throws ConfigException 176 { 177 // Check that this path exists or create it. 178 final File dbDirectory = getFileForPath(dbDirName); 179 try 180 { 181 if (!dbDirectory.exists()) 182 { 183 dbDirectory.mkdir(); 184 } 185 return dbDirectory; 186 } 187 catch (Exception e) 188 { 189 final LocalizableMessageBuilder mb = new LocalizableMessageBuilder( 190 e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory)); 191 throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e); 192 } 193 } 194 195 private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN) 196 { 197 final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); 198 if (domainMap != null) 199 { 200 return domainMap; 201 } 202 return Collections.emptyMap(); 203 } 204 205 private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId) 206 { 207 return getDomainMap(baseDN).get(serverId); 208 } 209 210 /** 211 * Returns a {@link JEReplicaDB}, possibly creating it. 212 * 213 * @param baseDN 214 * the baseDN for which to create a ReplicaDB 215 * @param serverId 216 * the serverId for which to create a ReplicaDB 217 * @param server 218 * the ReplicationServer 219 * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created 220 * @throws ChangelogException 221 * if a problem occurred with the database 222 */ 223 Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId, 224 final ReplicationServer server) throws ChangelogException 225 { 226 while (!shutdown.get()) 227 { 228 final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN); 229 final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server); 230 if (result != null) 231 { 232 final Boolean dbWasCreated = result.getSecond(); 233 if (dbWasCreated) 234 { // new replicaDB => update all cursors with it 235 final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); 236 if (cursors != null && !cursors.isEmpty()) 237 { 238 for (DomainDBCursor cursor : cursors) 239 { 240 cursor.addReplicaDB(serverId, null); 241 } 242 } 243 } 244 245 return result; 246 } 247 } 248 throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get()); 249 } 250 251 private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN) 252 { 253 // happy path: the domainMap already exists 254 final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN); 255 if (currentValue != null) 256 { 257 return currentValue; 258 } 259 260 // unlucky, the domainMap does not exist: take the hit and create the 261 // newValue, even though the same could be done concurrently by another thread 262 final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<>(); 263 final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); 264 if (previousValue != null) 265 { 266 // there was already a value associated to the key, let's use it 267 return previousValue; 268 } 269 270 // we just created a new domain => update all cursors 271 for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) 272 { 273 cursor.addDomain(baseDN, null); 274 } 275 return newValue; 276 } 277 278 private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap, 279 final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException 280 { 281 // happy path: the replicaDB already exists 282 JEReplicaDB currentValue = domainMap.get(serverId); 283 if (currentValue != null) 284 { 285 return Pair.of(currentValue, false); 286 } 287 288 // unlucky, the replicaDB does not exist: take the hit and synchronize 289 // on the domainMap to create a new ReplicaDB 290 synchronized (domainMap) 291 { 292 // double-check 293 currentValue = domainMap.get(serverId); 294 if (currentValue != null) 295 { 296 return Pair.of(currentValue, false); 297 } 298 299 if (domainToReplicaDBs.get(baseDN) != domainMap) 300 { 301 // The domainMap could have been concurrently removed because 302 // 1) a shutdown was initiated or 2) an initialize was called. 303 // Return will allow the code to: 304 // 1) shutdown properly or 2) lazily recreate the replicaDB 305 return null; 306 } 307 308 final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv); 309 domainMap.put(serverId, newDB); 310 return Pair.of(newDB, true); 311 } 312 } 313 314 /** {@inheritDoc} */ 315 @Override 316 public void initializeDB() 317 { 318 try 319 { 320 final File dbDir = getFileForPath(config.getReplicationDBDirectory()); 321 replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer); 322 final ChangelogState changelogState = replicationEnv.getChangelogState(); 323 initializeToChangelogState(changelogState); 324 if (config.isComputeChangeNumber()) 325 { 326 startIndexer(changelogState); 327 } 328 setPurgeDelay(replicationServer.getPurgeDelay()); 329 } 330 catch (ChangelogException e) 331 { 332 logger.traceException(e); 333 logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()); 334 } 335 } 336 337 private void initializeToChangelogState(final ChangelogState changelogState) 338 throws ChangelogException 339 { 340 for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet()) 341 { 342 replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue()); 343 } 344 for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) 345 { 346 for (int serverId : entry.getValue()) 347 { 348 getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer); 349 } 350 } 351 } 352 353 private void shutdownChangeNumberIndexDB() throws ChangelogException 354 { 355 synchronized (cnIndexDBLock) 356 { 357 if (cnIndexDB != null) 358 { 359 cnIndexDB.shutdown(); 360 } 361 } 362 } 363 364 /** {@inheritDoc} */ 365 @Override 366 public void shutdownDB() throws ChangelogException 367 { 368 if (!this.shutdown.compareAndSet(false, true)) 369 { // shutdown has already been initiated 370 return; 371 } 372 373 // Remember the first exception because : 374 // - we want to try to remove everything we want to remove 375 // - then throw the first encountered exception 376 ChangelogException firstException = null; 377 378 final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); 379 if (indexer != null) 380 { 381 indexer.initiateShutdown(); 382 } 383 final ChangelogDBPurger purger = cnPurger.getAndSet(null); 384 if (purger != null) 385 { 386 purger.initiateShutdown(); 387 } 388 389 try 390 { 391 shutdownChangeNumberIndexDB(); 392 } 393 catch (ChangelogException e) 394 { 395 firstException = e; 396 } 397 398 for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it = 399 this.domainToReplicaDBs.values().iterator(); it.hasNext();) 400 { 401 final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next(); 402 synchronized (domainMap) 403 { 404 it.remove(); 405 for (JEReplicaDB replicaDB : domainMap.values()) 406 { 407 replicaDB.shutdown(); 408 } 409 } 410 } 411 412 if (replicationEnv != null) 413 { 414 // wait for shutdown of the threads holding cursors 415 try 416 { 417 if (indexer != null) 418 { 419 indexer.join(); 420 } 421 if (purger != null) 422 { 423 purger.join(); 424 } 425 } 426 catch (InterruptedException e) 427 { 428 // do nothing: we are already shutting down 429 } 430 431 replicationEnv.shutdown(); 432 } 433 434 if (firstException != null) 435 { 436 throw firstException; 437 } 438 } 439 440 /** 441 * Clears all records from the changelog (does not remove the changelog itself). 442 * 443 * @throws ChangelogException 444 * If an error occurs when clearing the changelog. 445 */ 446 public void clearDB() throws ChangelogException 447 { 448 if (!dbDirectory.exists()) 449 { 450 return; 451 } 452 453 // Remember the first exception because : 454 // - we want to try to remove everything we want to remove 455 // - then throw the first encountered exception 456 ChangelogException firstException = null; 457 458 for (DN baseDN : this.domainToReplicaDBs.keySet()) 459 { 460 removeDomain(baseDN); 461 } 462 463 synchronized (cnIndexDBLock) 464 { 465 if (cnIndexDB != null) 466 { 467 try 468 { 469 cnIndexDB.clear(); 470 } 471 catch (ChangelogException e) 472 { 473 firstException = e; 474 } 475 476 try 477 { 478 shutdownChangeNumberIndexDB(); 479 } 480 catch (ChangelogException e) 481 { 482 if (firstException == null) 483 { 484 firstException = e; 485 } 486 else 487 { 488 logger.traceException(e); 489 } 490 } 491 492 cnIndexDB = null; 493 } 494 } 495 496 if (firstException != null) 497 { 498 throw firstException; 499 } 500 } 501 502 /** {@inheritDoc} */ 503 @Override 504 public void removeDB() throws ChangelogException 505 { 506 shutdownDB(); 507 StaticUtils.recursiveDelete(dbDirectory); 508 } 509 510 /** {@inheritDoc} */ 511 @Override 512 public ServerState getDomainOldestCSNs(DN baseDN) 513 { 514 final ServerState result = new ServerState(); 515 for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) 516 { 517 result.update(replicaDB.getOldestCSN()); 518 } 519 return result; 520 } 521 522 /** {@inheritDoc} */ 523 @Override 524 public ServerState getDomainNewestCSNs(DN baseDN) 525 { 526 final ServerState result = new ServerState(); 527 for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) 528 { 529 result.update(replicaDB.getNewestCSN()); 530 } 531 return result; 532 } 533 534 /** {@inheritDoc} */ 535 @Override 536 public void removeDomain(DN baseDN) throws ChangelogException 537 { 538 // Remember the first exception because : 539 // - we want to try to remove everything we want to remove 540 // - then throw the first encountered exception 541 ChangelogException firstException = null; 542 543 // 1- clear the replica DBs 544 Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN); 545 if (domainMap != null) 546 { 547 final ChangeNumberIndexer indexer = this.cnIndexer.get(); 548 if (indexer != null) 549 { 550 indexer.clear(baseDN); 551 } 552 synchronized (domainMap) 553 { 554 domainMap = domainToReplicaDBs.remove(baseDN); 555 for (JEReplicaDB replicaDB : domainMap.values()) 556 { 557 try 558 { 559 replicaDB.clear(); 560 } 561 catch (ChangelogException e) 562 { 563 firstException = e; 564 } 565 replicaDB.shutdown(); 566 } 567 } 568 } 569 570 // 2- clear the ChangeNumber index DB 571 synchronized (cnIndexDBLock) 572 { 573 if (cnIndexDB != null) 574 { 575 try 576 { 577 cnIndexDB.removeDomain(baseDN); 578 } 579 catch (ChangelogException e) 580 { 581 if (firstException == null) 582 { 583 firstException = e; 584 } 585 else 586 { 587 logger.traceException(e); 588 } 589 } 590 } 591 } 592 593 // 3- clear the changelogstate DB 594 try 595 { 596 replicationEnv.clearGenerationId(baseDN); 597 } 598 catch (ChangelogException e) 599 { 600 if (firstException == null) 601 { 602 firstException = e; 603 } 604 else 605 { 606 logger.traceException(e); 607 } 608 } 609 610 if (firstException != null) 611 { 612 throw firstException; 613 } 614 } 615 616 /** {@inheritDoc} */ 617 @Override 618 public void setPurgeDelay(final long purgeDelayInMillis) 619 { 620 this.purgeDelayInMillis = purgeDelayInMillis; 621 if (purgeDelayInMillis > 0) 622 { 623 final ChangelogDBPurger newPurger = new ChangelogDBPurger(); 624 if (cnPurger.compareAndSet(null, newPurger)) 625 { // no purger was running, run this new one 626 newPurger.start(); 627 } 628 else 629 { // a purger was already running, just wake that one up 630 // to verify if some entries can be purged with the new purge delay 631 final ChangelogDBPurger currentPurger = cnPurger.get(); 632 synchronized (currentPurger) 633 { 634 currentPurger.notify(); 635 } 636 } 637 } 638 else 639 { 640 final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null); 641 if (purgerToStop != null) 642 { // stop this purger 643 purgerToStop.initiateShutdown(); 644 } 645 } 646 } 647 648 /** {@inheritDoc} */ 649 @Override 650 public void setComputeChangeNumber(final boolean computeChangeNumber) 651 throws ChangelogException 652 { 653 if (computeChangeNumber) 654 { 655 startIndexer(replicationEnv.getChangelogState()); 656 } 657 else 658 { 659 final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); 660 if (indexer != null) 661 { 662 indexer.initiateShutdown(); 663 } 664 } 665 } 666 667 private void startIndexer(final ChangelogState changelogState) 668 { 669 final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); 670 if (cnIndexer.compareAndSet(null, indexer)) 671 { 672 indexer.start(); 673 } 674 } 675 676 /** {@inheritDoc} */ 677 @Override 678 public ChangeNumberIndexDB getChangeNumberIndexDB() 679 { 680 synchronized (cnIndexDBLock) 681 { 682 if (cnIndexDB == null) 683 { 684 try 685 { 686 cnIndexDB = new JEChangeNumberIndexDB(replicationEnv); 687 } 688 catch (Exception e) 689 { 690 logger.traceException(e); 691 logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage()); 692 } 693 } 694 return cnIndexDB; 695 } 696 } 697 698 /** {@inheritDoc} */ 699 @Override 700 public ReplicationDomainDB getReplicationDomainDB() 701 { 702 return this; 703 } 704 705 /** {@inheritDoc} */ 706 @Override 707 public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options) 708 throws ChangelogException 709 { 710 final Set<DN> excludedDomainDns = Collections.emptySet(); 711 return getCursorFrom(startState, options, excludedDomainDns); 712 } 713 714 /** {@inheritDoc} */ 715 @Override 716 public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, 717 CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException 718 { 719 final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options); 720 registeredMultiDomainCursors.add(cursor); 721 for (DN baseDN : domainToReplicaDBs.keySet()) 722 { 723 if (!excludedDomainDns.contains(baseDN)) 724 { 725 cursor.addDomain(baseDN, startState.getServerState(baseDN)); 726 } 727 } 728 return cursor; 729 } 730 731 /** {@inheritDoc} */ 732 @Override 733 public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options) 734 throws ChangelogException 735 { 736 final DomainDBCursor cursor = newDomainDBCursor(baseDN, options); 737 for (int serverId : getDomainMap(baseDN).keySet()) 738 { 739 // get the last already sent CSN from that server to get a cursor 740 final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null; 741 cursor.addReplicaDB(serverId, lastCSN); 742 } 743 return cursor; 744 } 745 746 private DomainDBCursor newDomainDBCursor(final DN baseDN, CursorOptions options) 747 { 748 final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options); 749 putCursor(registeredDomainCursors, baseDN, cursor); 750 return cursor; 751 } 752 753 private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN) 754 { 755 final MultiDomainServerState offlineReplicas = 756 replicationEnv.getChangelogState().getOfflineReplicas(); 757 final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId); 758 if (offlineCSN != null 759 && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN))) 760 { 761 return offlineCSN; 762 } 763 return null; 764 } 765 766 @Override 767 public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, 768 CursorOptions options) throws ChangelogException 769 { 770 final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); 771 if (replicaDB != null) 772 { 773 final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN(); 774 final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( 775 actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); 776 final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN); 777 final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); 778 final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); 779 780 putCursor(replicaCursors, replicaId, replicaCursor); 781 782 return replicaCursor; 783 } 784 return EMPTY_CURSOR_REPLICA_DB; 785 } 786 787 private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor) 788 { 789 CopyOnWriteArrayList<V> cursors = map.get(key); 790 if (cursors == null) 791 { 792 cursors = new CopyOnWriteArrayList<>(); 793 CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors); 794 if (previousValue != null) 795 { 796 cursors = previousValue; 797 } 798 } 799 cursors.add(cursor); 800 } 801 802 /** {@inheritDoc} */ 803 @Override 804 public void unregisterCursor(final DBCursor<?> cursor) 805 { 806 if (cursor instanceof MultiDomainDBCursor) 807 { 808 registeredMultiDomainCursors.remove(cursor); 809 } 810 else if (cursor instanceof DomainDBCursor) 811 { 812 final DomainDBCursor domainCursor = (DomainDBCursor) cursor; 813 final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN()); 814 if (cursors != null) 815 { 816 cursors.remove(cursor); 817 } 818 } 819 else if (cursor instanceof ReplicaCursor) 820 { 821 final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; 822 final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId()); 823 if (cursors != null) 824 { 825 cursors.remove(cursor); 826 } 827 } 828 } 829 830 /** {@inheritDoc} */ 831 @Override 832 public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException 833 { 834 final CSN csn = updateMsg.getCSN(); 835 final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN, 836 csn.getServerId(), replicationServer); 837 final JEReplicaDB replicaDB = pair.getFirst(); 838 replicaDB.add(updateMsg); 839 840 ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg); 841 842 final ChangeNumberIndexer indexer = cnIndexer.get(); 843 if (indexer != null) 844 { 845 notifyReplicaOnline(indexer, baseDN, csn.getServerId()); 846 indexer.publishUpdateMsg(baseDN, updateMsg); 847 } 848 return pair.getSecond(); // replica DB was created 849 } 850 851 /** {@inheritDoc} */ 852 @Override 853 public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException 854 { 855 final ChangeNumberIndexer indexer = cnIndexer.get(); 856 if (indexer != null) 857 { 858 notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId()); 859 indexer.publishHeartbeat(baseDN, heartbeatCSN); 860 } 861 } 862 863 private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId) 864 throws ChangelogException 865 { 866 if (indexer.isReplicaOffline(baseDN, serverId)) 867 { 868 replicationEnv.notifyReplicaOnline(baseDN, serverId); 869 } 870 updateCursorsWithOfflineCSN(baseDN, serverId, null); 871 } 872 873 /** {@inheritDoc} */ 874 @Override 875 public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException 876 { 877 replicationEnv.notifyReplicaOffline(baseDN, offlineCSN); 878 final ChangeNumberIndexer indexer = cnIndexer.get(); 879 if (indexer != null) 880 { 881 indexer.replicaOffline(baseDN, offlineCSN); 882 } 883 updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN); 884 } 885 886 private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN) 887 { 888 final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId)); 889 if (cursors != null) 890 { 891 for (ReplicaCursor cursor : cursors) 892 { 893 cursor.setOfflineCSN(offlineCSN); 894 } 895 } 896 } 897 898 /** 899 * The thread purging the changelogDB on a regular interval. Records are 900 * purged from the changelogDB if they are older than a delay specified in 901 * seconds. The purge process works in two steps: 902 * <ol> 903 * <li>first purge the changeNumberIndexDB and retrieve information to drive 904 * replicaDBs purging</li> 905 * <li>proceed to purge each replicaDBs based on the information collected 906 * when purging the changeNumberIndexDB</li> 907 * </ol> 908 */ 909 private final class ChangelogDBPurger extends DirectoryThread 910 { 911 private static final int DEFAULT_SLEEP = 500; 912 913 protected ChangelogDBPurger() 914 { 915 super("changelog DB purger"); 916 } 917 918 /** {@inheritDoc} */ 919 @Override 920 public void run() 921 { 922 // initialize CNIndexDB 923 getChangeNumberIndexDB(); 924 while (!isShutdownInitiated()) 925 { 926 try 927 { 928 final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; 929 final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); 930 final CSN oldestNotPurgedCSN; 931 932 // next code assumes that the compute-change-number config 933 // never changes during the life time of an RS 934 if (!config.isComputeChangeNumber()) 935 { 936 oldestNotPurgedCSN = purgeCSN; 937 } 938 else 939 { 940 final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB; 941 if (localCNIndexDB == null) 942 { // shutdown has been initiated 943 return; 944 } 945 946 oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN); 947 if (oldestNotPurgedCSN == null) 948 { // shutdown may have been initiated... 949 // ... or the change number index DB is empty, 950 // wait for new changes to come in. 951 952 // Note we cannot sleep for as long as the purge delay 953 // (3 days default), because we might receive late updates 954 // that will have to be purged before the purge delay elapses. 955 // This can particularly happen in case of network partitions. 956 if (!isShutdownInitiated()) 957 { 958 synchronized (this) 959 { 960 if (!isShutdownInitiated()) 961 { 962 wait(DEFAULT_SLEEP); 963 } 964 } 965 } 966 continue; 967 } 968 } 969 970 for (final Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) 971 { 972 for (final JEReplicaDB replicaDB : domainMap.values()) 973 { 974 replicaDB.purgeUpTo(oldestNotPurgedCSN); 975 } 976 } 977 978 if (!isShutdownInitiated()) 979 { 980 synchronized (this) 981 { 982 if (!isShutdownInitiated()) 983 { 984 wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); 985 } 986 } 987 } 988 } 989 catch (InterruptedException e) 990 { 991 // shutdown initiated? 992 } 993 catch (Exception e) 994 { 995 logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e)); 996 if (replicationServer != null) 997 { 998 replicationServer.shutdown(); 999 } 1000 } 1001 } 1002 } 1003 1004 private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) 1005 { 1006 final long nextPurgeTime = notPurgedCSN.getTime(); 1007 final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis; 1008 if (currentPurgeTime <= nextPurgeTime) 1009 { 1010 // sleep till the next CSN to purge, 1011 return nextPurgeTime - currentPurgeTime; 1012 } 1013 // wait a bit before purging more 1014 return DEFAULT_SLEEP; 1015 } 1016 1017 /** {@inheritDoc} */ 1018 @Override 1019 public void initiateShutdown() 1020 { 1021 super.initiateShutdown(); 1022 synchronized (this) 1023 { 1024 notify(); // wake up the purger thread for faster shutdown 1025 } 1026 } 1027 } 1028}