001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2009 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server.changelog.je;
028
029import static com.sleepycat.je.LockMode.*;
030import static com.sleepycat.je.OperationStatus.*;
031
032import static org.opends.messages.ReplicationMessages.*;
033import static org.opends.server.util.StaticUtils.*;
034
035import java.io.Closeable;
036import java.util.concurrent.locks.ReadWriteLock;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038
039import org.forgerock.i18n.slf4j.LocalizedLogger;
040import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
041import org.opends.server.replication.server.changelog.api.ChangelogException;
042
043import com.sleepycat.je.*;
044
045/**
046 * This class implements the interface between the underlying database
047 * and the {@link JEChangeNumberIndexDB} class.
048 * This is the only class that should have code using the BDB interfaces.
049 */
050public class DraftCNDB
051{
052  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
053
054  private Database db;
055  private ReplicationDbEnv dbenv;
056
057  /**
058   * The lock used to provide exclusive access to the thread that close the db
059   * (shutdown or clear).
060   */
061  private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
062
063  /**
064   * Creates a new database or open existing database that will be used
065   * to store and retrieve changes from an LDAP server.
066   * @param dbenv The Db environment to use to create the db.
067   * @throws ChangelogException If a database problem happened
068   */
069  public DraftCNDB(ReplicationDbEnv dbenv) throws ChangelogException
070  {
071    this.dbenv = dbenv;
072
073    // Get or create the associated ReplicationServerDomain and Db.
074    db = dbenv.getOrCreateCNIndexDB();
075  }
076
077  /**
078   * Add a record to the database.
079   *
080   * @param record
081   *          the provided {@link ChangeNumberIndexRecord} to be stored.
082   * @throws ChangelogException
083   *           If a database problem happened
084   */
085  public void addRecord(ChangeNumberIndexRecord record)
086      throws ChangelogException
087  {
088    try
089    {
090      final long changeNumber = record.getChangeNumber();
091      DatabaseEntry key = new ReplicationDraftCNKey(changeNumber);
092      DatabaseEntry data = new DraftCNData(changeNumber, record.getBaseDN(), record.getCSN());
093
094      // Use a transaction so that we can override durability.
095      Transaction txn = null;
096      dbCloseLock.readLock().lock();
097      try
098      {
099        // If the DB has been closed then return immediately.
100        if (isDBClosed())
101        {
102          return;
103        }
104
105        txn = dbenv.beginTransaction();
106        db.put(txn, key, data);
107        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
108      }
109      finally
110      {
111        JEUtils.abort(txn);
112        dbCloseLock.readLock().unlock();
113      }
114    }
115    catch (DatabaseException e)
116    {
117      throw new ChangelogException(e);
118    }
119  }
120
121  /**
122   * Shutdown the database.
123   */
124  public void shutdown()
125  {
126    dbCloseLock.writeLock().lock();
127    try
128    {
129      db.close();
130      db = null;
131    }
132    catch (DatabaseException e)
133    {
134      logger.info(NOTE_EXCEPTION_CLOSING_DATABASE, this, stackTraceToSingleLineString(e));
135    }
136    finally
137    {
138      dbCloseLock.writeLock().unlock();
139    }
140  }
141
142  /**
143   * Create a cursor that can be used to search or iterate on this DB.
144   *
145   * @param changeNumber The change number from which the cursor must start.
146   * @return The ReplServerDBCursor
147   * @throws ChangelogException If a database error prevented the cursor
148   *                           creation.
149   */
150  public DraftCNDBCursor openReadCursor(long changeNumber)
151      throws ChangelogException
152  {
153    return new DraftCNDBCursor(changeNumber);
154  }
155
156  /**
157   * Create a cursor that can be used to delete some record from this
158   * ReplicationServer database.
159   *
160   * @return The ReplServerDBCursor
161   * @throws ChangelogException If a database error prevented the cursor
162   *                           creation.
163   */
164  public DraftCNDBCursor openDeleteCursor() throws ChangelogException
165  {
166    return new DraftCNDBCursor();
167  }
168
169  private void closeLockedCursor(Cursor cursor)
170  {
171    try
172    {
173      close(cursor);
174    }
175    finally
176    {
177      dbCloseLock.readLock().unlock();
178    }
179  }
180
181  /**
182   * Read the first Change from the database, 0 when none.
183   *
184   * @return the first change number.
185   * @throws ChangelogException
186   *           if a database problem occurred
187   */
188  public ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
189  {
190    try
191    {
192      dbCloseLock.readLock().lock();
193      Cursor cursor = null;
194      try
195      {
196        // If the DB has been closed then return immediately.
197        if (isDBClosed())
198        {
199          return null;
200        }
201
202        cursor = db.openCursor(null, null);
203        ReplicationDraftCNKey key = new ReplicationDraftCNKey();
204        DatabaseEntry entry = new DatabaseEntry();
205        if (cursor.getFirst(key, entry, LockMode.DEFAULT) != SUCCESS)
206        {
207          return null;
208        }
209
210        return newCNIndexRecord(key, entry);
211      }
212      finally
213      {
214        closeLockedCursor(cursor);
215      }
216    }
217    catch (DatabaseException e)
218    {
219      throw new ChangelogException(e);
220    }
221  }
222
223  private ChangeNumberIndexRecord newCNIndexRecord(ReplicationDraftCNKey key,
224      DatabaseEntry data) throws ChangelogException
225  {
226    return new DraftCNData(key.getChangeNumber(), data.getData()).getRecord();
227  }
228
229  /**
230   * Return the record count.
231   * @return the record count.
232   */
233  public long count()
234  {
235    dbCloseLock.readLock().lock();
236    try
237    {
238      // If the DB has been closed then return immediately.
239      if (isDBClosed())
240      {
241        return 0;
242      }
243
244      return db.count();
245    }
246    catch (DatabaseException e)
247    {
248      logger.traceException(e);
249    }
250    finally
251    {
252      dbCloseLock.readLock().unlock();
253    }
254    return 0;
255  }
256
257  /**
258   * Read the last change number from the database.
259   *
260   * @return the last change number.
261   * @throws ChangelogException
262   *           if a database problem occurred
263   */
264  public ChangeNumberIndexRecord readLastRecord() throws ChangelogException
265  {
266    try
267    {
268      dbCloseLock.readLock().lock();
269      Cursor cursor = null;
270      try
271      {
272        // If the DB has been closed then return immediately.
273        if (isDBClosed())
274        {
275          return null;
276        }
277
278        cursor = db.openCursor(null, null);
279        ReplicationDraftCNKey key = new ReplicationDraftCNKey();
280        DatabaseEntry entry = new DatabaseEntry();
281        if (cursor.getLast(key, entry, LockMode.DEFAULT) != SUCCESS)
282        {
283          return null;
284        }
285
286        return newCNIndexRecord(key, entry);
287      }
288      finally
289      {
290        closeLockedCursor(cursor);
291      }
292    }
293    catch (DatabaseException e)
294    {
295      throw new ChangelogException(e);
296    }
297  }
298
299  /** {@inheritDoc} */
300  @Override
301  public String toString()
302  {
303    return getClass().getSimpleName();
304  }
305
306  /**
307   * This Class implements a cursor that can be used to browse the database.
308   */
309  public class DraftCNDBCursor implements Closeable
310  {
311    private final Cursor cursor;
312
313    /**
314     * The transaction that will protect the actions done with the cursor.
315     * Will be let null for a read cursor.
316     * Will be set non null for a write cursor.
317     */
318    private final Transaction txn;
319    private final ReplicationDraftCNKey key;
320    private final DatabaseEntry entry = new DatabaseEntry();
321    private ChangeNumberIndexRecord record;
322    private boolean isClosed;
323
324
325    /**
326     * Creates a cursor that can be used for browsing the db.
327     *
328     * @param startChangeNumber
329     *          the change number from which the cursor must start.
330     * @throws ChangelogException
331     *           If a database problem happened
332     */
333    private DraftCNDBCursor(long startChangeNumber) throws ChangelogException
334    {
335      this.key = new ReplicationDraftCNKey(startChangeNumber);
336
337      // Take the lock. From now on, whatever error that happen in the life
338      // of this cursor should end by unlocking that lock. We must also
339      // unlock it when throwing an exception.
340      dbCloseLock.readLock().lock();
341
342      boolean cursorHeld = false;
343      Cursor localCursor = null;
344      try
345      {
346        // If the DB has been closed then create empty cursor.
347        if (isDBClosed())
348        {
349          isClosed = true;
350          txn = null;
351          cursor = null;
352          return;
353        }
354
355        localCursor = db.openCursor(null, null);
356        if (startChangeNumber >= 0)
357        {
358          if (localCursor.getSearchKey(key, entry, LockMode.DEFAULT) != SUCCESS)
359          {
360            // We could not move the cursor to the expected startChangeNumber
361            if (localCursor.getSearchKeyRange(key, entry, DEFAULT) != SUCCESS)
362            {
363              // We could not even move the cursor close to it
364              // => return an empty cursor
365              isClosed = true;
366              txn = null;
367              cursor = null;
368              return;
369            }
370
371            if (localCursor.getPrev(key, entry, LockMode.DEFAULT) != SUCCESS)
372            {
373              localCursor.close();
374              localCursor = db.openCursor(null, null);
375            }
376            else
377            {
378              record = newCNIndexRecord(this.key, entry);
379            }
380          }
381          else
382          {
383            record = newCNIndexRecord(this.key, entry);
384          }
385        }
386
387        this.txn = null;
388        this.cursor = localCursor;
389        cursorHeld = true;
390      }
391      catch (DatabaseException e)
392      {
393        throw new ChangelogException(e);
394      }
395      finally
396      {
397        if (!cursorHeld)
398        {
399          // Do not keep a readLock on the DB when this class does not hold a DB
400          // cursor. Either an exception was thrown or no cursor could be opened
401          // for some reason.
402          closeLockedCursor(localCursor);
403        }
404      }
405    }
406
407    private DraftCNDBCursor() throws ChangelogException
408    {
409      Transaction localTxn = null;
410      Cursor localCursor = null;
411
412      this.key = new ReplicationDraftCNKey();
413
414      // We'll go on only if no close or no clear is running
415      boolean cursorHeld = false;
416      dbCloseLock.readLock().lock();
417      try
418      {
419        // If the DB has been closed then create empty cursor.
420        if (isDBClosed())
421        {
422          isClosed = true;
423          txn = null;
424          cursor = null;
425          return;
426        }
427
428        // Create the transaction that will protect whatever done with this
429        // write cursor.
430        localTxn = dbenv.beginTransaction();
431        localCursor = db.openCursor(localTxn, null);
432
433        this.txn = localTxn;
434        this.cursor = localCursor;
435        cursorHeld = true;
436      }
437      catch (DatabaseException e)
438      {
439        logger.traceException(e);
440        JEUtils.abort(localTxn);
441        throw new ChangelogException(e);
442      }
443      catch (ChangelogException e)
444      {
445        logger.traceException(e);
446        JEUtils.abort(localTxn);
447        throw e;
448      }
449      finally
450      {
451        if (!cursorHeld)
452        {
453          // Do not keep a readLock on the DB when this class does not hold a DB
454          // cursor. Either an exception was thrown or no cursor could be opened
455          // for some reason.
456          closeLockedCursor(localCursor);
457        }
458      }
459    }
460
461    /**
462     * Close the ReplicationServer Cursor.
463     */
464    @Override
465    public void close()
466    {
467      synchronized (this)
468      {
469        if (isClosed)
470        {
471          return;
472        }
473        isClosed = true;
474      }
475
476      closeLockedCursor(cursor);
477
478      if (txn != null)
479      {
480        try
481        {
482          txn.commit();
483        }
484        catch (DatabaseException e)
485        {
486          dbenv.shutdownOnException(e);
487        }
488      }
489    }
490
491    /**
492     * Abort the Cursor after a DatabaseException.
493     * This method catch and ignore the DatabaseException because
494     * this must be done when aborting a cursor after a DatabaseException
495     * (per the Cursor documentation).
496     * This should not be used in any other case.
497     */
498    public void abort()
499    {
500      synchronized (this)
501      {
502        if (isClosed)
503        {
504          return;
505        }
506        isClosed = true;
507      }
508
509      closeLockedCursor(cursor);
510      JEUtils.abort(txn);
511    }
512
513    /**
514     * Returns the {@link ChangeNumberIndexRecord} at the current position of
515     * the cursor.
516     *
517     * @return The current {@link ChangeNumberIndexRecord}.
518     */
519    public ChangeNumberIndexRecord currentRecord()
520    {
521      if (isClosed)
522      {
523        return null;
524      }
525      return record;
526    }
527
528    /**
529     * Go to the next record on the cursor.
530     *
531     * @return the next record on this cursor.
532     * @throws ChangelogException
533     *           If a database problem happened
534     */
535    public boolean next() throws ChangelogException
536    {
537      // first wipe old entry
538      record = null;
539      if (isClosed)
540      {
541        return false;
542      }
543
544      try {
545        OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
546        if (status == OperationStatus.SUCCESS)
547        {
548          record = newCNIndexRecord(this.key, entry);
549          return true;
550        }
551        return false;
552      }
553      catch (DatabaseException e)
554      {
555        throw new ChangelogException(e);
556      }
557    }
558
559    /**
560     * Delete the record at the current cursor position.
561     *
562     * @throws ChangelogException
563     *           If a database problem happened
564     */
565    public void delete() throws ChangelogException
566    {
567      if (isClosed)
568      {
569        throw new IllegalStateException("DraftCNDB already closed");
570      }
571
572      try
573      {
574        cursor.delete();
575      }
576      catch (DatabaseException e)
577      {
578        throw new ChangelogException(e);
579      }
580    }
581
582    /** {@inheritDoc} */
583    @Override
584    public String toString()
585    {
586      return getClass().getSimpleName() + " currentRecord=" + record;
587    }
588  }
589
590  /**
591   * Clears this change DB from the changes it contains.
592   *
593   * @throws ChangelogException
594   *           If a database problem happened
595   */
596  public void clear() throws ChangelogException
597  {
598    // The coming users will be blocked until the clear is done
599    dbCloseLock.writeLock().lock();
600    try
601    {
602      // If the DB has been closed then return immediately.
603      if (isDBClosed())
604      {
605        return;
606      }
607
608      final Database oldDb = db;
609      db = null; // In case there's a failure between here and recreation.
610      dbenv.clearDb(oldDb);
611
612      // RE-create the db
613      db = dbenv.getOrCreateCNIndexDB();
614    }
615    catch(Exception e)
616    {
617      logger.error(ERR_ERROR_CLEARING_DB, this, e.getMessage() + " " + stackTraceToSingleLineString(e));
618    }
619    finally
620    {
621      // Relax the waiting users
622      dbCloseLock.writeLock().unlock();
623    }
624  }
625
626  /**
627   * Returns {@code true} if the DB is closed. This method assumes that either
628   * the db read/write lock has been taken.
629   *
630   * @return {@code true} if the DB is closed.
631   */
632  private boolean isDBClosed()
633  {
634    return db == null || !db.getEnvironment().isValid();
635  }
636}