001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2013-2015 ForgeRock AS
025 */
026package org.opends.server.replication.server.changelog.file;
027
028import java.util.Map.Entry;
029import java.util.Set;
030import java.util.concurrent.ConcurrentSkipListSet;
031
032import org.forgerock.i18n.slf4j.LocalizedLogger;
033import org.opends.server.api.DirectoryThread;
034import org.opends.server.backends.ChangelogBackend;
035import org.opends.server.replication.common.CSN;
036import org.opends.server.replication.common.MultiDomainServerState;
037import org.opends.server.replication.common.ServerState;
038import org.opends.server.replication.protocol.ReplicaOfflineMsg;
039import org.opends.server.replication.protocol.UpdateMsg;
040import org.opends.server.replication.server.ChangelogState;
041import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
042import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
043import org.opends.server.replication.server.changelog.api.ChangelogDB;
044import org.opends.server.replication.server.changelog.api.ChangelogException;
045import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
046import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
047import org.opends.server.types.DN;
048
049import static org.opends.messages.ReplicationMessages.*;
050import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
051import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
052import static org.opends.server.util.StaticUtils.*;
053
054/**
055 * Thread responsible for inserting replicated changes into the ChangeNumber
056 * Index DB (CNIndexDB for short).
057 * <p>
058 * Only changes older than the medium consistency point are inserted in the
059 * CNIndexDB. As a consequence this class is also responsible for maintaining
060 * the medium consistency point (indirectly through an
061 * {@link ECLMultiDomainDBCursor}).
062 */
063public class ChangeNumberIndexer extends DirectoryThread
064{
065  /** The tracer object for the debug logger. */
066  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
067
068  /**
069   * If it contains nothing, then the run method executes normally.
070   * Otherwise, the {@link #run()} method must clear its state
071   * for the supplied domain baseDNs. If a supplied domain is
072   * {@link DN#NULL_DN}, then all domains will be cleared.
073   */
074  private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>();
075  private final ChangelogDB changelogDB;
076  /** Only used for initialization, and then discarded. */
077  private ChangelogState changelogState;
078  private final ECLEnabledDomainPredicate predicate;
079
080  /*
081   * The following MultiDomainServerState fields must be thread safe, because
082   * 1) initialization can happen while the replication server starts receiving
083   * updates
084   * 2) many updates can happen concurrently.
085   */
086  /**
087   * Holds the last time each replica was seen alive, whether via updates or
088   * heartbeat notifications, or offline notifications. Data is held for each
089   * serverId cross domain.
090   * <p>
091   * Updates are persistent and stored in the replicaDBs, heartbeats are
092   * transient and are easily constructed on normal operations.
093   * <p>
094   * Note: This object is updated by both heartbeats and changes/updates.
095   */
096  private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
097
098  /** Note: This object is updated by replica offline messages. */
099  private final MultiDomainServerState replicasOffline = new MultiDomainServerState();
100
101  /**
102   * Cursor across all the replicaDBs for all the replication domains. It is
103   * positioned on the next change that needs to be inserted in the CNIndexDB.
104   * <p>
105   * Note: it is only accessed from the {@link #run()} method.
106   *
107   * @NonNull
108   */
109  private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
110  private MultiDomainServerState cookie = new MultiDomainServerState();
111
112  /**
113   * Builds a ChangeNumberIndexer object.
114   *
115   * @param changelogDB
116   *          the changelogDB
117   * @param changelogState
118   *          the changelog state used for initialization
119   */
120  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
121  {
122    this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
123  }
124
125  /**
126   * Builds a ChangeNumberIndexer object.
127   *
128   * @param changelogDB
129   *          the changelogDB
130   * @param changelogState
131   *          the changelog state used for initialization
132   * @param predicate
133   *          tells whether a domain is enabled for the external changelog
134   */
135  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState,
136      ECLEnabledDomainPredicate predicate)
137  {
138    super("Change number indexer");
139    this.changelogDB = changelogDB;
140    this.changelogState = changelogState;
141    this.predicate = predicate;
142  }
143
144  /**
145   * Ensures the medium consistency point is updated by heartbeats.
146   *
147   * @param baseDN
148   *          the baseDN of the domain for which the heartbeat is published
149   * @param heartbeatCSN
150   *          the CSN coming from the heartbeat
151   */
152  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
153  {
154    if (!predicate.isECLEnabledDomain(baseDN))
155    {
156      return;
157    }
158
159    final CSN oldestCSNBefore = getOldestLastAliveCSN();
160    lastAliveCSNs.update(baseDN, heartbeatCSN);
161    tryNotify(oldestCSNBefore);
162  }
163
164  /**
165   * Indicates if the replica corresponding to provided domain DN and server id
166   * is offline.
167   *
168   * @param domainDN
169   *          base DN of the replica
170   * @param serverId
171   *          server id of the replica
172   * @return {@code true} if replica is offline, {@code false} otherwise
173   */
174  public boolean isReplicaOffline(DN domainDN, int serverId)
175  {
176    return replicasOffline.getCSN(domainDN, serverId) != null;
177  }
178
179  /**
180   * Ensures the medium consistency point is updated by UpdateMsg.
181   *
182   * @param baseDN
183   *          the baseDN of the domain for which the heartbeat is published
184   * @param updateMsg
185   *          the updateMsg that will update the medium consistency point
186   * @throws ChangelogException
187   *           If a database problem happened
188   */
189  public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
190      throws ChangelogException
191  {
192    if (!predicate.isECLEnabledDomain(baseDN))
193    {
194      return;
195    }
196
197    final CSN oldestCSNBefore = getOldestLastAliveCSN();
198    lastAliveCSNs.update(baseDN, updateMsg.getCSN());
199    tryNotify(oldestCSNBefore);
200  }
201
202  /**
203   * Signals a replica went offline.
204   *
205   * @param baseDN
206   *          the replica's replication domain
207   * @param offlineCSN
208   *          the serverId and time of the replica that went offline
209   */
210  public void replicaOffline(DN baseDN, CSN offlineCSN)
211  {
212    if (!predicate.isECLEnabledDomain(baseDN))
213    {
214      return;
215    }
216
217    replicasOffline.update(baseDN, offlineCSN);
218    final CSN oldestCSNBefore = getOldestLastAliveCSN();
219    lastAliveCSNs.update(baseDN, offlineCSN);
220    tryNotify(oldestCSNBefore);
221  }
222
223  private CSN getOldestLastAliveCSN()
224  {
225    return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
226  }
227
228  /**
229   * Notifies the Change number indexer thread if it will be able to do some
230   * work.
231   */
232  private void tryNotify(final CSN oldestCSNBefore)
233  {
234    if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
235    {
236      synchronized (this)
237      {
238        notify();
239      }
240    }
241  }
242
243  /**
244   * Used for waking up the {@link ChangeNumberIndexer} thread because it might
245   * have some work to do.
246   */
247  private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
248  {
249    final CSN oldestCSNAfter = getOldestLastAliveCSN();
250    // ensure that all initial replicas alive information have been updated
251    // with CSNs that are acceptable for moving the medium consistency forward
252    return allInitialReplicasAreOfflineOrAlive()
253        && oldestCSNBefore != null // then oldestCSNAfter cannot be null
254        // has the oldest CSN changed?
255        && oldestCSNBefore.isOlderThan(oldestCSNAfter);
256  }
257
258  /**
259   * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
260   * must be persisted to the change number index DB.
261   */
262  private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
263  {
264    // ensure that all initial replicas alive information have been updated
265    // with CSNs that are acceptable for moving the medium consistency forward
266    return allInitialReplicasAreOfflineOrAlive()
267        // can we persist the next CSN?
268        && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
269  }
270
271  /**
272   * Returns true only if the initial replicas known from the changelog state DB
273   * are either:
274   * <ul>
275   * <li>offline, so do not wait for them in order to compute medium consistency
276   * </li>
277   * <li>alive, because we received heartbeats or changes (so their last alive
278   * CSN has been updated to something past the oldest possible CSN), we have
279   * enough info to compute medium consistency</li>
280   * </ul>
281   * In both cases, we have enough information to compute medium consistency
282   * without waiting any further.
283   */
284  private boolean allInitialReplicasAreOfflineOrAlive()
285  {
286    for (DN baseDN : lastAliveCSNs)
287    {
288      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
289      {
290        if (csn.getTime() == 0
291            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
292        {
293          // this is the oldest possible CSN, but the replica is not offline
294          // we must wait for more up to date information from this replica
295          return false;
296        }
297      }
298    }
299    return true;
300  }
301
302  /**
303   * Restores in memory data needed to build the CNIndexDB. In particular,
304   * initializes the changes cursor to the medium consistency point.
305   */
306  private void initialize() throws ChangelogException
307  {
308    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
309
310    initializeLastAliveCSNs(domainDB);
311    initializeNextChangeCursor(domainDB);
312    initializeOfflineReplicas();
313
314    // this will not be used any more. Discard for garbage collection.
315    this.changelogState = null;
316  }
317
318  private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
319  {
320    // Initialize the multi domain cursor only from the change number index record.
321    // The cookie is always empty at this stage.
322    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
323    final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null;
324    final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn);
325    final MultiDomainServerState unused = new MultiDomainServerState();
326    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options);
327
328    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
329    ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord);
330  }
331
332  private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
333  {
334    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
335    {
336      final DN baseDN = entry.getKey();
337      if (predicate.isECLEnabledDomain(baseDN))
338      {
339        for (Integer serverId : entry.getValue())
340        {
341          /*
342           * initialize with the oldest possible CSN in order for medium
343           * consistency to wait for all replicas to be alive before moving forward
344           */
345          lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
346        }
347
348        final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
349        lastAliveCSNs.update(baseDN, latestKnownState);
350      }
351    }
352  }
353
354  private void initializeOfflineReplicas()
355  {
356    final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
357    for (DN baseDN : offlineReplicas)
358    {
359      for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
360      {
361        if (predicate.isECLEnabledDomain(baseDN))
362        {
363          replicasOffline.update(baseDN, offlineCSN);
364          // a replica offline message could also be the very last time
365          // we heard from this replica :)
366          lastAliveCSNs.update(baseDN, offlineCSN);
367        }
368      }
369    }
370  }
371
372  private CSN oldestPossibleCSN(int serverId)
373  {
374    return new CSN(0, 0, serverId);
375  }
376
377  /** {@inheritDoc} */
378  @Override
379  public void initiateShutdown()
380  {
381    super.initiateShutdown();
382    synchronized (this)
383    {
384      notify();
385    }
386  }
387
388  /** {@inheritDoc} */
389  @Override
390  public void run()
391  {
392    try
393    {
394      /*
395       * initialize here to allow fast application start up and avoid errors due
396       * cursors being created in a different thread to the one where they are used.
397       */
398      initialize();
399
400      while (!isShutdownInitiated())
401      {
402        try
403        {
404          while (!domainsToClear.isEmpty())
405          {
406            final DN baseDNToClear = domainsToClear.first();
407            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
408            // Only release the waiting thread
409            // once this domain's state has been cleared.
410            domainsToClear.remove(baseDNToClear);
411          }
412
413          // Do not call DBCursor.next() here
414          // because we might not have consumed the last record,
415          // for example if we could not move the MCP forward
416          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
417          if (msg == null)
418          {
419            synchronized (this)
420            {
421              if (isShutdownInitiated())
422              {
423                continue;
424              }
425              wait();
426            }
427            // check whether new changes have been added to the ReplicaDBs
428            moveToNextChange();
429            continue;
430          }
431          else if (msg instanceof ReplicaOfflineMsg)
432          {
433            moveToNextChange();
434            continue;
435          }
436
437          final CSN csn = msg.getCSN();
438          final DN baseDN = nextChangeForInsertDBCursor.getData();
439          // FIXME problem: what if the serverId is not part of the ServerState?
440          // right now, change number will be blocked
441          if (!canMoveForwardMediumConsistencyPoint(csn))
442          {
443            // the oldest record to insert is newer than the medium consistency
444            // point. Let's wait for a change that can be published.
445            synchronized (this)
446            {
447              // double check to protect against a missed call to notify()
448              if (!canMoveForwardMediumConsistencyPoint(csn))
449              {
450                if (isShutdownInitiated())
451                {
452                  return;
453                }
454                wait();
455                // loop to check if changes older than the medium consistency
456                // point have been added to the ReplicaDBs
457                continue;
458              }
459            }
460          }
461
462          // OK, the oldest change is older than the medium consistency point
463          // let's publish it to the CNIndexDB.
464          final long changeNumber = changelogDB.getChangeNumberIndexDB()
465              .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
466          if (!cookie.update(baseDN, csn))
467          {
468            throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn
469                + ") would have updated the cookie=" + cookie + ", but it did not");
470          }
471          notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
472          moveForwardMediumConsistencyPoint(csn, baseDN);
473        }
474        catch (InterruptedException ignored)
475        {
476          // was shutdown called? loop to figure it out.
477          Thread.currentThread().interrupt();
478        }
479      }
480    }
481    catch (RuntimeException e)
482    {
483      logUnexpectedException(e);
484      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
485      throw e;
486    }
487    catch (Exception e)
488    {
489      logUnexpectedException(e);
490      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
491      throw new RuntimeException(e);
492    }
493    finally
494    {
495      nextChangeForInsertDBCursor.close();
496      nextChangeForInsertDBCursor = null;
497    }
498  }
499
500  private void moveToNextChange() throws ChangelogException
501  {
502    try
503    {
504      nextChangeForInsertDBCursor.next();
505    }
506    catch (AbortedChangelogCursorException e) {
507      if (domainsToClear.isEmpty())
508      {
509        // There is no domain to clear, thus it is
510        // not expected that a cursor is aborted
511        throw e;
512      }
513      // else assumes the aborted cursor is part of a domain
514      // that will be removed on the next iteration
515      logger.trace("Cursor was aborted: %s, but continuing because domainsToClear has size %s",
516          e, domainsToClear.size());
517    }
518  }
519
520  /**
521   * Notifies the {@link ChangelogBackend} that a new entry has been added.
522   *
523   * @param baseDN
524   *          the baseDN of the newly added entry.
525   * @param changeNumber
526   *          the change number of the newly added entry. It will be greater
527   *          than zero for entries added to the change number index and less
528   *          than or equal to zero for entries added to any replica DB
529   * @param cookie
530   *          the cookie of the newly added entry. This is only meaningful for
531   *          entries added to the change number index
532   * @param msg
533   *          the update message of the newly added entry
534   * @throws ChangelogException
535   *           If a problem occurs while notifying of the newly added entry.
536   */
537  protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
538      MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException
539  {
540    ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg);
541  }
542
543  /**
544   * Nothing can be done about it.
545   * <p>
546   * Rely on the DirectoryThread uncaught exceptions handler for logging error +
547   * alert.
548   * <p>
549   * Message logged here gives corrective information to the administrator.
550   */
551  private void logUnexpectedException(Exception e)
552  {
553    logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
554        getClass().getSimpleName(), stackTraceToSingleLineString(e));
555  }
556
557  private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException
558  {
559    final int mcServerId = mcCSN.getServerId();
560    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
561    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
562    if (offlineCSN != null)
563    {
564      if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN))
565      {
566        // replica is back online, we can forget the last time it was offline
567        replicasOffline.removeCSN(mcBaseDN, offlineCSN);
568      }
569      else if (offlineCSN.isOlderThan(mcCSN))
570      {
571        /*
572         * replica is not back online, Medium consistency point has gone past
573         * its last offline time, and there are no more changes after the
574         * offline CSN in the cursor: remove everything known about it
575         * (offlineCSN from lastAliveCSN and remove all knowledge of this replica
576         * from the medium consistency RUV).
577         */
578        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
579      }
580    }
581
582    // advance the cursor we just read from,
583    // success/failure will be checked later
584    nextChangeForInsertDBCursor.next();
585  }
586
587  /**
588   * Asks the current thread to clear its state for the specified domain.
589   * <p>
590   * Note: This method blocks the current thread until state is cleared.
591   *
592   * @param baseDN the baseDN to be cleared from this thread's state.
593   *               {@code null} and {@link DN#NULL_DN} mean "clear all domains".
594   */
595  public void clear(DN baseDN)
596  {
597    // Use DN.NULL_DN to say "clear all domains"
598    final DN baseDNToClear = baseDN != null ? baseDN : DN.NULL_DN;
599    domainsToClear.add(baseDNToClear);
600    while (domainsToClear.contains(baseDNToClear)
601        && !State.TERMINATED.equals(getState()))
602    {
603      // wait until clear() has been done by thread, always waking it up
604      synchronized (this)
605      {
606        notify();
607      }
608      // ensures thread wait that this thread's state is cleaned up
609      Thread.yield();
610    }
611  }
612
613}