001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2013-2015 ForgeRock AS 025 */ 026package org.opends.server.replication.server.changelog.file; 027 028import java.util.Map.Entry; 029import java.util.Set; 030import java.util.concurrent.ConcurrentSkipListSet; 031 032import org.forgerock.i18n.slf4j.LocalizedLogger; 033import org.opends.server.api.DirectoryThread; 034import org.opends.server.backends.ChangelogBackend; 035import org.opends.server.replication.common.CSN; 036import org.opends.server.replication.common.MultiDomainServerState; 037import org.opends.server.replication.common.ServerState; 038import org.opends.server.replication.protocol.ReplicaOfflineMsg; 039import org.opends.server.replication.protocol.UpdateMsg; 040import org.opends.server.replication.server.ChangelogState; 041import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException; 042import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 043import org.opends.server.replication.server.changelog.api.ChangelogDB; 044import org.opends.server.replication.server.changelog.api.ChangelogException; 045import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 046import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 047import org.opends.server.types.DN; 048 049import static org.opends.messages.ReplicationMessages.*; 050import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; 051import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 052import static org.opends.server.util.StaticUtils.*; 053 054/** 055 * Thread responsible for inserting replicated changes into the ChangeNumber 056 * Index DB (CNIndexDB for short). 057 * <p> 058 * Only changes older than the medium consistency point are inserted in the 059 * CNIndexDB. As a consequence this class is also responsible for maintaining 060 * the medium consistency point (indirectly through an 061 * {@link ECLMultiDomainDBCursor}). 062 */ 063public class ChangeNumberIndexer extends DirectoryThread 064{ 065 /** The tracer object for the debug logger. */ 066 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 067 068 /** 069 * If it contains nothing, then the run method executes normally. 070 * Otherwise, the {@link #run()} method must clear its state 071 * for the supplied domain baseDNs. If a supplied domain is 072 * {@link DN#NULL_DN}, then all domains will be cleared. 073 */ 074 private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>(); 075 private final ChangelogDB changelogDB; 076 /** Only used for initialization, and then discarded. */ 077 private ChangelogState changelogState; 078 private final ECLEnabledDomainPredicate predicate; 079 080 /* 081 * The following MultiDomainServerState fields must be thread safe, because 082 * 1) initialization can happen while the replication server starts receiving 083 * updates 084 * 2) many updates can happen concurrently. 085 */ 086 /** 087 * Holds the last time each replica was seen alive, whether via updates or 088 * heartbeat notifications, or offline notifications. Data is held for each 089 * serverId cross domain. 090 * <p> 091 * Updates are persistent and stored in the replicaDBs, heartbeats are 092 * transient and are easily constructed on normal operations. 093 * <p> 094 * Note: This object is updated by both heartbeats and changes/updates. 095 */ 096 private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); 097 098 /** Note: This object is updated by replica offline messages. */ 099 private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); 100 101 /** 102 * Cursor across all the replicaDBs for all the replication domains. It is 103 * positioned on the next change that needs to be inserted in the CNIndexDB. 104 * <p> 105 * Note: it is only accessed from the {@link #run()} method. 106 * 107 * @NonNull 108 */ 109 private ECLMultiDomainDBCursor nextChangeForInsertDBCursor; 110 private MultiDomainServerState cookie = new MultiDomainServerState(); 111 112 /** 113 * Builds a ChangeNumberIndexer object. 114 * 115 * @param changelogDB 116 * the changelogDB 117 * @param changelogState 118 * the changelog state used for initialization 119 */ 120 public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) 121 { 122 this(changelogDB, changelogState, new ECLEnabledDomainPredicate()); 123 } 124 125 /** 126 * Builds a ChangeNumberIndexer object. 127 * 128 * @param changelogDB 129 * the changelogDB 130 * @param changelogState 131 * the changelog state used for initialization 132 * @param predicate 133 * tells whether a domain is enabled for the external changelog 134 */ 135 ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState, 136 ECLEnabledDomainPredicate predicate) 137 { 138 super("Change number indexer"); 139 this.changelogDB = changelogDB; 140 this.changelogState = changelogState; 141 this.predicate = predicate; 142 } 143 144 /** 145 * Ensures the medium consistency point is updated by heartbeats. 146 * 147 * @param baseDN 148 * the baseDN of the domain for which the heartbeat is published 149 * @param heartbeatCSN 150 * the CSN coming from the heartbeat 151 */ 152 public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) 153 { 154 if (!predicate.isECLEnabledDomain(baseDN)) 155 { 156 return; 157 } 158 159 final CSN oldestCSNBefore = getOldestLastAliveCSN(); 160 lastAliveCSNs.update(baseDN, heartbeatCSN); 161 tryNotify(oldestCSNBefore); 162 } 163 164 /** 165 * Indicates if the replica corresponding to provided domain DN and server id 166 * is offline. 167 * 168 * @param domainDN 169 * base DN of the replica 170 * @param serverId 171 * server id of the replica 172 * @return {@code true} if replica is offline, {@code false} otherwise 173 */ 174 public boolean isReplicaOffline(DN domainDN, int serverId) 175 { 176 return replicasOffline.getCSN(domainDN, serverId) != null; 177 } 178 179 /** 180 * Ensures the medium consistency point is updated by UpdateMsg. 181 * 182 * @param baseDN 183 * the baseDN of the domain for which the heartbeat is published 184 * @param updateMsg 185 * the updateMsg that will update the medium consistency point 186 * @throws ChangelogException 187 * If a database problem happened 188 */ 189 public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) 190 throws ChangelogException 191 { 192 if (!predicate.isECLEnabledDomain(baseDN)) 193 { 194 return; 195 } 196 197 final CSN oldestCSNBefore = getOldestLastAliveCSN(); 198 lastAliveCSNs.update(baseDN, updateMsg.getCSN()); 199 tryNotify(oldestCSNBefore); 200 } 201 202 /** 203 * Signals a replica went offline. 204 * 205 * @param baseDN 206 * the replica's replication domain 207 * @param offlineCSN 208 * the serverId and time of the replica that went offline 209 */ 210 public void replicaOffline(DN baseDN, CSN offlineCSN) 211 { 212 if (!predicate.isECLEnabledDomain(baseDN)) 213 { 214 return; 215 } 216 217 replicasOffline.update(baseDN, offlineCSN); 218 final CSN oldestCSNBefore = getOldestLastAliveCSN(); 219 lastAliveCSNs.update(baseDN, offlineCSN); 220 tryNotify(oldestCSNBefore); 221 } 222 223 private CSN getOldestLastAliveCSN() 224 { 225 return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond(); 226 } 227 228 /** 229 * Notifies the Change number indexer thread if it will be able to do some 230 * work. 231 */ 232 private void tryNotify(final CSN oldestCSNBefore) 233 { 234 if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore)) 235 { 236 synchronized (this) 237 { 238 notify(); 239 } 240 } 241 } 242 243 /** 244 * Used for waking up the {@link ChangeNumberIndexer} thread because it might 245 * have some work to do. 246 */ 247 private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore) 248 { 249 final CSN oldestCSNAfter = getOldestLastAliveCSN(); 250 // ensure that all initial replicas alive information have been updated 251 // with CSNs that are acceptable for moving the medium consistency forward 252 return allInitialReplicasAreOfflineOrAlive() 253 && oldestCSNBefore != null // then oldestCSNAfter cannot be null 254 // has the oldest CSN changed? 255 && oldestCSNBefore.isOlderThan(oldestCSNAfter); 256 } 257 258 /** 259 * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN 260 * must be persisted to the change number index DB. 261 */ 262 private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist) 263 { 264 // ensure that all initial replicas alive information have been updated 265 // with CSNs that are acceptable for moving the medium consistency forward 266 return allInitialReplicasAreOfflineOrAlive() 267 // can we persist the next CSN? 268 && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN()); 269 } 270 271 /** 272 * Returns true only if the initial replicas known from the changelog state DB 273 * are either: 274 * <ul> 275 * <li>offline, so do not wait for them in order to compute medium consistency 276 * </li> 277 * <li>alive, because we received heartbeats or changes (so their last alive 278 * CSN has been updated to something past the oldest possible CSN), we have 279 * enough info to compute medium consistency</li> 280 * </ul> 281 * In both cases, we have enough information to compute medium consistency 282 * without waiting any further. 283 */ 284 private boolean allInitialReplicasAreOfflineOrAlive() 285 { 286 for (DN baseDN : lastAliveCSNs) 287 { 288 for (CSN csn : lastAliveCSNs.getServerState(baseDN)) 289 { 290 if (csn.getTime() == 0 291 && replicasOffline.getCSN(baseDN, csn.getServerId()) == null) 292 { 293 // this is the oldest possible CSN, but the replica is not offline 294 // we must wait for more up to date information from this replica 295 return false; 296 } 297 } 298 } 299 return true; 300 } 301 302 /** 303 * Restores in memory data needed to build the CNIndexDB. In particular, 304 * initializes the changes cursor to the medium consistency point. 305 */ 306 private void initialize() throws ChangelogException 307 { 308 final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); 309 310 initializeLastAliveCSNs(domainDB); 311 initializeNextChangeCursor(domainDB); 312 initializeOfflineReplicas(); 313 314 // this will not be used any more. Discard for garbage collection. 315 this.changelogState = null; 316 } 317 318 private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException 319 { 320 // Initialize the multi domain cursor only from the change number index record. 321 // The cookie is always empty at this stage. 322 final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); 323 final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null; 324 final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn); 325 final MultiDomainServerState unused = new MultiDomainServerState(); 326 MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options); 327 328 nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint); 329 ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord); 330 } 331 332 private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB) 333 { 334 for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) 335 { 336 final DN baseDN = entry.getKey(); 337 if (predicate.isECLEnabledDomain(baseDN)) 338 { 339 for (Integer serverId : entry.getValue()) 340 { 341 /* 342 * initialize with the oldest possible CSN in order for medium 343 * consistency to wait for all replicas to be alive before moving forward 344 */ 345 lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); 346 } 347 348 final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); 349 lastAliveCSNs.update(baseDN, latestKnownState); 350 } 351 } 352 } 353 354 private void initializeOfflineReplicas() 355 { 356 final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas(); 357 for (DN baseDN : offlineReplicas) 358 { 359 for (CSN offlineCSN : offlineReplicas.getServerState(baseDN)) 360 { 361 if (predicate.isECLEnabledDomain(baseDN)) 362 { 363 replicasOffline.update(baseDN, offlineCSN); 364 // a replica offline message could also be the very last time 365 // we heard from this replica :) 366 lastAliveCSNs.update(baseDN, offlineCSN); 367 } 368 } 369 } 370 } 371 372 private CSN oldestPossibleCSN(int serverId) 373 { 374 return new CSN(0, 0, serverId); 375 } 376 377 /** {@inheritDoc} */ 378 @Override 379 public void initiateShutdown() 380 { 381 super.initiateShutdown(); 382 synchronized (this) 383 { 384 notify(); 385 } 386 } 387 388 /** {@inheritDoc} */ 389 @Override 390 public void run() 391 { 392 try 393 { 394 /* 395 * initialize here to allow fast application start up and avoid errors due 396 * cursors being created in a different thread to the one where they are used. 397 */ 398 initialize(); 399 400 while (!isShutdownInitiated()) 401 { 402 try 403 { 404 while (!domainsToClear.isEmpty()) 405 { 406 final DN baseDNToClear = domainsToClear.first(); 407 nextChangeForInsertDBCursor.removeDomain(baseDNToClear); 408 // Only release the waiting thread 409 // once this domain's state has been cleared. 410 domainsToClear.remove(baseDNToClear); 411 } 412 413 // Do not call DBCursor.next() here 414 // because we might not have consumed the last record, 415 // for example if we could not move the MCP forward 416 final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); 417 if (msg == null) 418 { 419 synchronized (this) 420 { 421 if (isShutdownInitiated()) 422 { 423 continue; 424 } 425 wait(); 426 } 427 // check whether new changes have been added to the ReplicaDBs 428 moveToNextChange(); 429 continue; 430 } 431 else if (msg instanceof ReplicaOfflineMsg) 432 { 433 moveToNextChange(); 434 continue; 435 } 436 437 final CSN csn = msg.getCSN(); 438 final DN baseDN = nextChangeForInsertDBCursor.getData(); 439 // FIXME problem: what if the serverId is not part of the ServerState? 440 // right now, change number will be blocked 441 if (!canMoveForwardMediumConsistencyPoint(csn)) 442 { 443 // the oldest record to insert is newer than the medium consistency 444 // point. Let's wait for a change that can be published. 445 synchronized (this) 446 { 447 // double check to protect against a missed call to notify() 448 if (!canMoveForwardMediumConsistencyPoint(csn)) 449 { 450 if (isShutdownInitiated()) 451 { 452 return; 453 } 454 wait(); 455 // loop to check if changes older than the medium consistency 456 // point have been added to the ReplicaDBs 457 continue; 458 } 459 } 460 } 461 462 // OK, the oldest change is older than the medium consistency point 463 // let's publish it to the CNIndexDB. 464 final long changeNumber = changelogDB.getChangeNumberIndexDB() 465 .addRecord(new ChangeNumberIndexRecord(baseDN, csn)); 466 if (!cookie.update(baseDN, csn)) 467 { 468 throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn 469 + ") would have updated the cookie=" + cookie + ", but it did not"); 470 } 471 notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg); 472 moveForwardMediumConsistencyPoint(csn, baseDN); 473 } 474 catch (InterruptedException ignored) 475 { 476 // was shutdown called? loop to figure it out. 477 Thread.currentThread().interrupt(); 478 } 479 } 480 } 481 catch (RuntimeException e) 482 { 483 logUnexpectedException(e); 484 // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. 485 throw e; 486 } 487 catch (Exception e) 488 { 489 logUnexpectedException(e); 490 // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. 491 throw new RuntimeException(e); 492 } 493 finally 494 { 495 nextChangeForInsertDBCursor.close(); 496 nextChangeForInsertDBCursor = null; 497 } 498 } 499 500 private void moveToNextChange() throws ChangelogException 501 { 502 try 503 { 504 nextChangeForInsertDBCursor.next(); 505 } 506 catch (AbortedChangelogCursorException e) { 507 if (domainsToClear.isEmpty()) 508 { 509 // There is no domain to clear, thus it is 510 // not expected that a cursor is aborted 511 throw e; 512 } 513 // else assumes the aborted cursor is part of a domain 514 // that will be removed on the next iteration 515 logger.trace("Cursor was aborted: %s, but continuing because domainsToClear has size %s", 516 e, domainsToClear.size()); 517 } 518 } 519 520 /** 521 * Notifies the {@link ChangelogBackend} that a new entry has been added. 522 * 523 * @param baseDN 524 * the baseDN of the newly added entry. 525 * @param changeNumber 526 * the change number of the newly added entry. It will be greater 527 * than zero for entries added to the change number index and less 528 * than or equal to zero for entries added to any replica DB 529 * @param cookie 530 * the cookie of the newly added entry. This is only meaningful for 531 * entries added to the change number index 532 * @param msg 533 * the update message of the newly added entry 534 * @throws ChangelogException 535 * If a problem occurs while notifying of the newly added entry. 536 */ 537 protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber, 538 MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException 539 { 540 ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg); 541 } 542 543 /** 544 * Nothing can be done about it. 545 * <p> 546 * Rely on the DirectoryThread uncaught exceptions handler for logging error + 547 * alert. 548 * <p> 549 * Message logged here gives corrective information to the administrator. 550 */ 551 private void logUnexpectedException(Exception e) 552 { 553 logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION, 554 getClass().getSimpleName(), stackTraceToSingleLineString(e)); 555 } 556 557 private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException 558 { 559 final int mcServerId = mcCSN.getServerId(); 560 final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); 561 final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); 562 if (offlineCSN != null) 563 { 564 if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN)) 565 { 566 // replica is back online, we can forget the last time it was offline 567 replicasOffline.removeCSN(mcBaseDN, offlineCSN); 568 } 569 else if (offlineCSN.isOlderThan(mcCSN)) 570 { 571 /* 572 * replica is not back online, Medium consistency point has gone past 573 * its last offline time, and there are no more changes after the 574 * offline CSN in the cursor: remove everything known about it 575 * (offlineCSN from lastAliveCSN and remove all knowledge of this replica 576 * from the medium consistency RUV). 577 */ 578 lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); 579 } 580 } 581 582 // advance the cursor we just read from, 583 // success/failure will be checked later 584 nextChangeForInsertDBCursor.next(); 585 } 586 587 /** 588 * Asks the current thread to clear its state for the specified domain. 589 * <p> 590 * Note: This method blocks the current thread until state is cleared. 591 * 592 * @param baseDN the baseDN to be cleared from this thread's state. 593 * {@code null} and {@link DN#NULL_DN} mean "clear all domains". 594 */ 595 public void clear(DN baseDN) 596 { 597 // Use DN.NULL_DN to say "clear all domains" 598 final DN baseDNToClear = baseDN != null ? baseDN : DN.NULL_DN; 599 domainsToClear.add(baseDNToClear); 600 while (domainsToClear.contains(baseDNToClear) 601 && !State.TERMINATED.equals(getState())) 602 { 603 // wait until clear() has been done by thread, always waking it up 604 synchronized (this) 605 { 606 notify(); 607 } 608 // ensures thread wait that this thread's state is cleaned up 609 Thread.yield(); 610 } 611 } 612 613}