001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Portions Copyright 2013-2015 ForgeRock AS
025 */
026package org.opends.server.replication.server.changelog.je;
027
028import static org.opends.messages.ReplicationMessages.*;
029import static org.opends.server.util.StaticUtils.*;
030
031import java.io.File;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentMap;
039import java.util.concurrent.ConcurrentSkipListMap;
040import java.util.concurrent.CopyOnWriteArrayList;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicReference;
043
044import org.forgerock.i18n.LocalizableMessageBuilder;
045import org.forgerock.i18n.slf4j.LocalizedLogger;
046import org.forgerock.opendj.config.server.ConfigException;
047import org.forgerock.util.Pair;
048import org.opends.server.admin.std.server.ReplicationServerCfg;
049import org.opends.server.api.DirectoryThread;
050import org.opends.server.backends.ChangelogBackend;
051import org.opends.server.replication.common.CSN;
052import org.opends.server.replication.common.MultiDomainServerState;
053import org.opends.server.replication.common.ServerState;
054import org.opends.server.replication.protocol.UpdateMsg;
055import org.opends.server.replication.server.ChangelogState;
056import org.opends.server.replication.server.ReplicationServer;
057import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
058import org.opends.server.replication.server.changelog.api.ChangelogDB;
059import org.opends.server.replication.server.changelog.api.ChangelogException;
060import org.opends.server.replication.server.changelog.api.DBCursor;
061import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
062import org.opends.server.replication.server.changelog.api.ReplicaId;
063import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
064import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer;
065import org.opends.server.replication.server.changelog.file.DomainDBCursor;
066import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor;
067import org.opends.server.replication.server.changelog.file.ReplicaCursor;
068import org.opends.server.types.DN;
069import org.opends.server.util.StaticUtils;
070import org.opends.server.util.TimeThread;
071
072/**
073 * JE implementation of the ChangelogDB interface.
074 */
075public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
076{
077  /** The tracer object for the debug logger. */
078  protected static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
079
080  /**
081   * This map contains the List of updates received from each LDAP server.
082   * <p>
083   * When removing a domainMap, code:
084   * <ol>
085   * <li>first get the domainMap</li>
086   * <li>synchronized on the domainMap</li>
087   * <li>remove the domainMap</li>
088   * <li>then check it's not null</li>
089   * <li>then close all inside</li>
090   * </ol>
091   * When creating a replicaDB, synchronize on the domainMap to avoid
092   * concurrent shutdown.
093   */
094  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
095      new ConcurrentHashMap<>();
096  private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
097      new ConcurrentSkipListMap<>();
098  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<>();
099  private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
100      new ConcurrentSkipListMap<>();
101  private ReplicationDbEnv replicationEnv;
102  private final ReplicationServerCfg config;
103  private final File dbDirectory;
104
105  /**
106   * The handler of the changelog database, the database stores the relation
107   * between a change number and the associated cookie.
108   * <p>
109   * @GuardedBy("cnIndexDBLock")
110   */
111  private JEChangeNumberIndexDB cnIndexDB;
112  private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<>();
113
114  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
115  private final Object cnIndexDBLock = new Object();
116
117  /**
118   * The purge delay (in milliseconds). Records in the changelog DB that are
119   * older than this delay might be removed.
120   */
121  private volatile long purgeDelayInMillis;
122  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<>();
123
124  /** The local replication server. */
125  private final ReplicationServer replicationServer;
126  private final AtomicBoolean shutdown = new AtomicBoolean();
127
128  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
129      new DBCursor<UpdateMsg>()
130  {
131
132    @Override
133    public boolean next()
134    {
135      return false;
136    }
137
138    @Override
139    public UpdateMsg getRecord()
140    {
141      return null;
142    }
143
144    @Override
145    public void close()
146    {
147      // empty
148    }
149
150    @Override
151    public String toString()
152    {
153      return "EmptyDBCursor<UpdateMsg>";
154    }
155  };
156
157  /**
158   * Creates a new changelog DB.
159   *
160   * @param replicationServer
161   *          the local replication server.
162   * @param config
163   *          the replication server configuration
164   * @throws ConfigException
165   *           if a problem occurs opening the supplied directory
166   */
167  public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
168      throws ConfigException
169  {
170    this.config = config;
171    this.replicationServer = replicationServer;
172    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
173  }
174
175  private File makeDir(final String dbDirName) throws ConfigException
176  {
177    // Check that this path exists or create it.
178    final File dbDirectory = getFileForPath(dbDirName);
179    try
180    {
181      if (!dbDirectory.exists())
182      {
183        dbDirectory.mkdir();
184      }
185      return dbDirectory;
186    }
187    catch (Exception e)
188    {
189      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
190          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
191      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
192    }
193  }
194
195  private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
196  {
197    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
198    if (domainMap != null)
199    {
200      return domainMap;
201    }
202    return Collections.emptyMap();
203  }
204
205  private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
206  {
207    return getDomainMap(baseDN).get(serverId);
208  }
209
210  /**
211   * Returns a {@link JEReplicaDB}, possibly creating it.
212   *
213   * @param baseDN
214   *          the baseDN for which to create a ReplicaDB
215   * @param serverId
216   *          the serverId for which to create a ReplicaDB
217   * @param server
218   *          the ReplicationServer
219   * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
220   * @throws ChangelogException
221   *           if a problem occurred with the database
222   */
223  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
224      final ReplicationServer server) throws ChangelogException
225  {
226    while (!shutdown.get())
227    {
228      final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
229      final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
230      if (result != null)
231      {
232        final Boolean dbWasCreated = result.getSecond();
233        if (dbWasCreated)
234        { // new replicaDB => update all cursors with it
235          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
236          if (cursors != null && !cursors.isEmpty())
237          {
238            for (DomainDBCursor cursor : cursors)
239            {
240              cursor.addReplicaDB(serverId, null);
241            }
242          }
243        }
244
245        return result;
246      }
247    }
248    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
249  }
250
251  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
252  {
253    // happy path: the domainMap already exists
254    final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
255    if (currentValue != null)
256    {
257      return currentValue;
258    }
259
260    // unlucky, the domainMap does not exist: take the hit and create the
261    // newValue, even though the same could be done concurrently by another thread
262    final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<>();
263    final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
264    if (previousValue != null)
265    {
266      // there was already a value associated to the key, let's use it
267      return previousValue;
268    }
269
270    // we just created a new domain => update all cursors
271    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
272    {
273      cursor.addDomain(baseDN, null);
274    }
275    return newValue;
276  }
277
278  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
279      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
280  {
281    // happy path: the replicaDB already exists
282    JEReplicaDB currentValue = domainMap.get(serverId);
283    if (currentValue != null)
284    {
285      return Pair.of(currentValue, false);
286    }
287
288    // unlucky, the replicaDB does not exist: take the hit and synchronize
289    // on the domainMap to create a new ReplicaDB
290    synchronized (domainMap)
291    {
292      // double-check
293      currentValue = domainMap.get(serverId);
294      if (currentValue != null)
295      {
296        return Pair.of(currentValue, false);
297      }
298
299      if (domainToReplicaDBs.get(baseDN) != domainMap)
300      {
301        // The domainMap could have been concurrently removed because
302        // 1) a shutdown was initiated or 2) an initialize was called.
303        // Return will allow the code to:
304        // 1) shutdown properly or 2) lazily recreate the replicaDB
305        return null;
306      }
307
308      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
309      domainMap.put(serverId, newDB);
310      return Pair.of(newDB, true);
311    }
312  }
313
314  /** {@inheritDoc} */
315  @Override
316  public void initializeDB()
317  {
318    try
319    {
320      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
321      replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
322      final ChangelogState changelogState = replicationEnv.getChangelogState();
323      initializeToChangelogState(changelogState);
324      if (config.isComputeChangeNumber())
325      {
326        startIndexer(changelogState);
327      }
328      setPurgeDelay(replicationServer.getPurgeDelay());
329    }
330    catch (ChangelogException e)
331    {
332      logger.traceException(e);
333      logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage());
334    }
335  }
336
337  private void initializeToChangelogState(final ChangelogState changelogState)
338      throws ChangelogException
339  {
340    for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
341    {
342      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
343    }
344    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
345    {
346      for (int serverId : entry.getValue())
347      {
348        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
349      }
350    }
351  }
352
353  private void shutdownChangeNumberIndexDB() throws ChangelogException
354  {
355    synchronized (cnIndexDBLock)
356    {
357      if (cnIndexDB != null)
358      {
359        cnIndexDB.shutdown();
360      }
361    }
362  }
363
364  /** {@inheritDoc} */
365  @Override
366  public void shutdownDB() throws ChangelogException
367  {
368    if (!this.shutdown.compareAndSet(false, true))
369    { // shutdown has already been initiated
370      return;
371    }
372
373    // Remember the first exception because :
374    // - we want to try to remove everything we want to remove
375    // - then throw the first encountered exception
376    ChangelogException firstException = null;
377
378    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
379    if (indexer != null)
380    {
381      indexer.initiateShutdown();
382    }
383    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
384    if (purger != null)
385    {
386      purger.initiateShutdown();
387    }
388
389    try
390    {
391      shutdownChangeNumberIndexDB();
392    }
393    catch (ChangelogException e)
394    {
395      firstException = e;
396    }
397
398    for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it =
399        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
400    {
401      final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next();
402      synchronized (domainMap)
403      {
404        it.remove();
405        for (JEReplicaDB replicaDB : domainMap.values())
406        {
407          replicaDB.shutdown();
408        }
409      }
410    }
411
412    if (replicationEnv != null)
413    {
414      // wait for shutdown of the threads holding cursors
415      try
416      {
417        if (indexer != null)
418        {
419          indexer.join();
420        }
421        if (purger != null)
422        {
423          purger.join();
424        }
425      }
426      catch (InterruptedException e)
427      {
428        // do nothing: we are already shutting down
429      }
430
431      replicationEnv.shutdown();
432    }
433
434    if (firstException != null)
435    {
436      throw firstException;
437    }
438  }
439
440  /**
441   * Clears all records from the changelog (does not remove the changelog itself).
442   *
443   * @throws ChangelogException
444   *           If an error occurs when clearing the changelog.
445   */
446  public void clearDB() throws ChangelogException
447  {
448    if (!dbDirectory.exists())
449    {
450      return;
451    }
452
453    // Remember the first exception because :
454    // - we want to try to remove everything we want to remove
455    // - then throw the first encountered exception
456    ChangelogException firstException = null;
457
458    for (DN baseDN : this.domainToReplicaDBs.keySet())
459    {
460      removeDomain(baseDN);
461    }
462
463    synchronized (cnIndexDBLock)
464    {
465      if (cnIndexDB != null)
466      {
467        try
468        {
469          cnIndexDB.clear();
470        }
471        catch (ChangelogException e)
472        {
473          firstException = e;
474        }
475
476        try
477        {
478          shutdownChangeNumberIndexDB();
479        }
480        catch (ChangelogException e)
481        {
482          if (firstException == null)
483          {
484            firstException = e;
485          }
486          else
487          {
488            logger.traceException(e);
489          }
490        }
491
492        cnIndexDB = null;
493      }
494    }
495
496    if (firstException != null)
497    {
498      throw firstException;
499    }
500  }
501
502  /** {@inheritDoc} */
503  @Override
504  public void removeDB() throws ChangelogException
505  {
506    shutdownDB();
507    StaticUtils.recursiveDelete(dbDirectory);
508  }
509
510  /** {@inheritDoc} */
511  @Override
512  public ServerState getDomainOldestCSNs(DN baseDN)
513  {
514    final ServerState result = new ServerState();
515    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
516    {
517      result.update(replicaDB.getOldestCSN());
518    }
519    return result;
520  }
521
522  /** {@inheritDoc} */
523  @Override
524  public ServerState getDomainNewestCSNs(DN baseDN)
525  {
526    final ServerState result = new ServerState();
527    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
528    {
529      result.update(replicaDB.getNewestCSN());
530    }
531    return result;
532  }
533
534  /** {@inheritDoc} */
535  @Override
536  public void removeDomain(DN baseDN) throws ChangelogException
537  {
538    // Remember the first exception because :
539    // - we want to try to remove everything we want to remove
540    // - then throw the first encountered exception
541    ChangelogException firstException = null;
542
543    // 1- clear the replica DBs
544    Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
545    if (domainMap != null)
546    {
547      final ChangeNumberIndexer indexer = this.cnIndexer.get();
548      if (indexer != null)
549      {
550        indexer.clear(baseDN);
551      }
552      synchronized (domainMap)
553      {
554        domainMap = domainToReplicaDBs.remove(baseDN);
555        for (JEReplicaDB replicaDB : domainMap.values())
556        {
557          try
558          {
559            replicaDB.clear();
560          }
561          catch (ChangelogException e)
562          {
563            firstException = e;
564          }
565          replicaDB.shutdown();
566        }
567      }
568    }
569
570    // 2- clear the ChangeNumber index DB
571    synchronized (cnIndexDBLock)
572    {
573      if (cnIndexDB != null)
574      {
575        try
576        {
577          cnIndexDB.removeDomain(baseDN);
578        }
579        catch (ChangelogException e)
580        {
581          if (firstException == null)
582          {
583            firstException = e;
584          }
585          else
586          {
587            logger.traceException(e);
588          }
589        }
590      }
591    }
592
593    // 3- clear the changelogstate DB
594    try
595    {
596      replicationEnv.clearGenerationId(baseDN);
597    }
598    catch (ChangelogException e)
599    {
600      if (firstException == null)
601      {
602        firstException = e;
603      }
604      else
605      {
606        logger.traceException(e);
607      }
608    }
609
610    if (firstException != null)
611    {
612      throw firstException;
613    }
614  }
615
616  /** {@inheritDoc} */
617  @Override
618  public void setPurgeDelay(final long purgeDelayInMillis)
619  {
620    this.purgeDelayInMillis = purgeDelayInMillis;
621    if (purgeDelayInMillis > 0)
622    {
623      final ChangelogDBPurger newPurger = new ChangelogDBPurger();
624      if (cnPurger.compareAndSet(null, newPurger))
625      { // no purger was running, run this new one
626        newPurger.start();
627      }
628      else
629      { // a purger was already running, just wake that one up
630        // to verify if some entries can be purged with the new purge delay
631        final ChangelogDBPurger currentPurger = cnPurger.get();
632        synchronized (currentPurger)
633        {
634          currentPurger.notify();
635        }
636      }
637    }
638    else
639    {
640      final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
641      if (purgerToStop != null)
642      { // stop this purger
643        purgerToStop.initiateShutdown();
644      }
645    }
646  }
647
648  /** {@inheritDoc} */
649  @Override
650  public void setComputeChangeNumber(final boolean computeChangeNumber)
651      throws ChangelogException
652  {
653    if (computeChangeNumber)
654    {
655      startIndexer(replicationEnv.getChangelogState());
656    }
657    else
658    {
659      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
660      if (indexer != null)
661      {
662        indexer.initiateShutdown();
663      }
664    }
665  }
666
667  private void startIndexer(final ChangelogState changelogState)
668  {
669    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
670    if (cnIndexer.compareAndSet(null, indexer))
671    {
672      indexer.start();
673    }
674  }
675
676  /** {@inheritDoc} */
677  @Override
678  public ChangeNumberIndexDB getChangeNumberIndexDB()
679  {
680    synchronized (cnIndexDBLock)
681    {
682      if (cnIndexDB == null)
683      {
684        try
685        {
686          cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
687        }
688        catch (Exception e)
689        {
690          logger.traceException(e);
691          logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage());
692        }
693      }
694      return cnIndexDB;
695    }
696  }
697
698  /** {@inheritDoc} */
699  @Override
700  public ReplicationDomainDB getReplicationDomainDB()
701  {
702    return this;
703  }
704
705  /** {@inheritDoc} */
706  @Override
707  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options)
708      throws ChangelogException
709  {
710    final Set<DN> excludedDomainDns = Collections.emptySet();
711    return getCursorFrom(startState, options, excludedDomainDns);
712  }
713
714  /** {@inheritDoc} */
715  @Override
716  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
717      CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException
718  {
719    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options);
720    registeredMultiDomainCursors.add(cursor);
721    for (DN baseDN : domainToReplicaDBs.keySet())
722    {
723      if (!excludedDomainDns.contains(baseDN))
724      {
725        cursor.addDomain(baseDN, startState.getServerState(baseDN));
726      }
727    }
728    return cursor;
729  }
730
731  /** {@inheritDoc} */
732  @Override
733  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options)
734      throws ChangelogException
735  {
736    final DomainDBCursor cursor = newDomainDBCursor(baseDN, options);
737    for (int serverId : getDomainMap(baseDN).keySet())
738    {
739      // get the last already sent CSN from that server to get a cursor
740      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
741      cursor.addReplicaDB(serverId, lastCSN);
742    }
743    return cursor;
744  }
745
746  private DomainDBCursor newDomainDBCursor(final DN baseDN, CursorOptions options)
747  {
748    final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options);
749    putCursor(registeredDomainCursors, baseDN, cursor);
750    return cursor;
751  }
752
753  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
754  {
755    final MultiDomainServerState offlineReplicas =
756        replicationEnv.getChangelogState().getOfflineReplicas();
757    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
758    if (offlineCSN != null
759        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
760    {
761      return offlineCSN;
762    }
763    return null;
764  }
765
766  @Override
767  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
768      CursorOptions options) throws ChangelogException
769  {
770    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
771    if (replicaDB != null)
772    {
773      final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
774      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
775          actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
776      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
777      final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
778      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
779
780      putCursor(replicaCursors, replicaId, replicaCursor);
781
782      return replicaCursor;
783    }
784    return EMPTY_CURSOR_REPLICA_DB;
785  }
786
787  private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
788  {
789    CopyOnWriteArrayList<V> cursors = map.get(key);
790    if (cursors == null)
791    {
792      cursors = new CopyOnWriteArrayList<>();
793      CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
794      if (previousValue != null)
795      {
796        cursors = previousValue;
797      }
798    }
799    cursors.add(cursor);
800  }
801
802  /** {@inheritDoc} */
803  @Override
804  public void unregisterCursor(final DBCursor<?> cursor)
805  {
806    if (cursor instanceof MultiDomainDBCursor)
807    {
808      registeredMultiDomainCursors.remove(cursor);
809    }
810    else if (cursor instanceof DomainDBCursor)
811    {
812      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
813      final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
814      if (cursors != null)
815      {
816        cursors.remove(cursor);
817      }
818    }
819    else if (cursor instanceof ReplicaCursor)
820    {
821      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
822      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId());
823      if (cursors != null)
824      {
825        cursors.remove(cursor);
826      }
827    }
828  }
829
830  /** {@inheritDoc} */
831  @Override
832  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
833  {
834    final CSN csn = updateMsg.getCSN();
835    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
836        csn.getServerId(), replicationServer);
837    final JEReplicaDB replicaDB = pair.getFirst();
838    replicaDB.add(updateMsg);
839
840    ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
841
842    final ChangeNumberIndexer indexer = cnIndexer.get();
843    if (indexer != null)
844    {
845      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
846      indexer.publishUpdateMsg(baseDN, updateMsg);
847    }
848    return pair.getSecond(); // replica DB was created
849  }
850
851  /** {@inheritDoc} */
852  @Override
853  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
854  {
855    final ChangeNumberIndexer indexer = cnIndexer.get();
856    if (indexer != null)
857    {
858      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
859      indexer.publishHeartbeat(baseDN, heartbeatCSN);
860    }
861  }
862
863  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
864      throws ChangelogException
865  {
866    if (indexer.isReplicaOffline(baseDN, serverId))
867    {
868      replicationEnv.notifyReplicaOnline(baseDN, serverId);
869    }
870    updateCursorsWithOfflineCSN(baseDN, serverId, null);
871  }
872
873  /** {@inheritDoc} */
874  @Override
875  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
876  {
877    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
878    final ChangeNumberIndexer indexer = cnIndexer.get();
879    if (indexer != null)
880    {
881      indexer.replicaOffline(baseDN, offlineCSN);
882    }
883    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
884  }
885
886  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
887  {
888    final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId));
889    if (cursors != null)
890    {
891      for (ReplicaCursor cursor : cursors)
892      {
893        cursor.setOfflineCSN(offlineCSN);
894      }
895    }
896  }
897
898  /**
899   * The thread purging the changelogDB on a regular interval. Records are
900   * purged from the changelogDB if they are older than a delay specified in
901   * seconds. The purge process works in two steps:
902   * <ol>
903   * <li>first purge the changeNumberIndexDB and retrieve information to drive
904   * replicaDBs purging</li>
905   * <li>proceed to purge each replicaDBs based on the information collected
906   * when purging the changeNumberIndexDB</li>
907   * </ol>
908   */
909  private final class ChangelogDBPurger extends DirectoryThread
910  {
911    private static final int DEFAULT_SLEEP = 500;
912
913    protected ChangelogDBPurger()
914    {
915      super("changelog DB purger");
916    }
917
918    /** {@inheritDoc} */
919    @Override
920    public void run()
921    {
922      // initialize CNIndexDB
923      getChangeNumberIndexDB();
924      while (!isShutdownInitiated())
925      {
926        try
927        {
928          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
929          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
930          final CSN oldestNotPurgedCSN;
931
932          // next code assumes that the compute-change-number config
933          // never changes during the life time of an RS
934          if (!config.isComputeChangeNumber())
935          {
936            oldestNotPurgedCSN = purgeCSN;
937          }
938          else
939          {
940            final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
941            if (localCNIndexDB == null)
942            { // shutdown has been initiated
943              return;
944            }
945
946            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
947            if (oldestNotPurgedCSN == null)
948            { // shutdown may have been initiated...
949              // ... or the change number index DB is empty,
950              // wait for new changes to come in.
951
952              // Note we cannot sleep for as long as the purge delay
953              // (3 days default), because we might receive late updates
954              // that will have to be purged before the purge delay elapses.
955              // This can particularly happen in case of network partitions.
956              if (!isShutdownInitiated())
957              {
958                synchronized (this)
959                {
960                  if (!isShutdownInitiated())
961                  {
962                    wait(DEFAULT_SLEEP);
963                  }
964                }
965              }
966              continue;
967            }
968          }
969
970          for (final Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
971          {
972            for (final JEReplicaDB replicaDB : domainMap.values())
973            {
974              replicaDB.purgeUpTo(oldestNotPurgedCSN);
975            }
976          }
977
978          if (!isShutdownInitiated())
979          {
980            synchronized (this)
981            {
982              if (!isShutdownInitiated())
983              {
984                wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
985              }
986            }
987          }
988        }
989        catch (InterruptedException e)
990        {
991          // shutdown initiated?
992        }
993        catch (Exception e)
994        {
995          logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e));
996          if (replicationServer != null)
997          {
998            replicationServer.shutdown();
999          }
1000        }
1001      }
1002    }
1003
1004    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
1005    {
1006      final long nextPurgeTime = notPurgedCSN.getTime();
1007      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
1008      if (currentPurgeTime <= nextPurgeTime)
1009      {
1010        // sleep till the next CSN to purge,
1011        return nextPurgeTime - currentPurgeTime;
1012      }
1013      // wait a bit before purging more
1014      return DEFAULT_SLEEP;
1015    }
1016
1017    /** {@inheritDoc} */
1018    @Override
1019    public void initiateShutdown()
1020    {
1021      super.initiateShutdown();
1022      synchronized (this)
1023      {
1024        notify(); // wake up the purger thread for faster shutdown
1025      }
1026    }
1027  }
1028}