001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2009 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server.changelog.je;
028
029import java.io.File;
030import java.io.UnsupportedEncodingException;
031import java.util.AbstractMap.SimpleImmutableEntry;
032import java.util.*;
033import java.util.Map.Entry;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037
038import org.forgerock.i18n.LocalizableMessage;
039import org.forgerock.i18n.slf4j.LocalizedLogger;
040import org.opends.server.replication.common.CSN;
041import org.opends.server.replication.server.ChangelogState;
042import org.opends.server.replication.server.ReplicationServer;
043import org.opends.server.replication.server.changelog.api.ChangelogException;
044import org.opends.server.types.DN;
045import org.opends.server.types.DirectoryException;
046
047import com.sleepycat.je.*;
048
049import static com.sleepycat.je.EnvironmentConfig.*;
050import static com.sleepycat.je.OperationStatus.*;
051
052import static org.opends.messages.BackendMessages.*;
053import static org.opends.messages.ReplicationMessages.*;
054import static org.opends.server.util.StaticUtils.*;
055
056/**
057 * This class represents a DB environment that acts as a factory for
058 * ReplicationDBs.
059 */
060public class ReplicationDbEnv
061{
062  private Environment dbEnvironment;
063  private Database changelogStateDb;
064  /**
065   * The current changelogState. This is in-memory version of what is inside the
066   * on-disk changelogStateDB. It improves performances in case the
067   * changelogState is read often.
068   *
069   * @GuardedBy("stateLock")
070   */
071  private final ChangelogState changelogState;
072  /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState. */
073  private final Object stateLock = new Object();
074  private final List<Database> allDbs = new CopyOnWriteArrayList<>();
075  private ReplicationServer replicationServer;
076  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
077  private static final String GENERATION_ID_TAG = "GENID";
078  private static final String OFFLINE_TAG = "OFFLINE";
079  private static final String FIELD_SEPARATOR = " ";
080  /** The tracer object for the debug logger. */
081  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
082
083  /**
084   * Initialize this class.
085   * Creates Db environment that will be used to create databases.
086   * It also reads the currently known databases from the "changelogstate"
087   * database.
088   * @param path Path where the backing files must be created.
089   * @param replicationServer the ReplicationServer that creates this
090   *                          ReplicationDbEnv.
091   * @throws ChangelogException If an Exception occurred that prevented
092   *                           the initialization to happen.
093   */
094  public ReplicationDbEnv(String path, ReplicationServer replicationServer)
095      throws ChangelogException
096  {
097    this.replicationServer = replicationServer;
098
099    try
100    {
101      dbEnvironment = openJEEnvironment(path);
102
103      /*
104       * One database is created to store the update from each LDAP server in
105       * the topology. The database "changelogstate" is used to store the list
106       * of all the servers that have been seen in the past.
107       */
108      changelogStateDb = openDatabase("changelogstate");
109      changelogState = readOnDiskChangelogState();
110    }
111    catch (RuntimeException e)
112    {
113      throw new ChangelogException(e);
114    }
115  }
116
117  /**
118   * Open a JE environment.
119   * <p>
120   * protected so it can be overridden by tests.
121   *
122   * @param path
123   *          the path to the JE environment in the filesystem
124   * @return the opened JE environment
125   */
126  protected Environment openJEEnvironment(String path)
127  {
128    final EnvironmentConfig envConfig = new EnvironmentConfig();
129
130    /*
131     * Create the DB Environment that will be used for all the
132     * ReplicationServer activities related to the db
133     */
134    envConfig.setAllowCreate(true);
135    envConfig.setTransactional(true);
136    envConfig.setConfigParam(STATS_COLLECT, "false");
137    envConfig.setConfigParam(CLEANER_THREADS, "2");
138    envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
139    /*
140     * Tests have shown that since the parsing of the Replication log is
141     * always done sequentially, it is not necessary to use a large DB cache.
142     */
143    if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
144    {
145      /*
146       * If the JVM is reasonably large then we can safely default to bigger
147       * read buffers. This will result in more scalable checkpointer and
148       * cleaner performance.
149       */
150      envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
151      envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
152      envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
153
154      /*
155       * The cache size must be bigger in order to accommodate the larger
156       * buffers - see OPENDJ-943.
157       */
158      envConfig.setConfigParam(MAX_MEMORY, mb(16));
159    }
160    else
161    {
162      /*
163       * Use 5M so that the replication can be used with 64M total for the
164       * JVM.
165       */
166      envConfig.setConfigParam(MAX_MEMORY, mb(5));
167    }
168
169    // Since records are always added at the end of the Replication log and
170    // deleted at the beginning of the Replication log, this should never
171    // cause any deadlock.
172    envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
173    envConfig.setLockTimeout(0, TimeUnit.SECONDS);
174
175    // Since replication provides durability, we can reduce the DB durability
176    // level so that we are immune to application / JVM crashes.
177    envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
178
179    return new Environment(new File(path), envConfig);
180  }
181
182  private String kb(int sizeInKb)
183  {
184    return String.valueOf(sizeInKb * 1024);
185  }
186
187  private String mb(int sizeInMb)
188  {
189    return String.valueOf(sizeInMb * 1024 * 1024);
190  }
191
192  /**
193   * Open a JE database.
194   * <p>
195   * protected so it can be overridden by tests.
196   *
197   * @param databaseName
198   *          the databaseName to open
199   * @return the opened JE database
200   * @throws ChangelogException
201   *           if a problem happened opening the database
202   * @throws RuntimeException
203   *           if a problem happened with the JE database
204   */
205  protected Database openDatabase(String databaseName)
206      throws ChangelogException, RuntimeException
207  {
208    if (isShuttingDown.get())
209    {
210      throw new ChangelogException(
211          WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(
212              databaseName, replicationServer.getServerId()));
213    }
214    final DatabaseConfig dbConfig = new DatabaseConfig();
215    dbConfig.setAllowCreate(true);
216    dbConfig.setTransactional(true);
217    final Database db =
218        dbEnvironment.openDatabase(null, databaseName, dbConfig);
219    if (isShuttingDown.get())
220    {
221      closeDB(db);
222      throw new ChangelogException(
223          WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(
224              databaseName, replicationServer.getServerId()));
225    }
226    allDbs.add(db);
227    return db;
228  }
229
230  /**
231   * Return the current changelog state.
232   *
233   * @return the current {@link ChangelogState}
234   */
235  public ChangelogState getChangelogState()
236  {
237    return changelogState;
238  }
239
240  /**
241   * Read and return the changelog state from the database.
242   *
243   * @return the {@link ChangelogState} read from the changelogState DB
244   * @throws ChangelogException
245   *           if a database problem occurs
246   */
247  protected ChangelogState readOnDiskChangelogState() throws ChangelogException
248  {
249    return decodeChangelogState(readWholeState());
250  }
251
252  /**
253   * Decode the whole changelog state DB.
254   *
255   * @param wholeState
256   *          the whole changelog state DB as a Map.
257   *          The Map is only used as a convenient collection of key => data objects
258   * @return the decoded changelog state
259   * @throws ChangelogException
260   *           if a problem occurred while decoding
261   */
262  ChangelogState decodeChangelogState(Map<byte[], byte[]> wholeState)
263      throws ChangelogException
264  {
265    try
266    {
267      final ChangelogState result = new ChangelogState();
268      for (Entry<byte[], byte[]> entry : wholeState.entrySet())
269      {
270        final String stringKey = toString(entry.getKey());
271        final String stringData = toString(entry.getValue());
272
273        if (logger.isTraceEnabled())
274        {
275          debug("read (key, data)=(" + stringKey + ", " + stringData + ")");
276        }
277
278        final String prefix = stringKey.split(FIELD_SEPARATOR)[0];
279        if (prefix.equals(GENERATION_ID_TAG))
280        {
281          final String[] str = stringData.split(FIELD_SEPARATOR, 3);
282          final long generationId = toLong(str[1]);
283          final DN baseDN = DN.valueOf(str[2]);
284
285          if (logger.isTraceEnabled())
286          {
287            debug("has read generationId: baseDN=" + baseDN + " generationId="
288                + generationId);
289          }
290          result.setDomainGenerationId(baseDN, generationId);
291        }
292        else if (prefix.equals(OFFLINE_TAG))
293        {
294          final String[] str = stringData.split(FIELD_SEPARATOR, 3);
295          long timestamp = toLong(str[0]);
296          final int serverId = toInt(str[1]);
297          final DN baseDN = DN.valueOf(str[2]);
298          if (logger.isTraceEnabled())
299          {
300            debug("has read replica offline: baseDN=" + baseDN + " serverId="
301                + serverId);
302          }
303          result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId));
304        }
305        else
306        {
307          final String[] str = stringData.split(FIELD_SEPARATOR, 2);
308          final int serverId = toInt(str[0]);
309          final DN baseDN = DN.valueOf(str[1]);
310
311          if (logger.isTraceEnabled())
312          {
313            debug("has read replica: baseDN=" + baseDN + " serverId="
314                + serverId);
315          }
316          result.addServerIdToDomain(serverId, baseDN);
317        }
318      }
319      return result;
320    }
321    catch (DirectoryException e)
322    {
323      throw new ChangelogException(e.getMessageObject(), e);
324    }
325  }
326
327  private Map<byte[], byte[]> readWholeState() throws ChangelogException
328  {
329    DatabaseEntry key = new DatabaseEntry();
330    DatabaseEntry data = new DatabaseEntry();
331    Cursor cursor = changelogStateDb.openCursor(null, null);
332
333    try
334    {
335      final Map<byte[], byte[]> results = new LinkedHashMap<>();
336
337      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
338      while (status == OperationStatus.SUCCESS)
339      {
340        results.put(key.getData(), data.getData());
341        status = cursor.getNext(key, data, LockMode.DEFAULT);
342      }
343
344      return results;
345    }
346    catch (RuntimeException e)
347    {
348      throw new ChangelogException(ERR_DATABASE_EXCEPTION.get(e.getMessage()), e);
349    }
350    finally
351    {
352      close(cursor);
353    }
354  }
355
356  private int toInt(String data) throws ChangelogException
357  {
358    try
359    {
360      return Integer.parseInt(data);
361    }
362    catch (NumberFormatException e)
363    {
364      // should never happen
365      // TODO: i18n
366      throw new ChangelogException(LocalizableMessage.raw(
367          "replicationServer state database has a wrong format: "
368          + e.getLocalizedMessage() + "<" + data + ">"));
369    }
370  }
371
372  private long toLong(String data) throws ChangelogException
373  {
374    try
375    {
376      return Long.parseLong(data);
377    }
378    catch (NumberFormatException e)
379    {
380      // should never happen
381      // TODO: i18n
382      throw new ChangelogException(LocalizableMessage.raw(
383          "replicationServer state database has a wrong format: "
384          + e.getLocalizedMessage() + "<" + data + ">"));
385    }
386  }
387
388  private String toString(byte[] data) throws ChangelogException
389  {
390    try
391    {
392      return new String(data, "UTF-8");
393    }
394    catch (UnsupportedEncodingException e)
395    {
396      // should never happens
397      // TODO: i18n
398      throw new ChangelogException(LocalizableMessage.raw("need UTF-8 support"));
399    }
400  }
401
402  /**
403   * Converts the string to a UTF8-encoded byte array.
404   *
405   * @param s
406   *          the string to convert
407   * @return the byte array representation of the UTF8-encoded string
408   */
409  static byte[] toBytes(String s)
410  {
411    try
412    {
413      return s.getBytes("UTF-8");
414    }
415    catch (UnsupportedEncodingException e)
416    {
417      // can't happen
418      return null;
419    }
420  }
421
422  /**
423   * Finds or creates the database used to store changes for a replica with the
424   * given baseDN and serverId.
425   *
426   * @param serverId
427   *          The server id that identifies the server.
428   * @param baseDN
429   *          The baseDN that identifies the domain.
430   * @param generationId
431   *          The generationId associated to this domain.
432   * @return the Database.
433   * @throws ChangelogException
434   *           in case of underlying Exception.
435   */
436  public Database getOrAddReplicationDB(int serverId, DN baseDN, long generationId)
437      throws ChangelogException
438  {
439    if (logger.isTraceEnabled())
440    {
441      debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", "
442          + generationId + ")");
443    }
444    try
445    {
446      // JNR: redundant info is stored between the key and data down below.
447      // It is probably ok since "changelogstate" DB does not receive a high
448      // volume of inserts.
449      Entry<String, String> replicaEntry = toReplicaEntry(baseDN, serverId);
450
451      // Opens the DB for the changes received from this server on this domain.
452      final Database replicaDB = openDatabase(replicaEntry.getKey());
453
454      synchronized (stateLock)
455      {
456        putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
457        changelogState.addServerIdToDomain(serverId, baseDN);
458        putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
459        changelogState.setDomainGenerationId(baseDN, generationId);
460      }
461      return replicaDB;
462    }
463    catch (RuntimeException e)
464    {
465      throw new ChangelogException(e);
466    }
467  }
468
469  /**
470   * Return an entry to store in the changelog state database representing a
471   * replica in the topology.
472   *
473   * @param baseDN
474   *          the replica's baseDN
475   * @param serverId
476   *          the replica's serverId
477   * @return a database entry for the replica
478   */
479  static Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
480  {
481    final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString();
482    final String value = serverId + FIELD_SEPARATOR + baseDN;
483    return toEntry(key, value);
484  }
485
486  /**
487   * Return an entry to store in the changelog state database representing the
488   * domain generation id.
489   *
490   * @param baseDN
491   *          the domain's baseDN
492   * @param generationId
493   *          the domain's generationId
494   * @return a database entry for the generationId
495   */
496  static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId)
497  {
498    final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString();
499    final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR + baseDN;
500    return toEntry(toBytes(key), toBytes(data));
501  }
502
503  /**
504   * Converts an Entry&lt;String, String&gt; to an Entry&lt;byte[], byte[]&gt;.
505   *
506   * @param entry
507   *          the entry to convert
508   * @return the converted entry
509   */
510  static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry)
511  {
512    return toEntry(toBytes(entry.getKey()), toBytes(entry.getValue()));
513  }
514
515  /**
516   * Return an entry to store in the changelog state database representing the
517   * time a replica went offline.
518   *
519   * @param baseDN
520   *          the replica's baseDN
521   * @param offlineCSN
522   *          the replica's serverId and offline timestamp
523   * @return a database entry representing the time a replica went offline
524   */
525  static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
526  {
527    final int serverId = offlineCSN.getServerId();
528    final byte[] key = toReplicaOfflineKey(baseDN, serverId);
529    final byte[] data = toBytes(offlineCSN.getTime() + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN);
530    return toEntry(key, data);
531  }
532
533  /**
534   * Return the key for a replica offline entry in the changelog state database.
535   *
536   * @param baseDN
537   *          the replica's baseDN
538   * @param serverId
539   *          the replica's serverId
540   * @return the key used in the database to store offline time of the replica
541   */
542  private static byte[] toReplicaOfflineKey(DN baseDN, int serverId)
543  {
544    return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString());
545  }
546
547  /** Returns an entry with the provided key and a null value. */
548  private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key)
549  {
550    return toEntry(key, null);
551  }
552
553  private static <K, V> SimpleImmutableEntry<K, V> toEntry(final K key, final V value)
554  {
555    return new SimpleImmutableEntry<>(key, value);
556  }
557
558  private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
559      throws ChangelogException, RuntimeException
560  {
561    DatabaseEntry key = new DatabaseEntry(entry.getKey());
562    DatabaseEntry data = new DatabaseEntry();
563    if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
564    {
565      Transaction txn = dbEnvironment.beginTransaction(null, null);
566      try
567      {
568        data.setData(entry.getValue());
569        if (logger.isTraceEnabled())
570        {
571          debug("putting record in the changelogstate Db key=["
572              + toString(entry.getKey()) + "] value=["
573              + toString(entry.getValue()) + "]");
574        }
575        changelogStateDb.put(txn, key, data);
576        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
577      }
578      catch (DatabaseException dbe)
579      {
580        // Abort the txn and propagate the Exception to the caller
581        txn.abort();
582        throw dbe;
583      }
584    }
585  }
586
587    /**
588     * Creates a new transaction.
589     *
590     * @return the transaction.
591     * @throws ChangelogException in case of underlying exception
592     */
593    public Transaction beginTransaction() throws ChangelogException
594    {
595      try
596      {
597        return dbEnvironment.beginTransaction(null, null);
598      }
599      catch (RuntimeException e)
600      {
601        throw new ChangelogException(e);
602      }
603    }
604
605  /**
606   * Shutdown the Db environment.
607   */
608  public void shutdown()
609  {
610    isShuttingDown.set(true);
611    // CopyOnWriteArrayList iterator never throw ConcurrentModificationException
612    // This code rely on openDatabase() to close databases opened concurrently
613    // with this code
614    final Database[] allDbsCopy = allDbs.toArray(new Database[0]);
615    allDbs.clear();
616    for (Database db : allDbsCopy)
617    {
618      closeDB(db);
619    }
620
621    try
622    {
623      dbEnvironment.close();
624    }
625    catch (DatabaseException e)
626    {
627      logger.error(closeDBErrorMessage(null, e));
628    }
629  }
630
631  private void closeDB(Database db)
632  {
633    allDbs.remove(db);
634    try
635    {
636      db.close();
637    }
638    catch (DatabaseException e)
639    {
640      logger.error(closeDBErrorMessage(db.getDatabaseName(), e));
641    }
642  }
643
644  private LocalizableMessage closeDBErrorMessage(String dbName, DatabaseException e)
645  {
646    if (dbName != null)
647    {
648      return NOTE_EXCEPTION_CLOSING_DATABASE.get(dbName,
649          stackTraceToSingleLineString(e));
650    }
651    return ERR_ERROR_CLOSING_CHANGELOG_ENV.get(stackTraceToSingleLineString(e));
652  }
653
654  /**
655   * Clears the provided generationId associated to the provided baseDN from the
656   * state Db.
657   *
658   * @param baseDN
659   *          The baseDN for which the generationID must be cleared.
660   * @throws ChangelogException
661   *           If a database problem happened
662   */
663  public void clearGenerationId(DN baseDN) throws ChangelogException
664  {
665    synchronized (stateLock)
666    {
667      final int unusedGenId = 0;
668      deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
669          "clearGenerationId(baseDN=" + baseDN + ")");
670      changelogState.setDomainGenerationId(baseDN, unusedGenId);
671    }
672  }
673
674  /**
675   * Clears the provided serverId associated to the provided baseDN from the
676   * state Db.
677   *
678   * @param baseDN
679   *          The baseDN for which the serverId must be cleared.
680   * @param serverId
681   *          The serverId to remove from the Db.
682   * @throws ChangelogException
683   *           If a database problem happened
684   */
685  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
686  {
687    synchronized (stateLock)
688    {
689      deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
690          "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
691      changelogState.setDomainGenerationId(baseDN, -1);
692    }
693  }
694
695  private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
696      String methodInvocation) throws ChangelogException
697  {
698    if (logger.isTraceEnabled())
699    {
700      debug(methodInvocation + " starting");
701    }
702
703    try
704    {
705      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
706      final DatabaseEntry data = new DatabaseEntry();
707      if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
708      {
709        Transaction txn = dbEnvironment.beginTransaction(null, null);
710        try
711        {
712          changelogStateDb.delete(txn, key);
713          txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
714          if (logger.isTraceEnabled())
715          {
716            debug(methodInvocation + " succeeded");
717          }
718        }
719        catch (RuntimeException dbe)
720        {
721          // Abort the txn and propagate the Exception to the caller
722          txn.abort();
723          throw dbe;
724        }
725      }
726      else if (logger.isTraceEnabled())
727      {
728        debug(methodInvocation + " failed: key not found");
729      }
730    }
731    catch (RuntimeException e)
732    {
733      if (logger.isTraceEnabled())
734      {
735        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
736      }
737      throw new ChangelogException(e);
738    }
739  }
740
741  /**
742   * Notify that replica is offline.
743   * <p>
744   * This information is stored in the changelog state DB.
745   *
746   * @param baseDN
747   *          the domain of the offline replica
748   * @param offlineCSN
749   *          the offline replica serverId and offline timestamp
750   * @throws ChangelogException
751   *           if a database problem occurred
752   */
753  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
754      throws ChangelogException
755  {
756    synchronized (stateLock)
757    {
758      // just overwrite any older entry as it is assumed a newly received offline
759      // CSN is newer than the previous one
760      putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
761          "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
762      changelogState.addOfflineReplica(baseDN, offlineCSN);
763    }
764  }
765
766  /**
767   * Notify that replica is online.
768   * <p>
769   * Update the changelog state DB if necessary (ie, replica was known to be
770   * offline).
771   *
772   * @param baseDN
773   *          the domain of replica
774   * @param serverId
775   *          the serverId of replica
776   * @throws ChangelogException
777   *           if a database problem occurred
778   */
779  public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
780  {
781    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
782        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
783  }
784
785  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
786      String methodInvocation) throws ChangelogException
787  {
788    if (logger.isTraceEnabled())
789    {
790      debug(methodInvocation + " starting");
791    }
792
793    try
794    {
795      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
796      final DatabaseEntry data = new DatabaseEntry(entry.getValue());
797      changelogStateDb.put(null, key, data);
798      if (logger.isTraceEnabled())
799      {
800        debug(methodInvocation + " succeeded");
801      }
802    }
803    catch (RuntimeException e)
804    {
805      if (logger.isTraceEnabled())
806      {
807        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
808      }
809      throw new ChangelogException(e);
810    }
811  }
812
813    /**
814     * Clears the database.
815     *
816     * @param db
817     *          The database to clear.
818     */
819    public final void clearDb(Database db)
820    {
821      String databaseName = db.getDatabaseName();
822
823      // Closing is requested by Berkeley JE before truncate
824      db.close();
825
826      Transaction txn = null;
827      try
828      {
829        txn = dbEnvironment.beginTransaction(null, null);
830        dbEnvironment.truncateDatabase(txn, databaseName, false);
831        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
832        txn = null;
833      }
834      catch (RuntimeException e)
835      {
836        logger.error(ERR_ERROR_CLEARING_DB, databaseName,
837            e.getMessage() + " " + stackTraceToSingleLineString(e));
838      }
839      finally
840      {
841        try
842        {
843          if (txn != null)
844          {
845            txn.abort();
846          }
847        }
848        catch(Exception e)
849        { /* do nothing */ }
850      }
851    }
852
853    /**
854     * Get or create a db to manage integer change  number associated
855     * to multidomain server state.
856     * TODO:ECL how to manage compatibility of this db with  new domains
857     * added or removed ?
858     * @return the retrieved or created db.
859     * @throws ChangelogException when a problem occurs.
860     */
861    public Database getOrCreateCNIndexDB() throws ChangelogException
862    {
863      try
864      {
865        // Opens the database for change number associated to this domain.
866        // Create it if it does not already exist.
867        return openDatabase("draftcndb");
868      }
869      catch (RuntimeException e)
870      {
871        throw new ChangelogException(e);
872      }
873    }
874
875  /**
876   * Shuts down replication when an unexpected database exception occurs. Note
877   * that we do not expect lock timeouts or txn timeouts because the replication
878   * databases are deadlock free, thus all operations should complete
879   * eventually.
880   *
881   * @param e
882   *          The unexpected database exception.
883   */
884  void shutdownOnException(DatabaseException e)
885  {
886    logger.error(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR, stackTraceToSingleLineString(e));
887    replicationServer.shutdown();
888  }
889
890  private void debug(String message)
891  {
892    // replication server may be null in tests
893    logger.trace("In %s, %s",
894        replicationServer != null ? replicationServer.getMonitorInstanceName() : "[test]",
895        message);
896  }
897
898}