001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.service;
028
029import java.io.IOException;
030import java.math.BigDecimal;
031import java.math.MathContext;
032import java.math.RoundingMode;
033import java.net.*;
034import java.util.*;
035import java.util.Map.Entry;
036import java.util.concurrent.ConcurrentSkipListMap;
037import java.util.concurrent.Semaphore;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicReference;
041
042import org.forgerock.i18n.LocalizableMessage;
043import org.forgerock.i18n.slf4j.LocalizedLogger;
044import org.forgerock.util.Utils;
045import org.opends.server.admin.std.server.ReplicationDomainCfg;
046import org.opends.server.core.DirectoryServer;
047import org.opends.server.replication.common.*;
048import org.opends.server.replication.plugin.MultimasterReplication;
049import org.opends.server.replication.protocol.*;
050import org.opends.server.types.DN;
051import org.opends.server.types.HostPort;
052
053import static org.opends.messages.ReplicationMessages.*;
054import static org.opends.server.replication.protocol.ProtocolVersion.*;
055import static org.opends.server.replication.server.ReplicationServer.*;
056import static org.opends.server.util.StaticUtils.*;
057
058/**
059 * The broker for Multi-master Replication.
060 */
061public class ReplicationBroker
062{
063
064  /**
065   * Immutable class containing information about whether the broker is
066   * connected to an RS and data associated to this connected RS.
067   */
068  // @Immutable
069  private static final class ConnectedRS
070  {
071
072    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
073        NO_CONNECTED_SERVER);
074
075    /** The info of the RS we are connected to. */
076    private final ReplicationServerInfo rsInfo;
077    /** Contains a connected session to the RS if any exist, null otherwise. */
078    private final Session session;
079    private final String replicationServer;
080
081    private ConnectedRS(String replicationServer)
082    {
083      this.rsInfo = null;
084      this.session = null;
085      this.replicationServer = replicationServer;
086    }
087
088    private ConnectedRS(ReplicationServerInfo rsInfo, Session session)
089    {
090      this.rsInfo = rsInfo;
091      this.session = session;
092      this.replicationServer = session != null ?
093          session.getReadableRemoteAddress()
094          : NO_CONNECTED_SERVER;
095    }
096
097    private static ConnectedRS stopped()
098    {
099      return new ConnectedRS("stopped");
100    }
101
102    private static ConnectedRS noConnectedRS()
103    {
104      return NO_CONNECTED_RS;
105    }
106
107    public int getServerId()
108    {
109      return rsInfo != null ? rsInfo.getServerId() : -1;
110    }
111
112    private byte getGroupId()
113    {
114      return rsInfo != null ? rsInfo.getGroupId() : -1;
115    }
116
117    private boolean isConnected()
118    {
119      return session != null;
120    }
121
122    /** {@inheritDoc} */
123    @Override
124    public String toString()
125    {
126      final StringBuilder sb = new StringBuilder();
127      toString(sb);
128      return sb.toString();
129    }
130
131    public void toString(StringBuilder sb)
132    {
133      sb.append("connected=").append(isConnected()).append(", ");
134      if (!isConnected())
135      {
136        sb.append("no connectedRS");
137      }
138      else
139      {
140        sb.append("connectedRS(serverId=").append(rsInfo.getServerId())
141          .append(", serverUrl=").append(rsInfo.getServerURL())
142          .append(", groupId=").append(rsInfo.getGroupId())
143          .append(")");
144      }
145    }
146
147  }
148  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
149  private volatile boolean shutdown;
150  private final Object startStopLock = new Object();
151  private volatile ReplicationDomainCfg config;
152  /**
153   * String reported under CSN=monitor when there is no connected RS.
154   */
155  static final String NO_CONNECTED_SERVER = "Not connected";
156  private final ServerState state;
157  private Semaphore sendWindow;
158  private int maxSendWindow;
159  private int rcvWindow = 100;
160  private int halfRcvWindow = rcvWindow / 2;
161  private int timeout;
162  private final ReplSessionSecurity replSessionSecurity;
163  /**
164   * The RS this DS is currently connected to.
165   * <p>
166   * Always use {@link #setConnectedRS(ConnectedRS)} to set a new
167   * connected RS.
168   */
169  // @NotNull // for the reference
170  private final AtomicReference<ConnectedRS> connectedRS = new AtomicReference<>(ConnectedRS.noConnectedRS());
171  /** Our replication domain. */
172  private final ReplicationDomain domain;
173  /**
174   * This object is used as a conditional event to be notified about
175   * the reception of monitor information from the Replication Server.
176   */
177  private final AtomicBoolean monitorResponse = new AtomicBoolean(false);
178  /**
179   * A Map containing the ServerStates of all the replicas in the topology
180   * as seen by the ReplicationServer the last time it was polled or the last
181   * time it published monitoring information.
182   */
183  private Map<Integer, ServerState> replicaStates = new HashMap<>();
184  /** A thread to monitor heartbeats on the session. */
185  private HeartbeatMonitor heartbeatMonitor;
186  /** The number of times the connection was lost. */
187  private int numLostConnections;
188  /**
189   * When the broker cannot connect to any replication server
190   * it log an error and keeps continuing every second.
191   * This boolean is set when the first failure happens and is used
192   * to avoid repeating the error message for further failure to connect
193   * and to know that it is necessary to print a new message when the broker
194   * finally succeed to connect.
195   */
196  private volatile boolean connectionError;
197  private final Object connectPhaseLock = new Object();
198  /**
199   * The thread that publishes messages to the RS containing the current
200   * change time of this DS.
201   */
202  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
203  /*
204   * Properties for the last topology info received from the network.
205   */
206  /** Contains the last known state of the replication topology. */
207  private final AtomicReference<Topology> topology = new AtomicReference<>(new Topology());
208  /** <pre>@GuardedBy("this")</pre>. */
209  private volatile int updateDoneCount;
210  private volatile boolean connectRequiresRecovery;
211
212  /**
213   * This integer defines when the best replication server checking algorithm
214   * should be engaged.
215   * Every time a monitoring message (each monitoring publisher period) is
216   * received, it is incremented. When it reaches 2, we run the checking
217   * algorithm to see if we must reconnect to another best replication server.
218   * Then we reset the value to 0. But when a topology message is received, the
219   * integer is reset to 0. This ensures that we wait at least one monitoring
220   * publisher period before running the algorithm, but also that we wait at
221   * least for a monitoring period after the last received topology message
222   * (topology stabilization).
223   */
224  private int mustRunBestServerCheckingAlgorithm;
225
226  /**
227   * The monitor provider for this replication domain.
228   * <p>
229   * The name of the monitor includes the local address and must therefore be
230   * re-registered every time the session is re-established or destroyed. The
231   * monitor provider can only be created (i.e. non-null) if there is a
232   * replication domain, which is not the case in unit tests.
233   */
234  private final ReplicationMonitor monitor;
235
236  /**
237   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
238   *
239   * @param replicationDomain The replication domain that is creating us.
240   * @param state The ServerState that should be used by this broker
241   *        when negotiating the session with the replicationServer.
242   * @param config The configuration to use.
243   * @param replSessionSecurity The session security configuration.
244   */
245  public ReplicationBroker(ReplicationDomain replicationDomain,
246      ServerState state, ReplicationDomainCfg config,
247      ReplSessionSecurity replSessionSecurity)
248  {
249    this.domain = replicationDomain;
250    this.state = state;
251    this.config = config;
252    this.replSessionSecurity = replSessionSecurity;
253    this.rcvWindow = getMaxRcvWindow();
254    this.halfRcvWindow = rcvWindow / 2;
255
256    /*
257     * Only create a monitor if there is a replication domain (this is not the
258     * case in some unit tests).
259     */
260    this.monitor = replicationDomain != null ? new ReplicationMonitor(
261        replicationDomain) : null;
262    registerReplicationMonitor();
263  }
264
265  /**
266   * Start the ReplicationBroker.
267   */
268  public void start()
269  {
270    synchronized (startStopLock)
271    {
272      shutdown = false;
273      this.rcvWindow = getMaxRcvWindow();
274      connectAsDataServer();
275    }
276  }
277
278  /**
279   * Gets the group id of the RS we are connected to.
280   * @return The group id of the RS we are connected to
281   */
282  public byte getRsGroupId()
283  {
284    return connectedRS.get().getGroupId();
285  }
286
287  /**
288   * Gets the server id of the RS we are connected to.
289   * @return The server id of the RS we are connected to
290   */
291  public int getRsServerId()
292  {
293    return connectedRS.get().getServerId();
294  }
295
296  /**
297   * Gets the server id.
298   * @return The server id
299   */
300  public int getServerId()
301  {
302    return config.getServerId();
303  }
304
305  private DN getBaseDN()
306  {
307    return config.getBaseDN();
308  }
309
310  private Set<String> getReplicationServerUrls()
311  {
312    return config.getReplicationServer();
313  }
314
315  private byte getGroupId()
316  {
317    return (byte) config.getGroupId();
318  }
319
320  /**
321   * Gets the server id.
322   * @return The server id
323   */
324  private long getGenerationID()
325  {
326    return domain.getGenerationID();
327  }
328
329  /**
330   * Set the generation id - for test purpose.
331   * @param generationID The generation id
332   */
333  public void setGenerationID(long generationID)
334  {
335    domain.setGenerationID(generationID);
336  }
337
338  /**
339   * Compares 2 replication servers addresses and returns true if they both
340   * represent the same replication server instance.
341   * @param rs1Url Replication server 1 address
342   * @param rs2Url Replication server 2 address
343   * @return True if both replication server addresses represent the same
344   * replication server instance, false otherwise.
345   */
346  private static boolean isSameReplicationServerUrl(String rs1Url,
347      String rs2Url)
348  {
349    try
350    {
351      final HostPort hp1 = HostPort.valueOf(rs1Url);
352      final HostPort hp2 = HostPort.valueOf(rs2Url);
353      return hp1.isEquivalentTo(hp2);
354    }
355    catch (RuntimeException ex)
356    {
357      // Not a RS url or not a valid port number: should not happen
358      return false;
359    }
360  }
361
362  /**
363   * Bag class for keeping info we get from a replication server in order to
364   * compute the best one to connect to. This is in fact a wrapper to a
365   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
366   * updated with a info coming from received topology messages or monitoring
367   * messages.
368   */
369  static class ReplicationServerInfo
370  {
371    private RSInfo rsInfo;
372    private final short protocolVersion;
373    private final DN baseDN;
374    private final int windowSize;
375    // @NotNull
376    private final ServerState serverState;
377    private final boolean sslEncryption;
378    private final int degradedStatusThreshold;
379    /** Keeps the 0 value if created with a ReplServerStartMsg. */
380    private int connectedDSNumber;
381    // @NotNull
382    private Set<Integer> connectedDSs;
383    /**
384     * Is this RS locally configured? (the RS is recognized as a usable server).
385     */
386    private boolean locallyConfigured = true;
387
388    /**
389     * Create a new instance of ReplicationServerInfo wrapping the passed
390     * message.
391     * @param msg LocalizableMessage to wrap.
392     * @param newServerURL Override serverURL.
393     * @return The new instance wrapping the passed message.
394     * @throws IllegalArgumentException If the passed message has an unexpected
395     *                                  type.
396     */
397    private static ReplicationServerInfo newInstance(
398      ReplicationMsg msg, String newServerURL) throws IllegalArgumentException
399    {
400      final ReplicationServerInfo rsInfo = newInstance(msg);
401      rsInfo.setServerURL(newServerURL);
402      return rsInfo;
403    }
404
405    /**
406     * Create a new instance of ReplicationServerInfo wrapping the passed
407     * message.
408     * @param msg LocalizableMessage to wrap.
409     * @return The new instance wrapping the passed message.
410     * @throws IllegalArgumentException If the passed message has an unexpected
411     *                                  type.
412     */
413    static ReplicationServerInfo newInstance(ReplicationMsg msg)
414        throws IllegalArgumentException
415    {
416      if (msg instanceof ReplServerStartMsg)
417      {
418        // RS uses protocol V3 or lower
419        return new ReplicationServerInfo((ReplServerStartMsg) msg);
420      }
421      else if (msg instanceof ReplServerStartDSMsg)
422      {
423        // RS uses protocol V4 or higher
424        return new ReplicationServerInfo((ReplServerStartDSMsg) msg);
425      }
426
427      // Unsupported message type: should not happen
428      throw new IllegalArgumentException("Unexpected PDU type: "
429          + msg.getClass().getName() + ":\n" + msg);
430    }
431
432    /**
433     * Constructs a ReplicationServerInfo object wrapping a
434     * {@link ReplServerStartMsg}.
435     *
436     * @param msg
437     *          The {@link ReplServerStartMsg} this object will wrap.
438     */
439    private ReplicationServerInfo(ReplServerStartMsg msg)
440    {
441      this.protocolVersion = msg.getVersion();
442      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
443          msg.getGenerationId(), msg.getGroupId(), 1);
444      this.baseDN = msg.getBaseDN();
445      this.windowSize = msg.getWindowSize();
446      final ServerState ss = msg.getServerState();
447      this.serverState = ss != null ? ss : new ServerState();
448      this.sslEncryption = msg.getSSLEncryption();
449      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
450    }
451
452    /**
453     * Constructs a ReplicationServerInfo object wrapping a
454     * {@link ReplServerStartDSMsg}.
455     *
456     * @param msg
457     *          The {@link ReplServerStartDSMsg} this object will wrap.
458     */
459    private ReplicationServerInfo(ReplServerStartDSMsg msg)
460    {
461      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
462          msg.getGenerationId(), msg.getGroupId(), msg.getWeight());
463      this.protocolVersion = msg.getVersion();
464      this.baseDN = msg.getBaseDN();
465      this.windowSize = msg.getWindowSize();
466      final ServerState ss = msg.getServerState();
467      this.serverState = ss != null ? ss : new ServerState();
468      this.sslEncryption = msg.getSSLEncryption();
469      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
470      this.connectedDSNumber = msg.getConnectedDSNumber();
471    }
472
473    /**
474     * Constructs a new replication server info with the passed RSInfo internal
475     * values and the passed connected DSs.
476     *
477     * @param rsInfo
478     *          The RSinfo to use for the update
479     * @param connectedDSs
480     *          The new connected DSs
481     */
482    ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs)
483    {
484      this.rsInfo =
485          new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(), rsInfo
486              .getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
487      this.protocolVersion = 0;
488      this.baseDN = null;
489      this.windowSize = 0;
490      this.connectedDSs = connectedDSs;
491      this.connectedDSNumber = connectedDSs.size();
492      this.sslEncryption = false;
493      this.degradedStatusThreshold = -1;
494      this.serverState = new ServerState();
495    }
496
497    /**
498     * Get the server state.
499     * @return The server state
500     */
501    public ServerState getServerState()
502    {
503      return serverState;
504    }
505
506    /**
507     * Get the group id.
508     * @return The group id
509     */
510    public byte getGroupId()
511    {
512      return rsInfo.getGroupId();
513    }
514
515    /**
516     * Get the server protocol version.
517     * @return the protocolVersion
518     */
519    public short getProtocolVersion()
520    {
521      return protocolVersion;
522    }
523
524    /**
525     * Get the generation id.
526     * @return the generationId
527     */
528    public long getGenerationId()
529    {
530      return rsInfo.getGenerationId();
531    }
532
533    /**
534     * Get the server id.
535     * @return the serverId
536     */
537    public int getServerId()
538    {
539      return rsInfo.getId();
540    }
541
542    /**
543     * Get the server URL.
544     * @return the serverURL
545     */
546    public String getServerURL()
547    {
548      return rsInfo.getServerUrl();
549    }
550
551    /**
552     * Get the base DN.
553     *
554     * @return the base DN
555     */
556    public DN getBaseDN()
557    {
558      return baseDN;
559    }
560
561    /**
562     * Get the window size.
563     * @return the windowSize
564     */
565    public int getWindowSize()
566    {
567      return windowSize;
568    }
569
570    /**
571     * Get the ssl encryption.
572     * @return the sslEncryption
573     */
574    public boolean isSslEncryption()
575    {
576      return sslEncryption;
577    }
578
579    /**
580     * Get the degraded status threshold.
581     * @return the degradedStatusThreshold
582     */
583    public int getDegradedStatusThreshold()
584    {
585      return degradedStatusThreshold;
586    }
587
588    /**
589     * Get the weight.
590     * @return the weight. Null if this object is a wrapper for
591     * a ReplServerStartMsg.
592     */
593    public int getWeight()
594    {
595      return rsInfo.getWeight();
596    }
597
598    /**
599     * Get the connected DS number.
600     * @return the connectedDSNumber. Null if this object is a wrapper for
601     * a ReplServerStartMsg.
602     */
603    public int getConnectedDSNumber()
604    {
605      return connectedDSNumber;
606    }
607
608    /**
609     * Converts the object to a RSInfo object.
610     * @return The RSInfo object matching this object.
611     */
612    RSInfo toRSInfo()
613    {
614      return rsInfo;
615    }
616
617    /**
618     * Updates replication server info with the passed RSInfo internal values
619     * and the passed connected DSs.
620     * @param rsInfo The RSinfo to use for the update
621     * @param connectedDSs The new connected DSs
622     */
623    private void update(RSInfo rsInfo, Set<Integer> connectedDSs)
624    {
625      this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(),
626          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
627      this.connectedDSs = connectedDSs;
628      this.connectedDSNumber = connectedDSs.size();
629    }
630
631    private void setServerURL(String newServerURL)
632    {
633      rsInfo = new RSInfo(rsInfo.getId(), newServerURL,
634          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
635    }
636
637    /**
638     * Updates replication server info with the passed server state.
639     * @param serverState The ServerState to use for the update
640     */
641    private void update(ServerState serverState)
642    {
643      this.serverState.update(serverState);
644    }
645
646    /**
647     * Get the getConnectedDSs.
648     * @return the getConnectedDSs
649     */
650    public Set<Integer> getConnectedDSs()
651    {
652      return connectedDSs;
653    }
654
655    /**
656     * Gets the locally configured status for this RS.
657     * @return the locallyConfigured
658     */
659    public boolean isLocallyConfigured()
660    {
661      return locallyConfigured;
662    }
663
664    /**
665     * Sets the locally configured status for this RS.
666     * @param locallyConfigured the locallyConfigured to set
667     */
668    public void setLocallyConfigured(boolean locallyConfigured)
669    {
670      this.locallyConfigured = locallyConfigured;
671    }
672
673    /**
674     * Returns a string representation of this object.
675     * @return A string representation of this object.
676     */
677    @Override
678    public String toString()
679    {
680      return "ReplServerInfo Url:" + getServerURL()
681          + " ServerId:" + getServerId()
682          + " GroupId:" + getGroupId()
683          + " connectedDSs:" + connectedDSs;
684    }
685  }
686
687  /**
688   * Contacts all replication servers to get information from them and being
689   * able to choose the more suitable.
690   * @return the collected information.
691   */
692  private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
693  {
694    final Map<Integer, ReplicationServerInfo> rsInfos = new ConcurrentSkipListMap<>();
695
696    for (String serverUrl : getReplicationServerUrls())
697    {
698      // Connect to server + get and store info about it
699      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
700      final ReplicationServerInfo rsInfo = rs.rsInfo;
701      if (rsInfo != null)
702      {
703        rsInfos.put(rsInfo.getServerId(), rsInfo);
704      }
705    }
706
707    return rsInfos;
708  }
709
710  /**
711   * Connect to a ReplicationServer.
712   *
713   * Handshake sequences between a DS and a RS is divided into 2 logical
714   * consecutive phases (phase 1 and phase 2). DS always initiates connection
715   * and always sends first message:
716   *
717   * DS<->RS:
718   * -------
719   *
720   * phase 1:
721   * DS --- ServerStartMsg ---> RS
722   * DS <--- ReplServerStartDSMsg --- RS
723   * phase 2:
724   * DS --- StartSessionMsg ---> RS
725   * DS <--- TopologyMsg --- RS
726   *
727   * Before performing a full handshake sequence, DS searches for best suitable
728   * RS by making only phase 1 handshake to every RS he knows then closing
729   * connection. This allows to gather information on available RSs and then
730   * decide with which RS the full handshake (phase 1 then phase 2) will be
731   * finally performed.
732   *
733   * @throws NumberFormatException address was invalid
734   */
735  private void connectAsDataServer()
736  {
737    /*
738     * If a first connect or a connection failure occur, we go through here.
739     * force status machine to NOT_CONNECTED_STATUS so that monitoring can see
740     * that we are not connected.
741     */
742    domain.toNotConnectedStatus();
743
744    /*
745    Stop any existing heartbeat monitor and changeTime publisher
746    from a previous session.
747    */
748    stopRSHeartBeatMonitoring();
749    stopChangeTimeHeartBeatPublishing();
750    mustRunBestServerCheckingAlgorithm = 0;
751
752    synchronized (connectPhaseLock)
753    {
754      final int serverId = getServerId();
755      final DN baseDN = getBaseDN();
756
757      /*
758       * Connect to each replication server and get their ServerState then find
759       * out which one is the best to connect to.
760       */
761      if (logger.isTraceEnabled())
762      {
763        debugInfo("phase 1 : will perform PhaseOneH with each RS in order to elect the preferred one");
764      }
765
766      // Get info from every available replication servers
767      Map<Integer, ReplicationServerInfo> rsInfos =
768          collectReplicationServersInfo();
769      computeNewTopology(toRSInfos(rsInfos));
770
771      if (rsInfos.isEmpty())
772      {
773        setConnectedRS(ConnectedRS.noConnectedRS());
774      }
775      else
776      {
777        // At least one server answered, find the best one.
778        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
779            rsInfos, serverId, getGroupId(), getGenerationID());
780
781        // Best found, now initialize connection to this one (handshake phase 1)
782        if (logger.isTraceEnabled())
783        {
784          debugInfo("phase 2 : will perform PhaseOneH with the preferred RS=" + evals.getBestRS());
785        }
786
787        final ConnectedRS electedRS = performPhaseOneHandshake(
788            evals.getBestRS().getServerURL(), true);
789        final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
790        if (electedRsInfo != null)
791        {
792          /*
793          Update replication server info with potentially more up to date
794          data (server state for instance may have changed)
795          */
796          rsInfos.put(electedRsInfo.getServerId(), electedRsInfo);
797
798          // Handshake phase 1 exchange went well
799
800          // Compute in which status we are starting the session to tell the RS
801          final ServerStatus initStatus = computeInitialServerStatus(
802              electedRsInfo.getGenerationId(), electedRsInfo.getServerState(),
803              electedRsInfo.getDegradedStatusThreshold(), getGenerationID());
804
805          // Perform session start (handshake phase 2)
806          final TopologyMsg topologyMsg =
807              performPhaseTwoHandshake(electedRS, initStatus);
808
809          if (topologyMsg != null) // Handshake phase 2 exchange went well
810          {
811            connectToReplicationServer(electedRS, initStatus, topologyMsg);
812          } // Could perform handshake phase 2 with best
813        } // Could perform handshake phase 1 with best
814      }
815
816      // connectedRS has been updated by calls above, reload it
817      final ConnectedRS rs = connectedRS.get();
818      if (rs.isConnected())
819      {
820        connectPhaseLock.notify();
821
822        final long rsGenId = rs.rsInfo.getGenerationId();
823        final int rsServerId = rs.rsInfo.getServerId();
824        if (rsGenId == getGenerationID() || rsGenId == -1)
825        {
826          logger.info(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG, serverId, rsServerId, baseDN,
827              rs.replicationServer, getGenerationID());
828        }
829        else
830        {
831          logger.warn(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG, serverId, rsServerId, baseDN,
832              rs.replicationServer, getGenerationID(), rsGenId);
833        }
834      }
835      else
836      {
837         // This server could not find any replicationServer.
838         // It's going to start in degraded mode. Log a message.
839        if (!connectionError)
840        {
841          connectionError = true;
842          connectPhaseLock.notify();
843
844          if (!rsInfos.isEmpty())
845          {
846            logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN,
847                Utils.joinAsString(", ", rsInfos.keySet()));
848          }
849          else
850          {
851            logger.warn(WARN_NO_AVAILABLE_CHANGELOGS, serverId, baseDN);
852          }
853        }
854      }
855    }
856  }
857
858  private void computeNewTopology(List<RSInfo> newRSInfos)
859  {
860    final int rsServerId = getRsServerId();
861
862    Topology oldTopo;
863    Topology newTopo;
864    do
865    {
866      oldTopo = topology.get();
867      newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(),
868          rsServerId, getReplicationServerUrls(), oldTopo.rsInfos);
869    }
870    while (!topology.compareAndSet(oldTopo, newTopo));
871
872    if (logger.isTraceEnabled())
873    {
874      debugInfo(topologyChange(rsServerId, oldTopo, newTopo));
875    }
876  }
877
878  private StringBuilder topologyChange(int rsServerId, Topology oldTopo,
879      Topology newTopo)
880  {
881    final StringBuilder sb = new StringBuilder();
882    sb.append("rsServerId=").append(rsServerId);
883    if (newTopo.equals(oldTopo))
884    {
885      sb.append(", unchangedTopology=").append(newTopo);
886    }
887    else
888    {
889      sb.append(", oldTopology=").append(oldTopo);
890      sb.append(", newTopology=").append(newTopo);
891    }
892    return sb;
893  }
894
895  /**
896   * Connects to a replication server.
897   *
898   * @param rs
899   *          the Replication Server to connect to
900   * @param initStatus
901   *          The status to enter the state machine with
902   * @param topologyMsg
903   *          the message containing the topology information
904   */
905  private void connectToReplicationServer(ConnectedRS rs,
906      ServerStatus initStatus, TopologyMsg topologyMsg)
907  {
908    final DN baseDN = getBaseDN();
909    final ReplicationServerInfo rsInfo = rs.rsInfo;
910
911    boolean connectCompleted = false;
912    try
913    {
914      maxSendWindow = rsInfo.getWindowSize();
915
916      receiveTopo(topologyMsg, rs.getServerId());
917
918      /*
919      Log a message to let the administrator know that the failure was resolved.
920      Wake up all the thread that were waiting on the window
921      on the previous connection.
922      */
923      connectionError = false;
924      if (sendWindow != null)
925      {
926        /*
927         * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding
928         * this semaphore will get blocked when they acquire it. However, we
929         * also need to make sure that we don't overflow the semaphore by
930         * releasing too many permits.
931         */
932        final int MAX_PERMITS = Integer.MAX_VALUE >>> 2;
933        if (sendWindow.availablePermits() < MAX_PERMITS)
934        {
935          /*
936           * At least 2^29 acquisitions would need to occur for this to be
937           * insufficient. In addition, at least 2^30 releases would need to
938           * occur for this to potentially overflow. Hopefully this is unlikely
939           * to happen.
940           */
941          sendWindow.release(MAX_PERMITS);
942        }
943      }
944      sendWindow = new Semaphore(maxSendWindow);
945      rcvWindow = getMaxRcvWindow();
946
947      domain.sessionInitiated(initStatus, rsInfo.getServerState());
948
949      final byte groupId = getGroupId();
950      if (rs.getGroupId() != groupId)
951      {
952        /*
953        Connected to replication server with wrong group id:
954        warn user and start heartbeat monitor to recover when a server
955        with the right group id shows up.
956        */
957        logger.warn(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID,
958            groupId, rs.getServerId(), rsInfo.getServerURL(), rs.getGroupId(), baseDN, getServerId());
959      }
960      startRSHeartBeatMonitoring(rs);
961      if (rsInfo.getProtocolVersion() >=
962        ProtocolVersion.REPLICATION_PROTOCOL_V3)
963      {
964        startChangeTimeHeartBeatPublishing(rs);
965      }
966      connectCompleted = true;
967    }
968    catch (Exception e)
969    {
970      logger.error(ERR_COMPUTING_FAKE_OPS, baseDN, rsInfo.getServerURL(),
971          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
972    }
973    finally
974    {
975      if (!connectCompleted)
976      {
977        setConnectedRS(ConnectedRS.noConnectedRS());
978      }
979    }
980  }
981
982  /**
983   * Determines the status we are starting with according to our state and the
984   * RS state.
985   *
986   * @param rsGenId The generation id of the RS
987   * @param rsState The server state of the RS
988   * @param degradedStatusThreshold The degraded status threshold of the RS
989   * @param dsGenId The local generation id
990   * @return The initial status
991   */
992  private ServerStatus computeInitialServerStatus(long rsGenId,
993    ServerState rsState, int degradedStatusThreshold, long dsGenId)
994  {
995    if (rsGenId == -1)
996    {
997      // RS has no generation id
998      return ServerStatus.NORMAL_STATUS;
999    }
1000    else if (rsGenId != dsGenId)
1001    {
1002      // DS and RS do not have same generation id
1003      return ServerStatus.BAD_GEN_ID_STATUS;
1004    }
1005    else
1006    {
1007      /*
1008      DS and RS have same generation id
1009
1010      Determine if we are late or not to replay changes. RS uses a
1011      threshold value for pending changes to be replayed by a DS to
1012      determine if the DS is in normal status or in degraded status.
1013      Let's compare the local and remote server state using  this threshold
1014      value to determine if we are late or not
1015      */
1016
1017      int nChanges = ServerState.diffChanges(rsState, state);
1018      if (logger.isTraceEnabled())
1019      {
1020        debugInfo("computed " + nChanges + " changes late.");
1021      }
1022
1023      /*
1024      Check status to know if it is relevant to change the status. Do not
1025      take RSD lock to test. If we attempt to change the status whereas
1026      we are in a status that do not allows that, this will be noticed by
1027      the changeStatusFromStatusAnalyzer method. This allows to take the
1028      lock roughly only when needed versus every sleep time timeout.
1029      */
1030      if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
1031      {
1032        return ServerStatus.DEGRADED_STATUS;
1033      }
1034      // degradedStatusThreshold value of '0' means no degrading system used
1035      // (no threshold): force normal status
1036      return ServerStatus.NORMAL_STATUS;
1037    }
1038  }
1039
1040
1041
1042  /**
1043   * Connect to the provided server performing the first phase handshake (start
1044   * messages exchange) and return the reply message from the replication
1045   * server, wrapped in a ReplicationServerInfo object.
1046   *
1047   * @param serverURL
1048   *          Server to connect to.
1049   * @param keepSession
1050   *          Do we keep session opened or not after handshake. Use true if want
1051   *          to perform handshake phase 2 with the same session and keep the
1052   *          session to create as the current one.
1053   * @return The answer from the server . Null if could not get an answer.
1054   */
1055  private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
1056  {
1057    Session newSession = null;
1058    Socket socket = null;
1059    boolean hasConnected = false;
1060    LocalizableMessage errorMessage = null;
1061
1062    try
1063    {
1064      // Open a socket connection to the next candidate.
1065      socket = new Socket();
1066      socket.setReceiveBufferSize(1000000);
1067      socket.setTcpNoDelay(true);
1068      if (config.getSourceAddress() != null)
1069      {
1070        InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0);
1071        socket.bind(local);
1072      }
1073      int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
1074      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
1075      newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
1076      boolean isSslEncryption = replSessionSecurity.isSslEncryption();
1077
1078      // Send our ServerStartMsg.
1079      final HostPort hp = new HostPort(
1080          socket.getLocalAddress().getHostName(), socket.getLocalPort());
1081      final String url = hp.toString();
1082      final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
1083          getMaxRcvWindow(), config.getHeartbeatInterval(), state,
1084          getGenerationID(), isSslEncryption, getGroupId());
1085      newSession.publish(serverStartMsg);
1086
1087      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
1088      // come back.
1089      ReplicationMsg msg = newSession.receive();
1090      if (logger.isTraceEnabled())
1091      {
1092        debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
1093            + msg);
1094      }
1095
1096      // Wrap received message in a server info object
1097      final ReplicationServerInfo replServerInfo =
1098          ReplicationServerInfo.newInstance(msg, serverURL);
1099
1100      // Sanity check
1101      final DN repDN = replServerInfo.getBaseDN();
1102      if (!getBaseDN().equals(repDN))
1103      {
1104        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDN, getBaseDN());
1105        return setConnectedRS(ConnectedRS.noConnectedRS());
1106      }
1107
1108      /*
1109       * We have sent our own protocol version to the replication server. The
1110       * replication server will use the same one (or an older one if it is an
1111       * old replication server).
1112       */
1113      newSession.setProtocolVersion(
1114          getCompatibleVersion(replServerInfo.getProtocolVersion()));
1115
1116      if (!isSslEncryption)
1117      {
1118        newSession.stopEncryption();
1119      }
1120
1121      hasConnected = true;
1122
1123      if (keepSession)
1124      {
1125        // cannot store it yet,
1126        // only store after a successful phase two handshake
1127        return new ConnectedRS(replServerInfo, newSession);
1128      }
1129      return new ConnectedRS(replServerInfo, null);
1130    }
1131    catch (ConnectException e)
1132    {
1133      logger.traceException(e);
1134      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), serverURL, getBaseDN());
1135    }
1136    catch (SocketTimeoutException e)
1137    {
1138      logger.traceException(e);
1139      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), serverURL, getBaseDN());
1140    }
1141    catch (Exception e)
1142    {
1143      logger.traceException(e);
1144      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
1145          getServerId(), serverURL, getBaseDN(), stackTraceToSingleLineString(e));
1146    }
1147    finally
1148    {
1149      if (!hasConnected || !keepSession)
1150      {
1151        close(newSession);
1152        close(socket);
1153      }
1154
1155      if (!hasConnected && errorMessage != null && !connectionError)
1156      {
1157        // There was no server waiting on this host:port
1158        // Log a notice and will try the next replicationServer in the list
1159        if (keepSession) // Log error message only for final connection
1160        {
1161          // log the error message only once to avoid overflowing the error log
1162          logger.error(errorMessage);
1163        }
1164
1165        logger.trace(errorMessage);
1166      }
1167    }
1168    return setConnectedRS(ConnectedRS.noConnectedRS());
1169  }
1170
1171  /**
1172   * Performs the second phase handshake (send StartSessionMsg and receive
1173   * TopologyMsg messages exchange) and return the reply message from the
1174   * replication server.
1175   *
1176   * @param electedRS Server we are connecting with.
1177   * @param initStatus The status we are starting with
1178   * @return The ReplServerStartMsg the server replied. Null if could not
1179   *         get an answer.
1180   */
1181  private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS,
1182    ServerStatus initStatus)
1183  {
1184    try
1185    {
1186      // Send our StartSessionMsg.
1187      final StartSessionMsg startSessionMsg;
1188      startSessionMsg = new StartSessionMsg(
1189          initStatus,
1190          domain.getRefUrls(),
1191          domain.isAssured(),
1192          domain.getAssuredMode(),
1193          domain.getAssuredSdLevel());
1194      startSessionMsg.setEclIncludes(
1195          domain.getEclIncludes(domain.getServerId()),
1196          domain.getEclIncludesForDeletes(domain.getServerId()));
1197      final Session session = electedRS.session;
1198      session.publish(startSessionMsg);
1199
1200      // Read the TopologyMsg that should come back.
1201      final TopologyMsg topologyMsg = (TopologyMsg) session.receive();
1202
1203      if (logger.isTraceEnabled())
1204      {
1205        debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
1206            + "\nAND RECEIVED:\n" + topologyMsg);
1207      }
1208
1209      // Alright set the timeout to the desired value
1210      session.setSoTimeout(timeout);
1211      setConnectedRS(electedRS);
1212      return topologyMsg;
1213    }
1214    catch (Exception e)
1215    {
1216      logger.error(WARN_EXCEPTION_STARTING_SESSION_PHASE,
1217          getServerId(), electedRS.rsInfo.getServerURL(), getBaseDN(), stackTraceToSingleLineString(e));
1218
1219      setConnectedRS(ConnectedRS.noConnectedRS());
1220      return null;
1221    }
1222  }
1223
1224  /**
1225   * Class holding evaluation results for electing the best replication server
1226   * for the local directory server.
1227   */
1228  static class RSEvaluations
1229  {
1230    private final int localServerId;
1231    private Map<Integer, ReplicationServerInfo> bestRSs;
1232    private final Map<Integer, LocalizableMessage> rsEvals = new HashMap<>();
1233
1234    /**
1235     * Ctor.
1236     *
1237     * @param localServerId
1238     *          the serverId for the local directory server
1239     * @param rsInfos
1240     *          a Map of serverId => {@link ReplicationServerInfo} with all the
1241     *          candidate replication servers
1242     */
1243    RSEvaluations(int localServerId,
1244        Map<Integer, ReplicationServerInfo> rsInfos)
1245    {
1246      this.localServerId = localServerId;
1247      this.bestRSs = rsInfos;
1248    }
1249
1250    private boolean keepBest(LocalEvaluation eval)
1251    {
1252      if (eval.hasAcceptedAny())
1253      {
1254        bestRSs = eval.getAccepted();
1255        rsEvals.putAll(eval.getRejected());
1256        return true;
1257      }
1258      return false;
1259    }
1260
1261    /**
1262     * Sets the elected best replication server, rejecting all the other
1263     * replication servers with the supplied evaluation.
1264     *
1265     * @param bestRsId
1266     *          the serverId of the elected replication server
1267     * @param rejectedRSsEval
1268     *          the evaluation for all the rejected replication servers
1269     */
1270    private void setBestRS(int bestRsId, LocalizableMessage rejectedRSsEval)
1271    {
1272      for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
1273          this.bestRSs.entrySet().iterator(); it.hasNext();)
1274      {
1275        final Entry<Integer, ReplicationServerInfo> entry = it.next();
1276        final Integer rsId = entry.getKey();
1277        final ReplicationServerInfo rsInfo = entry.getValue();
1278        if (rsInfo.getServerId() != bestRsId)
1279        {
1280          it.remove();
1281        }
1282        rsEvals.put(rsId, rejectedRSsEval);
1283      }
1284    }
1285
1286    private void discardAll(LocalizableMessage eval)
1287    {
1288      for (Integer rsId : bestRSs.keySet())
1289      {
1290        rsEvals.put(rsId, eval);
1291      }
1292    }
1293
1294    private boolean foundBestRS()
1295    {
1296      return bestRSs.size() == 1;
1297    }
1298
1299    /**
1300     * Returns the {@link ReplicationServerInfo} for the best replication
1301     * server.
1302     *
1303     * @return the {@link ReplicationServerInfo} for the best replication server
1304     */
1305    ReplicationServerInfo getBestRS()
1306    {
1307      if (foundBestRS())
1308      {
1309        return bestRSs.values().iterator().next();
1310      }
1311      return null;
1312    }
1313
1314    /**
1315     * Returns the evaluations for all the candidate replication servers.
1316     *
1317     * @return a Map of serverId => LocalizableMessage containing the evaluation for each
1318     *         candidate replication servers.
1319     */
1320    Map<Integer, LocalizableMessage> getEvaluations()
1321    {
1322      if (foundBestRS())
1323      {
1324        final Integer bestRSServerId = getBestRS().getServerId();
1325        if (rsEvals.get(bestRSServerId) == null)
1326        {
1327          final LocalizableMessage eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
1328          rsEvals.put(bestRSServerId, eval);
1329        }
1330      }
1331      return Collections.unmodifiableMap(rsEvals);
1332    }
1333
1334    /**
1335     * Returns the evaluation for the supplied replication server Id.
1336     * <p>
1337     * Note: "unknown RS" message is returned if the supplied replication server
1338     * was not part of the candidate replication servers.
1339     *
1340     * @param rsServerId
1341     *          the supplied replication server Id
1342     * @return the evaluation {@link LocalizableMessage} for the supplied replication
1343     *         server Id
1344     */
1345    private LocalizableMessage getEvaluation(int rsServerId)
1346    {
1347      final LocalizableMessage evaluation = getEvaluations().get(rsServerId);
1348      if (evaluation != null)
1349      {
1350        return evaluation;
1351      }
1352      return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
1353    }
1354
1355    /** {@inheritDoc} */
1356    @Override
1357    public String toString()
1358    {
1359      return "Current best replication server Ids: " + bestRSs.keySet()
1360          + ", Evaluation of connected replication servers"
1361          + " (ServerId => Evaluation): " + rsEvals.keySet()
1362          + ", Any replication server not appearing here"
1363          + " could not be contacted.";
1364    }
1365  }
1366
1367  /**
1368   * Evaluation local to one filter.
1369   */
1370  private static class LocalEvaluation
1371  {
1372    private final Map<Integer, ReplicationServerInfo> accepted = new HashMap<>();
1373    private final Map<ReplicationServerInfo, LocalizableMessage> rsEvals = new HashMap<>();
1374
1375    private void accept(Integer rsId, ReplicationServerInfo rsInfo)
1376    {
1377      // forget previous eval, including undoing reject
1378      this.rsEvals.remove(rsInfo);
1379      this.accepted.put(rsId, rsInfo);
1380    }
1381
1382    private void reject(ReplicationServerInfo rsInfo, LocalizableMessage reason)
1383    {
1384      this.accepted.remove(rsInfo.getServerId()); // undo accept
1385      this.rsEvals.put(rsInfo, reason);
1386    }
1387
1388    private Map<Integer, ReplicationServerInfo> getAccepted()
1389    {
1390      return accepted;
1391    }
1392
1393    private ReplicationServerInfo[] getAcceptedRSInfos()
1394    {
1395      return accepted.values().toArray(
1396          new ReplicationServerInfo[accepted.size()]);
1397    }
1398
1399    public Map<Integer, LocalizableMessage> getRejected()
1400    {
1401      final Map<Integer, LocalizableMessage> result = new HashMap<>();
1402      for (Entry<ReplicationServerInfo, LocalizableMessage> entry : rsEvals.entrySet())
1403      {
1404        result.put(entry.getKey().getServerId(), entry.getValue());
1405      }
1406      return result;
1407    }
1408
1409    private boolean hasAcceptedAny()
1410    {
1411      return !accepted.isEmpty();
1412    }
1413
1414  }
1415
1416  /**
1417   * Returns the replication server that best fits our need so that we can
1418   * connect to it or determine if we must disconnect from current one to
1419   * re-connect to best server.
1420   * <p>
1421   * Note: this method is static for test purpose (access from unit tests)
1422   *
1423   * @param firstConnection True if we run this method for the very first
1424   * connection of the broker. False if we run this method to determine if the
1425   * replication server we are currently connected to is still the best or not.
1426   * @param rsServerId The id of the replication server we are currently
1427   * connected to. Only used when firstConnection is false.
1428   * @param myState The local server state.
1429   * @param rsInfos The list of available replication servers and their
1430   * associated information (choice will be made among them).
1431   * @param localServerId The server id for the suffix we are working for.
1432   * @param groupId The groupId we prefer being connected to if possible
1433   * @param generationId The generation id we are using
1434   * @return The computed best replication server. If the returned value is
1435   * null, the best replication server is undetermined but the local server must
1436   * disconnect (so the best replication server is another one than the current
1437   * one). Null can only be returned when firstConnection is false.
1438   */
1439  static RSEvaluations computeBestReplicationServer(
1440      boolean firstConnection, int rsServerId, ServerState myState,
1441      Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
1442      byte groupId, long generationId)
1443  {
1444    final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
1445    // Shortcut, if only one server, this is the best
1446    if (evals.foundBestRS())
1447    {
1448      return evals;
1449    }
1450
1451    /**
1452     * Apply some filtering criteria to determine the best servers list from
1453     * the available ones. The ordered list of criteria is (from more important
1454     * to less important):
1455     * - replication server has the same group id as the local DS one
1456     * - replication server has the same generation id as the local DS one
1457     * - replication server is up to date regarding changes generated by the
1458     *   local DS
1459     * - replication server in the same VM as local DS one
1460     */
1461    /*
1462    The list of best replication servers is filtered with each criteria. At
1463    each criteria, the list is replaced with the filtered one if there
1464    are some servers from the filtering, otherwise, the list is left as is
1465    and the new filtering for the next criteria is applied and so on.
1466
1467    Use only servers locally configured: those are servers declared in
1468    the local configuration. When the current method is called, for
1469    sure, at least one server from the list is locally configured
1470    */
1471    filterServersLocallyConfigured(evals, localServerId);
1472    // Some servers with same group id ?
1473    filterServersWithSameGroupId(evals, localServerId, groupId);
1474    // Some servers with same generation id ?
1475    final boolean rssWithSameGenerationIdExist =
1476        filterServersWithSameGenerationId(evals, localServerId, generationId);
1477    if (rssWithSameGenerationIdExist)
1478    {
1479      // If some servers with the right generation id this is useful to
1480      // run the local DS change criteria
1481      filterServersWithAllLocalDSChanges(evals, myState, localServerId);
1482    }
1483    // Some servers in the local VM or local host?
1484    filterServersOnSameHost(evals, localServerId);
1485
1486    if (evals.foundBestRS())
1487    {
1488      return evals;
1489    }
1490
1491    /**
1492     * Now apply the choice based on the weight to the best servers list
1493     */
1494    if (firstConnection)
1495    {
1496      // We are not connected to a server yet
1497      computeBestServerForWeight(evals, -1, -1);
1498    }
1499    else
1500    {
1501      /*
1502       * We are already connected to a RS: compute the best RS as far as the
1503       * weights is concerned. If this is another one, some DS must disconnect.
1504       */
1505      computeBestServerForWeight(evals, rsServerId, localServerId);
1506    }
1507    return evals;
1508  }
1509
1510  /**
1511   * Creates a new list that contains only replication servers that are locally
1512   * configured.
1513   * @param evals The evaluation object
1514   */
1515  private static void filterServersLocallyConfigured(RSEvaluations evals,
1516      int localServerId)
1517  {
1518    final LocalEvaluation eval = new LocalEvaluation();
1519    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1520    {
1521      final Integer rsId = entry.getKey();
1522      final ReplicationServerInfo rsInfo = entry.getValue();
1523      if (rsInfo.isLocallyConfigured())
1524      {
1525        eval.accept(rsId, rsInfo);
1526      }
1527      else
1528      {
1529        eval.reject(rsInfo,
1530            NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
1531      }
1532    }
1533    evals.keepBest(eval);
1534  }
1535
1536  /**
1537   * Creates a new list that contains only replication servers that have the
1538   * passed group id, from a passed replication server list.
1539   * @param evals The evaluation object
1540   * @param groupId The group id that must match
1541   */
1542  private static void filterServersWithSameGroupId(RSEvaluations evals,
1543      int localServerId, byte groupId)
1544  {
1545    final LocalEvaluation eval = new LocalEvaluation();
1546    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1547    {
1548      final Integer rsId = entry.getKey();
1549      final ReplicationServerInfo rsInfo = entry.getValue();
1550      if (rsInfo.getGroupId() == groupId)
1551      {
1552        eval.accept(rsId, rsInfo);
1553      }
1554      else
1555      {
1556        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
1557            rsId, rsInfo.getGroupId(), localServerId, groupId));
1558      }
1559    }
1560    evals.keepBest(eval);
1561  }
1562
1563  /**
1564   * Creates a new list that contains only replication servers that have the
1565   * provided generation id, from a provided replication server list.
1566   * When the selected replication servers have no change (empty serverState)
1567   * then the 'empty'(generationId==-1) replication servers are also included
1568   * in the result list.
1569   *
1570   * @param evals The evaluation object
1571   * @param generationId The generation id that must match
1572   * @return whether some replication server passed the filter
1573   */
1574  private static boolean filterServersWithSameGenerationId(
1575      RSEvaluations evals, long localServerId, long generationId)
1576  {
1577    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1578    final LocalEvaluation eval = new LocalEvaluation();
1579    boolean emptyState = true;
1580
1581    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1582    {
1583      final Integer rsId = entry.getKey();
1584      final ReplicationServerInfo rsInfo = entry.getValue();
1585      if (rsInfo.getGenerationId() == generationId)
1586      {
1587        eval.accept(rsId, rsInfo);
1588        if (!rsInfo.serverState.isEmpty())
1589        {
1590          emptyState = false;
1591        }
1592      }
1593      else if (rsInfo.getGenerationId() == -1)
1594      {
1595        eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
1596            generationId, localServerId));
1597      }
1598      else
1599      {
1600        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
1601            rsId, rsInfo.getGenerationId(), localServerId, generationId));
1602      }
1603    }
1604
1605    if (emptyState)
1606    {
1607      // If the RS with a generationId have all an empty state,
1608      // then the 'empty'(genId=-1) RSes are also candidate
1609      for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1610      {
1611        ReplicationServerInfo rsInfo = entry.getValue();
1612        if (rsInfo.getGenerationId() == -1)
1613        {
1614          // will undo the reject of previously rejected RSs
1615          eval.accept(entry.getKey(), rsInfo);
1616        }
1617      }
1618    }
1619
1620    return evals.keepBest(eval);
1621  }
1622
1623  /**
1624   * Creates a new list that contains only replication servers that have the
1625   * latest changes from the passed DS, from a passed replication server list.
1626   * @param evals The evaluation object
1627   * @param localState The state of the local DS
1628   * @param localServerId The server id to consider for the changes
1629   */
1630  private static void filterServersWithAllLocalDSChanges(
1631      RSEvaluations evals, ServerState localState, int localServerId)
1632  {
1633    // Extract the CSN of the latest change generated by the local server
1634    final CSN localCSN = getCSN(localState, localServerId);
1635
1636    /**
1637     * Find replication servers that are up to date (or more up to date than us,
1638     * if for instance we failed and restarted, having sent some changes to the
1639     * RS but without having time to store our own state) regarding our own
1640     * server id. If some servers are more up to date, prefer this list but take
1641     * only the latest CSN.
1642     */
1643    final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
1644    boolean foundRSMoreUpToDateThanLocalDS = false;
1645    CSN latestRsCSN = null;
1646    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1647    {
1648      final Integer rsId = entry.getKey();
1649      final ReplicationServerInfo rsInfo = entry.getValue();
1650      final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
1651
1652      // Has this replication server the latest local change ?
1653      if (rsCSN.isOlderThan(localCSN))
1654      {
1655        mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
1656            rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1657      }
1658      else if (rsCSN.equals(localCSN))
1659      {
1660        // This replication server has exactly the latest change from the
1661        // local server
1662        if (!foundRSMoreUpToDateThanLocalDS)
1663        {
1664          mostUpToDateEval.accept(rsId, rsInfo);
1665        }
1666        else
1667        {
1668          mostUpToDateEval.reject(rsInfo,
1669            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1670              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1671        }
1672      }
1673      else if (rsCSN.isNewerThan(localCSN))
1674      {
1675        // This replication server is even more up to date than the local server
1676        if (latestRsCSN == null)
1677        {
1678          foundRSMoreUpToDateThanLocalDS = true;
1679          // all previous results are now outdated, reject them all
1680          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
1681              localCSN);
1682          // Initialize the latest CSN
1683          latestRsCSN = rsCSN;
1684        }
1685
1686        if (rsCSN.equals(latestRsCSN))
1687        {
1688          mostUpToDateEval.accept(rsId, rsInfo);
1689        }
1690        else if (rsCSN.isNewerThan(latestRsCSN))
1691        {
1692          // This RS is even more up to date, reject all previously accepted RSs
1693          // and store this new RS
1694          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
1695              localCSN);
1696          mostUpToDateEval.accept(rsId, rsInfo);
1697          latestRsCSN = rsCSN;
1698        }
1699        else
1700        {
1701          mostUpToDateEval.reject(rsInfo,
1702            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1703              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1704        }
1705      }
1706    }
1707    evals.keepBest(mostUpToDateEval);
1708  }
1709
1710  private static CSN getCSN(ServerState state, int serverId)
1711  {
1712    final CSN csn = state.getCSN(serverId);
1713    if (csn != null)
1714    {
1715      return csn;
1716    }
1717    return new CSN(0, 0, serverId);
1718  }
1719
1720  private static void rejectAllWithRSIsLaterThanBestRS(
1721      final LocalEvaluation eval, int localServerId, CSN localCSN)
1722  {
1723    for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos())
1724    {
1725      final String rsCSN =
1726          getCSN(rsInfo.getServerState(), localServerId).toStringUI();
1727      final LocalizableMessage reason =
1728          NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1729            rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
1730      eval.reject(rsInfo, reason);
1731    }
1732  }
1733
1734  /**
1735   * Creates a new list that contains only replication servers that are on the
1736   * same host as the local DS, from a passed replication server list. This
1737   * method will gives priority to any replication server which is in the same
1738   * VM as this DS.
1739   *
1740   * @param evals The evaluation object
1741   */
1742  private static void filterServersOnSameHost(RSEvaluations evals,
1743      int localServerId)
1744  {
1745    /*
1746     * Initially look for all servers on the same host. If we find one in the
1747     * same VM, then narrow the search.
1748     */
1749    boolean foundRSInSameVM = false;
1750    final LocalEvaluation eval = new LocalEvaluation();
1751    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1752    {
1753      final Integer rsId = entry.getKey();
1754      final ReplicationServerInfo rsInfo = entry.getValue();
1755      final HostPort hp = HostPort.valueOf(rsInfo.getServerURL());
1756      if (hp.isLocalAddress())
1757      {
1758        if (isLocalReplicationServerPort(hp.getPort()))
1759        {
1760          if (!foundRSInSameVM)
1761          {
1762            // An RS in the same VM will always have priority.
1763            // Narrow the search to only include servers in this VM.
1764            rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
1765            foundRSInSameVM = true;
1766          }
1767          eval.accept(rsId, rsInfo);
1768        }
1769        else if (!foundRSInSameVM)
1770        {
1771          // OK, accept RSs on the same machine because we have not found an RS
1772          // in the same VM yet
1773          eval.accept(rsId, rsInfo);
1774        }
1775        else
1776        {
1777          // Skip: we have found some RSs in the same VM, but this RS is not.
1778          eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
1779              localServerId));
1780        }
1781      }
1782      else
1783      {
1784        eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
1785            localServerId));
1786      }
1787    }
1788    evals.keepBest(eval);
1789  }
1790
1791  private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
1792      int localServerId)
1793  {
1794    for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos())
1795    {
1796      eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
1797          rsInfo.getServerId(), localServerId));
1798    }
1799  }
1800
1801  /**
1802   * Computes the best replication server the local server should be connected
1803   * to so that the load is correctly spread across the topology, following the
1804   * weights guidance.
1805   * Warning: This method is expected to be called with at least 2 servers in
1806   * bestServers
1807   * Note: this method is static for test purpose (access from unit tests)
1808   * @param evals The evaluation object
1809   * @param currentRsServerId The replication server the local server is
1810   *        currently connected to. -1 if the local server is not yet connected
1811   *        to any replication server.
1812   * @param localServerId The server id of the local server. This is not used
1813   *        when it is not connected to a replication server
1814   *        (currentRsServerId = -1)
1815   */
1816  static void computeBestServerForWeight(RSEvaluations evals,
1817      int currentRsServerId, int localServerId)
1818  {
1819    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1820    /*
1821     * - Compute the load goal of each RS, deducing it from the weights affected
1822     * to them.
1823     * - Compute the current load of each RS, deducing it from the DSs
1824     * currently connected to them.
1825     * - Compute the differences between the load goals and the current loads of
1826     * the RSs.
1827     */
1828    // Sum of the weights
1829    int sumOfWeights = 0;
1830    // Sum of the connected DSs
1831    int sumOfConnectedDSs = 0;
1832    for (ReplicationServerInfo rsInfo : bestServers.values())
1833    {
1834      sumOfWeights += rsInfo.getWeight();
1835      sumOfConnectedDSs += rsInfo.getConnectedDSNumber();
1836    }
1837
1838    // Distance (difference) of the current loads to the load goals of each RS:
1839    // key:server id, value: distance
1840    Map<Integer, BigDecimal> loadDistances = new HashMap<>();
1841    // Precision for the operations (number of digits after the dot)
1842    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
1843    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1844    {
1845      final Integer rsId = entry.getKey();
1846      final ReplicationServerInfo rsInfo = entry.getValue();
1847
1848      //  load goal = rs weight / sum of weights
1849      BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide(
1850          BigDecimal.valueOf(sumOfWeights), mathContext);
1851      BigDecimal currentLoadBd = BigDecimal.ZERO;
1852      if (sumOfConnectedDSs != 0)
1853      {
1854        // current load = number of connected DSs / total number of DSs
1855        int connectedDSs = rsInfo.getConnectedDSNumber();
1856        currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
1857            BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
1858      }
1859      // load distance = load goal - current load
1860      BigDecimal loadDistanceBd =
1861        loadGoalBd.subtract(currentLoadBd, mathContext);
1862      loadDistances.put(rsId, loadDistanceBd);
1863    }
1864
1865    if (currentRsServerId == -1)
1866    {
1867      // The local server is not connected yet, find best server to connect to,
1868      // taking the weights into account.
1869      computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
1870    }
1871    else
1872    {
1873      // The local server is currently connected to a RS, let's see if it must
1874      // disconnect or not, taking the weights into account.
1875      computeBestServerWhenConnected(evals, loadDistances, localServerId,
1876          currentRsServerId, sumOfWeights, sumOfConnectedDSs);
1877    }
1878  }
1879
1880  private static void computeBestServerWhenNotConnected(RSEvaluations evals,
1881      Map<Integer, BigDecimal> loadDistances, int localServerId)
1882  {
1883    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1884    /*
1885     * Find the server with the current highest distance to its load goal and
1886     * choose it. Make an exception if every server is correctly balanced,
1887     * that is every current load distances are equal to 0, in that case,
1888     * choose the server with the highest weight
1889     */
1890    int bestRsId = 0; // If all server equal, return the first one
1891    float highestDistance = Float.NEGATIVE_INFINITY;
1892    boolean allRsWithZeroDistance = true;
1893    int highestWeightRsId = -1;
1894    int highestWeight = -1;
1895    for (Integer rsId : bestServers.keySet())
1896    {
1897      float loadDistance = loadDistances.get(rsId).floatValue();
1898      if (loadDistance > highestDistance)
1899      {
1900        // This server is far more from its balance point
1901        bestRsId = rsId;
1902        highestDistance = loadDistance;
1903      }
1904      if (loadDistance != 0)
1905      {
1906        allRsWithZeroDistance = false;
1907      }
1908      int weight = bestServers.get(rsId).getWeight();
1909      if (weight > highestWeight)
1910      {
1911        // This server has a higher weight
1912        highestWeightRsId = rsId;
1913        highestWeight = weight;
1914      }
1915    }
1916    // All servers with a 0 distance ?
1917    if (allRsWithZeroDistance)
1918    {
1919      // Choose server with the highest weight
1920      bestRsId = highestWeightRsId;
1921    }
1922    evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
1923        bestRsId));
1924  }
1925
1926  private static void computeBestServerWhenConnected(RSEvaluations evals,
1927      Map<Integer, BigDecimal> loadDistances, int localServerId,
1928      int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
1929  {
1930    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1931    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
1932    float currentLoadDistance =
1933      loadDistances.get(currentRsServerId).floatValue();
1934    if (currentLoadDistance < 0)
1935    {
1936      /*
1937      Too much DSs connected to the current RS, compared with its load
1938      goal:
1939      Determine the potential number of DSs to disconnect from the current
1940      RS and see if the local DS is part of them: the DSs that must
1941      disconnect are those with the lowest server id.
1942      Compute the sum of the distances of the load goals of the other RSs
1943      */
1944      BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
1945      for (Integer rsId : bestServers.keySet())
1946      {
1947        if (rsId != currentRsServerId)
1948        {
1949          sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
1950            loadDistances.get(rsId), mathContext);
1951        }
1952      }
1953
1954      if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
1955      {
1956        /*
1957        The average distance of the other RSs shows a lack of DSs.
1958        Compute the number of DSs to disconnect from the current RS,
1959        rounding to the nearest integer number. Do only this if there is
1960        no risk of yoyo effect: when the exact balance cannot be
1961        established due to the current number of DSs connected, do not
1962        disconnect a DS. A simple example where the balance cannot be
1963        reached is:
1964        - RS1 has weight 1 and 2 DSs
1965        - RS2 has weight 1 and 1 DS
1966        => disconnecting a DS from RS1 to reconnect it to RS2 would have no
1967        sense as this would lead to the reverse situation. In that case,
1968        the perfect balance cannot be reached and we must stick to the
1969        current situation, otherwise the DS would keep move between the 2
1970        RSs
1971        */
1972        float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
1973          multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
1974              .floatValue();
1975        int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
1976
1977        // Avoid yoyo effect
1978        if (overloadingDSsNumber == 1)
1979        {
1980          // What would be the new load distance for the current RS if
1981          // we disconnect some DSs ?
1982          ReplicationServerInfo currentReplicationServerInfo =
1983            bestServers.get(currentRsServerId);
1984
1985          int currentRsWeight = currentReplicationServerInfo.getWeight();
1986          BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
1987          BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
1988          BigDecimal currentRsLoadGoalBd =
1989            currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
1990          BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
1991          if (sumOfConnectedDSs != 0)
1992          {
1993            int connectedDSs = currentReplicationServerInfo.
1994              getConnectedDSNumber();
1995            BigDecimal potentialNewConnectedDSsBd =
1996                BigDecimal.valueOf(connectedDSs - 1);
1997            BigDecimal sumOfConnectedDSsBd =
1998                BigDecimal.valueOf(sumOfConnectedDSs);
1999            potentialCurrentRsNewLoadBd =
2000              potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
2001                mathContext);
2002          }
2003          BigDecimal potentialCurrentRsNewLoadDistanceBd =
2004            currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
2005              mathContext);
2006
2007          // What would be the new load distance for the other RSs ?
2008          BigDecimal additionalDsLoadBd =
2009              BigDecimal.ONE.divide(
2010                  BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
2011          BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
2012            sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
2013                  mathContext);
2014
2015          /*
2016          Now compare both values: we must not disconnect the DS if this
2017          is for going in a situation where the load distance of the other
2018          RSs is the opposite of the future load distance of the local RS
2019          or we would evaluate that we should disconnect just after being
2020          arrived on the new RS. But we should disconnect if we reach the
2021          perfect balance (both values are 0).
2022          */
2023          if (mustAvoidYoyoEffect(potentialCurrentRsNewLoadDistanceBd,
2024              potentialNewSumOfLoadDistancesOfOtherRSsBd))
2025          {
2026            // Avoid the yoyo effect, and keep the local DS connected to its
2027            // current RS
2028            evals.setBestRS(currentRsServerId,
2029                NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
2030            return;
2031          }
2032        }
2033
2034        ReplicationServerInfo currentRsInfo =
2035            bestServers.get(currentRsServerId);
2036        if (isServerOverloadingRS(localServerId, currentRsInfo,
2037            overloadingDSsNumber))
2038        {
2039          // The local server is part of the DSs to disconnect
2040          evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
2041              localServerId, currentRsServerId));
2042        }
2043        else
2044        {
2045          // The local server is not part of the servers to disconnect from the
2046          // current RS.
2047          evals.setBestRS(currentRsServerId,
2048              NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
2049                  currentRsServerId));
2050        }
2051      } else {
2052        // The average distance of the other RSs does not show a lack of DSs:
2053        // no need to disconnect any DS from the current RS.
2054        evals.setBestRS(currentRsServerId,
2055            NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
2056                currentRsServerId));
2057      }
2058    } else {
2059      // The RS load goal is reached or there are not enough DSs connected to
2060      // it to reach it: do not disconnect from this RS and return rsInfo for
2061      // this RS
2062      evals.setBestRS(currentRsServerId,
2063          NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
2064              currentRsServerId));
2065    }
2066  }
2067
2068  private static boolean mustAvoidYoyoEffect(BigDecimal rsNewLoadDistance,
2069      BigDecimal otherRSsNewSumOfLoadDistances)
2070  {
2071    final MathContext roundCtx = new MathContext(6, RoundingMode.DOWN);
2072    final BigDecimal rsLoadDistance = rsNewLoadDistance.round(roundCtx);
2073    final BigDecimal otherRSsSumOfLoadDistances =
2074        otherRSsNewSumOfLoadDistances.round(roundCtx);
2075
2076    return rsLoadDistance.compareTo(BigDecimal.ZERO) != 0
2077        && rsLoadDistance.compareTo(otherRSsSumOfLoadDistances.negate()) == 0;
2078  }
2079
2080  /**
2081   * Returns whether the local DS is overloading the RS.
2082   * <p>
2083   * There are an "overloadingDSsNumber" of DS overloading the RS. The list of
2084   * DSs connected to this RS is ordered by serverId to use a consistent
2085   * ordering across all nodes in the topology. The serverIds which index in the
2086   * List are lower than "overloadingDSsNumber" will be evicted first.
2087   * <p>
2088   * This ordering is unfair since nodes with the lower serverIds will be
2089   * evicted more often than nodes with higher serverIds. However, it is a
2090   * consistent and reliable ordering applicable anywhere in the topology.
2091   */
2092  private static boolean isServerOverloadingRS(int localServerId,
2093      ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
2094  {
2095    List<Integer> serversConnectedToCurrentRS = new ArrayList<>(currentRsInfo.getConnectedDSs());
2096    Collections.sort(serversConnectedToCurrentRS);
2097
2098    final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
2099    return idx != -1 && idx < overloadingDSsNumber;
2100  }
2101
2102  /**
2103   * Start the heartbeat monitor thread.
2104   */
2105  private void startRSHeartBeatMonitoring(ConnectedRS rs)
2106  {
2107    final long heartbeatInterval = config.getHeartbeatInterval();
2108    if (heartbeatInterval > 0)
2109    {
2110      heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(),
2111          getBaseDN().toString(), rs.session, heartbeatInterval);
2112      heartbeatMonitor.start();
2113    }
2114  }
2115
2116  /**
2117   * Stop the heartbeat monitor thread.
2118   */
2119  private synchronized void stopRSHeartBeatMonitoring()
2120  {
2121    if (heartbeatMonitor != null)
2122    {
2123      heartbeatMonitor.shutdown();
2124      heartbeatMonitor = null;
2125    }
2126  }
2127
2128  /**
2129   * Restart the ReplicationBroker.
2130   * @param infiniteTry the socket which failed
2131   */
2132  public void reStart(boolean infiniteTry)
2133  {
2134    reStart(connectedRS.get().session, infiniteTry);
2135  }
2136
2137  /**
2138   * Restart the ReplicationServer broker after a failure.
2139   *
2140   * @param failingSession the socket which failed
2141   * @param infiniteTry the socket which failed
2142   */
2143  private void reStart(Session failingSession, boolean infiniteTry)
2144  {
2145    if (failingSession != null)
2146    {
2147      failingSession.close();
2148      numLostConnections++;
2149    }
2150
2151    ConnectedRS rs = connectedRS.get();
2152    if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS()))
2153    {
2154      rs = setConnectedRS(ConnectedRS.noConnectedRS());
2155    }
2156
2157    while (true)
2158    {
2159      // Synchronize inside the loop in order to allow shutdown.
2160      synchronized (startStopLock)
2161      {
2162        if (rs.isConnected() || shutdown)
2163        {
2164          break;
2165        }
2166
2167        try
2168        {
2169          connectAsDataServer();
2170          rs = connectedRS.get();
2171        }
2172        catch (Exception e)
2173        {
2174          logger.error(NOTE_EXCEPTION_RESTARTING_SESSION,
2175              getBaseDN(), e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
2176        }
2177
2178        if (rs.isConnected() || !infiniteTry)
2179        {
2180          break;
2181        }
2182      }
2183      try
2184      {
2185          Thread.sleep(500);
2186      }
2187      catch (InterruptedException ignored)
2188      {
2189        // ignore
2190      }
2191    }
2192
2193    if (logger.isTraceEnabled())
2194    {
2195      debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
2196          + rs.getServerId() + ") genId=" + getGenerationID());
2197    }
2198  }
2199
2200  /**
2201   * Publish a message to the other servers.
2202   * @param msg the message to publish
2203   */
2204  public void publish(ReplicationMsg msg)
2205  {
2206    publish(msg, false, true);
2207  }
2208
2209  /**
2210   * Publish a message to the other servers.
2211   * @param msg            The message to publish.
2212   * @param retryOnFailure Whether reconnect should automatically be done.
2213   * @return               Whether publish succeeded.
2214   */
2215  boolean publish(ReplicationMsg msg, boolean retryOnFailure)
2216  {
2217    return publish(msg, false, retryOnFailure);
2218  }
2219
2220  /**
2221   * Publish a recovery message to the other servers.
2222   * @param msg the message to publish
2223   */
2224  public void publishRecovery(ReplicationMsg msg)
2225  {
2226    publish(msg, true, true);
2227  }
2228
2229  /**
2230   * Publish a message to the other servers.
2231   * @param msg the message to publish
2232   * @param recoveryMsg the message is a recovery LocalizableMessage
2233   * @param retryOnFailure whether retry should be done on failure
2234   * @return whether the message was successfully sent.
2235   */
2236  private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
2237      boolean retryOnFailure)
2238  {
2239    boolean done = false;
2240
2241    while (!done && !shutdown)
2242    {
2243      if (connectionError)
2244      {
2245        /*
2246        It was not possible to connect to any replication server.
2247        Since the operation was already processed, we have no other
2248        choice than to return without sending the ReplicationMsg
2249        and relying on the resend procedure of the connect phase to
2250        fix the problem when we finally connect.
2251        */
2252
2253        if (logger.isTraceEnabled())
2254        {
2255          debugInfo("publish(): Publishing a message is not possible due to"
2256              + " existing connection error.");
2257        }
2258
2259        return false;
2260      }
2261
2262      try
2263      {
2264        /*
2265        save the session at the time when we acquire the
2266        sendwindow credit so that we can make sure later
2267        that the session did not change in between.
2268        This is necessary to make sure that we don't publish a message
2269        on a session with a credit that was acquired from a previous
2270        session.
2271        */
2272        Session currentSession;
2273        Semaphore currentWindowSemaphore;
2274        synchronized (connectPhaseLock)
2275        {
2276          currentSession = connectedRS.get().session;
2277          currentWindowSemaphore = sendWindow;
2278        }
2279
2280        /*
2281        If the Replication domain has decided that there is a need to
2282        recover some changes then it is not allowed to send this
2283        change but it will be the responsibility of the recovery thread to
2284        do it.
2285        */
2286        if (!recoveryMsg & connectRequiresRecovery)
2287        {
2288          return false;
2289        }
2290
2291        boolean credit;
2292        if (msg instanceof UpdateMsg)
2293        {
2294          /*
2295          Acquiring the window credit must be done outside of the
2296          connectPhaseLock because it can be blocking and we don't
2297          want to hold off reconnection in case the connection dropped.
2298          */
2299          credit =
2300            currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
2301        }
2302        else
2303        {
2304          credit = true;
2305        }
2306
2307        if (credit)
2308        {
2309          synchronized (connectPhaseLock)
2310          {
2311            /*
2312            session may have been set to null in the connection phase
2313            when restarting the broker for example.
2314            Check the session. If it has changed, some disconnection or
2315            reconnection happened and we need to restart from scratch.
2316            */
2317            final Session session = connectedRS.get().session;
2318            if (session != null && session == currentSession)
2319            {
2320              session.publish(msg);
2321              done = true;
2322            }
2323          }
2324        }
2325        if (!credit && currentWindowSemaphore.availablePermits() == 0)
2326        {
2327          synchronized (connectPhaseLock)
2328          {
2329            /*
2330            the window is still closed.
2331            Send a WindowProbeMsg message to wake up the receiver in case the
2332            window update message was lost somehow...
2333            then loop to check again if connection was closed.
2334            */
2335            Session session = connectedRS.get().session;
2336            if (session != null)
2337            {
2338              session.publish(new WindowProbeMsg());
2339            }
2340          }
2341        }
2342      }
2343      catch (IOException e)
2344      {
2345        if (logger.isTraceEnabled())
2346        {
2347          debugInfo("publish(): IOException caught: "
2348              + stackTraceToSingleLineString(e));
2349        }
2350        if (!retryOnFailure)
2351        {
2352          return false;
2353        }
2354
2355        // The receive threads should handle reconnection or
2356        // mark this broker in error. Just retry.
2357        synchronized (connectPhaseLock)
2358        {
2359          try
2360          {
2361            connectPhaseLock.wait(100);
2362          }
2363          catch (InterruptedException ignored)
2364          {
2365            if (logger.isTraceEnabled())
2366            {
2367              debugInfo("publish(): InterruptedException caught 1: "
2368                  + stackTraceToSingleLineString(ignored));
2369            }
2370          }
2371        }
2372      }
2373      catch (InterruptedException ignored)
2374      {
2375        // just loop.
2376        if (logger.isTraceEnabled())
2377        {
2378          debugInfo("publish(): InterruptedException caught 2: "
2379              + stackTraceToSingleLineString(ignored));
2380        }
2381      }
2382    }
2383    return true;
2384  }
2385
2386  /**
2387   * Receive a message.
2388   * This method is not thread-safe and should either always be
2389   * called in a single thread or protected by a locking mechanism
2390   * before being called. This is a wrapper to the method with a boolean version
2391   * so that we do not have to modify existing tests.
2392   *
2393   * @return the received message
2394   * @throws SocketTimeoutException if the timeout set by setSoTimeout
2395   *         has expired
2396   */
2397  public ReplicationMsg receive() throws SocketTimeoutException
2398  {
2399    return receive(false, true, false);
2400  }
2401
2402  /**
2403   * Receive a message.
2404   * This method is not thread-safe and should either always be
2405   * called in a single thread or protected by a locking mechanism
2406   * before being called.
2407   *
2408   * @param reconnectToTheBestRS Whether broker will automatically switch
2409   *                             to the best suitable RS.
2410   * @param reconnectOnFailure   Whether broker will automatically reconnect
2411   *                             on failure.
2412   * @param returnOnTopoChange   Whether broker should return TopologyMsg
2413   *                             received.
2414   * @return the received message
2415   *
2416   * @throws SocketTimeoutException if the timeout set by setSoTimeout
2417   *         has expired
2418   */
2419  ReplicationMsg receive(boolean reconnectToTheBestRS,
2420      boolean reconnectOnFailure, boolean returnOnTopoChange)
2421    throws SocketTimeoutException
2422  {
2423    while (!shutdown)
2424    {
2425      ConnectedRS rs = connectedRS.get();
2426      if (reconnectOnFailure && !rs.isConnected())
2427      {
2428        // infinite try to reconnect
2429        reStart(null, true);
2430        continue;
2431      }
2432
2433      // Save session information for later in case we need it for log messages
2434      // after the session has been closed and/or failed.
2435      if (rs.session == null)
2436      {
2437        // Must be shutting down.
2438        break;
2439      }
2440
2441      final int serverId = getServerId();
2442      final DN baseDN = getBaseDN();
2443      final int previousRsServerID = rs.getServerId();
2444      try
2445      {
2446        ReplicationMsg msg = rs.session.receive();
2447        if (msg instanceof UpdateMsg)
2448        {
2449          synchronized (this)
2450          {
2451            rcvWindow--;
2452          }
2453        }
2454        if (msg instanceof WindowMsg)
2455        {
2456          final WindowMsg windowMsg = (WindowMsg) msg;
2457          sendWindow.release(windowMsg.getNumAck());
2458        }
2459        else if (msg instanceof TopologyMsg)
2460        {
2461          final TopologyMsg topoMsg = (TopologyMsg) msg;
2462          receiveTopo(topoMsg, getRsServerId());
2463          if (reconnectToTheBestRS)
2464          {
2465            // Reset wait time before next computation of best server
2466            mustRunBestServerCheckingAlgorithm = 0;
2467          }
2468
2469          // Caller wants to check what's changed
2470          if (returnOnTopoChange)
2471          {
2472            return msg;
2473          }
2474        }
2475        else if (msg instanceof StopMsg)
2476        {
2477          // RS performs a proper disconnection
2478          logger.warn(WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED, previousRsServerID, rs.replicationServer,
2479              serverId, baseDN);
2480
2481          // Try to find a suitable RS
2482          reStart(rs.session, true);
2483        }
2484        else if (msg instanceof MonitorMsg)
2485        {
2486          // This is the response to a MonitorRequest that was sent earlier or
2487          // the regular message of the monitoring publisher of the RS.
2488          MonitorMsg monitorMsg = (MonitorMsg) msg;
2489
2490          // Extract and store replicas ServerStates
2491          final Map<Integer, ServerState> newReplicaStates = new HashMap<>();
2492          for (int srvId : toIterable(monitorMsg.ldapIterator()))
2493          {
2494            newReplicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
2495          }
2496          replicaStates = newReplicaStates;
2497
2498          // Notify the sender that the response was received.
2499          synchronized (monitorResponse)
2500          {
2501            monitorResponse.set(true);
2502            monitorResponse.notify();
2503          }
2504
2505          // Update the replication servers ServerStates with new received info
2506          Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos;
2507          for (int srvId : toIterable(monitorMsg.rsIterator()))
2508          {
2509            final ReplicationServerInfo rsInfo = rsInfos.get(srvId);
2510            if (rsInfo != null)
2511            {
2512              rsInfo.update(monitorMsg.getRSServerState(srvId));
2513            }
2514          }
2515
2516          /*
2517          Now if it is allowed, compute the best replication server to see if
2518          it is still the one we are currently connected to. If not,
2519          disconnect properly and let the connection algorithm re-connect to
2520          best replication server
2521          */
2522          if (reconnectToTheBestRS)
2523          {
2524            mustRunBestServerCheckingAlgorithm++;
2525            if (mustRunBestServerCheckingAlgorithm == 2)
2526            {
2527              // Stable topology (no topo msg since few seconds): proceed with
2528              // best server checking.
2529              final RSEvaluations evals = computeBestReplicationServer(
2530                  false, previousRsServerID, state,
2531                  rsInfos, serverId, getGroupId(), getGenerationID());
2532              final ReplicationServerInfo bestServerInfo = evals.getBestRS();
2533              if (previousRsServerID != -1
2534                  && (bestServerInfo == null
2535                      || bestServerInfo.getServerId() != previousRsServerID))
2536              {
2537                // The best replication server is no more the one we are
2538                // currently using. Disconnect properly then reconnect.
2539                LocalizableMessage message;
2540                if (bestServerInfo == null)
2541                {
2542                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
2543                      serverId, previousRsServerID, rs.replicationServer, baseDN);
2544                }
2545                else
2546                {
2547                  final int bestRsServerId = bestServerInfo.getServerId();
2548                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
2549                      serverId, previousRsServerID, rs.replicationServer, bestRsServerId, baseDN,
2550                      evals.getEvaluation(previousRsServerID),
2551                      evals.getEvaluation(bestRsServerId));
2552                }
2553                logger.info(message);
2554                if (logger.isTraceEnabled())
2555                {
2556                  debugInfo("best replication servers evaluation results: " + evals);
2557                }
2558                reStart(true);
2559              }
2560
2561              // Reset wait time before next computation of best server
2562              mustRunBestServerCheckingAlgorithm = 0;
2563            }
2564          }
2565        }
2566        else
2567        {
2568          return msg;
2569        }
2570      }
2571      catch (SocketTimeoutException e)
2572      {
2573        throw e;
2574      }
2575      catch (Exception e)
2576      {
2577        logger.traceException(e);
2578
2579        if (!shutdown)
2580        {
2581          if (rs.session == null || !rs.session.closeInitiated())
2582          {
2583            // We did not initiate the close on our side, log an error message.
2584            logger.error(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED,
2585                serverId, baseDN, previousRsServerID, rs.replicationServer);
2586          }
2587
2588          if (!reconnectOnFailure)
2589          {
2590            break; // does not seem necessary to explicitly disconnect ..
2591          }
2592
2593          reStart(rs.session, true);
2594        }
2595      }
2596    } // while !shutdown
2597    return null;
2598  }
2599
2600  /**
2601   * Gets the States of all the Replicas currently in the Topology. When this
2602   * method is called, a Monitoring message will be sent to the Replication
2603   * Server to which this domain is currently connected so that it computes a
2604   * table containing information about all Directory Servers in the topology.
2605   * This Computation involves communications will all the servers currently
2606   * connected and
2607   *
2608   * @return The States of all Replicas in the topology (except us)
2609   */
2610  public Map<Integer, ServerState> getReplicaStates()
2611  {
2612    monitorResponse.set(false);
2613
2614    // publish Monitor Request LocalizableMessage to the Replication Server
2615    publish(new MonitorRequestMsg(getServerId(), getRsServerId()));
2616
2617    // wait for Response up to 10 seconds.
2618    try
2619    {
2620      synchronized (monitorResponse)
2621      {
2622        if (!monitorResponse.get())
2623        {
2624          monitorResponse.wait(10000);
2625        }
2626      }
2627    } catch (InterruptedException e)
2628    {
2629      Thread.currentThread().interrupt();
2630    }
2631    return replicaStates;
2632  }
2633
2634  /**
2635   * This method allows to do the necessary computing for the window
2636   * management after treatment by the worker threads.
2637   *
2638   * This should be called once the replay thread have done their job
2639   * and the window can be open again.
2640   */
2641  public synchronized void updateWindowAfterReplay()
2642  {
2643    try
2644    {
2645      updateDoneCount++;
2646      final Session session = connectedRS.get().session;
2647      if (updateDoneCount >= halfRcvWindow && session != null)
2648      {
2649        session.publish(new WindowMsg(updateDoneCount));
2650        rcvWindow += updateDoneCount;
2651        updateDoneCount = 0;
2652      }
2653    } catch (IOException e)
2654    {
2655      // Any error on the socket will be handled by the thread calling receive()
2656      // just ignore.
2657    }
2658  }
2659
2660  /** Stop the server. */
2661  public void stop()
2662  {
2663    if (logger.isTraceEnabled())
2664    {
2665      debugInfo("is stopping and will close the connection to RS(" + getRsServerId() + ")");
2666    }
2667
2668    synchronized (startStopLock)
2669    {
2670      domain.publishReplicaOfflineMsg();
2671      shutdown = true;
2672      setConnectedRS(ConnectedRS.stopped());
2673      stopRSHeartBeatMonitoring();
2674      stopChangeTimeHeartBeatPublishing();
2675      deregisterReplicationMonitor();
2676    }
2677  }
2678
2679  /**
2680   * Set a timeout value.
2681   * With this option set to a non-zero value, calls to the receive() method
2682   * block for only this amount of time after which a
2683   * java.net.SocketTimeoutException is raised.
2684   * The Broker is valid and usable even after such an Exception is raised.
2685   *
2686   * @param timeout the specified timeout, in milliseconds.
2687   * @throws SocketException if there is an error in the underlying protocol,
2688   *         such as a TCP error.
2689   */
2690  public void setSoTimeout(int timeout) throws SocketException
2691  {
2692    this.timeout = timeout;
2693    final Session session = connectedRS.get().session;
2694    if (session != null)
2695    {
2696      session.setSoTimeout(timeout);
2697    }
2698  }
2699
2700  /**
2701   * Get the name of the replicationServer to which this broker is currently
2702   * connected.
2703   *
2704   * @return the name of the replicationServer to which this domain
2705   *         is currently connected.
2706   */
2707  public String getReplicationServer()
2708  {
2709    return connectedRS.get().replicationServer;
2710  }
2711
2712  /**
2713   * Get the maximum receive window size.
2714   *
2715   * @return The maximum receive window size.
2716   */
2717  public int getMaxRcvWindow()
2718  {
2719    return config.getWindowSize();
2720  }
2721
2722  /**
2723   * Get the current receive window size.
2724   *
2725   * @return The current receive window size.
2726   */
2727  public int getCurrentRcvWindow()
2728  {
2729    return rcvWindow;
2730  }
2731
2732  /**
2733   * Get the maximum send window size.
2734   *
2735   * @return The maximum send window size.
2736   */
2737  public int getMaxSendWindow()
2738  {
2739    return maxSendWindow;
2740  }
2741
2742  /**
2743   * Get the current send window size.
2744   *
2745   * @return The current send window size.
2746   */
2747  public int getCurrentSendWindow()
2748  {
2749    if (isConnected())
2750    {
2751      return sendWindow.availablePermits();
2752    }
2753    return 0;
2754  }
2755
2756  /**
2757   * Get the number of times the connection was lost.
2758   * @return The number of times the connection was lost.
2759   */
2760  public int getNumLostConnections()
2761  {
2762    return numLostConnections;
2763  }
2764
2765  /**
2766   * Change some configuration parameters.
2767   *
2768   * @param newConfig  The new config to use.
2769   * @return                    A boolean indicating if the changes
2770   *                            requires to restart the service.
2771   */
2772  boolean changeConfig(ReplicationDomainCfg newConfig)
2773  {
2774    // These parameters needs to be renegotiated with the ReplicationServer
2775    // so if they have changed, that requires restarting the session with
2776    // the ReplicationServer.
2777    // A new session is necessary only when information regarding
2778    // the connection is modified
2779    boolean needToRestartSession =
2780        !newConfig.getReplicationServer().equals(config.getReplicationServer())
2781        || newConfig.getWindowSize() != config.getWindowSize()
2782        || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval()
2783        || newConfig.getGroupId() != config.getGroupId();
2784
2785    this.config = newConfig;
2786    this.rcvWindow = newConfig.getWindowSize();
2787    this.halfRcvWindow = this.rcvWindow / 2;
2788
2789    return needToRestartSession;
2790  }
2791
2792  /**
2793   * Get the version of the replication protocol.
2794   * @return The version of the replication protocol.
2795   */
2796  public short getProtocolVersion()
2797  {
2798    final Session session = connectedRS.get().session;
2799    if (session != null)
2800    {
2801      return session.getProtocolVersion();
2802    }
2803    return ProtocolVersion.getCurrentVersion();
2804  }
2805
2806  /**
2807   * Check if the broker is connected to a ReplicationServer and therefore
2808   * ready to received and send Replication Messages.
2809   *
2810   * @return true if the server is connected, false if not.
2811   */
2812  public boolean isConnected()
2813  {
2814    return connectedRS.get().isConnected();
2815  }
2816
2817  /**
2818   * Determine whether the connection to the replication server is encrypted.
2819   * @return true if the connection is encrypted, false otherwise.
2820   */
2821  public boolean isSessionEncrypted()
2822  {
2823    final Session session = connectedRS.get().session;
2824    return session != null ? session.isEncrypted() : false;
2825  }
2826
2827  /**
2828   * Signals the RS we just entered a new status.
2829   * @param newStatus The status the local DS just entered
2830   */
2831  public void signalStatusChange(ServerStatus newStatus)
2832  {
2833    try
2834    {
2835      connectedRS.get().session.publish(
2836          new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus));
2837    } catch (IOException ex)
2838    {
2839      logger.error(ERR_EXCEPTION_SENDING_CS, getBaseDN(), getServerId(),
2840          ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
2841    }
2842  }
2843
2844  /**
2845   * Gets the info for DSs in the topology (except us).
2846   * @return The info for DSs in the topology (except us)
2847   */
2848  public Map<Integer, DSInfo> getReplicaInfos()
2849  {
2850    return topology.get().replicaInfos;
2851  }
2852
2853  /**
2854   * Gets the info for RSs in the topology (except the one we are connected
2855   * to).
2856   * @return The info for RSs in the topology (except the one we are connected
2857   * to)
2858   */
2859  public List<RSInfo> getRsInfos()
2860  {
2861    return toRSInfos(topology.get().rsInfos);
2862  }
2863
2864  private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos)
2865  {
2866    final List<RSInfo> result = new ArrayList<>();
2867    for (ReplicationServerInfo rsInfo : rsInfos.values())
2868    {
2869      result.add(rsInfo.toRSInfo());
2870    }
2871    return result;
2872  }
2873
2874  /**
2875   * Processes an incoming TopologyMsg.
2876   * Updates the structures for the local view of the topology.
2877   *
2878   * @param topoMsg
2879   *          The topology information received from RS.
2880   * @param rsServerId
2881   *          the serverId to use for the connectedDS
2882   */
2883  private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
2884  {
2885    final Topology newTopo = computeNewTopology(topoMsg, rsServerId);
2886    for (DSInfo dsInfo : newTopo.replicaInfos.values())
2887    {
2888      domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo
2889          .getEclIncludesForDeletes());
2890    }
2891  }
2892
2893  private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId)
2894  {
2895    Topology oldTopo;
2896    Topology newTopo;
2897    do
2898    {
2899      oldTopo = topology.get();
2900      newTopo = new Topology(topoMsg, getServerId(), rsServerId,
2901              getReplicationServerUrls(), oldTopo.rsInfos);
2902    }
2903    while (!topology.compareAndSet(oldTopo, newTopo));
2904
2905    if (logger.isTraceEnabled())
2906    {
2907      final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo);
2908      sb.append(" received TopologyMsg=").append(topoMsg);
2909      debugInfo(sb);
2910    }
2911    return newTopo;
2912  }
2913
2914  /**
2915   * Contains the last known state of the replication topology.
2916   */
2917  static final class Topology
2918  {
2919
2920    /**
2921     * The RS's serverId that this DS was connected to when this topology state
2922     * was computed.
2923     */
2924    private final int rsServerId;
2925    /**
2926     * Info for other DSs.
2927     * <p>
2928     * Warning: does not contain info for us (for our server id)
2929     */
2930    final Map<Integer, DSInfo> replicaInfos;
2931    /**
2932     * The map of replication server info initialized at connection time and
2933     * regularly updated. This is used to decide to which best suitable
2934     * replication server one wants to connect. Key: replication server id
2935     * Value: replication server info for the matching replication server id
2936     */
2937    final Map<Integer, ReplicationServerInfo> rsInfos;
2938
2939    private Topology()
2940    {
2941      this.rsServerId = -1;
2942      this.replicaInfos = Collections.emptyMap();
2943      this.rsInfos = Collections.emptyMap();
2944    }
2945
2946    /**
2947     * Constructor to use when only the RSInfos need to be recomputed.
2948     *
2949     * @param dsInfosToKeep
2950     *          the DSInfos that will be stored as is
2951     * @param newRSInfos
2952     *          the new RSInfos from which to compute the new topology
2953     * @param dsServerId
2954     *          the DS serverId
2955     * @param rsServerId
2956     *          the current connected RS serverId
2957     * @param configuredReplicationServerUrls
2958     *          the configured replication server URLs
2959     * @param previousRsInfos
2960     *          the RSInfos computed in the previous Topology object
2961     */
2962    Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos,
2963        int dsServerId, int rsServerId,
2964        Set<String> configuredReplicationServerUrls,
2965        Map<Integer, ReplicationServerInfo> previousRsInfos)
2966    {
2967      this.rsServerId = rsServerId;
2968      this.replicaInfos = dsInfosToKeep == null
2969          ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
2970      this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
2971          previousRsInfos, configuredReplicationServerUrls);
2972    }
2973
2974    /**
2975     * Constructor to use when a new TopologyMsg has been received.
2976     *
2977     * @param topoMsg
2978     *          the topology message containing the new DSInfos and RSInfos from
2979     *          which to compute the new topology
2980     * @param dsServerId
2981     *          the DS serverId
2982     * @param rsServerId
2983     *          the current connected RS serverId
2984     * @param configuredReplicationServerUrls
2985     *          the configured replication server URLs
2986     * @param previousRsInfos
2987     *          the RSInfos computed in the previous Topology object
2988     */
2989    Topology(TopologyMsg topoMsg, int dsServerId,
2990        int rsServerId, Set<String> configuredReplicationServerUrls,
2991        Map<Integer, ReplicationServerInfo> previousRsInfos)
2992    {
2993      this.rsServerId = rsServerId;
2994      this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId);
2995      this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(),
2996          previousRsInfos, configuredReplicationServerUrls);
2997    }
2998
2999    private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos,
3000        int dsServerId)
3001    {
3002      final Map<Integer, DSInfo> copy = new HashMap<>(dsInfos);
3003      copy.remove(dsServerId);
3004      return Collections.unmodifiableMap(copy);
3005    }
3006
3007    private Map<Integer, ReplicationServerInfo> computeRSInfos(
3008        int dsServerId, List<RSInfo> newRsInfos,
3009        Map<Integer, ReplicationServerInfo> previousRsInfos,
3010        Set<String> configuredReplicationServerUrls)
3011    {
3012      final Map<Integer, ReplicationServerInfo> results = new HashMap<>(previousRsInfos);
3013
3014      // Update replication server info list with the received topology info
3015      final Set<Integer> rssToKeep = new HashSet<>();
3016      for (RSInfo newRSInfo : newRsInfos)
3017      {
3018        final int rsId = newRSInfo.getId();
3019        rssToKeep.add(rsId); // Mark this server as still existing
3020        Set<Integer> connectedDSs =
3021            computeDSsConnectedTo(rsId, dsServerId);
3022        ReplicationServerInfo rsInfo = results.get(rsId);
3023        if (rsInfo == null)
3024        {
3025          // New replication server, create info for it add it to the list
3026          rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs);
3027          setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls);
3028          results.put(rsId, rsInfo);
3029        }
3030        else
3031        {
3032          // Update the existing info for the replication server
3033          rsInfo.update(newRSInfo, connectedDSs);
3034        }
3035      }
3036
3037      // Remove any replication server that may have disappeared from the
3038      // topology
3039      results.keySet().retainAll(rssToKeep);
3040
3041      return Collections.unmodifiableMap(results);
3042    }
3043
3044    /** Computes the list of DSs connected to a particular RS. */
3045    private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId)
3046    {
3047      final Set<Integer> connectedDSs = new HashSet<>();
3048      if (rsServerId == rsId)
3049      {
3050        /*
3051         * If we are computing connected DSs for the RS we are connected to, we
3052         * should count the local DS as the DSInfo of the local DS is not sent
3053         * by the replication server in the topology message. We must count
3054         * ourselves as a connected server.
3055         */
3056        connectedDSs.add(dsServerId);
3057      }
3058
3059      for (DSInfo dsInfo : replicaInfos.values())
3060      {
3061        if (dsInfo.getRsId() == rsId)
3062        {
3063          connectedDSs.add(dsInfo.getDsId());
3064        }
3065      }
3066
3067      return connectedDSs;
3068    }
3069
3070    /**
3071     * Sets the locally configured flag for the passed ReplicationServerInfo
3072     * object, analyzing the local configuration.
3073     *
3074     * @param rsInfo
3075     *          the Replication server to check and update
3076     * @param configuredReplicationServerUrls
3077     */
3078    private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo,
3079        Set<String> configuredReplicationServerUrls)
3080    {
3081      // Determine if the passed ReplicationServerInfo has a URL that is present
3082      // in the locally configured replication servers
3083      String rsUrl = rsInfo.getServerURL();
3084      if (rsUrl == null)
3085      {
3086        // The ReplicationServerInfo has been generated from a server with
3087        // no URL in TopologyMsg (i.e: with replication protocol version < 4):
3088        // ignore this server as we do not know how to connect to it
3089        rsInfo.setLocallyConfigured(false);
3090        return;
3091      }
3092      for (String serverUrl : configuredReplicationServerUrls)
3093      {
3094        if (isSameReplicationServerUrl(serverUrl, rsUrl))
3095        {
3096          // This RS is locally configured, mark this
3097          rsInfo.setLocallyConfigured(true);
3098          rsInfo.setServerURL(serverUrl);
3099          return;
3100        }
3101      }
3102      rsInfo.setLocallyConfigured(false);
3103    }
3104
3105    /** {@inheritDoc} */
3106    @Override
3107    public boolean equals(Object obj)
3108    {
3109      if (this == obj)
3110      {
3111        return true;
3112      }
3113      if (obj == null || getClass() != obj.getClass())
3114      {
3115        return false;
3116      }
3117      final Topology other = (Topology) obj;
3118      return rsServerId == other.rsServerId
3119          && Objects.equals(replicaInfos, other.replicaInfos)
3120          && Objects.equals(rsInfos, other.rsInfos)
3121          && urlsEqual1(replicaInfos, other.replicaInfos)
3122          && urlsEqual2(rsInfos, other.rsInfos);
3123    }
3124
3125    private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1,
3126        Map<Integer, DSInfo> replicaInfos2)
3127    {
3128      for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet())
3129      {
3130        DSInfo dsInfo = replicaInfos2.get(entry.getKey());
3131        if (!Objects.equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl()))
3132        {
3133          return false;
3134        }
3135      }
3136      return true;
3137    }
3138
3139    private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1,
3140        Map<Integer, ReplicationServerInfo> rsInfos2)
3141    {
3142      for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet())
3143      {
3144        ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey());
3145        if (!Objects.equals(entry.getValue().getServerURL(), rsInfo.getServerURL()))
3146        {
3147          return false;
3148        }
3149      }
3150      return true;
3151    }
3152
3153    /** {@inheritDoc} */
3154    @Override
3155    public int hashCode()
3156    {
3157      final int prime = 31;
3158      int result = 1;
3159      result = prime * result + rsServerId;
3160      result = prime * result
3161          + (replicaInfos == null ? 0 : replicaInfos.hashCode());
3162      result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode());
3163      return result;
3164    }
3165
3166    /** {@inheritDoc} */
3167    @Override
3168    public String toString()
3169    {
3170      return getClass().getSimpleName()
3171          + " rsServerId=" + rsServerId
3172          + ", replicaInfos=" + replicaInfos.values()
3173          + ", rsInfos=" + rsInfos.values();
3174    }
3175  }
3176
3177  /**
3178   * Check if the broker could not find any Replication Server and therefore
3179   * connection attempt failed.
3180   *
3181   * @return true if the server could not connect to any Replication Server.
3182   */
3183  boolean hasConnectionError()
3184  {
3185    return connectionError;
3186  }
3187
3188  /**
3189   * Starts publishing to the RS the current timestamp used in this server.
3190   */
3191  private void startChangeTimeHeartBeatPublishing(ConnectedRS rs)
3192  {
3193    // Start a CSN heartbeat thread.
3194    long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
3195    if (changeTimeHeartbeatInterval > 0)
3196    {
3197      final String threadName = "Replica DS(" + getServerId()
3198              + ") change time heartbeat publisher for domain \"" + getBaseDN()
3199              + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer;
3200
3201      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
3202          threadName, rs.session, changeTimeHeartbeatInterval, getServerId());
3203      ctHeartbeatPublisherThread.start();
3204    }
3205    else
3206    {
3207      if (logger.isTraceEnabled())
3208      {
3209        debugInfo("is not configured to send CSN heartbeat interval");
3210      }
3211    }
3212  }
3213
3214  /**
3215   * Stops publishing to the RS the current timestamp used in this server.
3216   */
3217  private synchronized void stopChangeTimeHeartBeatPublishing()
3218  {
3219    if (ctHeartbeatPublisherThread != null)
3220    {
3221      ctHeartbeatPublisherThread.shutdown();
3222      ctHeartbeatPublisherThread = null;
3223    }
3224  }
3225
3226  /**
3227   * Set the connectRequiresRecovery to the provided value.
3228   * This flag is used to indicate if a recovery of Update is necessary
3229   * after a reconnection to a RS.
3230   * It is the responsibility of the ReplicationDomain to set it during the
3231   * sessionInitiated phase.
3232   *
3233   * @param b the new value of the connectRequiresRecovery.
3234   */
3235  public void setRecoveryRequired(boolean b)
3236  {
3237    connectRequiresRecovery = b;
3238  }
3239
3240  /**
3241   * Returns whether the broker is shutting down.
3242   * @return whether the broker is shutting down.
3243   */
3244  boolean shuttingDown()
3245  {
3246    return shutdown;
3247  }
3248
3249  /**
3250   * Returns the local address of this replication domain, or the empty string
3251   * if it is not yet connected.
3252   *
3253   * @return The local address.
3254   */
3255  String getLocalUrl()
3256  {
3257    final Session session = connectedRS.get().session;
3258    return session != null ? session.getLocalUrl() : "";
3259  }
3260
3261  /**
3262   * Returns the replication monitor instance name associated with this broker.
3263   *
3264   * @return The replication monitor instance name.
3265   */
3266  String getReplicationMonitorInstanceName()
3267  {
3268    // Only invoked by replication domain so always non-null.
3269    return monitor.getMonitorInstanceName();
3270  }
3271
3272  private ConnectedRS setConnectedRS(final ConnectedRS newRS)
3273  {
3274    final ConnectedRS oldRS = connectedRS.getAndSet(newRS);
3275    if (!oldRS.equals(newRS) && oldRS.session != null)
3276    {
3277      // monitor name is changing, deregister before registering again
3278      deregisterReplicationMonitor();
3279      oldRS.session.close();
3280      registerReplicationMonitor();
3281    }
3282    return newRS;
3283  }
3284
3285  /**
3286   * Must be invoked each time the session changes because, the monitor name is
3287   * dynamically created with the session name, while monitor registration is
3288   * static.
3289   *
3290   * @see #monitor
3291   */
3292  private void registerReplicationMonitor()
3293  {
3294    // The monitor should not be registered if this is a unit test
3295    // because the replication domain is null.
3296    if (monitor != null)
3297    {
3298      DirectoryServer.registerMonitorProvider(monitor);
3299    }
3300  }
3301
3302  private void deregisterReplicationMonitor()
3303  {
3304    // The monitor should not be deregistered if this is a unit test
3305    // because the replication domain is null.
3306    if (monitor != null)
3307    {
3308      DirectoryServer.deregisterMonitorProvider(monitor);
3309    }
3310  }
3311
3312  /** {@inheritDoc} */
3313  @Override
3314  public String toString()
3315  {
3316    final StringBuilder sb = new StringBuilder();
3317    sb.append(getClass().getSimpleName())
3318      .append(" \"").append(getBaseDN()).append(" ")
3319      .append(getServerId()).append("\",")
3320      .append(" groupId=").append(getGroupId())
3321      .append(", genId=").append(getGenerationID())
3322      .append(", ");
3323    connectedRS.get().toString(sb);
3324    return sb.toString();
3325  }
3326
3327  private void debugInfo(CharSequence message)
3328  {
3329    logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
3330        + " and serverId=" + getServerId() + ": " + message);
3331  }
3332}