001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2009 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server.changelog.je; 028 029import java.io.File; 030import java.io.UnsupportedEncodingException; 031import java.util.AbstractMap.SimpleImmutableEntry; 032import java.util.*; 033import java.util.Map.Entry; 034import java.util.concurrent.CopyOnWriteArrayList; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037 038import org.forgerock.i18n.LocalizableMessage; 039import org.forgerock.i18n.slf4j.LocalizedLogger; 040import org.opends.server.replication.common.CSN; 041import org.opends.server.replication.server.ChangelogState; 042import org.opends.server.replication.server.ReplicationServer; 043import org.opends.server.replication.server.changelog.api.ChangelogException; 044import org.opends.server.types.DN; 045import org.opends.server.types.DirectoryException; 046 047import com.sleepycat.je.*; 048 049import static com.sleepycat.je.EnvironmentConfig.*; 050import static com.sleepycat.je.OperationStatus.*; 051 052import static org.opends.messages.BackendMessages.*; 053import static org.opends.messages.ReplicationMessages.*; 054import static org.opends.server.util.StaticUtils.*; 055 056/** 057 * This class represents a DB environment that acts as a factory for 058 * ReplicationDBs. 059 */ 060public class ReplicationDbEnv 061{ 062 private Environment dbEnvironment; 063 private Database changelogStateDb; 064 /** 065 * The current changelogState. This is in-memory version of what is inside the 066 * on-disk changelogStateDB. It improves performances in case the 067 * changelogState is read often. 068 * 069 * @GuardedBy("stateLock") 070 */ 071 private final ChangelogState changelogState; 072 /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState. */ 073 private final Object stateLock = new Object(); 074 private final List<Database> allDbs = new CopyOnWriteArrayList<>(); 075 private ReplicationServer replicationServer; 076 private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); 077 private static final String GENERATION_ID_TAG = "GENID"; 078 private static final String OFFLINE_TAG = "OFFLINE"; 079 private static final String FIELD_SEPARATOR = " "; 080 /** The tracer object for the debug logger. */ 081 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 082 083 /** 084 * Initialize this class. 085 * Creates Db environment that will be used to create databases. 086 * It also reads the currently known databases from the "changelogstate" 087 * database. 088 * @param path Path where the backing files must be created. 089 * @param replicationServer the ReplicationServer that creates this 090 * ReplicationDbEnv. 091 * @throws ChangelogException If an Exception occurred that prevented 092 * the initialization to happen. 093 */ 094 public ReplicationDbEnv(String path, ReplicationServer replicationServer) 095 throws ChangelogException 096 { 097 this.replicationServer = replicationServer; 098 099 try 100 { 101 dbEnvironment = openJEEnvironment(path); 102 103 /* 104 * One database is created to store the update from each LDAP server in 105 * the topology. The database "changelogstate" is used to store the list 106 * of all the servers that have been seen in the past. 107 */ 108 changelogStateDb = openDatabase("changelogstate"); 109 changelogState = readOnDiskChangelogState(); 110 } 111 catch (RuntimeException e) 112 { 113 throw new ChangelogException(e); 114 } 115 } 116 117 /** 118 * Open a JE environment. 119 * <p> 120 * protected so it can be overridden by tests. 121 * 122 * @param path 123 * the path to the JE environment in the filesystem 124 * @return the opened JE environment 125 */ 126 protected Environment openJEEnvironment(String path) 127 { 128 final EnvironmentConfig envConfig = new EnvironmentConfig(); 129 130 /* 131 * Create the DB Environment that will be used for all the 132 * ReplicationServer activities related to the db 133 */ 134 envConfig.setAllowCreate(true); 135 envConfig.setTransactional(true); 136 envConfig.setConfigParam(STATS_COLLECT, "false"); 137 envConfig.setConfigParam(CLEANER_THREADS, "2"); 138 envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true"); 139 /* 140 * Tests have shown that since the parsing of the Replication log is 141 * always done sequentially, it is not necessary to use a large DB cache. 142 */ 143 if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024) 144 { 145 /* 146 * If the JVM is reasonably large then we can safely default to bigger 147 * read buffers. This will result in more scalable checkpointer and 148 * cleaner performance. 149 */ 150 envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2)); 151 envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2)); 152 envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4)); 153 154 /* 155 * The cache size must be bigger in order to accommodate the larger 156 * buffers - see OPENDJ-943. 157 */ 158 envConfig.setConfigParam(MAX_MEMORY, mb(16)); 159 } 160 else 161 { 162 /* 163 * Use 5M so that the replication can be used with 64M total for the 164 * JVM. 165 */ 166 envConfig.setConfigParam(MAX_MEMORY, mb(5)); 167 } 168 169 // Since records are always added at the end of the Replication log and 170 // deleted at the beginning of the Replication log, this should never 171 // cause any deadlock. 172 envConfig.setTxnTimeout(0, TimeUnit.SECONDS); 173 envConfig.setLockTimeout(0, TimeUnit.SECONDS); 174 175 // Since replication provides durability, we can reduce the DB durability 176 // level so that we are immune to application / JVM crashes. 177 envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC); 178 179 return new Environment(new File(path), envConfig); 180 } 181 182 private String kb(int sizeInKb) 183 { 184 return String.valueOf(sizeInKb * 1024); 185 } 186 187 private String mb(int sizeInMb) 188 { 189 return String.valueOf(sizeInMb * 1024 * 1024); 190 } 191 192 /** 193 * Open a JE database. 194 * <p> 195 * protected so it can be overridden by tests. 196 * 197 * @param databaseName 198 * the databaseName to open 199 * @return the opened JE database 200 * @throws ChangelogException 201 * if a problem happened opening the database 202 * @throws RuntimeException 203 * if a problem happened with the JE database 204 */ 205 protected Database openDatabase(String databaseName) 206 throws ChangelogException, RuntimeException 207 { 208 if (isShuttingDown.get()) 209 { 210 throw new ChangelogException( 211 WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get( 212 databaseName, replicationServer.getServerId())); 213 } 214 final DatabaseConfig dbConfig = new DatabaseConfig(); 215 dbConfig.setAllowCreate(true); 216 dbConfig.setTransactional(true); 217 final Database db = 218 dbEnvironment.openDatabase(null, databaseName, dbConfig); 219 if (isShuttingDown.get()) 220 { 221 closeDB(db); 222 throw new ChangelogException( 223 WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get( 224 databaseName, replicationServer.getServerId())); 225 } 226 allDbs.add(db); 227 return db; 228 } 229 230 /** 231 * Return the current changelog state. 232 * 233 * @return the current {@link ChangelogState} 234 */ 235 public ChangelogState getChangelogState() 236 { 237 return changelogState; 238 } 239 240 /** 241 * Read and return the changelog state from the database. 242 * 243 * @return the {@link ChangelogState} read from the changelogState DB 244 * @throws ChangelogException 245 * if a database problem occurs 246 */ 247 protected ChangelogState readOnDiskChangelogState() throws ChangelogException 248 { 249 return decodeChangelogState(readWholeState()); 250 } 251 252 /** 253 * Decode the whole changelog state DB. 254 * 255 * @param wholeState 256 * the whole changelog state DB as a Map. 257 * The Map is only used as a convenient collection of key => data objects 258 * @return the decoded changelog state 259 * @throws ChangelogException 260 * if a problem occurred while decoding 261 */ 262 ChangelogState decodeChangelogState(Map<byte[], byte[]> wholeState) 263 throws ChangelogException 264 { 265 try 266 { 267 final ChangelogState result = new ChangelogState(); 268 for (Entry<byte[], byte[]> entry : wholeState.entrySet()) 269 { 270 final String stringKey = toString(entry.getKey()); 271 final String stringData = toString(entry.getValue()); 272 273 if (logger.isTraceEnabled()) 274 { 275 debug("read (key, data)=(" + stringKey + ", " + stringData + ")"); 276 } 277 278 final String prefix = stringKey.split(FIELD_SEPARATOR)[0]; 279 if (prefix.equals(GENERATION_ID_TAG)) 280 { 281 final String[] str = stringData.split(FIELD_SEPARATOR, 3); 282 final long generationId = toLong(str[1]); 283 final DN baseDN = DN.valueOf(str[2]); 284 285 if (logger.isTraceEnabled()) 286 { 287 debug("has read generationId: baseDN=" + baseDN + " generationId=" 288 + generationId); 289 } 290 result.setDomainGenerationId(baseDN, generationId); 291 } 292 else if (prefix.equals(OFFLINE_TAG)) 293 { 294 final String[] str = stringData.split(FIELD_SEPARATOR, 3); 295 long timestamp = toLong(str[0]); 296 final int serverId = toInt(str[1]); 297 final DN baseDN = DN.valueOf(str[2]); 298 if (logger.isTraceEnabled()) 299 { 300 debug("has read replica offline: baseDN=" + baseDN + " serverId=" 301 + serverId); 302 } 303 result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId)); 304 } 305 else 306 { 307 final String[] str = stringData.split(FIELD_SEPARATOR, 2); 308 final int serverId = toInt(str[0]); 309 final DN baseDN = DN.valueOf(str[1]); 310 311 if (logger.isTraceEnabled()) 312 { 313 debug("has read replica: baseDN=" + baseDN + " serverId=" 314 + serverId); 315 } 316 result.addServerIdToDomain(serverId, baseDN); 317 } 318 } 319 return result; 320 } 321 catch (DirectoryException e) 322 { 323 throw new ChangelogException(e.getMessageObject(), e); 324 } 325 } 326 327 private Map<byte[], byte[]> readWholeState() throws ChangelogException 328 { 329 DatabaseEntry key = new DatabaseEntry(); 330 DatabaseEntry data = new DatabaseEntry(); 331 Cursor cursor = changelogStateDb.openCursor(null, null); 332 333 try 334 { 335 final Map<byte[], byte[]> results = new LinkedHashMap<>(); 336 337 OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); 338 while (status == OperationStatus.SUCCESS) 339 { 340 results.put(key.getData(), data.getData()); 341 status = cursor.getNext(key, data, LockMode.DEFAULT); 342 } 343 344 return results; 345 } 346 catch (RuntimeException e) 347 { 348 throw new ChangelogException(ERR_DATABASE_EXCEPTION.get(e.getMessage()), e); 349 } 350 finally 351 { 352 close(cursor); 353 } 354 } 355 356 private int toInt(String data) throws ChangelogException 357 { 358 try 359 { 360 return Integer.parseInt(data); 361 } 362 catch (NumberFormatException e) 363 { 364 // should never happen 365 // TODO: i18n 366 throw new ChangelogException(LocalizableMessage.raw( 367 "replicationServer state database has a wrong format: " 368 + e.getLocalizedMessage() + "<" + data + ">")); 369 } 370 } 371 372 private long toLong(String data) throws ChangelogException 373 { 374 try 375 { 376 return Long.parseLong(data); 377 } 378 catch (NumberFormatException e) 379 { 380 // should never happen 381 // TODO: i18n 382 throw new ChangelogException(LocalizableMessage.raw( 383 "replicationServer state database has a wrong format: " 384 + e.getLocalizedMessage() + "<" + data + ">")); 385 } 386 } 387 388 private String toString(byte[] data) throws ChangelogException 389 { 390 try 391 { 392 return new String(data, "UTF-8"); 393 } 394 catch (UnsupportedEncodingException e) 395 { 396 // should never happens 397 // TODO: i18n 398 throw new ChangelogException(LocalizableMessage.raw("need UTF-8 support")); 399 } 400 } 401 402 /** 403 * Converts the string to a UTF8-encoded byte array. 404 * 405 * @param s 406 * the string to convert 407 * @return the byte array representation of the UTF8-encoded string 408 */ 409 static byte[] toBytes(String s) 410 { 411 try 412 { 413 return s.getBytes("UTF-8"); 414 } 415 catch (UnsupportedEncodingException e) 416 { 417 // can't happen 418 return null; 419 } 420 } 421 422 /** 423 * Finds or creates the database used to store changes for a replica with the 424 * given baseDN and serverId. 425 * 426 * @param serverId 427 * The server id that identifies the server. 428 * @param baseDN 429 * The baseDN that identifies the domain. 430 * @param generationId 431 * The generationId associated to this domain. 432 * @return the Database. 433 * @throws ChangelogException 434 * in case of underlying Exception. 435 */ 436 public Database getOrAddReplicationDB(int serverId, DN baseDN, long generationId) 437 throws ChangelogException 438 { 439 if (logger.isTraceEnabled()) 440 { 441 debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", " 442 + generationId + ")"); 443 } 444 try 445 { 446 // JNR: redundant info is stored between the key and data down below. 447 // It is probably ok since "changelogstate" DB does not receive a high 448 // volume of inserts. 449 Entry<String, String> replicaEntry = toReplicaEntry(baseDN, serverId); 450 451 // Opens the DB for the changes received from this server on this domain. 452 final Database replicaDB = openDatabase(replicaEntry.getKey()); 453 454 synchronized (stateLock) 455 { 456 putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); 457 changelogState.addServerIdToDomain(serverId, baseDN); 458 putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); 459 changelogState.setDomainGenerationId(baseDN, generationId); 460 } 461 return replicaDB; 462 } 463 catch (RuntimeException e) 464 { 465 throw new ChangelogException(e); 466 } 467 } 468 469 /** 470 * Return an entry to store in the changelog state database representing a 471 * replica in the topology. 472 * 473 * @param baseDN 474 * the replica's baseDN 475 * @param serverId 476 * the replica's serverId 477 * @return a database entry for the replica 478 */ 479 static Entry<String, String> toReplicaEntry(DN baseDN, int serverId) 480 { 481 final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString(); 482 final String value = serverId + FIELD_SEPARATOR + baseDN; 483 return toEntry(key, value); 484 } 485 486 /** 487 * Return an entry to store in the changelog state database representing the 488 * domain generation id. 489 * 490 * @param baseDN 491 * the domain's baseDN 492 * @param generationId 493 * the domain's generationId 494 * @return a database entry for the generationId 495 */ 496 static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId) 497 { 498 final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString(); 499 final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR + baseDN; 500 return toEntry(toBytes(key), toBytes(data)); 501 } 502 503 /** 504 * Converts an Entry<String, String> to an Entry<byte[], byte[]>. 505 * 506 * @param entry 507 * the entry to convert 508 * @return the converted entry 509 */ 510 static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry) 511 { 512 return toEntry(toBytes(entry.getKey()), toBytes(entry.getValue())); 513 } 514 515 /** 516 * Return an entry to store in the changelog state database representing the 517 * time a replica went offline. 518 * 519 * @param baseDN 520 * the replica's baseDN 521 * @param offlineCSN 522 * the replica's serverId and offline timestamp 523 * @return a database entry representing the time a replica went offline 524 */ 525 static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN) 526 { 527 final int serverId = offlineCSN.getServerId(); 528 final byte[] key = toReplicaOfflineKey(baseDN, serverId); 529 final byte[] data = toBytes(offlineCSN.getTime() + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN); 530 return toEntry(key, data); 531 } 532 533 /** 534 * Return the key for a replica offline entry in the changelog state database. 535 * 536 * @param baseDN 537 * the replica's baseDN 538 * @param serverId 539 * the replica's serverId 540 * @return the key used in the database to store offline time of the replica 541 */ 542 private static byte[] toReplicaOfflineKey(DN baseDN, int serverId) 543 { 544 return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString()); 545 } 546 547 /** Returns an entry with the provided key and a null value. */ 548 private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key) 549 { 550 return toEntry(key, null); 551 } 552 553 private static <K, V> SimpleImmutableEntry<K, V> toEntry(final K key, final V value) 554 { 555 return new SimpleImmutableEntry<>(key, value); 556 } 557 558 private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry) 559 throws ChangelogException, RuntimeException 560 { 561 DatabaseEntry key = new DatabaseEntry(entry.getKey()); 562 DatabaseEntry data = new DatabaseEntry(); 563 if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND) 564 { 565 Transaction txn = dbEnvironment.beginTransaction(null, null); 566 try 567 { 568 data.setData(entry.getValue()); 569 if (logger.isTraceEnabled()) 570 { 571 debug("putting record in the changelogstate Db key=[" 572 + toString(entry.getKey()) + "] value=[" 573 + toString(entry.getValue()) + "]"); 574 } 575 changelogStateDb.put(txn, key, data); 576 txn.commit(Durability.COMMIT_WRITE_NO_SYNC); 577 } 578 catch (DatabaseException dbe) 579 { 580 // Abort the txn and propagate the Exception to the caller 581 txn.abort(); 582 throw dbe; 583 } 584 } 585 } 586 587 /** 588 * Creates a new transaction. 589 * 590 * @return the transaction. 591 * @throws ChangelogException in case of underlying exception 592 */ 593 public Transaction beginTransaction() throws ChangelogException 594 { 595 try 596 { 597 return dbEnvironment.beginTransaction(null, null); 598 } 599 catch (RuntimeException e) 600 { 601 throw new ChangelogException(e); 602 } 603 } 604 605 /** 606 * Shutdown the Db environment. 607 */ 608 public void shutdown() 609 { 610 isShuttingDown.set(true); 611 // CopyOnWriteArrayList iterator never throw ConcurrentModificationException 612 // This code rely on openDatabase() to close databases opened concurrently 613 // with this code 614 final Database[] allDbsCopy = allDbs.toArray(new Database[0]); 615 allDbs.clear(); 616 for (Database db : allDbsCopy) 617 { 618 closeDB(db); 619 } 620 621 try 622 { 623 dbEnvironment.close(); 624 } 625 catch (DatabaseException e) 626 { 627 logger.error(closeDBErrorMessage(null, e)); 628 } 629 } 630 631 private void closeDB(Database db) 632 { 633 allDbs.remove(db); 634 try 635 { 636 db.close(); 637 } 638 catch (DatabaseException e) 639 { 640 logger.error(closeDBErrorMessage(db.getDatabaseName(), e)); 641 } 642 } 643 644 private LocalizableMessage closeDBErrorMessage(String dbName, DatabaseException e) 645 { 646 if (dbName != null) 647 { 648 return NOTE_EXCEPTION_CLOSING_DATABASE.get(dbName, 649 stackTraceToSingleLineString(e)); 650 } 651 return ERR_ERROR_CLOSING_CHANGELOG_ENV.get(stackTraceToSingleLineString(e)); 652 } 653 654 /** 655 * Clears the provided generationId associated to the provided baseDN from the 656 * state Db. 657 * 658 * @param baseDN 659 * The baseDN for which the generationID must be cleared. 660 * @throws ChangelogException 661 * If a database problem happened 662 */ 663 public void clearGenerationId(DN baseDN) throws ChangelogException 664 { 665 synchronized (stateLock) 666 { 667 final int unusedGenId = 0; 668 deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId), 669 "clearGenerationId(baseDN=" + baseDN + ")"); 670 changelogState.setDomainGenerationId(baseDN, unusedGenId); 671 } 672 } 673 674 /** 675 * Clears the provided serverId associated to the provided baseDN from the 676 * state Db. 677 * 678 * @param baseDN 679 * The baseDN for which the serverId must be cleared. 680 * @param serverId 681 * The serverId to remove from the Db. 682 * @throws ChangelogException 683 * If a database problem happened 684 */ 685 public void clearServerId(DN baseDN, int serverId) throws ChangelogException 686 { 687 synchronized (stateLock) 688 { 689 deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), 690 "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); 691 changelogState.setDomainGenerationId(baseDN, -1); 692 } 693 } 694 695 private void deleteFromChangelogStateDB(Entry<byte[], ?> entry, 696 String methodInvocation) throws ChangelogException 697 { 698 if (logger.isTraceEnabled()) 699 { 700 debug(methodInvocation + " starting"); 701 } 702 703 try 704 { 705 final DatabaseEntry key = new DatabaseEntry(entry.getKey()); 706 final DatabaseEntry data = new DatabaseEntry(); 707 if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS) 708 { 709 Transaction txn = dbEnvironment.beginTransaction(null, null); 710 try 711 { 712 changelogStateDb.delete(txn, key); 713 txn.commit(Durability.COMMIT_WRITE_NO_SYNC); 714 if (logger.isTraceEnabled()) 715 { 716 debug(methodInvocation + " succeeded"); 717 } 718 } 719 catch (RuntimeException dbe) 720 { 721 // Abort the txn and propagate the Exception to the caller 722 txn.abort(); 723 throw dbe; 724 } 725 } 726 else if (logger.isTraceEnabled()) 727 { 728 debug(methodInvocation + " failed: key not found"); 729 } 730 } 731 catch (RuntimeException e) 732 { 733 if (logger.isTraceEnabled()) 734 { 735 debug(methodInvocation + " error: " + stackTraceToSingleLineString(e)); 736 } 737 throw new ChangelogException(e); 738 } 739 } 740 741 /** 742 * Notify that replica is offline. 743 * <p> 744 * This information is stored in the changelog state DB. 745 * 746 * @param baseDN 747 * the domain of the offline replica 748 * @param offlineCSN 749 * the offline replica serverId and offline timestamp 750 * @throws ChangelogException 751 * if a database problem occurred 752 */ 753 public void notifyReplicaOffline(DN baseDN, CSN offlineCSN) 754 throws ChangelogException 755 { 756 synchronized (stateLock) 757 { 758 // just overwrite any older entry as it is assumed a newly received offline 759 // CSN is newer than the previous one 760 putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), 761 "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); 762 changelogState.addOfflineReplica(baseDN, offlineCSN); 763 } 764 } 765 766 /** 767 * Notify that replica is online. 768 * <p> 769 * Update the changelog state DB if necessary (ie, replica was known to be 770 * offline). 771 * 772 * @param baseDN 773 * the domain of replica 774 * @param serverId 775 * the serverId of replica 776 * @throws ChangelogException 777 * if a database problem occurred 778 */ 779 public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException 780 { 781 deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)), 782 "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")"); 783 } 784 785 private void putInChangelogStateDB(Entry<byte[], byte[]> entry, 786 String methodInvocation) throws ChangelogException 787 { 788 if (logger.isTraceEnabled()) 789 { 790 debug(methodInvocation + " starting"); 791 } 792 793 try 794 { 795 final DatabaseEntry key = new DatabaseEntry(entry.getKey()); 796 final DatabaseEntry data = new DatabaseEntry(entry.getValue()); 797 changelogStateDb.put(null, key, data); 798 if (logger.isTraceEnabled()) 799 { 800 debug(methodInvocation + " succeeded"); 801 } 802 } 803 catch (RuntimeException e) 804 { 805 if (logger.isTraceEnabled()) 806 { 807 debug(methodInvocation + " error: " + stackTraceToSingleLineString(e)); 808 } 809 throw new ChangelogException(e); 810 } 811 } 812 813 /** 814 * Clears the database. 815 * 816 * @param db 817 * The database to clear. 818 */ 819 public final void clearDb(Database db) 820 { 821 String databaseName = db.getDatabaseName(); 822 823 // Closing is requested by Berkeley JE before truncate 824 db.close(); 825 826 Transaction txn = null; 827 try 828 { 829 txn = dbEnvironment.beginTransaction(null, null); 830 dbEnvironment.truncateDatabase(txn, databaseName, false); 831 txn.commit(Durability.COMMIT_WRITE_NO_SYNC); 832 txn = null; 833 } 834 catch (RuntimeException e) 835 { 836 logger.error(ERR_ERROR_CLEARING_DB, databaseName, 837 e.getMessage() + " " + stackTraceToSingleLineString(e)); 838 } 839 finally 840 { 841 try 842 { 843 if (txn != null) 844 { 845 txn.abort(); 846 } 847 } 848 catch(Exception e) 849 { /* do nothing */ } 850 } 851 } 852 853 /** 854 * Get or create a db to manage integer change number associated 855 * to multidomain server state. 856 * TODO:ECL how to manage compatibility of this db with new domains 857 * added or removed ? 858 * @return the retrieved or created db. 859 * @throws ChangelogException when a problem occurs. 860 */ 861 public Database getOrCreateCNIndexDB() throws ChangelogException 862 { 863 try 864 { 865 // Opens the database for change number associated to this domain. 866 // Create it if it does not already exist. 867 return openDatabase("draftcndb"); 868 } 869 catch (RuntimeException e) 870 { 871 throw new ChangelogException(e); 872 } 873 } 874 875 /** 876 * Shuts down replication when an unexpected database exception occurs. Note 877 * that we do not expect lock timeouts or txn timeouts because the replication 878 * databases are deadlock free, thus all operations should complete 879 * eventually. 880 * 881 * @param e 882 * The unexpected database exception. 883 */ 884 void shutdownOnException(DatabaseException e) 885 { 886 logger.error(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR, stackTraceToSingleLineString(e)); 887 replicationServer.shutdown(); 888 } 889 890 private void debug(String message) 891 { 892 // replication server may be null in tests 893 logger.trace("In %s, %s", 894 replicationServer != null ? replicationServer.getMonitorInstanceName() : "[test]", 895 message); 896 } 897 898}