001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2009 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server.changelog.je; 028 029import static com.sleepycat.je.LockMode.*; 030import static com.sleepycat.je.OperationStatus.*; 031 032import static org.opends.messages.ReplicationMessages.*; 033import static org.opends.server.util.StaticUtils.*; 034 035import java.io.Closeable; 036import java.util.concurrent.locks.ReadWriteLock; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038 039import org.forgerock.i18n.slf4j.LocalizedLogger; 040import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 041import org.opends.server.replication.server.changelog.api.ChangelogException; 042 043import com.sleepycat.je.*; 044 045/** 046 * This class implements the interface between the underlying database 047 * and the {@link JEChangeNumberIndexDB} class. 048 * This is the only class that should have code using the BDB interfaces. 049 */ 050public class DraftCNDB 051{ 052 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 053 054 private Database db; 055 private ReplicationDbEnv dbenv; 056 057 /** 058 * The lock used to provide exclusive access to the thread that close the db 059 * (shutdown or clear). 060 */ 061 private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true); 062 063 /** 064 * Creates a new database or open existing database that will be used 065 * to store and retrieve changes from an LDAP server. 066 * @param dbenv The Db environment to use to create the db. 067 * @throws ChangelogException If a database problem happened 068 */ 069 public DraftCNDB(ReplicationDbEnv dbenv) throws ChangelogException 070 { 071 this.dbenv = dbenv; 072 073 // Get or create the associated ReplicationServerDomain and Db. 074 db = dbenv.getOrCreateCNIndexDB(); 075 } 076 077 /** 078 * Add a record to the database. 079 * 080 * @param record 081 * the provided {@link ChangeNumberIndexRecord} to be stored. 082 * @throws ChangelogException 083 * If a database problem happened 084 */ 085 public void addRecord(ChangeNumberIndexRecord record) 086 throws ChangelogException 087 { 088 try 089 { 090 final long changeNumber = record.getChangeNumber(); 091 DatabaseEntry key = new ReplicationDraftCNKey(changeNumber); 092 DatabaseEntry data = new DraftCNData(changeNumber, record.getBaseDN(), record.getCSN()); 093 094 // Use a transaction so that we can override durability. 095 Transaction txn = null; 096 dbCloseLock.readLock().lock(); 097 try 098 { 099 // If the DB has been closed then return immediately. 100 if (isDBClosed()) 101 { 102 return; 103 } 104 105 txn = dbenv.beginTransaction(); 106 db.put(txn, key, data); 107 txn.commit(Durability.COMMIT_WRITE_NO_SYNC); 108 } 109 finally 110 { 111 JEUtils.abort(txn); 112 dbCloseLock.readLock().unlock(); 113 } 114 } 115 catch (DatabaseException e) 116 { 117 throw new ChangelogException(e); 118 } 119 } 120 121 /** 122 * Shutdown the database. 123 */ 124 public void shutdown() 125 { 126 dbCloseLock.writeLock().lock(); 127 try 128 { 129 db.close(); 130 db = null; 131 } 132 catch (DatabaseException e) 133 { 134 logger.info(NOTE_EXCEPTION_CLOSING_DATABASE, this, stackTraceToSingleLineString(e)); 135 } 136 finally 137 { 138 dbCloseLock.writeLock().unlock(); 139 } 140 } 141 142 /** 143 * Create a cursor that can be used to search or iterate on this DB. 144 * 145 * @param changeNumber The change number from which the cursor must start. 146 * @return The ReplServerDBCursor 147 * @throws ChangelogException If a database error prevented the cursor 148 * creation. 149 */ 150 public DraftCNDBCursor openReadCursor(long changeNumber) 151 throws ChangelogException 152 { 153 return new DraftCNDBCursor(changeNumber); 154 } 155 156 /** 157 * Create a cursor that can be used to delete some record from this 158 * ReplicationServer database. 159 * 160 * @return The ReplServerDBCursor 161 * @throws ChangelogException If a database error prevented the cursor 162 * creation. 163 */ 164 public DraftCNDBCursor openDeleteCursor() throws ChangelogException 165 { 166 return new DraftCNDBCursor(); 167 } 168 169 private void closeLockedCursor(Cursor cursor) 170 { 171 try 172 { 173 close(cursor); 174 } 175 finally 176 { 177 dbCloseLock.readLock().unlock(); 178 } 179 } 180 181 /** 182 * Read the first Change from the database, 0 when none. 183 * 184 * @return the first change number. 185 * @throws ChangelogException 186 * if a database problem occurred 187 */ 188 public ChangeNumberIndexRecord readFirstRecord() throws ChangelogException 189 { 190 try 191 { 192 dbCloseLock.readLock().lock(); 193 Cursor cursor = null; 194 try 195 { 196 // If the DB has been closed then return immediately. 197 if (isDBClosed()) 198 { 199 return null; 200 } 201 202 cursor = db.openCursor(null, null); 203 ReplicationDraftCNKey key = new ReplicationDraftCNKey(); 204 DatabaseEntry entry = new DatabaseEntry(); 205 if (cursor.getFirst(key, entry, LockMode.DEFAULT) != SUCCESS) 206 { 207 return null; 208 } 209 210 return newCNIndexRecord(key, entry); 211 } 212 finally 213 { 214 closeLockedCursor(cursor); 215 } 216 } 217 catch (DatabaseException e) 218 { 219 throw new ChangelogException(e); 220 } 221 } 222 223 private ChangeNumberIndexRecord newCNIndexRecord(ReplicationDraftCNKey key, 224 DatabaseEntry data) throws ChangelogException 225 { 226 return new DraftCNData(key.getChangeNumber(), data.getData()).getRecord(); 227 } 228 229 /** 230 * Return the record count. 231 * @return the record count. 232 */ 233 public long count() 234 { 235 dbCloseLock.readLock().lock(); 236 try 237 { 238 // If the DB has been closed then return immediately. 239 if (isDBClosed()) 240 { 241 return 0; 242 } 243 244 return db.count(); 245 } 246 catch (DatabaseException e) 247 { 248 logger.traceException(e); 249 } 250 finally 251 { 252 dbCloseLock.readLock().unlock(); 253 } 254 return 0; 255 } 256 257 /** 258 * Read the last change number from the database. 259 * 260 * @return the last change number. 261 * @throws ChangelogException 262 * if a database problem occurred 263 */ 264 public ChangeNumberIndexRecord readLastRecord() throws ChangelogException 265 { 266 try 267 { 268 dbCloseLock.readLock().lock(); 269 Cursor cursor = null; 270 try 271 { 272 // If the DB has been closed then return immediately. 273 if (isDBClosed()) 274 { 275 return null; 276 } 277 278 cursor = db.openCursor(null, null); 279 ReplicationDraftCNKey key = new ReplicationDraftCNKey(); 280 DatabaseEntry entry = new DatabaseEntry(); 281 if (cursor.getLast(key, entry, LockMode.DEFAULT) != SUCCESS) 282 { 283 return null; 284 } 285 286 return newCNIndexRecord(key, entry); 287 } 288 finally 289 { 290 closeLockedCursor(cursor); 291 } 292 } 293 catch (DatabaseException e) 294 { 295 throw new ChangelogException(e); 296 } 297 } 298 299 /** {@inheritDoc} */ 300 @Override 301 public String toString() 302 { 303 return getClass().getSimpleName(); 304 } 305 306 /** 307 * This Class implements a cursor that can be used to browse the database. 308 */ 309 public class DraftCNDBCursor implements Closeable 310 { 311 private final Cursor cursor; 312 313 /** 314 * The transaction that will protect the actions done with the cursor. 315 * Will be let null for a read cursor. 316 * Will be set non null for a write cursor. 317 */ 318 private final Transaction txn; 319 private final ReplicationDraftCNKey key; 320 private final DatabaseEntry entry = new DatabaseEntry(); 321 private ChangeNumberIndexRecord record; 322 private boolean isClosed; 323 324 325 /** 326 * Creates a cursor that can be used for browsing the db. 327 * 328 * @param startChangeNumber 329 * the change number from which the cursor must start. 330 * @throws ChangelogException 331 * If a database problem happened 332 */ 333 private DraftCNDBCursor(long startChangeNumber) throws ChangelogException 334 { 335 this.key = new ReplicationDraftCNKey(startChangeNumber); 336 337 // Take the lock. From now on, whatever error that happen in the life 338 // of this cursor should end by unlocking that lock. We must also 339 // unlock it when throwing an exception. 340 dbCloseLock.readLock().lock(); 341 342 boolean cursorHeld = false; 343 Cursor localCursor = null; 344 try 345 { 346 // If the DB has been closed then create empty cursor. 347 if (isDBClosed()) 348 { 349 isClosed = true; 350 txn = null; 351 cursor = null; 352 return; 353 } 354 355 localCursor = db.openCursor(null, null); 356 if (startChangeNumber >= 0) 357 { 358 if (localCursor.getSearchKey(key, entry, LockMode.DEFAULT) != SUCCESS) 359 { 360 // We could not move the cursor to the expected startChangeNumber 361 if (localCursor.getSearchKeyRange(key, entry, DEFAULT) != SUCCESS) 362 { 363 // We could not even move the cursor close to it 364 // => return an empty cursor 365 isClosed = true; 366 txn = null; 367 cursor = null; 368 return; 369 } 370 371 if (localCursor.getPrev(key, entry, LockMode.DEFAULT) != SUCCESS) 372 { 373 localCursor.close(); 374 localCursor = db.openCursor(null, null); 375 } 376 else 377 { 378 record = newCNIndexRecord(this.key, entry); 379 } 380 } 381 else 382 { 383 record = newCNIndexRecord(this.key, entry); 384 } 385 } 386 387 this.txn = null; 388 this.cursor = localCursor; 389 cursorHeld = true; 390 } 391 catch (DatabaseException e) 392 { 393 throw new ChangelogException(e); 394 } 395 finally 396 { 397 if (!cursorHeld) 398 { 399 // Do not keep a readLock on the DB when this class does not hold a DB 400 // cursor. Either an exception was thrown or no cursor could be opened 401 // for some reason. 402 closeLockedCursor(localCursor); 403 } 404 } 405 } 406 407 private DraftCNDBCursor() throws ChangelogException 408 { 409 Transaction localTxn = null; 410 Cursor localCursor = null; 411 412 this.key = new ReplicationDraftCNKey(); 413 414 // We'll go on only if no close or no clear is running 415 boolean cursorHeld = false; 416 dbCloseLock.readLock().lock(); 417 try 418 { 419 // If the DB has been closed then create empty cursor. 420 if (isDBClosed()) 421 { 422 isClosed = true; 423 txn = null; 424 cursor = null; 425 return; 426 } 427 428 // Create the transaction that will protect whatever done with this 429 // write cursor. 430 localTxn = dbenv.beginTransaction(); 431 localCursor = db.openCursor(localTxn, null); 432 433 this.txn = localTxn; 434 this.cursor = localCursor; 435 cursorHeld = true; 436 } 437 catch (DatabaseException e) 438 { 439 logger.traceException(e); 440 JEUtils.abort(localTxn); 441 throw new ChangelogException(e); 442 } 443 catch (ChangelogException e) 444 { 445 logger.traceException(e); 446 JEUtils.abort(localTxn); 447 throw e; 448 } 449 finally 450 { 451 if (!cursorHeld) 452 { 453 // Do not keep a readLock on the DB when this class does not hold a DB 454 // cursor. Either an exception was thrown or no cursor could be opened 455 // for some reason. 456 closeLockedCursor(localCursor); 457 } 458 } 459 } 460 461 /** 462 * Close the ReplicationServer Cursor. 463 */ 464 @Override 465 public void close() 466 { 467 synchronized (this) 468 { 469 if (isClosed) 470 { 471 return; 472 } 473 isClosed = true; 474 } 475 476 closeLockedCursor(cursor); 477 478 if (txn != null) 479 { 480 try 481 { 482 txn.commit(); 483 } 484 catch (DatabaseException e) 485 { 486 dbenv.shutdownOnException(e); 487 } 488 } 489 } 490 491 /** 492 * Abort the Cursor after a DatabaseException. 493 * This method catch and ignore the DatabaseException because 494 * this must be done when aborting a cursor after a DatabaseException 495 * (per the Cursor documentation). 496 * This should not be used in any other case. 497 */ 498 public void abort() 499 { 500 synchronized (this) 501 { 502 if (isClosed) 503 { 504 return; 505 } 506 isClosed = true; 507 } 508 509 closeLockedCursor(cursor); 510 JEUtils.abort(txn); 511 } 512 513 /** 514 * Returns the {@link ChangeNumberIndexRecord} at the current position of 515 * the cursor. 516 * 517 * @return The current {@link ChangeNumberIndexRecord}. 518 */ 519 public ChangeNumberIndexRecord currentRecord() 520 { 521 if (isClosed) 522 { 523 return null; 524 } 525 return record; 526 } 527 528 /** 529 * Go to the next record on the cursor. 530 * 531 * @return the next record on this cursor. 532 * @throws ChangelogException 533 * If a database problem happened 534 */ 535 public boolean next() throws ChangelogException 536 { 537 // first wipe old entry 538 record = null; 539 if (isClosed) 540 { 541 return false; 542 } 543 544 try { 545 OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT); 546 if (status == OperationStatus.SUCCESS) 547 { 548 record = newCNIndexRecord(this.key, entry); 549 return true; 550 } 551 return false; 552 } 553 catch (DatabaseException e) 554 { 555 throw new ChangelogException(e); 556 } 557 } 558 559 /** 560 * Delete the record at the current cursor position. 561 * 562 * @throws ChangelogException 563 * If a database problem happened 564 */ 565 public void delete() throws ChangelogException 566 { 567 if (isClosed) 568 { 569 throw new IllegalStateException("DraftCNDB already closed"); 570 } 571 572 try 573 { 574 cursor.delete(); 575 } 576 catch (DatabaseException e) 577 { 578 throw new ChangelogException(e); 579 } 580 } 581 582 /** {@inheritDoc} */ 583 @Override 584 public String toString() 585 { 586 return getClass().getSimpleName() + " currentRecord=" + record; 587 } 588 } 589 590 /** 591 * Clears this change DB from the changes it contains. 592 * 593 * @throws ChangelogException 594 * If a database problem happened 595 */ 596 public void clear() throws ChangelogException 597 { 598 // The coming users will be blocked until the clear is done 599 dbCloseLock.writeLock().lock(); 600 try 601 { 602 // If the DB has been closed then return immediately. 603 if (isDBClosed()) 604 { 605 return; 606 } 607 608 final Database oldDb = db; 609 db = null; // In case there's a failure between here and recreation. 610 dbenv.clearDb(oldDb); 611 612 // RE-create the db 613 db = dbenv.getOrCreateCNIndexDB(); 614 } 615 catch(Exception e) 616 { 617 logger.error(ERR_ERROR_CLEARING_DB, this, e.getMessage() + " " + stackTraceToSingleLineString(e)); 618 } 619 finally 620 { 621 // Relax the waiting users 622 dbCloseLock.writeLock().unlock(); 623 } 624 } 625 626 /** 627 * Returns {@code true} if the DB is closed. This method assumes that either 628 * the db read/write lock has been taken. 629 * 630 * @return {@code true} if the DB is closed. 631 */ 632 private boolean isDBClosed() 633 { 634 return db == null || !db.getEnvironment().isValid(); 635 } 636}