001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Timer;
038import java.util.TimerTask;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.concurrent.locks.ReentrantLock;
043
044import org.forgerock.i18n.LocalizableMessage;
045import org.forgerock.i18n.LocalizableMessageBuilder;
046import org.forgerock.i18n.slf4j.LocalizedLogger;
047import org.forgerock.opendj.ldap.ResultCode;
048import org.opends.server.admin.std.server.MonitorProviderCfg;
049import org.opends.server.api.MonitorProvider;
050import org.opends.server.core.DirectoryServer;
051import org.opends.server.replication.common.CSN;
052import org.opends.server.replication.common.DSInfo;
053import org.opends.server.replication.common.RSInfo;
054import org.opends.server.replication.common.ServerState;
055import org.opends.server.replication.common.ServerStatus;
056import org.opends.server.replication.common.StatusMachineEvent;
057import org.opends.server.replication.protocol.AckMsg;
058import org.opends.server.replication.protocol.ChangeStatusMsg;
059import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
060import org.opends.server.replication.protocol.ErrorMsg;
061import org.opends.server.replication.protocol.MonitorMsg;
062import org.opends.server.replication.protocol.MonitorRequestMsg;
063import org.opends.server.replication.protocol.ReplicaOfflineMsg;
064import org.opends.server.replication.protocol.ResetGenerationIdMsg;
065import org.opends.server.replication.protocol.RoutableMsg;
066import org.opends.server.replication.protocol.TopologyMsg;
067import org.opends.server.replication.protocol.UpdateMsg;
068import org.opends.server.replication.server.changelog.api.ChangelogException;
069import org.opends.server.replication.server.changelog.api.DBCursor;
070import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
071import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
072import org.opends.server.types.Attribute;
073import org.opends.server.types.Attributes;
074import org.opends.server.types.DN;
075import org.opends.server.types.DirectoryException;
076import org.opends.server.types.HostPort;
077
078import static org.opends.messages.ReplicationMessages.*;
079import static org.opends.server.replication.common.ServerStatus.*;
080import static org.opends.server.replication.common.StatusMachineEvent.*;
081import static org.opends.server.replication.protocol.ProtocolVersion.*;
082import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
083import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
084import static org.opends.server.util.CollectionUtils.*;
085import static org.opends.server.util.StaticUtils.*;
086
087/**
088 * This class define an in-memory cache that will be used to store
089 * the messages that have been received from an LDAP server or
090 * from another replication server and that should be forwarded to
091 * other servers.
092 *
093 * The size of the cache is set by configuration.
094 * If the cache becomes bigger than the configured size, the older messages
095 * are removed and should they be needed again must be read from the backing
096 * file
097 *
098 * it runs a thread that is responsible for saving the messages
099 * received to the disk and for trimming them
100 * Decision to trim can be based on disk space or age of the message
101 */
102public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
103{
104  private final DN baseDN;
105
106  /**
107   * Periodically verifies whether the connected DSs are late and publishes any
108   * pending status messages.
109   */
110  private final StatusAnalyzer statusAnalyzer;
111
112  /**
113   * The monitoring publisher that periodically sends monitoring messages to the
114   * topology. Using an AtomicReference to avoid leaking references to costly
115   * threads.
116   */
117  private final AtomicReference<MonitoringPublisher> monitoringPublisher = new AtomicReference<>();
118  /** Maintains monitor data for the current domain. */
119  private final ReplicationDomainMonitor domainMonitor = new ReplicationDomainMonitor(this);
120
121  /**
122   * The following map contains one balanced tree for each replica ID to which
123   * we are currently publishing the first update in the balanced tree is the
124   * next change that we must push to this particular server.
125   */
126  private final Map<Integer, DataServerHandler> connectedDSs = new ConcurrentHashMap<>();
127
128  /**
129   * This map contains one ServerHandler for each replication servers with which
130   * we are connected (so normally all the replication servers) the first update
131   * in the balanced tree is the next change that we must push to this
132   * particular server.
133   */
134  private final Map<Integer, ReplicationServerHandler> connectedRSs = new ConcurrentHashMap<>();
135
136  private final ReplicationDomainDB domainDB;
137  /** The ReplicationServer that created the current instance. */
138  private final ReplicationServer localReplicationServer;
139
140  /**
141   * The generationId of the current replication domain. The generationId is
142   * computed by hashing the first 1000 entries in the DB.
143   */
144  private volatile long generationId = -1;
145  /**
146   * JNR, this is legacy code, hard to follow logic. I think what this field
147   * tries to say is: "is the generationId in use anywhere?", i.e. is there a
148   * replication topology in place? As soon as an answer to any of these
149   * question comes true, then it is set to true.
150   * <p>
151   * It looks like the only use of this field is to prevent the
152   * {@link #generationId} from being reset by
153   * {@link #resetGenerationIdIfPossible()}.
154   */
155  private volatile boolean generationIdSavedStatus;
156
157  /** The tracer object for the debug logger. */
158  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
159
160  /**
161   * The needed info for each received assured update message we are waiting
162   * acks for.
163   * <p>
164   * Key: a CSN matching a received update message which requested
165   * assured mode usage (either safe read or safe data mode)
166   * <p>
167   * Value: The object holding every info needed about the already received acks
168   * as well as the acks to be received.
169   *
170   * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub
171   *      classes javadoc.
172   */
173  private final Map<CSN, ExpectedAcksInfo> waitingAcks = new ConcurrentHashMap<>();
174
175  /**
176   * The timer used to run the timeout code (timer tasks) for the assured update
177   * messages we are waiting acks for.
178   */
179  private final Timer assuredTimeoutTimer;
180  /**
181   * Counter used to purge the timer tasks references in assuredTimeoutTimer,
182   * every n number of treated assured messages.
183   */
184  private int assuredTimeoutTimerPurgeCounter;
185
186
187
188  /**
189   * Stores pending status messages such as DS change time heartbeats for future
190   * forwarding to the rest of the topology. This class is required in order to
191   * decouple inbound IO processing from outbound IO processing and avoid
192   * potential inter-process deadlocks. In particular, the {@code ServerReader}
193   * thread must not send messages.
194   */
195  private static class PendingStatusMessages
196  {
197    private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats = new HashMap<>(1);
198    private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs = new HashMap<>(1);
199    private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs = new HashMap<>(1);
200    private boolean sendRSTopologyMsg;
201    private boolean sendDSTopologyMsg;
202    private int excludedDSForTopologyMsg = -1;
203
204    /**
205     * Enqueues a TopologyMsg for all the connected directory servers in order
206     * to let them know the topology (every known DSs and RSs).
207     *
208     * @param excludedDS
209     *          If not null, the topology message will not be sent to this DS.
210     */
211    private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
212    {
213      int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
214      if (sendDSTopologyMsg)
215      {
216        if (excludedServerId != excludedDSForTopologyMsg)
217        {
218          excludedDSForTopologyMsg = -1;
219        }
220      }
221      else
222      {
223        sendDSTopologyMsg = true;
224        excludedDSForTopologyMsg = excludedServerId;
225      }
226    }
227
228    /**
229     * Enqueues a TopologyMsg for all the connected replication servers in order
230     * to let them know our connected LDAP servers.
231     */
232    private void enqueueTopoInfoToAllRSs()
233    {
234      sendRSTopologyMsg = true;
235    }
236
237    /**
238     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
239     * all other RS instances.
240     *
241     * @param msg
242     *          The heartbeat message.
243     */
244    private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
245    {
246      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
247    }
248
249    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
250    {
251      pendingDSMonitorMsgs.put(dsServerId, msg);
252    }
253
254    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
255    {
256      pendingRSMonitorMsgs.put(rsServerId, msg);
257    }
258
259    /** {@inheritDoc} */
260    @Override
261    public String toString()
262    {
263      return getClass().getSimpleName()
264          + " pendingHeartbeats=" + pendingHeartbeats
265          + ", pendingDSMonitorMsgs=" + pendingDSMonitorMsgs
266          + ", pendingRSMonitorMsgs=" + pendingRSMonitorMsgs
267          + ", sendRSTopologyMsg=" + sendRSTopologyMsg
268          + ", sendDSTopologyMsg=" + sendDSTopologyMsg
269          + ", excludedDSForTopologyMsg=" + excludedDSForTopologyMsg;
270    }
271  }
272
273  private final Object pendingStatusMessagesLock = new Object();
274
275  /** @GuardedBy("pendingStatusMessagesLock") */
276  private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
277
278  /**
279   * Creates a new ReplicationServerDomain associated to the baseDN.
280   *
281   * @param baseDN
282   *          The baseDN associated to the ReplicationServerDomain.
283   * @param localReplicationServer
284   *          the ReplicationServer that created this instance.
285   */
286  public ReplicationServerDomain(DN baseDN,
287      ReplicationServer localReplicationServer)
288  {
289    this.baseDN = baseDN;
290    this.localReplicationServer = localReplicationServer;
291    this.assuredTimeoutTimer = new Timer("Replication server RS("
292        + localReplicationServer.getServerId()
293        + ") assured timer for domain \"" + baseDN + "\"", true);
294    this.domainDB =
295        localReplicationServer.getChangelogDB().getReplicationDomainDB();
296    this.statusAnalyzer = new StatusAnalyzer(this);
297    this.statusAnalyzer.start();
298    DirectoryServer.registerMonitorProvider(this);
299  }
300
301  /**
302   * Add an update that has been received to the list of
303   * updates that must be forwarded to all other servers.
304   *
305   * @param updateMsg  The update that has been received.
306   * @param sourceHandler The ServerHandler for the server from which the
307   *        update was received
308   * @throws IOException When an IO exception happens during the update
309   *         processing.
310   */
311  public void put(UpdateMsg updateMsg, ServerHandler sourceHandler) throws IOException
312  {
313    sourceHandler.updateServerState(updateMsg);
314    sourceHandler.incrementInCount();
315    setGenerationIdIfUnset(sourceHandler.getGenerationId());
316
317    /**
318     * If this is an assured message (a message requesting ack), we must
319     * construct the ExpectedAcksInfo object with the right number of expected
320     * acks before posting message to the writers. Otherwise some writers may
321     * have time to post, receive the ack and increment received ack counter
322     * (kept in ExpectedAcksInfo object) and we could think the acknowledgment
323     * is fully processed although it may be not (some other acks from other
324     * servers are not yet arrived). So for that purpose we do a pre-loop
325     * to determine to who we will post an assured message.
326     * Whether the assured mode is safe read or safe data, we anyway do not
327     * support the assured replication feature across topologies with different
328     * group ids. The assured feature insures assured replication based on the
329     * same locality (group id). For instance in double data center deployment
330     * (2 group id usage) with assured replication enabled, an assured message
331     * sent from data center 1 (group id = 1) will be sent to servers of both
332     * data centers, but one will request and wait acks only from servers of the
333     * data center 1.
334     */
335    final PreparedAssuredInfo preparedAssuredInfo = getPreparedAssuredInfo(updateMsg, sourceHandler);
336
337    if (!publishUpdateMsg(updateMsg))
338    {
339      return;
340    }
341
342    final List<Integer> assuredServers = getAssuredServers(updateMsg, preparedAssuredInfo);
343
344    /**
345     * The update message equivalent to the originally received update message,
346     * but with assured flag disabled. This message is the one that should be
347     * sent to non eligible servers for assured mode.
348     * We need a clone like of the original message with assured flag off, to be
349     * posted to servers we don't want to wait the ack from (not normal status
350     * servers or servers with different group id). This must be done because
351     * the posted message is a reference so each writer queue gets the same
352     * reference, thus, changing the assured flag of an object is done for every
353     * references posted on every writer queues. That is why we need a message
354     * version with assured flag on and another one with assured flag off.
355     */
356    final NotAssuredUpdateMsg notAssuredUpdateMsg =
357        preparedAssuredInfo != null ? new NotAssuredUpdateMsg(updateMsg) : null;
358
359    // Push the message to the replication servers
360    if (sourceHandler.isDataServer())
361    {
362      for (ReplicationServerHandler rsHandler : connectedRSs.values())
363      {
364        /**
365         * Ignore updates to RS with bad gen id
366         * (no system managed status for a RS)
367         */
368        if (!isDifferentGenerationId(rsHandler, updateMsg))
369        {
370          addUpdate(rsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
371        }
372      }
373    }
374
375    // Push the message to the LDAP servers
376    for (DataServerHandler dsHandler : connectedDSs.values())
377    {
378      // Do not forward the change to the server that just sent it
379      if (dsHandler != sourceHandler
380          && !isUpdateMsgFiltered(updateMsg, dsHandler))
381      {
382        addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
383      }
384    }
385  }
386
387  private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
388      UpdateMsg updateMsg)
389  {
390    final boolean isDifferent = isDifferentGenerationId(rsHandler.getGenerationId());
391    if (isDifferent && logger.isTraceEnabled())
392    {
393      debug("updateMsg " + updateMsg.getCSN()
394          + " will not be sent to replication server "
395          + rsHandler.getServerId() + " with generation id "
396          + rsHandler.getGenerationId() + " different from local "
397          + "generation id " + generationId);
398    }
399    return isDifferent;
400  }
401
402  /**
403   * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS.
404   * <p>
405   * The RSD lock should not be taken here as it is acceptable to have a delay
406   * between the time the server has a wrong status and the fact we detect it:
407   * the updates that succeed to pass during this time will have no impact on
408   * remote server. But it is interesting to not saturate uselessly the network
409   * if the updates are not necessary so this check to stop sending updates is
410   * interesting anyway. Not taking the RSD lock allows to have better
411   * performances in normal mode (most of the time).
412   */
413  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg, DataServerHandler dsHandler)
414  {
415    final ServerStatus dsStatus = dsHandler.getStatus();
416    if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
417    {
418      if (logger.isTraceEnabled())
419      {
420        debug("updateMsg " + updateMsg.getCSN()
421            + " will not be sent to directory server "
422            + dsHandler.getServerId() + " with generation id "
423            + dsHandler.getGenerationId() + " different from local "
424            + "generation id " + generationId);
425      }
426      return true;
427    }
428    else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
429    {
430      if (logger.isTraceEnabled())
431      {
432        debug("updateMsg " + updateMsg.getCSN()
433            + " will not be sent to directory server "
434            + dsHandler.getServerId() + " as it is in full update");
435      }
436      return true;
437    }
438    return false;
439  }
440
441  private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg,
442      ServerHandler sourceHandler) throws IOException
443  {
444    // Assured feature is supported starting from replication protocol V2
445    if (!updateMsg.isAssured()
446        || sourceHandler.getProtocolVersion() < REPLICATION_PROTOCOL_V2)
447    {
448      return null;
449    }
450
451    // According to assured sub-mode, prepare structures to keep track of
452    // the acks we are interested in.
453    switch (updateMsg.getAssuredMode())
454    {
455    case SAFE_DATA_MODE:
456      sourceHandler.incrementAssuredSdReceivedUpdates();
457      return processSafeDataUpdateMsg(updateMsg, sourceHandler);
458
459    case SAFE_READ_MODE:
460      sourceHandler.incrementAssuredSrReceivedUpdates();
461      return processSafeReadUpdateMsg(updateMsg, sourceHandler);
462
463    default:
464      // Unknown assured mode: should never happen
465      logger.error(ERR_RS_UNKNOWN_ASSURED_MODE,
466          localReplicationServer.getServerId(), updateMsg.getAssuredMode(), baseDN, updateMsg);
467      return null;
468    }
469  }
470
471  private List<Integer> getAssuredServers(UpdateMsg updateMsg, PreparedAssuredInfo preparedAssuredInfo)
472  {
473    List<Integer> expectedServers = null;
474    if (preparedAssuredInfo != null && preparedAssuredInfo.expectedServers != null)
475    {
476      expectedServers = preparedAssuredInfo.expectedServers;
477      // Store the expected acks info into the global map.
478      // The code for processing reception of acks for this update will update
479      // info kept in this object and if enough acks received, it will send
480      // back the final ack to the requester and remove the object from this map
481      // OR
482      // The following timer will time out and send an timeout ack to the
483      // requester if the acks are not received in time. The timer will also
484      // remove the object from this map.
485      final CSN csn = updateMsg.getCSN();
486      waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
487
488      // Arm timer for this assured update message (wait for acks until it times out)
489      final AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
490      assuredTimeoutTimer.schedule(assuredTimeoutTask, localReplicationServer.getAssuredTimeout());
491      // Purge timer every 100 treated messages
492      assuredTimeoutTimerPurgeCounter++;
493      if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
494      {
495        assuredTimeoutTimer.purge();
496      }
497    }
498
499    return expectedServers != null ? expectedServers : Collections.<Integer> emptyList();
500  }
501
502  private boolean publishUpdateMsg(UpdateMsg updateMsg)
503  {
504    try
505    {
506      if (updateMsg instanceof ReplicaOfflineMsg)
507      {
508        final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
509        this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN());
510        return true;
511      }
512
513      if (this.domainDB.publishUpdateMsg(baseDN, updateMsg))
514      {
515        /*
516         * JNR: Matt and I had a hard time figuring out where to put this
517         * synchronized block. We elected to put it here, but without a strong
518         * conviction.
519         */
520        synchronized (generationIDLock)
521        {
522          /*
523           * JNR: I think the generationIdSavedStatus is set to true because
524           * method above created a ReplicaDB which assumes the generationId was
525           * communicated to another server. Hence setting true on this field
526           * prevent the generationId from being reset.
527           */
528          generationIdSavedStatus = true;
529        }
530      }
531      return true;
532    }
533    catch (ChangelogException e)
534    {
535      /*
536       * Because of database problem we can't save any more changes from at
537       * least one LDAP server. This replicationServer therefore can't do it's
538       * job properly anymore and needs to close all its connections and
539       * shutdown itself.
540       */
541      logger.error(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR, stackTraceToSingleLineString(e));
542      localReplicationServer.shutdown();
543      return false;
544    }
545  }
546
547  private void addUpdate(ServerHandler sHandler, UpdateMsg updateMsg,
548      NotAssuredUpdateMsg notAssuredUpdateMsg, List<Integer> assuredServers)
549  {
550    // Assured mode: post an assured or not assured matching update message
551    // according to what has been computed for the destination server
552    if (notAssuredUpdateMsg != null
553        && !assuredServers.contains(sHandler.getServerId()))
554    {
555      sHandler.add(notAssuredUpdateMsg);
556    }
557    else
558    {
559      sHandler.add(updateMsg);
560    }
561  }
562
563  /**
564   * Helper class to be the return type of a method that processes a just
565   * received assured update message:
566   * - processSafeReadUpdateMsg
567   * - processSafeDataUpdateMsg
568   * This is a facility to pack many interesting returned object.
569   */
570  private class PreparedAssuredInfo
571  {
572      /**
573       * The list of servers identified as servers we are interested in
574       * receiving acks from. If this list is not null, then expectedAcksInfo
575       * should be not null.
576       * Servers that are not in this list are servers not eligible for an ack
577       * request.
578       */
579      public List<Integer> expectedServers;
580
581      /**
582       * The constructed ExpectedAcksInfo object to be used when acks will be
583       * received. Null if expectedServers is null.
584       */
585      public ExpectedAcksInfo expectedAcksInfo;
586  }
587
588  /**
589   * Process a just received assured update message in Safe Read mode. If the
590   * ack can be sent immediately, it is done here. This will also determine to
591   * which suitable servers an ack should be requested from, and which ones are
592   * not eligible for an ack request.
593   * This method is an helper method for the put method. Have a look at the put
594   * method for a better understanding.
595   * @param update The just received assured update to process.
596   * @param sourceHandler The ServerHandler for the server from which the
597   *        update was received
598   * @return A suitable PreparedAssuredInfo object that contains every needed
599   * info to proceed with post to server writers.
600   * @throws IOException When an IO exception happens during the update
601   *         processing.
602   */
603  private PreparedAssuredInfo processSafeReadUpdateMsg(
604    UpdateMsg update, ServerHandler sourceHandler) throws IOException
605  {
606    CSN csn = update.getCSN();
607    byte groupId = localReplicationServer.getGroupId();
608    byte sourceGroupId = sourceHandler.getGroupId();
609    List<Integer> expectedServers = new ArrayList<>();
610    List<Integer> wrongStatusServers = new ArrayList<>();
611
612    if (sourceGroupId == groupId)
613      // Assured feature does not cross different group ids
614    {
615      if (sourceHandler.isDataServer())
616      {
617        collectRSsEligibleForAssuredReplication(groupId, expectedServers);
618      }
619
620      // Look for DS eligible for assured
621      for (DataServerHandler dsHandler : connectedDSs.values())
622      {
623        // Don't forward the change to the server that just sent it
624        if (dsHandler == sourceHandler)
625        {
626          continue;
627        }
628        if (dsHandler.getGroupId() == groupId)
629          // No ack expected from a DS with different group id
630        {
631          ServerStatus serverStatus = dsHandler.getStatus();
632          if (serverStatus == ServerStatus.NORMAL_STATUS)
633          {
634            expectedServers.add(dsHandler.getServerId());
635          } else if (serverStatus == ServerStatus.DEGRADED_STATUS) {
636            // No ack expected from a DS with wrong status
637            wrongStatusServers.add(dsHandler.getServerId());
638          }
639          /*
640           * else
641           * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
642           * We do not want this to be reported as an error to the update
643           * maker -> no pollution or potential misunderstanding when
644           * reading logs or monitoring and it was just administration (for
645           * instance new server is being configured in topo: it goes in bad
646           * gen then full update).
647           */
648        }
649      }
650    }
651
652    // Return computed structures
653    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
654    if (!expectedServers.isEmpty())
655    {
656      // Some other acks to wait for
657      preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(csn,
658        sourceHandler, expectedServers, wrongStatusServers);
659      preparedAssuredInfo.expectedServers = expectedServers;
660    }
661
662    if (preparedAssuredInfo.expectedServers == null)
663    {
664      // No eligible servers found, send the ack immediately
665      sourceHandler.send(new AckMsg(csn));
666    }
667
668    return preparedAssuredInfo;
669  }
670
671  /**
672   * Process a just received assured update message in Safe Data mode. If the
673   * ack can be sent immediately, it is done here. This will also determine to
674   * which suitable servers an ack should be requested from, and which ones are
675   * not eligible for an ack request.
676   * This method is an helper method for the put method. Have a look at the put
677   * method for a better understanding.
678   * @param update The just received assured update to process.
679   * @param sourceHandler The ServerHandler for the server from which the
680   *        update was received
681   * @return A suitable PreparedAssuredInfo object that contains every needed
682   * info to proceed with post to server writers.
683   * @throws IOException When an IO exception happens during the update
684   *         processing.
685   */
686  private PreparedAssuredInfo processSafeDataUpdateMsg(
687    UpdateMsg update, ServerHandler sourceHandler) throws IOException
688  {
689    CSN csn = update.getCSN();
690    boolean interestedInAcks = false;
691    byte safeDataLevel = update.getSafeDataLevel();
692    byte groupId = localReplicationServer.getGroupId();
693    byte sourceGroupId = sourceHandler.getGroupId();
694    if (safeDataLevel < (byte) 1)
695    {
696      // Should never happen
697      logger.error(ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL,
698          localReplicationServer.getServerId(), safeDataLevel, baseDN, update);
699    } else if (sourceGroupId == groupId
700    // Assured feature does not cross different group IDS
701        && isSameGenerationId(sourceHandler.getGenerationId()))
702    // Ignore assured updates from wrong generationId servers
703    {
704        if (sourceHandler.isDataServer())
705        {
706          if (safeDataLevel == (byte) 1)
707          {
708            /**
709             * Immediately return the ack for an assured message in safe data
710             * mode with safe data level 1, coming from a DS. No need to wait
711             * for more acks
712             */
713            sourceHandler.send(new AckMsg(csn));
714          } else
715          {
716            /**
717             * level > 1 : We need further acks
718             * The message will be posted in assured mode to eligible
719             * servers. The embedded safe data level is not changed, and his
720             * value will be used by a remote RS to determine if he must send
721             * an ack (level > 1) or not (level = 1)
722             */
723            interestedInAcks = true;
724          }
725        } else
726        { // A RS sent us the safe data message, for sure no further ack to wait
727          /**
728           * Level 1 has already been reached so no further acks to wait.
729           * Just deal with level > 1
730           */
731          if (safeDataLevel > (byte) 1)
732          {
733            sourceHandler.send(new AckMsg(csn));
734          }
735        }
736    }
737
738    List<Integer> expectedServers = new ArrayList<>();
739    if (interestedInAcks && sourceHandler.isDataServer())
740    {
741      collectRSsEligibleForAssuredReplication(groupId, expectedServers);
742    }
743
744    // Return computed structures
745    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
746    int nExpectedServers = expectedServers.size();
747    if (interestedInAcks) // interestedInAcks so level > 1
748    {
749      if (nExpectedServers > 0)
750      {
751        // Some other acks to wait for
752        int sdl = update.getSafeDataLevel();
753        int neededAdditionalServers = sdl - 1;
754        // Change the number of expected acks if not enough available eligible
755        // servers: the level is a best effort thing, we do not want to timeout
756        // at every assured SD update for instance if a RS has had his gen id
757        // reseted
758        byte finalSdl = (nExpectedServers >= neededAdditionalServers) ?
759          (byte)sdl : // Keep level as it was
760          (byte)(nExpectedServers+1); // Change level to match what's available
761        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(csn,
762          sourceHandler, finalSdl, expectedServers);
763        preparedAssuredInfo.expectedServers = expectedServers;
764      } else
765      {
766        // level > 1 and source is a DS but no eligible servers found, send the
767        // ack immediately
768        sourceHandler.send(new AckMsg(csn));
769      }
770    }
771
772    return preparedAssuredInfo;
773  }
774
775  private void collectRSsEligibleForAssuredReplication(byte groupId,
776      List<Integer> expectedServers)
777  {
778    for (ReplicationServerHandler rsHandler : connectedRSs.values())
779    {
780      if (rsHandler.getGroupId() == groupId
781      // No ack expected from a RS with different group id
782            && isSameGenerationId(rsHandler.getGenerationId())
783        // No ack expected from a RS with bad gen id
784        )
785      {
786        expectedServers.add(rsHandler.getServerId());
787      }
788    }
789  }
790
791  private boolean isSameGenerationId(long generationId)
792  {
793    return this.generationId > 0 && this.generationId == generationId;
794  }
795
796  private boolean isDifferentGenerationId(long generationId)
797  {
798    return this.generationId > 0 && this.generationId != generationId;
799  }
800
801  /**
802   * Process an ack received from a given server.
803   *
804   * @param ack The ack message received.
805   * @param ackingServer The server handler of the server that sent the ack.
806   */
807  void processAck(AckMsg ack, ServerHandler ackingServer)
808  {
809    // Retrieve the expected acks info for the update matching the original
810    // sent update.
811    CSN csn = ack.getCSN();
812    ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
813
814    if (expectedAcksInfo != null)
815    {
816      // Prevent concurrent access from processAck() or AssuredTimeoutTask.run()
817      synchronized (expectedAcksInfo)
818      {
819        if (expectedAcksInfo.isCompleted())
820        {
821          // Timeout code is sending a timeout ack, do nothing and let him
822          // remove object from the map
823          return;
824        }
825        /**
826         *
827         * If this is the last ack we were waiting from, immediately create and
828         * send the final ack to the original server
829         */
830        if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
831        {
832          // Remove the object from the map as no more needed
833          waitingAcks.remove(csn);
834          AckMsg finalAck = expectedAcksInfo.createAck(false);
835          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
836          try
837          {
838            origServer.send(finalAck);
839          } catch (IOException e)
840          {
841            /**
842             * An error happened trying the send back an ack to the server.
843             * Log an error and close the connection to this server.
844             */
845            LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
846            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
847                localReplicationServer.getServerId(), origServer.getServerId(), csn, baseDN));
848            mb.append(" ");
849            mb.append(stackTraceToSingleLineString(e));
850            logger.error(mb.toMessage());
851            stopServer(origServer, false);
852          }
853          // Mark the ack info object as completed to prevent potential timeout
854          // code parallel run
855          expectedAcksInfo.completed();
856        }
857      }
858    }
859    /* Else the timeout occurred for the update matching this CSN
860     * and the ack with timeout error has probably already been sent.
861     */
862  }
863
864  /**
865   * The code run when the timeout occurs while waiting for acks of the
866   * eligible servers. This basically sends a timeout ack (with any additional
867   * error info) to the original server that sent an assured update message.
868   */
869  private class AssuredTimeoutTask extends TimerTask
870  {
871    private CSN csn;
872
873    /**
874     * Constructor for the timer task.
875     * @param csn The CSN of the assured update we are waiting acks for
876     */
877    public AssuredTimeoutTask(CSN csn)
878    {
879      this.csn = csn;
880    }
881
882    /**
883     * Run when the assured timeout for an assured update message we are waiting
884     * acks for occurs.
885     */
886    @Override
887    public void run()
888    {
889      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
890
891      if (expectedAcksInfo != null)
892      {
893        synchronized (expectedAcksInfo)
894        {
895          if (expectedAcksInfo.isCompleted())
896          {
897            // processAck() code is sending the ack, do nothing and let him
898            // remove object from the map
899            return;
900          }
901          // Remove the object from the map as no more needed
902          waitingAcks.remove(csn);
903          // Create the timeout ack and send him to the server the assured
904          // update message came from
905          AckMsg finalAck = expectedAcksInfo.createAck(true);
906          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
907          if (logger.isTraceEnabled())
908          {
909            debug("sending timeout for assured update with CSN " + csn
910                + " to serverId=" + origServer.getServerId());
911          }
912          try
913          {
914            origServer.send(finalAck);
915          } catch (IOException e)
916          {
917            /**
918             * An error happened trying the send back an ack to the server.
919             * Log an error and close the connection to this server.
920             */
921            LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
922            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
923                localReplicationServer.getServerId(), origServer.getServerId(), csn, baseDN));
924            mb.append(" ");
925            mb.append(stackTraceToSingleLineString(e));
926            logger.error(mb.toMessage());
927            stopServer(origServer, false);
928          }
929          // Increment assured counters
930          boolean safeRead =
931              expectedAcksInfo instanceof SafeReadExpectedAcksInfo;
932          if (safeRead)
933          {
934            origServer.incrementAssuredSrReceivedUpdatesTimeout();
935          } else
936          {
937            if (origServer.isDataServer())
938            {
939              origServer.incrementAssuredSdReceivedUpdatesTimeout();
940            }
941          }
942          //   retrieve expected servers in timeout to increment their counter
943          List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
944          for (Integer serverId : serversInTimeout)
945          {
946            ServerHandler expectedDSInTimeout = connectedDSs.get(serverId);
947            ServerHandler expectedRSInTimeout = connectedRSs.get(serverId);
948            if (expectedDSInTimeout != null)
949            {
950              if (safeRead)
951              {
952                expectedDSInTimeout.incrementAssuredSrSentUpdatesTimeout();
953              } // else no SD update sent to a DS (meaningless)
954            } else if (expectedRSInTimeout != null)
955            {
956              if (safeRead)
957              {
958                expectedRSInTimeout.incrementAssuredSrSentUpdatesTimeout();
959              }
960              else
961              {
962                expectedRSInTimeout.incrementAssuredSdSentUpdatesTimeout();
963              }
964            }
965            // else server disappeared ? Let's forget about it.
966          }
967          // Mark the ack info object as completed to prevent potential
968          // processAck() code parallel run
969          expectedAcksInfo.completed();
970        }
971      }
972    }
973  }
974
975
976  /**
977   * Stop operations with a list of replication servers.
978   *
979   * @param serversToDisconnect
980   *          the replication servers addresses for which we want to stop
981   *          operations
982   */
983  public void stopReplicationServers(Collection<HostPort> serversToDisconnect)
984  {
985    for (ReplicationServerHandler rsHandler : connectedRSs.values())
986    {
987      if (serversToDisconnect.contains(
988            HostPort.valueOf(rsHandler.getServerAddressURL())))
989      {
990        stopServer(rsHandler, false);
991      }
992    }
993  }
994
995  /**
996   * Stop operations with all servers this domain is connected with (RS and DS).
997   *
998   * @param shutdown A boolean indicating if the stop is due to a
999   *                 shutdown condition.
1000   */
1001  public void stopAllServers(boolean shutdown)
1002  {
1003    for (ReplicationServerHandler rsHandler : connectedRSs.values())
1004    {
1005      stopServer(rsHandler, shutdown);
1006    }
1007
1008    for (DataServerHandler dsHandler : connectedDSs.values())
1009    {
1010      stopServer(dsHandler, shutdown);
1011    }
1012  }
1013
1014  /**
1015   * Checks whether it is already connected to a DS with same id.
1016   *
1017   * @param dsHandler
1018   *          the DS we want to check
1019   * @return true if this DS is already connected to the current server
1020   */
1021  public boolean isAlreadyConnectedToDS(DataServerHandler dsHandler)
1022  {
1023    if (connectedDSs.containsKey(dsHandler.getServerId()))
1024    {
1025      // looks like two connected LDAP servers have the same serverId
1026      logger.error(ERR_DUPLICATE_SERVER_ID, localReplicationServer.getMonitorInstanceName(),
1027          connectedDSs.get(dsHandler.getServerId()), dsHandler, dsHandler.getServerId());
1028      return true;
1029    }
1030    return false;
1031  }
1032
1033  /**
1034   * Stop operations with a given server.
1035   *
1036   * @param sHandler the server for which we want to stop operations.
1037   * @param shutdown A boolean indicating if the stop is due to a
1038   *                 shutdown condition.
1039   */
1040  public void stopServer(ServerHandler sHandler, boolean shutdown)
1041  {
1042    // TODO JNR merge with stopServer(MessageHandler)
1043    if (logger.isTraceEnabled())
1044    {
1045      debug("stopServer() on the server handler " + sHandler);
1046    }
1047    /*
1048     * We must prevent deadlock on replication server domain lock, when for
1049     * instance this code is called from dying ServerReader but also dying
1050     * ServerWriter at the same time, or from a thread that wants to shut down
1051     * the handler. So use a thread safe flag to know if the job must be done
1052     * or not (is already being processed or not).
1053     */
1054    if (!sHandler.engageShutdown())
1055      // Only do this once (prevent other thread to enter here again)
1056    {
1057      if (!shutdown)
1058      {
1059        try
1060        {
1061          // Acquire lock on domain (see more details in comment of start()
1062          // method of ServerHandler)
1063          lock();
1064        }
1065        catch (InterruptedException ex)
1066        {
1067          // We can't deal with this here, so re-interrupt thread so that it is
1068          // caught during subsequent IO.
1069          Thread.currentThread().interrupt();
1070          return;
1071        }
1072      }
1073
1074      try
1075      {
1076        // Stop useless monitoring publisher if no more RS or DS in domain
1077        if ( (connectedDSs.size() + connectedRSs.size() )== 1)
1078        {
1079          if (logger.isTraceEnabled())
1080          {
1081            debug("remote server " + sHandler
1082                + " is the last RS/DS to be stopped:"
1083                + " stopping monitoring publisher");
1084          }
1085          stopMonitoringPublisher();
1086        }
1087
1088        if (connectedRSs.containsKey(sHandler.getServerId()))
1089        {
1090          unregisterServerHandler(sHandler, shutdown, false);
1091        }
1092        else if (connectedDSs.containsKey(sHandler.getServerId()))
1093        {
1094          unregisterServerHandler(sHandler, shutdown, true);
1095        }
1096      }
1097      catch(Exception e)
1098      {
1099        logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
1100      }
1101      finally
1102      {
1103        if (!shutdown)
1104        {
1105          release();
1106        }
1107      }
1108    }
1109  }
1110
1111  private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
1112      boolean isDirectoryServer)
1113  {
1114    unregisterServerHandler(sHandler);
1115    sHandler.shutdown();
1116
1117    resetGenerationIdIfPossible();
1118    if (!shutdown)
1119    {
1120      synchronized (pendingStatusMessagesLock)
1121      {
1122        if (isDirectoryServer)
1123        {
1124          // Update the remote replication servers with our list
1125          // of connected LDAP servers
1126          pendingStatusMessages.enqueueTopoInfoToAllRSs();
1127        }
1128        // Warn our DSs that a RS or DS has quit (does not use this
1129        // handler as already removed from list)
1130        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
1131      }
1132      statusAnalyzer.notifyPendingStatusMessage();
1133    }
1134  }
1135
1136  /**
1137   * Unregister this handler from the list of handlers registered to this
1138   * domain.
1139   * @param sHandler the provided handler to unregister.
1140   */
1141  private void unregisterServerHandler(ServerHandler sHandler)
1142  {
1143    if (sHandler.isReplicationServer())
1144    {
1145      connectedRSs.remove(sHandler.getServerId());
1146    }
1147    else
1148    {
1149      connectedDSs.remove(sHandler.getServerId());
1150    }
1151  }
1152
1153  /**
1154   * This method resets the generationId for this domain if there is no LDAP
1155   * server currently connected in the whole topology on this domain and if the
1156   * generationId has never been saved.
1157   * <ul>
1158   * <li>test emptiness of {@link #connectedDSs} list</li>
1159   * <li>traverse {@link #connectedRSs} list and test for each if DS are
1160   * connected</li>
1161   * </ul>
1162   * So it strongly relies on the {@link #connectedDSs} list
1163   */
1164  private void resetGenerationIdIfPossible()
1165  {
1166    if (logger.isTraceEnabled())
1167    {
1168      debug("mayResetGenerationId generationIdSavedStatus="
1169          + generationIdSavedStatus);
1170    }
1171
1172    // If there is no more any LDAP server connected to this domain in the
1173    // topology and the generationId has never been saved, then we can reset
1174    // it and the next LDAP server to connect will become the new reference.
1175    boolean ldapServersConnectedInTheTopology = false;
1176    if (connectedDSs.isEmpty())
1177    {
1178      for (ReplicationServerHandler rsHandler : connectedRSs.values())
1179      {
1180        if (generationId != rsHandler.getGenerationId())
1181        {
1182          if (logger.isTraceEnabled())
1183          {
1184            debug("mayResetGenerationId skip RS " + rsHandler
1185                + " that has different genId");
1186          }
1187        }
1188        else if (rsHandler.hasRemoteLDAPServers())
1189        {
1190          ldapServersConnectedInTheTopology = true;
1191
1192          if (logger.isTraceEnabled())
1193          {
1194            debug("mayResetGenerationId RS " + rsHandler
1195                + " has ldap servers connected to it"
1196                + " - will not reset generationId");
1197          }
1198          break;
1199        }
1200      }
1201    }
1202    else
1203    {
1204      ldapServersConnectedInTheTopology = true;
1205
1206      if (logger.isTraceEnabled())
1207      {
1208        debug("has ldap servers connected to it - will not reset generationId");
1209      }
1210    }
1211
1212    if (!ldapServersConnectedInTheTopology
1213        && !generationIdSavedStatus
1214        && generationId != -1)
1215    {
1216      changeGenerationId(-1);
1217    }
1218  }
1219
1220  /**
1221   * Checks whether a remote RS is already connected to this hosting RS.
1222   *
1223   * @param rsHandler
1224   *          The handler for the remote RS.
1225   * @return flag specifying whether the remote RS is already connected.
1226   * @throws DirectoryException
1227   *           when a problem occurs.
1228   */
1229  public boolean isAlreadyConnectedToRS(ReplicationServerHandler rsHandler)
1230      throws DirectoryException
1231  {
1232    ReplicationServerHandler oldRsHandler =
1233        connectedRSs.get(rsHandler.getServerId());
1234    if (oldRsHandler == null)
1235    {
1236      return false;
1237    }
1238
1239    if (oldRsHandler.getServerAddressURL().equals(
1240        rsHandler.getServerAddressURL()))
1241    {
1242      // this is the same server, this means that our ServerStart messages
1243      // have been sent at about the same time and 2 connections
1244      // have been established.
1245      // Silently drop this connection.
1246      return true;
1247    }
1248
1249    // looks like two replication servers have the same serverId
1250    // log an error message and drop this connection.
1251    LocalizableMessage message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
1252        localReplicationServer.getMonitorInstanceName(),
1253        oldRsHandler.getServerAddressURL(), rsHandler.getServerAddressURL(),
1254        rsHandler.getServerId());
1255    throw new DirectoryException(ResultCode.OTHER, message);
1256  }
1257
1258  /**
1259   * Creates and returns a cursor across this replication domain.
1260   * <p>
1261   * Client code must call {@link DBCursor#next()} to advance the cursor to the
1262   * next available record.
1263   * <p>
1264   * When the cursor is not used anymore, client code MUST call the
1265   * {@link DBCursor#close()} method to free the resources and locks used by the
1266   * cursor.
1267   *
1268   * @param startAfterServerState
1269   *          Starting point for the replicaDB cursors. If null, start from the
1270   *          oldest CSN
1271   * @return a non null {@link DBCursor} going from oldest to newest CSN
1272   * @throws ChangelogException
1273   *           If a database problem happened
1274   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, CursorOptions)
1275   */
1276  public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
1277      throws ChangelogException
1278  {
1279    CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
1280    return domainDB.getCursorFrom(baseDN, startAfterServerState, options);
1281  }
1282
1283  /**
1284   * Get the baseDN.
1285   *
1286   * @return Returns the baseDN.
1287   */
1288  public DN getBaseDN()
1289  {
1290    return baseDN;
1291  }
1292
1293  /**
1294   * Retrieves the destination handlers for a routable message.
1295   *
1296   * @param msg The message to route.
1297   * @param senderHandler The handler of the server that published this message.
1298   * @return The list of destination handlers.
1299   */
1300  private List<ServerHandler> getDestinationServers(RoutableMsg msg,
1301    ServerHandler senderHandler)
1302  {
1303    List<ServerHandler> servers = new ArrayList<>();
1304
1305    if (msg.getDestination() == RoutableMsg.THE_CLOSEST_SERVER)
1306    {
1307      // TODO Import from the "closest server" to be implemented
1308    } else if (msg.getDestination() == RoutableMsg.ALL_SERVERS)
1309    {
1310      if (!senderHandler.isReplicationServer())
1311      {
1312        // Send to all replication servers with a least one remote
1313        // server connected
1314        for (ReplicationServerHandler rsh : connectedRSs.values())
1315        {
1316          if (rsh.hasRemoteLDAPServers())
1317          {
1318            servers.add(rsh);
1319          }
1320        }
1321      }
1322
1323      // Sends to all connected LDAP servers
1324      for (DataServerHandler destinationHandler : connectedDSs.values())
1325      {
1326        // Don't loop on the sender
1327        if (destinationHandler == senderHandler)
1328        {
1329          continue;
1330        }
1331        servers.add(destinationHandler);
1332      }
1333    } else
1334    {
1335      // Destination is one server
1336      DataServerHandler destinationHandler =
1337        connectedDSs.get(msg.getDestination());
1338      if (destinationHandler != null)
1339      {
1340        servers.add(destinationHandler);
1341      } else
1342      {
1343        // the targeted server is NOT connected
1344        // Let's search for the replication server that MAY
1345        // have the targeted server connected.
1346        if (senderHandler.isDataServer())
1347        {
1348          for (ReplicationServerHandler rsHandler : connectedRSs.values())
1349          {
1350            // Send to all replication servers with a least one remote
1351            // server connected
1352            if (rsHandler.isRemoteLDAPServer(msg.getDestination()))
1353            {
1354              servers.add(rsHandler);
1355            }
1356          }
1357        }
1358      }
1359    }
1360    return servers;
1361  }
1362
1363
1364
1365  /**
1366   * Processes a message coming from one server in the topology and potentially
1367   * forwards it to one or all other servers.
1368   *
1369   * @param msg
1370   *          The message received and to be processed.
1371   * @param sender
1372   *          The server handler of the server that sent the message.
1373   */
1374  void process(RoutableMsg msg, ServerHandler sender)
1375  {
1376    if (msg.getDestination() == localReplicationServer.getServerId())
1377    {
1378      // Handle routable messages targeted at this RS.
1379      if (msg instanceof ErrorMsg)
1380      {
1381        ErrorMsg errorMsg = (ErrorMsg) msg;
1382        logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
1383      }
1384      else
1385      {
1386        replyWithUnroutableMsgType(sender, msg);
1387      }
1388    }
1389    else
1390    {
1391      // Forward message not destined for this RS.
1392      List<ServerHandler> servers = getDestinationServers(msg, sender);
1393      if (!servers.isEmpty())
1394      {
1395        forwardMsgToAllServers(msg, servers, sender);
1396      }
1397      else
1398      {
1399        replyWithUnreachablePeerMsg(sender, msg);
1400      }
1401    }
1402  }
1403
1404  /**
1405   * Responds to a monitor request message.
1406   *
1407   * @param msg
1408   *          The monitor request message.
1409   * @param sender
1410   *          The DS/RS which sent the monitor request.
1411   */
1412  void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
1413  {
1414    enqueueMonitorMsg(msg, sender);
1415  }
1416
1417  /**
1418   * Responds to a monitor message.
1419   *
1420   * @param msg
1421   *          The monitor message
1422   * @param sender
1423   *          The DS/RS which sent the monitor.
1424   */
1425  void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
1426  {
1427    domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
1428  }
1429
1430  private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
1431      RoutableMsg msg)
1432  {
1433    String msgClassname = msg.getClass().getCanonicalName();
1434    logger.info(NOTE_ERR_ROUTING_TO_SERVER, msgClassname);
1435
1436    LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
1437    mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname));
1438    mb.append("serverID:").append(msg.getDestination());
1439    ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage());
1440    try
1441    {
1442      msgEmitter.send(errMsg);
1443    }
1444    catch (IOException ignored)
1445    {
1446      // an error happened on the sender session trying to recover
1447      // from an error on the receiver session.
1448      // Not much more we can do at this point.
1449    }
1450  }
1451
1452  private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter,
1453      RoutableMsg msg)
1454  {
1455    LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
1456    mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN, msg.getDestination()));
1457    mb.append(" In Replication Server=").append(
1458      this.localReplicationServer.getMonitorInstanceName());
1459    mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
1460    mb.append(" Details:routing table is empty");
1461    final LocalizableMessage message = mb.toMessage();
1462    logger.error(message);
1463
1464    ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(),
1465        msg.getSenderID(), message);
1466    try
1467    {
1468      msgEmitter.send(errMsg);
1469    }
1470    catch (IOException ignored)
1471    {
1472      // TODO Handle error properly (sender timeout in addition)
1473      /*
1474       * An error happened trying to send an error msg to this server.
1475       * Log an error and close the connection to this server.
1476       */
1477      logger.error(ERR_CHANGELOG_ERROR_SENDING_ERROR, this, ignored);
1478      stopServer(msgEmitter, false);
1479    }
1480  }
1481
1482  private void forwardMsgToAllServers(RoutableMsg msg,
1483      List<ServerHandler> servers, ServerHandler sender)
1484  {
1485    for (ServerHandler targetHandler : servers)
1486    {
1487      try
1488      {
1489        targetHandler.send(msg);
1490      } catch (IOException ioe)
1491      {
1492        /*
1493         * An error happened trying to send a routable message to its
1494         * destination server.
1495         * Send back an error to the originator of the message.
1496         */
1497        LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
1498        mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN, msg.getDestination()));
1499        mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
1500        mb.append(" Details: ").append(ioe.getLocalizedMessage());
1501        final LocalizableMessage message = mb.toMessage();
1502        logger.error(message);
1503
1504        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
1505        try
1506        {
1507          sender.send(errMsg);
1508        } catch (IOException ioe1)
1509        {
1510          // an error happened on the sender session trying to recover
1511          // from an error on the receiver session.
1512          // We don't have much solution left beside closing the sessions.
1513          stopServer(sender, false);
1514          stopServer(targetHandler, false);
1515        }
1516      // TODO Handle error properly (sender timeout in addition)
1517      }
1518    }
1519  }
1520
1521  /**
1522   * Creates a new monitor message including monitoring information for the
1523   * whole topology.
1524   *
1525   * @param sender
1526   *          The sender of this message.
1527   * @param destination
1528   *          The destination of this message.
1529   * @return The newly created and filled MonitorMsg. Null if a problem occurred
1530   *         during message creation.
1531   * @throws InterruptedException
1532   *           if this thread is interrupted while waiting for a response
1533   */
1534  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
1535      throws InterruptedException
1536  {
1537    return createGlobalTopologyMonitorMsg(sender, destination,
1538        domainMonitor.recomputeMonitorData());
1539  }
1540
1541  private MonitorMsg createGlobalTopologyMonitorMsg(int sender,
1542      int destination, ReplicationDomainMonitorData monitorData)
1543  {
1544    final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
1545    returnMsg.setReplServerDbState(getLatestServerState());
1546
1547    // Add the server state for each DS and RS currently in the topology.
1548    for (int replicaId : toIterable(monitorData.ldapIterator()))
1549    {
1550      returnMsg.setServerState(replicaId,
1551          monitorData.getLDAPServerState(replicaId),
1552          monitorData.getApproxFirstMissingDate(replicaId), true);
1553    }
1554
1555    for (int replicaId : toIterable(monitorData.rsIterator()))
1556    {
1557      returnMsg.setServerState(replicaId,
1558          monitorData.getRSStates(replicaId),
1559          monitorData.getRSApproxFirstMissingDate(replicaId), false);
1560    }
1561
1562    return returnMsg;
1563  }
1564
1565
1566
1567  /**
1568   * Creates a new monitor message including monitoring information for the
1569   * topology directly connected to this RS. This includes information for: -
1570   * local RS - all direct DSs - all direct RSs
1571   *
1572   * @param sender
1573   *          The sender of this message.
1574   * @param destination
1575   *          The destination of this message.
1576   * @return The newly created and filled MonitorMsg. Null if the current thread
1577   *         was interrupted while attempting to get the domain lock.
1578   */
1579  private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
1580  {
1581    final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
1582    monitorMsg.setReplServerDbState(getLatestServerState());
1583
1584    // Add the server state for each connected DS and RS.
1585    for (DataServerHandler dsHandler : this.connectedDSs.values())
1586    {
1587      monitorMsg.setServerState(dsHandler.getServerId(),
1588          dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
1589          true);
1590    }
1591
1592    for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
1593    {
1594      monitorMsg.setServerState(rsHandler.getServerId(),
1595          rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
1596          false);
1597    }
1598    return monitorMsg;
1599  }
1600
1601  /**
1602   * Shutdown this ReplicationServerDomain.
1603   */
1604  public void shutdown()
1605  {
1606    DirectoryServer.deregisterMonitorProvider(this);
1607
1608    // Terminate the assured timer
1609    assuredTimeoutTimer.cancel();
1610
1611    stopAllServers(true);
1612    statusAnalyzer.shutdown();
1613  }
1614
1615  /**
1616   * Returns the latest most current ServerState describing the newest CSNs for
1617   * each server in this domain.
1618   *
1619   * @return The ServerState describing the newest CSNs for each server in in
1620   *         this domain.
1621   */
1622  public ServerState getLatestServerState()
1623  {
1624    return domainDB.getDomainNewestCSNs(baseDN);
1625  }
1626
1627  /** {@inheritDoc} */
1628  @Override
1629  public String toString()
1630  {
1631    return "ReplicationServerDomain " + baseDN;
1632  }
1633
1634
1635
1636  /**
1637   * Creates a TopologyMsg filled with information to be sent to a remote RS.
1638   * We send remote RS the info of every DS that are directly connected to us
1639   * plus our own info as RS.
1640   * @return A suitable TopologyMsg PDU to be sent to a peer RS
1641   */
1642  public TopologyMsg createTopologyMsgForRS()
1643  {
1644    List<DSInfo> dsInfos = new ArrayList<>();
1645    for (DataServerHandler dsHandler : connectedDSs.values())
1646    {
1647      dsInfos.add(dsHandler.toDSInfo());
1648    }
1649
1650    // Create info for the local RS
1651    List<RSInfo> rsInfos = newArrayList(toRSInfo(localReplicationServer, generationId));
1652
1653    return new TopologyMsg(dsInfos, rsInfos);
1654  }
1655
1656  /**
1657   * Creates a TopologyMsg filled with information to be sent to a DS.
1658   * We send remote DS the info of every known DS and RS in the topology (our
1659   * directly connected DSs plus the DSs connected to other RSs) except himself.
1660   * Also put info related to local RS.
1661   *
1662   * @param destDsId The id of the DS the TopologyMsg PDU is to be sent to and
1663   * that we must not include in the DS list.
1664   * @return A suitable TopologyMsg PDU to be sent to a peer DS
1665   */
1666  public TopologyMsg createTopologyMsgForDS(int destDsId)
1667  {
1668    // Go through every DSs (except recipient of msg)
1669    List<DSInfo> dsInfos = new ArrayList<>();
1670    for (DataServerHandler dsHandler : connectedDSs.values())
1671    {
1672      if (dsHandler.getServerId() == destDsId)
1673      {
1674        continue;
1675      }
1676      dsInfos.add(dsHandler.toDSInfo());
1677    }
1678
1679
1680    List<RSInfo> rsInfos = new ArrayList<>();
1681    // Add our own info (local RS)
1682    rsInfos.add(toRSInfo(localReplicationServer, generationId));
1683
1684    // Go through every peer RSs (and get their connected DSs), also add info
1685    // for RSs
1686    for (ReplicationServerHandler rsHandler : connectedRSs.values())
1687    {
1688      rsInfos.add(rsHandler.toRSInfo());
1689
1690      rsHandler.addDSInfos(dsInfos);
1691    }
1692
1693    return new TopologyMsg(dsInfos, rsInfos);
1694  }
1695
1696  private RSInfo toRSInfo(ReplicationServer rs, long generationId)
1697  {
1698    return new RSInfo(rs.getServerId(), rs.getServerURL(), generationId,
1699        rs.getGroupId(), rs.getWeight());
1700  }
1701
1702  /**
1703   * Get the generationId associated to this domain.
1704   *
1705   * @return The generationId
1706   */
1707  public long getGenerationId()
1708  {
1709    return generationId;
1710  }
1711
1712  /**
1713   * Initialize the value of the generationID for this ReplicationServerDomain.
1714   * This method is intended to be used for initialization at startup and
1715   * simply stores the new value without any additional processing.
1716   * For example it does not clear the change-log DBs
1717   *
1718   * @param generationId The new value of generationId.
1719   */
1720  public void initGenerationID(long generationId)
1721  {
1722    synchronized (generationIDLock)
1723    {
1724      this.generationId = generationId;
1725      this.generationIdSavedStatus = true;
1726    }
1727  }
1728
1729  /**
1730   * Sets the provided value as the new in memory generationId.
1731   * Also clear the changelog databases.
1732   *
1733   * @param generationId The new value of generationId.
1734   * @return The old generation id
1735   */
1736  public long changeGenerationId(long generationId)
1737  {
1738    synchronized (generationIDLock)
1739    {
1740      long oldGenerationId = this.generationId;
1741
1742      if (this.generationId != generationId)
1743      {
1744        clearDbs();
1745
1746        this.generationId = generationId;
1747        this.generationIdSavedStatus = false;
1748      }
1749      return oldGenerationId;
1750    }
1751  }
1752
1753  /**
1754   * Resets the generationID.
1755   *
1756   * @param senderHandler The handler associated to the server
1757   *        that requested to reset the generationId.
1758   * @param genIdMsg The reset generation ID msg received.
1759   */
1760  public void resetGenerationId(ServerHandler senderHandler,
1761    ResetGenerationIdMsg genIdMsg)
1762  {
1763    if (logger.isTraceEnabled())
1764    {
1765      debug("Receiving ResetGenerationIdMsg from "
1766          + senderHandler.getServerId() + ":\n" + genIdMsg);
1767    }
1768
1769    try
1770    {
1771      // Acquire lock on domain (see more details in comment of start() method
1772      // of ServerHandler)
1773      lock();
1774    }
1775    catch (InterruptedException ex)
1776    {
1777      // We can't deal with this here, so re-interrupt thread so that it is
1778      // caught during subsequent IO.
1779      Thread.currentThread().interrupt();
1780      return;
1781    }
1782
1783    try
1784    {
1785      final long newGenId = genIdMsg.getGenerationId();
1786      if (newGenId != this.generationId)
1787      {
1788        changeGenerationId(newGenId);
1789      }
1790      else
1791      {
1792        // Order to take a gen id we already have, just ignore
1793        if (logger.isTraceEnabled())
1794        {
1795          debug("Reset generation id requested but generationId was already "
1796              + this.generationId + ":\n" + genIdMsg);
1797        }
1798      }
1799
1800      // If we are the first replication server warned,
1801      // then forwards the reset message to the remote replication servers
1802      for (ServerHandler rsHandler : connectedRSs.values())
1803      {
1804        try
1805        {
1806          // After we'll have sent the message , the remote RS will adopt
1807          // the new genId
1808          rsHandler.setGenerationId(newGenId);
1809          if (senderHandler.isDataServer())
1810          {
1811            rsHandler.send(genIdMsg);
1812          }
1813        } catch (IOException e)
1814        {
1815          logger.error(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID, baseDN, e.getMessage());
1816        }
1817      }
1818
1819      // Change status of the connected DSs according to the requested new
1820      // reference generation id
1821      for (DataServerHandler dsHandler : connectedDSs.values())
1822      {
1823        try
1824        {
1825          dsHandler.changeStatusForResetGenId(newGenId);
1826        } catch (IOException e)
1827        {
1828          logger.error(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID, baseDN,
1829              dsHandler.getServerId(), e.getMessage());
1830        }
1831      }
1832
1833      // Update every peers (RS/DS) with potential topology changes (status
1834      // change). Rather than doing that each time a DS has a status change
1835      // (consecutive to reset gen id message), we prefer advertising once for
1836      // all after changes (less packet sent), here at the end of the reset msg
1837      // treatment.
1838      sendTopoInfoToAll();
1839
1840      logger.info(NOTE_RESET_GENERATION_ID, baseDN, newGenId);
1841    }
1842    catch(Exception e)
1843    {
1844      logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
1845    }
1846    finally
1847    {
1848      release();
1849    }
1850  }
1851
1852  /**
1853   * Process message of a remote server changing his status.
1854   * @param senderHandler The handler associated to the server
1855   *        that changed his status.
1856   * @param csMsg The message containing the new status
1857   */
1858  public void processNewStatus(DataServerHandler senderHandler,
1859    ChangeStatusMsg csMsg)
1860  {
1861    if (logger.isTraceEnabled())
1862    {
1863      debug("receiving ChangeStatusMsg from " + senderHandler.getServerId()
1864          + ":\n" + csMsg);
1865    }
1866
1867    try
1868    {
1869      // Acquire lock on domain (see more details in comment of start() method
1870      // of ServerHandler)
1871      lock();
1872    }
1873    catch (InterruptedException ex)
1874    {
1875      // We can't deal with this here, so re-interrupt thread so that it is
1876      // caught during subsequent IO.
1877      Thread.currentThread().interrupt();
1878      return;
1879    }
1880
1881    try
1882    {
1883      ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
1884      if (newStatus == ServerStatus.INVALID_STATUS)
1885      {
1886        // Already logged an error in processNewStatus()
1887        // just return not to forward a bad status to topology
1888        return;
1889      }
1890
1891      enqueueTopoInfoToAllExcept(senderHandler);
1892
1893      logger.info(NOTE_DIRECTORY_SERVER_CHANGED_STATUS,
1894          senderHandler.getServerId(), baseDN, newStatus);
1895    }
1896    catch(Exception e)
1897    {
1898      logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
1899    }
1900    finally
1901    {
1902      release();
1903    }
1904  }
1905
1906  /**
1907   * Change the status of a directory server according to the event generated
1908   * from the status analyzer.
1909   * @param dsHandler The handler of the directory server to update
1910   * @param event The event to be used for new status computation
1911   * @return True if we have been interrupted (must stop), false otherwise
1912   */
1913  private boolean changeStatus(DataServerHandler dsHandler,
1914      StatusMachineEvent event)
1915  {
1916    try
1917    {
1918      // Acquire lock on domain (see ServerHandler#start() for more details)
1919      lock();
1920    }
1921    catch (InterruptedException ex)
1922    {
1923      // We have been interrupted for dying, from stopStatusAnalyzer
1924      // to prevent deadlock in this situation:
1925      // RS is being shutdown, and stopServer will call stopStatusAnalyzer.
1926      // Domain lock is taken by shutdown thread while status analyzer thread
1927      // is willing to change the status of a server at the same time so is
1928      // waiting for the domain lock at the same time. As shutdown thread is
1929      // waiting for analyzer thread death, a deadlock occurs. So we force
1930      // interruption of the status analyzer thread death after 2 seconds if
1931      // it has not finished (see StatusAnalyzer.waitForShutdown). This allows
1932      // to have the analyzer thread taking the domain lock only when the
1933      // status of a DS has to be changed. See more comments in run method of
1934      // StatusAnalyzer.
1935      if (logger.isTraceEnabled())
1936      {
1937        logger.trace("Status analyzer for domain " + baseDN
1938            + " has been interrupted when"
1939            + " trying to acquire domain lock for changing the status of DS "
1940            + dsHandler.getServerId());
1941      }
1942      return true;
1943    }
1944
1945    try
1946    {
1947      ServerStatus newStatus = ServerStatus.INVALID_STATUS;
1948      ServerStatus oldStatus = dsHandler.getStatus();
1949      try
1950      {
1951        newStatus = dsHandler.changeStatus(event);
1952      }
1953      catch (IOException e)
1954      {
1955        logger.error(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER,
1956            baseDN, dsHandler.getServerId(), e.getMessage());
1957      }
1958
1959      if (newStatus == ServerStatus.INVALID_STATUS || newStatus == oldStatus)
1960      {
1961        // Change was impossible or already occurred (see StatusAnalyzer
1962        // comments)
1963        return false;
1964      }
1965
1966      enqueueTopoInfoToAllExcept(dsHandler);
1967    }
1968    catch (Exception e)
1969    {
1970      logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
1971    }
1972    finally
1973    {
1974      release();
1975    }
1976
1977    return false;
1978  }
1979
1980  /**
1981   * Update every peers (RS/DS) with topology changes.
1982   */
1983  public void sendTopoInfoToAll()
1984  {
1985    enqueueTopoInfoToAllExcept(null);
1986  }
1987
1988  /**
1989   * Update every peers (RS/DS) with topology changes but one DS.
1990   *
1991   * @param dsHandler
1992   *          if not null, the topology message will not be sent to this DS
1993   */
1994  private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
1995  {
1996    synchronized (pendingStatusMessagesLock)
1997    {
1998      pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
1999      pendingStatusMessages.enqueueTopoInfoToAllRSs();
2000    }
2001    statusAnalyzer.notifyPendingStatusMessage();
2002  }
2003
2004  /**
2005   * Clears the Db associated with that domain.
2006   */
2007  private void clearDbs()
2008  {
2009    try
2010    {
2011      domainDB.removeDomain(baseDN);
2012    }
2013    catch (ChangelogException e)
2014    {
2015      logger.error(ERR_ERROR_CLEARING_DB, baseDN, e.getMessage(), e);
2016    }
2017  }
2018
2019  /**
2020   * Returns whether the provided server is in degraded
2021   * state due to the fact that the peer server has an invalid
2022   * generationId for this domain.
2023   *
2024   * @param serverId The serverId for which we want to know the
2025   *                 the state.
2026   * @return Whether it is degraded or not.
2027   */
2028  public boolean isDegradedDueToGenerationId(int serverId)
2029  {
2030    if (logger.isTraceEnabled())
2031    {
2032      debug("isDegraded serverId=" + serverId + " given local generation Id="
2033          + this.generationId);
2034    }
2035
2036    ServerHandler sHandler = connectedRSs.get(serverId);
2037    if (sHandler == null)
2038    {
2039      sHandler = connectedDSs.get(serverId);
2040      if (sHandler == null)
2041      {
2042        return false;
2043      }
2044    }
2045
2046    if (logger.isTraceEnabled())
2047    {
2048      debug("Compute degradation of serverId=" + serverId
2049          + " LS server generation Id=" + sHandler.getGenerationId());
2050    }
2051    return sHandler.getGenerationId() != this.generationId;
2052  }
2053
2054  /**
2055   * Process topology information received from a peer RS.
2056   * @param topoMsg The just received topo message from remote RS
2057   * @param rsHandler The handler that received the message.
2058   * @param allowResetGenId True for allowing to reset the generation id (
2059   * when called after initial handshake)
2060   * @throws IOException If an error occurred.
2061   * @throws DirectoryException If an error occurred.
2062   */
2063  public void receiveTopoInfoFromRS(TopologyMsg topoMsg,
2064      ReplicationServerHandler rsHandler, boolean allowResetGenId)
2065      throws IOException, DirectoryException
2066  {
2067    if (logger.isTraceEnabled())
2068    {
2069      debug("receiving TopologyMsg from serverId=" + rsHandler.getServerId()
2070          + ":\n" + topoMsg);
2071    }
2072
2073    try
2074    {
2075      // Acquire lock on domain (see more details in comment of start() method
2076      // of ServerHandler)
2077      lock();
2078    }
2079    catch (InterruptedException ex)
2080    {
2081      // We can't deal with this here, so re-interrupt thread so that it is
2082      // caught during subsequent IO.
2083      Thread.currentThread().interrupt();
2084      return;
2085    }
2086
2087    try
2088    {
2089      // Store DS connected to remote RS & update information about the peer RS
2090      rsHandler.processTopoInfoFromRS(topoMsg);
2091
2092      // Handle generation id
2093      if (allowResetGenId)
2094      {
2095        resetGenerationIdIfPossible();
2096        setGenerationIdIfUnset(rsHandler.getGenerationId());
2097      }
2098
2099      if (isDifferentGenerationId(rsHandler.getGenerationId()))
2100      {
2101        LocalizableMessage message = WARN_BAD_GENERATION_ID_FROM_RS.get(rsHandler.getServerId(),
2102            rsHandler.session.getReadableRemoteAddress(), rsHandler.getGenerationId(),
2103            baseDN, getLocalRSServerId(), generationId);
2104        logger.warn(message);
2105
2106        ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(),
2107            rsHandler.getServerId(), message);
2108        rsHandler.send(errorMsg);
2109      }
2110
2111      /*
2112       * Sends the currently known topology information to every connected
2113       * DS we have.
2114       */
2115      synchronized (pendingStatusMessagesLock)
2116      {
2117        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
2118      }
2119      statusAnalyzer.notifyPendingStatusMessage();
2120    }
2121    catch(Exception e)
2122    {
2123      logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
2124    }
2125    finally
2126    {
2127      release();
2128    }
2129  }
2130
2131  private void setGenerationIdIfUnset(long generationId)
2132  {
2133    if (this.generationId < 0)
2134    {
2135      this.generationId = generationId;
2136    }
2137  }
2138
2139  /**
2140   * Returns the latest monitor data available for this replication server
2141   * domain.
2142   *
2143   * @return The latest monitor data available for this replication server
2144   *         domain, which is never {@code null}.
2145   */
2146  ReplicationDomainMonitorData getDomainMonitorData()
2147  {
2148    return domainMonitor.getMonitorData();
2149  }
2150
2151  /**
2152   * Get the map of connected DSs.
2153   * @return The map of connected DSs
2154   */
2155  public Map<Integer, DataServerHandler> getConnectedDSs()
2156  {
2157    return Collections.unmodifiableMap(connectedDSs);
2158  }
2159
2160  /**
2161   * Get the map of connected RSs.
2162   * @return The map of connected RSs
2163   */
2164  public Map<Integer, ReplicationServerHandler> getConnectedRSs()
2165  {
2166    return Collections.unmodifiableMap(connectedRSs);
2167  }
2168
2169
2170  /**
2171   * A synchronization mechanism is created to insure exclusive access to the
2172   * domain. The goal is to have a consistent view of the topology by locking
2173   * the structures holding the topology view of the domain:
2174   * {@link #connectedDSs} and {@link #connectedRSs}. When a connection is
2175   * established with a peer DS or RS, the lock should be taken before updating
2176   * these structures, then released. The same mechanism should be used when
2177   * updating any data related to the view of the topology: for instance if the
2178   * status of a DS is changed, the lock should be taken before updating the
2179   * matching server handler and sending the topology messages to peers and
2180   * released after.... This allows every member of the topology to have a
2181   * consistent view of the topology and to be sure it will not miss some
2182   * information.
2183   * <p>
2184   * So the locking system must be called (not exhaustive list):
2185   * <ul>
2186   * <li>when connection established with a DS or RS</li>
2187   * <li>when connection ended with a DS or RS</li>
2188   * <li>when receiving a TopologyMsg and updating structures</li>
2189   * <li>when creating and sending a TopologyMsg</li>
2190   * <li>when a DS status is changing (ChangeStatusMsg received or sent)...</li>
2191   * </ul>
2192   */
2193  private final ReentrantLock lock = new ReentrantLock();
2194
2195  /**
2196   * This lock is used to protect the generationId variable.
2197   */
2198  private final Object generationIDLock = new Object();
2199
2200  /**
2201   * Tests if the current thread has the lock on this domain.
2202   * @return True if the current thread has the lock.
2203   */
2204  public boolean hasLock()
2205  {
2206    return lock.getHoldCount() > 0;
2207  }
2208
2209  /**
2210   * Takes the lock on this domain (blocking until lock can be acquired) or
2211   * calling thread is interrupted.
2212   * @throws java.lang.InterruptedException If interrupted.
2213   */
2214  public void lock() throws InterruptedException
2215  {
2216    lock.lockInterruptibly();
2217  }
2218
2219  /**
2220   * Releases the lock on this domain.
2221   */
2222  public void release()
2223  {
2224    lock.unlock();
2225  }
2226
2227  /**
2228   * Tries to acquire the lock on the domain within a given amount of time.
2229   * @param timeout The amount of milliseconds to wait for acquiring the lock.
2230   * @return True if the lock was acquired, false if timeout occurred.
2231   * @throws java.lang.InterruptedException When call was interrupted.
2232   */
2233  public boolean tryLock(long timeout) throws InterruptedException
2234  {
2235    return lock.tryLock(timeout, TimeUnit.MILLISECONDS);
2236  }
2237
2238  /**
2239   * Starts the monitoring publisher for the domain if not already started.
2240   */
2241  private void startMonitoringPublisher()
2242  {
2243    long period = localReplicationServer.getMonitoringPublisherPeriod();
2244    if (period > 0) // 0 means no monitoring publisher
2245    {
2246      final MonitoringPublisher thread = new MonitoringPublisher(this, period);
2247      if (monitoringPublisher.compareAndSet(null, thread))
2248      {
2249        thread.start();
2250      }
2251    }
2252  }
2253
2254  /**
2255   * Stops the monitoring publisher for the domain.
2256   */
2257  private void stopMonitoringPublisher()
2258  {
2259    final MonitoringPublisher thread = monitoringPublisher.get();
2260    if (thread != null && monitoringPublisher.compareAndSet(thread, null))
2261    {
2262      thread.shutdown();
2263      thread.waitForShutdown();
2264    }
2265  }
2266
2267  /** {@inheritDoc} */
2268  @Override
2269  public void initializeMonitorProvider(MonitorProviderCfg configuraiton)
2270  {
2271    // Nothing to do for now
2272  }
2273
2274  /** {@inheritDoc} */
2275  @Override
2276  public String getMonitorInstanceName()
2277  {
2278    return "Replication server RS(" + localReplicationServer.getServerId()
2279        + ") " + localReplicationServer.getServerURL() + ",cn="
2280        + baseDN.toString().replace(',', '_').replace('=', '_')
2281        + ",cn=Replication";
2282  }
2283
2284  /** {@inheritDoc} */
2285  @Override
2286  public List<Attribute> getMonitorData()
2287  {
2288    // publish the server id and the port number.
2289    List<Attribute> attributes = new ArrayList<>();
2290    attributes.add(Attributes.create("replication-server-id",
2291        String.valueOf(localReplicationServer.getServerId())));
2292    attributes.add(Attributes.create("replication-server-port",
2293        String.valueOf(localReplicationServer.getReplicationPort())));
2294    attributes.add(Attributes.create("domain-name",
2295        baseDN.toString()));
2296    attributes.add(Attributes.create("generation-id",
2297        baseDN + " " + generationId));
2298
2299    // Missing changes
2300    long missingChanges = getDomainMonitorData().getMissingChangesRS(
2301        localReplicationServer.getServerId());
2302    attributes.add(Attributes.create("missing-changes",
2303        String.valueOf(missingChanges)));
2304
2305    return attributes;
2306  }
2307
2308  /**
2309   * Returns the oldest known state for the domain, made of the oldest CSN
2310   * stored for each serverId.
2311   * <p>
2312   * Note: Because the replication changelogDB trimming always keep one change
2313   * whatever its date, the CSN contained in the returned state can be very old.
2314   *
2315   * @return the start state of the domain.
2316   */
2317  public ServerState getOldestState()
2318  {
2319    return domainDB.getDomainOldestCSNs(baseDN);
2320  }
2321
2322  private void sendTopologyMsg(String type, ServerHandler handler, TopologyMsg msg)
2323  {
2324    for (int i = 1; i <= 2; i++)
2325    {
2326      if (!handler.shuttingDown()
2327          && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
2328      {
2329        try
2330        {
2331          handler.sendTopoInfo(msg);
2332          break;
2333        }
2334        catch (IOException e)
2335        {
2336          if (i == 2)
2337          {
2338            logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO,
2339                baseDN, type, handler.getServerId(), e.getMessage());
2340          }
2341        }
2342      }
2343      sleep(100);
2344    }
2345  }
2346
2347
2348
2349  /**
2350   * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
2351   * value received, and forwarding the message to the other RSes.
2352   * @param senderHandler The handler for the server that sent the heartbeat.
2353   * @param msg The message to process.
2354   * @throws DirectoryException
2355   *           if a problem occurs
2356   */
2357  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
2358      ChangeTimeHeartbeatMsg msg) throws DirectoryException
2359  {
2360    try
2361    {
2362      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
2363    }
2364    catch (ChangelogException e)
2365    {
2366      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e
2367          .getMessageObject(), e);
2368    }
2369
2370    if (senderHandler.isDataServer())
2371    {
2372      /*
2373       * If we are the first replication server warned, then forward the message
2374       * to the remote replication servers.
2375       */
2376      synchronized (pendingStatusMessagesLock)
2377      {
2378        pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
2379      }
2380      statusAnalyzer.notifyPendingStatusMessage();
2381    }
2382  }
2383
2384  /**
2385   * Return the monitor instance name of the ReplicationServer that created the
2386   * current instance.
2387   *
2388   * @return the monitor instance name of the ReplicationServer that created the
2389   *         current instance.
2390   */
2391  String getLocalRSMonitorInstanceName()
2392  {
2393    return this.localReplicationServer.getMonitorInstanceName();
2394  }
2395
2396  /**
2397   * Return the serverId of the ReplicationServer that created the current
2398   * instance.
2399   *
2400   * @return the serverId of the ReplicationServer that created the current
2401   *         instance.
2402   */
2403  int getLocalRSServerId()
2404  {
2405    return this.localReplicationServer.getServerId();
2406  }
2407
2408  /**
2409   * Update the monitoring publisher with the new period value.
2410   *
2411   * @param period
2412   *          The new period value.
2413   */
2414  void updateMonitoringPeriod(long period)
2415  {
2416    if (period == 0)
2417    {
2418      // Requested to stop monitoring publishers
2419      stopMonitoringPublisher();
2420      return;
2421    }
2422
2423    final MonitoringPublisher mpThread = monitoringPublisher.get();
2424    if (mpThread != null) // it is running
2425    {
2426      mpThread.setPeriod(period);
2427    }
2428    else if (!connectedDSs.isEmpty() || !connectedRSs.isEmpty())
2429    {
2430      // Requested to start monitoring publishers with provided period value
2431      startMonitoringPublisher();
2432    }
2433  }
2434
2435  /**
2436   * Registers a DS handler into this domain and notifies the domain about the
2437   * new DS.
2438   *
2439   * @param dsHandler
2440   *          The Directory Server Handler to register
2441   */
2442  public void register(DataServerHandler dsHandler)
2443  {
2444    startMonitoringPublisher();
2445
2446    // connected with new DS: store handler.
2447    connectedDSs.put(dsHandler.getServerId(), dsHandler);
2448
2449    // Tell peer RSs and DSs a new DS just connected to us
2450    // No need to re-send TopologyMsg to this just new DS
2451    enqueueTopoInfoToAllExcept(dsHandler);
2452  }
2453
2454  /**
2455   * Registers the RS handler into this domain and notifies the domain.
2456   *
2457   * @param rsHandler
2458   *          The Replication Server Handler to register
2459   */
2460  public void register(ReplicationServerHandler rsHandler)
2461  {
2462    startMonitoringPublisher();
2463
2464    // connected with new RS (either outgoing or incoming
2465    // connection): store handler.
2466    connectedRSs.put(rsHandler.getServerId(), rsHandler);
2467  }
2468
2469  private void debug(String message)
2470  {
2471    logger.trace("In ReplicationServerDomain serverId="
2472        + localReplicationServer.getServerId() + " for baseDN=" + baseDN
2473        + " and port=" + localReplicationServer.getReplicationPort()
2474        + ": " + message);
2475  }
2476
2477
2478
2479  /**
2480   * Go through each connected DS, get the number of pending changes we have for
2481   * it and change status accordingly if threshold value is crossed/uncrossed.
2482   */
2483  void checkDSDegradedStatus()
2484  {
2485    final int degradedStatusThreshold = localReplicationServer
2486        .getDegradedStatusThreshold();
2487    // Threshold value = 0 means no status analyzer (no degrading system)
2488    // we should not have that as the status analyzer thread should not be
2489    // created if this is the case, but for sanity purpose, we add this
2490    // test
2491    if (degradedStatusThreshold > 0)
2492    {
2493      for (DataServerHandler serverHandler : connectedDSs.values())
2494      {
2495        // Get number of pending changes for this server
2496        final int nChanges = serverHandler.getRcvMsgQueueSize();
2497        if (logger.isTraceEnabled())
2498        {
2499          logger.trace("In RS " + getLocalRSServerId() + ", for baseDN="
2500              + getBaseDN() + ": " + "Status analyzer: DS "
2501              + serverHandler.getServerId() + " has " + nChanges
2502              + " message(s) in writer queue.");
2503        }
2504
2505        // Check status to know if it is relevant to change the status. Do not
2506        // take RSD lock to test. If we attempt to change the status whereas
2507        // the current status does allow it, this will be noticed by
2508        // the changeStatusFromStatusAnalyzer() method. This allows to take the
2509        // lock roughly only when needed versus every sleep time timeout.
2510        if (nChanges >= degradedStatusThreshold)
2511        {
2512          if (serverHandler.getStatus() == NORMAL_STATUS
2513              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
2514          {
2515            break; // Interrupted.
2516          }
2517        }
2518        else
2519        {
2520          if (serverHandler.getStatus() == DEGRADED_STATUS
2521              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
2522          {
2523            break; // Interrupted.
2524          }
2525        }
2526      }
2527    }
2528  }
2529
2530
2531
2532  /**
2533   * Sends any enqueued status messages to the rest of the topology.
2534   */
2535  void sendPendingStatusMessages()
2536  {
2537    /*
2538     * Take a snapshot of pending status notifications in order to avoid holding
2539     * the broadcast lock for too long. In addition, clear the notifications so
2540     * that they are not resent the next time.
2541     */
2542    final PendingStatusMessages savedState;
2543    synchronized (pendingStatusMessagesLock)
2544    {
2545      savedState = pendingStatusMessages;
2546      pendingStatusMessages = new PendingStatusMessages();
2547    }
2548    sendPendingChangeTimeHeartbeatMsgs(savedState);
2549    sendPendingTopologyMsgs(savedState);
2550    sendPendingMonitorMsgs(savedState);
2551  }
2552
2553
2554
2555  private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
2556  {
2557    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
2558        .entrySet())
2559    {
2560      ServerHandler ds = connectedDSs.get(msg.getKey());
2561      if (ds != null)
2562      {
2563        try
2564        {
2565          ds.send(msg.getValue());
2566        }
2567        catch (IOException e)
2568        {
2569          // Ignore: connection closed.
2570        }
2571      }
2572    }
2573    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
2574        .entrySet())
2575    {
2576      ServerHandler rs = connectedRSs.get(msg.getKey());
2577      if (rs != null)
2578      {
2579        try
2580        {
2581          rs.send(msg.getValue());
2582        }
2583        catch (IOException e)
2584        {
2585          // We log the error. The requestor will detect a timeout or
2586          // any other failure on the connection.
2587
2588          // FIXME: why do we log for RSs but not DSs?
2589          logger.traceException(e);
2590          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getValue().getDestination());
2591        }
2592      }
2593    }
2594  }
2595
2596
2597
2598  private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
2599  {
2600    for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats.values())
2601    {
2602      for (ReplicationServerHandler rsHandler : connectedRSs.values())
2603      {
2604        try
2605        {
2606          if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
2607          {
2608            rsHandler.send(pendingHeartbeat);
2609          }
2610        }
2611        catch (IOException e)
2612        {
2613          logger.traceException(e);
2614          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, "Replication Server "
2615              + localReplicationServer.getReplicationPort() + " " + baseDN
2616              + " " + localReplicationServer.getServerId());
2617          stopServer(rsHandler, false);
2618        }
2619      }
2620    }
2621  }
2622
2623
2624
2625  private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
2626  {
2627    if (pendingMsgs.sendDSTopologyMsg)
2628    {
2629      for (ServerHandler handler : connectedDSs.values())
2630      {
2631        if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
2632        {
2633          final TopologyMsg topoMsg = createTopologyMsgForDS(handler
2634              .getServerId());
2635          sendTopologyMsg("directory", handler, topoMsg);
2636        }
2637      }
2638    }
2639
2640    if (pendingMsgs.sendRSTopologyMsg && !connectedRSs.isEmpty())
2641    {
2642      final TopologyMsg topoMsg = createTopologyMsgForRS();
2643      for (ServerHandler handler : connectedRSs.values())
2644      {
2645        sendTopologyMsg("replication", handler, topoMsg);
2646      }
2647    }
2648  }
2649
2650
2651
2652  private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
2653  {
2654    /*
2655     * If the request comes from a Directory Server we need to build the full
2656     * list of all servers in the topology and send back a MonitorMsg with the
2657     * full list of all the servers in the topology.
2658     */
2659    if (sender.isDataServer())
2660    {
2661      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
2662          msg.getDestination(), msg.getSenderID(),
2663          domainMonitor.getMonitorData());
2664      synchronized (pendingStatusMessagesLock)
2665      {
2666        pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
2667            monitorMsg);
2668      }
2669    }
2670    else
2671    {
2672      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
2673          msg.getDestination(), msg.getSenderID());
2674      synchronized (pendingStatusMessagesLock)
2675      {
2676        pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
2677            monitorMsg);
2678      }
2679    }
2680    statusAnalyzer.notifyPendingStatusMessage();
2681  }
2682}