001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2014-2015 ForgeRock AS
025 */
026package org.opends.server.replication.server.changelog.file;
027
028import static org.opends.messages.ReplicationMessages.*;
029import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
030import static org.opends.server.util.StaticUtils.*;
031
032import java.io.File;
033import java.util.Collections;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.ConcurrentSkipListMap;
041import java.util.concurrent.CopyOnWriteArrayList;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.atomic.AtomicReference;
044
045import org.forgerock.i18n.LocalizableMessageBuilder;
046import org.forgerock.i18n.slf4j.LocalizedLogger;
047import org.forgerock.opendj.config.server.ConfigException;
048import org.forgerock.util.Pair;
049import org.forgerock.util.time.TimeService;
050import org.opends.server.admin.std.server.ReplicationServerCfg;
051import org.opends.server.api.DirectoryThread;
052import org.opends.server.backends.ChangelogBackend;
053import org.opends.server.replication.common.CSN;
054import org.opends.server.replication.common.MultiDomainServerState;
055import org.opends.server.replication.common.ServerState;
056import org.opends.server.replication.protocol.UpdateMsg;
057import org.opends.server.replication.server.ChangelogState;
058import org.opends.server.replication.server.ReplicationServer;
059import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
060import org.opends.server.replication.server.changelog.api.ChangelogDB;
061import org.opends.server.replication.server.changelog.api.ChangelogException;
062import org.opends.server.replication.server.changelog.api.DBCursor;
063import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
064import org.opends.server.replication.server.changelog.api.ReplicaId;
065import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
066import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
067import org.opends.server.types.DN;
068import org.opends.server.util.StaticUtils;
069import org.opends.server.util.TimeThread;
070
071/**
072 * Log file implementation of the ChangelogDB interface.
073 */
074public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
075{
076
077  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
078
079  /**
080   * This map contains the List of updates received from each LDAP server.
081   * <p>
082   * When removing a domainMap, code:
083   * <ol>
084   * <li>first get the domainMap</li>
085   * <li>synchronized on the domainMap</li>
086   * <li>remove the domainMap</li>
087   * <li>then check it's not null</li>
088   * <li>then close all inside</li>
089   * </ol>
090   * When creating a replicaDB, synchronize on the domainMap to avoid
091   * concurrent shutdown.
092   */
093  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
094      new ConcurrentHashMap<>();
095  private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
096      new ConcurrentSkipListMap<>();
097  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<>();
098  private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
099      new ConcurrentSkipListMap<>();
100  private ReplicationEnvironment replicationEnv;
101  private final ReplicationServerCfg config;
102  private final File dbDirectory;
103
104  /**
105   * The handler of the changelog database, the database stores the relation
106   * between a change number and the associated cookie.
107   * <p>
108   * @GuardedBy("cnIndexDBLock")
109   */
110  private FileChangeNumberIndexDB cnIndexDB;
111  private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<>();
112
113  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
114  private final Object cnIndexDBLock = new Object();
115
116  /**
117   * The purge delay (in milliseconds). Records in the changelog DB that are
118   * older than this delay might be removed.
119   */
120  private volatile long purgeDelayInMillis;
121  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<>();
122
123  /** The local replication server. */
124  private final ReplicationServer replicationServer;
125  private final AtomicBoolean shutdown = new AtomicBoolean();
126
127  private static final RepositionableCursor<CSN, UpdateMsg> EMPTY_CURSOR = Log.getEmptyCursor();
128  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
129      new FileReplicaDBCursor(EMPTY_CURSOR, null, AFTER_MATCHING_KEY);
130
131  /**
132   * Creates a new changelog DB.
133   *
134   * @param replicationServer
135   *          the local replication server.
136   * @param config
137   *          the replication server configuration
138   * @throws ConfigException
139   *           if a problem occurs opening the supplied directory
140   */
141  public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
142      throws ConfigException
143  {
144    this.config = config;
145    this.replicationServer = replicationServer;
146    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
147  }
148
149  private File makeDir(final String dbDirName) throws ConfigException
150  {
151    // Check that this path exists or create it.
152    final File dbDirectory = getFileForPath(dbDirName);
153    try
154    {
155      if (!dbDirectory.exists())
156      {
157        dbDirectory.mkdir();
158      }
159      return dbDirectory;
160    }
161    catch (Exception e)
162    {
163      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
164          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
165      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
166    }
167  }
168
169  private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN)
170  {
171    final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
172    if (domainMap != null)
173    {
174      return domainMap;
175    }
176    return Collections.emptyMap();
177  }
178
179  private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId)
180  {
181    return getDomainMap(baseDN).get(serverId);
182  }
183
184  /**
185   * Returns a {@link FileReplicaDB}, possibly creating it.
186   *
187   * @param baseDN
188   *          the baseDN for which to create a ReplicaDB
189   * @param serverId
190   *          the serverId for which to create a ReplicaDB
191   * @param server
192   *          the ReplicationServer
193   * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
194   * @throws ChangelogException
195   *           if a problem occurred with the database
196   */
197  Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
198      final ReplicationServer server) throws ChangelogException
199  {
200    while (!shutdown.get())
201    {
202      final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
203      final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
204      if (result != null)
205      {
206        final Boolean dbWasCreated = result.getSecond();
207        if (dbWasCreated)
208        { // new replicaDB => update all cursors with it
209          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
210          if (cursors != null && !cursors.isEmpty())
211          {
212            for (DomainDBCursor cursor : cursors)
213            {
214              cursor.addReplicaDB(serverId, null);
215            }
216          }
217        }
218
219        return result;
220      }
221    }
222    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
223  }
224
225  private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
226  {
227    // happy path: the domainMap already exists
228    final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
229    if (currentValue != null)
230    {
231      return currentValue;
232    }
233
234    // unlucky, the domainMap does not exist: take the hit and create the
235    // newValue, even though the same could be done concurrently by another thread
236    final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<>();
237    final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
238    if (previousValue != null)
239    {
240      // there was already a value associated to the key, let's use it
241      return previousValue;
242    }
243
244    // we just created a new domain => update all cursors
245    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
246    {
247      cursor.addDomain(baseDN, null);
248    }
249    return newValue;
250  }
251
252  private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
253      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
254  {
255    // happy path: the replicaDB already exists
256    FileReplicaDB currentValue = domainMap.get(serverId);
257    if (currentValue != null)
258    {
259      return Pair.of(currentValue, false);
260    }
261
262    // unlucky, the replicaDB does not exist: take the hit and synchronize
263    // on the domainMap to create a new ReplicaDB
264    synchronized (domainMap)
265    {
266      // double-check
267      currentValue = domainMap.get(serverId);
268      if (currentValue != null)
269      {
270        return Pair.of(currentValue, false);
271      }
272
273      if (domainToReplicaDBs.get(baseDN) != domainMap)
274      {
275        // The domainMap could have been concurrently removed because
276        // 1) a shutdown was initiated or 2) an initialize was called.
277        // Return will allow the code to:
278        // 1) shutdown properly or 2) lazily recreate the replicaDB
279        return null;
280      }
281
282      final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, replicationEnv);
283      domainMap.put(serverId, newDB);
284      return Pair.of(newDB, true);
285    }
286  }
287
288  /** {@inheritDoc} */
289  @Override
290  public void initializeDB()
291  {
292    try
293    {
294      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
295      replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer, TimeService.SYSTEM);
296      final ChangelogState changelogState = replicationEnv.getChangelogState();
297      initializeToChangelogState(changelogState);
298      if (config.isComputeChangeNumber())
299      {
300        startIndexer(changelogState);
301      }
302      setPurgeDelay(replicationServer.getPurgeDelay());
303    }
304    catch (ChangelogException e)
305    {
306      logger.traceException(e);
307      logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage());
308    }
309  }
310
311  private void initializeToChangelogState(final ChangelogState changelogState)
312      throws ChangelogException
313  {
314    for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
315    {
316      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
317    }
318    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
319    {
320      for (int serverId : entry.getValue())
321      {
322        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
323      }
324    }
325  }
326
327  private void shutdownChangeNumberIndexDB() throws ChangelogException
328  {
329    synchronized (cnIndexDBLock)
330    {
331      if (cnIndexDB != null)
332      {
333        cnIndexDB.shutdown();
334      }
335    }
336  }
337
338  /** {@inheritDoc} */
339  @Override
340  public void shutdownDB() throws ChangelogException
341  {
342    if (!this.shutdown.compareAndSet(false, true))
343    { // shutdown has already been initiated
344      return;
345    }
346
347    // Remember the first exception because :
348    // - we want to try to remove everything we want to remove
349    // - then throw the first encountered exception
350    ChangelogException firstException = null;
351
352    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
353    if (indexer != null)
354    {
355      indexer.initiateShutdown();
356    }
357    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
358    if (purger != null)
359    {
360      purger.initiateShutdown();
361    }
362
363    // wait for shutdown of the threads holding cursors
364    try
365    {
366      if (indexer != null)
367      {
368        indexer.join();
369      }
370      if (purger != null)
371      {
372        purger.join();
373      }
374    }
375    catch (InterruptedException e)
376    {
377      // do nothing: we are already shutting down
378    }
379
380    // now we can safely shutdown all DBs
381    try
382    {
383      shutdownChangeNumberIndexDB();
384    }
385    catch (ChangelogException e)
386    {
387      firstException = e;
388    }
389
390    for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
391        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
392    {
393      final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next();
394      synchronized (domainMap)
395      {
396        it.remove();
397        for (FileReplicaDB replicaDB : domainMap.values())
398        {
399          replicaDB.shutdown();
400        }
401      }
402    }
403    if (replicationEnv != null)
404    {
405      replicationEnv.shutdown();
406    }
407
408    if (firstException != null)
409    {
410      throw firstException;
411    }
412  }
413
414  /**
415   * Clears all records from the changelog (does not remove the changelog itself).
416   *
417   * @throws ChangelogException
418   *           If an error occurs when clearing the changelog.
419   */
420  public void clearDB() throws ChangelogException
421  {
422    if (!dbDirectory.exists())
423    {
424      return;
425    }
426
427    // Remember the first exception because :
428    // - we want to try to remove everything we want to remove
429    // - then throw the first encountered exception
430    ChangelogException firstException = null;
431
432    for (DN baseDN : this.domainToReplicaDBs.keySet())
433    {
434      removeDomain(baseDN);
435    }
436
437    synchronized (cnIndexDBLock)
438    {
439      if (cnIndexDB != null)
440      {
441        try
442        {
443          cnIndexDB.clear();
444        }
445        catch (ChangelogException e)
446        {
447          firstException = e;
448        }
449
450        try
451        {
452          shutdownChangeNumberIndexDB();
453        }
454        catch (ChangelogException e)
455        {
456          if (firstException == null)
457          {
458            firstException = e;
459          }
460          else
461          {
462            logger.traceException(e);
463          }
464        }
465
466        cnIndexDB = null;
467      }
468    }
469
470    if (firstException != null)
471    {
472      throw firstException;
473    }
474  }
475
476  /** {@inheritDoc} */
477  @Override
478  public void removeDB() throws ChangelogException
479  {
480    shutdownDB();
481    StaticUtils.recursiveDelete(dbDirectory);
482  }
483
484  /** {@inheritDoc} */
485  @Override
486  public ServerState getDomainOldestCSNs(DN baseDN)
487  {
488    final ServerState result = new ServerState();
489    for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
490    {
491      result.update(replicaDB.getOldestCSN());
492    }
493    return result;
494  }
495
496  /** {@inheritDoc} */
497  @Override
498  public ServerState getDomainNewestCSNs(DN baseDN)
499  {
500    final ServerState result = new ServerState();
501    for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
502    {
503      result.update(replicaDB.getNewestCSN());
504    }
505    return result;
506  }
507
508  /** {@inheritDoc} */
509  @Override
510  public void removeDomain(DN baseDN) throws ChangelogException
511  {
512    // Remember the first exception because :
513    // - we want to try to remove everything we want to remove
514    // - then throw the first encountered exception
515    ChangelogException firstException = null;
516
517    // 1- clear the replica DBs
518    Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
519    if (domainMap != null)
520    {
521      final ChangeNumberIndexer indexer = this.cnIndexer.get();
522      if (indexer != null)
523      {
524        indexer.clear(baseDN);
525      }
526      synchronized (domainMap)
527      {
528        domainMap = domainToReplicaDBs.remove(baseDN);
529        for (FileReplicaDB replicaDB : domainMap.values())
530        {
531          try
532          {
533            replicaDB.clear();
534          }
535          catch (ChangelogException e)
536          {
537            firstException = e;
538          }
539          replicaDB.shutdown();
540        }
541      }
542    }
543
544
545    // 2- clear the changelogstate DB
546    try
547    {
548      replicationEnv.clearGenerationId(baseDN);
549    }
550    catch (ChangelogException e)
551    {
552      if (firstException == null)
553      {
554        firstException = e;
555      }
556      else
557      {
558        logger.traceException(e);
559      }
560    }
561
562    if (firstException != null)
563    {
564      throw firstException;
565    }
566  }
567
568  /** {@inheritDoc} */
569  @Override
570  public void setPurgeDelay(final long purgeDelayInMillis)
571  {
572    this.purgeDelayInMillis = purgeDelayInMillis;
573
574    // Rotation time interval for CN Index DB log file
575    // needs to be a fraction of the purge delay
576    // to ensure there is at least one file to purge
577    replicationEnv.setCNIndexDBRotationInterval(purgeDelayInMillis / 2);
578
579    if (purgeDelayInMillis > 0)
580    {
581      final ChangelogDBPurger newPurger = new ChangelogDBPurger();
582      if (cnPurger.compareAndSet(null, newPurger))
583      { // no purger was running, run this new one
584        newPurger.start();
585      }
586      else
587      { // a purger was already running, just wake that one up
588        // to verify if some entries can be purged with the new purge delay
589        final ChangelogDBPurger currentPurger = cnPurger.get();
590        synchronized (currentPurger)
591        {
592          currentPurger.notify();
593        }
594      }
595    }
596    else
597    {
598      final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
599      if (purgerToStop != null)
600      { // stop this purger
601        purgerToStop.initiateShutdown();
602      }
603    }
604  }
605
606  /** {@inheritDoc} */
607  @Override
608  public void setComputeChangeNumber(final boolean computeChangeNumber)
609      throws ChangelogException
610  {
611    if (computeChangeNumber)
612    {
613      startIndexer(replicationEnv.getChangelogState());
614    }
615    else
616    {
617      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
618      if (indexer != null)
619      {
620        indexer.initiateShutdown();
621      }
622    }
623  }
624
625  private void startIndexer(final ChangelogState changelogState)
626  {
627    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
628    if (cnIndexer.compareAndSet(null, indexer))
629    {
630      indexer.start();
631    }
632  }
633
634  /** {@inheritDoc} */
635  @Override
636  public ChangeNumberIndexDB getChangeNumberIndexDB()
637  {
638    synchronized (cnIndexDBLock)
639    {
640      if (cnIndexDB == null)
641      {
642        try
643        {
644          cnIndexDB = new FileChangeNumberIndexDB(replicationEnv);
645        }
646        catch (Exception e)
647        {
648          logger.traceException(e);
649          logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage());
650        }
651      }
652      return cnIndexDB;
653    }
654  }
655
656  /** {@inheritDoc} */
657  @Override
658  public ReplicationDomainDB getReplicationDomainDB()
659  {
660    return this;
661  }
662
663  /** {@inheritDoc} */
664  @Override
665  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options)
666      throws ChangelogException
667  {
668    final Set<DN> excludedDomainDns = Collections.emptySet();
669    return getCursorFrom(startState, options, excludedDomainDns);
670  }
671
672  /** {@inheritDoc} */
673  @Override
674  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
675      CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException
676  {
677    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options);
678    registeredMultiDomainCursors.add(cursor);
679    for (DN baseDN : domainToReplicaDBs.keySet())
680    {
681      if (!excludedDomainDns.contains(baseDN))
682      {
683        cursor.addDomain(baseDN, startState.getServerState(baseDN));
684      }
685    }
686    return cursor;
687  }
688
689  /** {@inheritDoc} */
690  @Override
691  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options)
692      throws ChangelogException
693  {
694    final DomainDBCursor cursor = newDomainDBCursor(baseDN, options);
695    for (int serverId : getDomainMap(baseDN).keySet())
696    {
697      // get the last already sent CSN from that server to get a cursor
698      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
699      cursor.addReplicaDB(serverId, lastCSN);
700    }
701    return cursor;
702  }
703
704  private DomainDBCursor newDomainDBCursor(final DN baseDN, final CursorOptions options)
705  {
706    final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options);
707    putCursor(registeredDomainCursors, baseDN, cursor);
708    return cursor;
709  }
710
711  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
712  {
713    final MultiDomainServerState offlineReplicas =
714        replicationEnv.getChangelogState().getOfflineReplicas();
715    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
716    if (offlineCSN != null
717        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
718    {
719      return offlineCSN;
720    }
721    return null;
722  }
723
724  @Override
725  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
726      CursorOptions options) throws ChangelogException
727  {
728    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
729    if (replicaDB != null)
730    {
731      final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
732      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
733          actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
734      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
735      final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
736      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
737
738      putCursor(replicaCursors, replicaId, replicaCursor);
739
740      return replicaCursor;
741    }
742    return EMPTY_CURSOR_REPLICA_DB;
743  }
744
745  private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
746  {
747    CopyOnWriteArrayList<V> cursors = map.get(key);
748    if (cursors == null)
749    {
750      cursors = new CopyOnWriteArrayList<>();
751      CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
752      if (previousValue != null)
753      {
754        cursors = previousValue;
755      }
756    }
757    cursors.add(cursor);
758  }
759
760  /** {@inheritDoc} */
761  @Override
762  public void unregisterCursor(final DBCursor<?> cursor)
763  {
764    if (cursor instanceof MultiDomainDBCursor)
765    {
766      registeredMultiDomainCursors.remove(cursor);
767    }
768    else if (cursor instanceof DomainDBCursor)
769    {
770      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
771      final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
772      if (cursors != null)
773      {
774        cursors.remove(cursor);
775      }
776    }
777    else if (cursor instanceof ReplicaCursor)
778    {
779      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
780      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId());
781      if (cursors != null)
782      {
783        cursors.remove(cursor);
784      }
785    }
786  }
787
788  /** {@inheritDoc} */
789  @Override
790  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
791  {
792    final CSN csn = updateMsg.getCSN();
793    final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
794        csn.getServerId(), replicationServer);
795    final FileReplicaDB replicaDB = pair.getFirst();
796    replicaDB.add(updateMsg);
797
798    ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
799
800    final ChangeNumberIndexer indexer = cnIndexer.get();
801    if (indexer != null)
802    {
803      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
804      indexer.publishUpdateMsg(baseDN, updateMsg);
805    }
806    return pair.getSecond(); // replica DB was created
807  }
808
809  /** {@inheritDoc} */
810  @Override
811  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
812  {
813    final ChangeNumberIndexer indexer = cnIndexer.get();
814    if (indexer != null)
815    {
816      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
817      indexer.publishHeartbeat(baseDN, heartbeatCSN);
818    }
819  }
820
821  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
822      throws ChangelogException
823  {
824    if (indexer.isReplicaOffline(baseDN, serverId))
825    {
826      replicationEnv.notifyReplicaOnline(baseDN, serverId);
827    }
828    updateCursorsWithOfflineCSN(baseDN, serverId, null);
829  }
830
831  /** {@inheritDoc} */
832  @Override
833  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
834  {
835    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
836    final ChangeNumberIndexer indexer = cnIndexer.get();
837    if (indexer != null)
838    {
839      indexer.replicaOffline(baseDN, offlineCSN);
840    }
841    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
842  }
843
844  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
845  {
846    final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId));
847    if (cursors != null)
848    {
849      for (ReplicaCursor cursor : cursors)
850      {
851        cursor.setOfflineCSN(offlineCSN);
852      }
853    }
854  }
855
856  /**
857   * The thread purging the changelogDB on a regular interval. Records are
858   * purged from the changelogDB if they are older than a delay specified in
859   * seconds. The purge process works in two steps:
860   * <ol>
861   * <li>first purge the changeNumberIndexDB and retrieve information to drive
862   * replicaDBs purging</li>
863   * <li>proceed to purge each replicaDBs based on the information collected
864   * when purging the changeNumberIndexDB</li>
865   * </ol>
866   */
867  private final class ChangelogDBPurger extends DirectoryThread
868  {
869    private static final int DEFAULT_SLEEP = 500;
870
871    protected ChangelogDBPurger()
872    {
873      super("changelog DB purger");
874    }
875
876    /** {@inheritDoc} */
877    @Override
878    public void run()
879    {
880      // initialize CNIndexDB
881      getChangeNumberIndexDB();
882      while (!isShutdownInitiated())
883      {
884        try
885        {
886          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
887          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
888          final CSN oldestNotPurgedCSN;
889
890          // next code assumes that the compute-change-number config
891          // never changes during the life time of an RS
892          if (!config.isComputeChangeNumber())
893          {
894            oldestNotPurgedCSN = purgeCSN;
895          }
896          else
897          {
898            final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB;
899            if (localCNIndexDB == null)
900            { // shutdown has been initiated
901              return;
902            }
903
904            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
905            if (oldestNotPurgedCSN == null)
906            { // shutdown may have been initiated...
907              // ... or the change number index DB is empty,
908              // wait for new changes to come in.
909
910              // Note we cannot sleep for as long as the purge delay
911              // (3 days default), because we might receive late updates
912              // that will have to be purged before the purge delay elapses.
913              // This can particularly happen in case of network partitions.
914              if (!isShutdownInitiated())
915              {
916                synchronized (this)
917                {
918                  if (!isShutdownInitiated())
919                  {
920                    wait(DEFAULT_SLEEP);
921                  }
922                }
923              }
924              continue;
925            }
926          }
927
928          for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values())
929          {
930            for (final FileReplicaDB replicaDB : domainMap.values())
931            {
932              replicaDB.purgeUpTo(oldestNotPurgedCSN);
933            }
934          }
935
936          if (!isShutdownInitiated())
937          {
938            synchronized (this)
939            {
940              if (!isShutdownInitiated())
941              {
942                wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
943              }
944            }
945          }
946        }
947        catch (InterruptedException e)
948        {
949          // shutdown initiated?
950        }
951        catch (Exception e)
952        {
953          logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e));
954          if (replicationServer != null)
955          {
956            replicationServer.shutdown();
957          }
958        }
959      }
960    }
961
962    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
963    {
964      final long nextPurgeTime = notPurgedCSN.getTime();
965      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
966      if (currentPurgeTime <= nextPurgeTime)
967      {
968        // sleep till the next CSN to purge,
969        return nextPurgeTime - currentPurgeTime;
970      }
971      // wait a bit before purging more
972      return DEFAULT_SLEEP;
973    }
974
975    /** {@inheritDoc} */
976    @Override
977    public void initiateShutdown()
978    {
979      super.initiateShutdown();
980      synchronized (this)
981      {
982        notify(); // wake up the purger thread for faster shutdown
983      }
984    }
985  }
986}