001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server;
028
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.replication.protocol.ProtocolVersion.*;
031
032import java.io.IOException;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ConcurrentHashMap;
037
038import org.forgerock.i18n.LocalizableMessage;
039import org.forgerock.i18n.slf4j.LocalizedLogger;
040import org.forgerock.opendj.ldap.ResultCode;
041import org.opends.server.replication.common.DSInfo;
042import org.opends.server.replication.common.RSInfo;
043import org.opends.server.replication.common.ServerState;
044import org.opends.server.replication.common.ServerStatus;
045import org.opends.server.replication.protocol.*;
046import org.opends.server.types.*;
047
048/**
049 * This class defines a server handler, which handles all interaction with a
050 * peer replication server.
051 */
052public class ReplicationServerHandler extends ServerHandler
053{
054  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
055
056  /** Properties filled only if remote server is a RS. */
057  private String serverAddressURL;
058  /**
059   * This collection will contain as many elements as there are
060   * LDAP servers connected to the remote replication server.
061   */
062  private final Map<Integer, LightweightServerHandler> remoteDirectoryServers = new ConcurrentHashMap<>();
063
064  /**
065   * Starts this handler based on a start message received from remote server.
066   * @param inReplServerStartMsg The start msg provided by the remote server.
067   * @return Whether the remote server requires encryption or not.
068   * @throws DirectoryException When a problem occurs.
069   */
070  private boolean processStartFromRemote(
071        ReplServerStartMsg inReplServerStartMsg)
072        throws DirectoryException
073  {
074    try
075    {
076      short protocolVersion = getCompatibleVersion(inReplServerStartMsg
077          .getVersion());
078      session.setProtocolVersion(protocolVersion);
079      generationId = inReplServerStartMsg.getGenerationId();
080      serverId = inReplServerStartMsg.getServerId();
081      serverURL = inReplServerStartMsg.getServerURL();
082      serverAddressURL = toServerAddressURL(serverURL);
083      setBaseDNAndDomain(inReplServerStartMsg.getBaseDN(), false);
084      setInitialServerState(inReplServerStartMsg.getServerState());
085      setSendWindowSize(inReplServerStartMsg.getWindowSize());
086      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
087      {
088        // We support connection from a V1 RS
089        // Only V2 protocol has the group id in repl server start message
090        this.groupId = inReplServerStartMsg.getGroupId();
091      }
092
093      oldGenerationId = -100;
094    }
095    catch(Exception e)
096    {
097      LocalizableMessage message = LocalizableMessage.raw(e.getLocalizedMessage());
098      throw new DirectoryException(ResultCode.OTHER, message);
099    }
100    return inReplServerStartMsg.getSSLEncryption();
101  }
102
103  private String toServerAddressURL(String serverURL)
104  {
105    final int port = HostPort.valueOf(serverURL).getPort();
106    // Ensure correct formatting of IPv6 addresses by using a HostPort instance.
107    return new HostPort(session.getRemoteAddress(), port).toString();
108  }
109
110  /**
111   * Sends a start message to the remote RS.
112   *
113   * @return The ReplServerStartMsg sent.
114   * @throws IOException
115   *           When an exception occurs.
116   */
117  private ReplServerStartMsg sendStartToRemote() throws IOException
118  {
119    ReplServerStartMsg outReplServerStartMsg = createReplServerStartMsg();
120    send(outReplServerStartMsg);
121    return outReplServerStartMsg;
122  }
123
124  /**
125   * Creates a new handler object to a remote replication server.
126   * @param session The session with the remote RS.
127   * @param queueSize The queue size to manage updates to that RS.
128   * @param replicationServer The hosting local RS object.
129   * @param rcvWindowSize The receiving window size.
130   */
131  public ReplicationServerHandler(
132      Session session,
133      int queueSize,
134      ReplicationServer replicationServer,
135      int rcvWindowSize)
136  {
137    super(session, queueSize, replicationServer, rcvWindowSize);
138  }
139
140  /**
141   * Connect the hosting RS to the RS represented by THIS handler
142   * on an outgoing connection.
143   * @param baseDN The baseDN
144   * @param sslEncryption The sslEncryption requested to the remote RS.
145   * @throws DirectoryException when an error occurs.
146   */
147  public void connect(DN baseDN, boolean sslEncryption)
148      throws DirectoryException
149  {
150    // we are the initiator and decides of the encryption
151    this.sslEncryption = sslEncryption;
152
153    setBaseDNAndDomain(baseDN, false);
154
155    localGenerationId = replicationServerDomain.getGenerationId();
156    oldGenerationId = localGenerationId;
157
158    try
159    {
160      lockDomainNoTimeout();
161
162      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
163
164      // Wait answer
165      ReplicationMsg msg = session.receive();
166
167      // Reject bad responses
168      if (!(msg instanceof ReplServerStartMsg))
169      {
170        if (msg instanceof StopMsg)
171        {
172          // Remote replication server is probably shutting down or simultaneous
173          // cross-connect detected.
174          abortStart(null);
175        }
176        else
177        {
178          LocalizableMessage message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg
179              .getClass().getCanonicalName(), "ReplServerStartMsg");
180          abortStart(message);
181        }
182        return;
183      }
184
185      processStartFromRemote((ReplServerStartMsg) msg);
186
187      if (replicationServerDomain.isAlreadyConnectedToRS(this))
188      {
189        // Simultaneous cross connect.
190        abortStart(null);
191        return;
192      }
193
194      /*
195      Since we are going to send the topology message before having received
196      one, we need to set the generation ID as soon as possible if it is
197      currently uninitialized. See OpenDJ-121.
198      */
199      if (localGenerationId < 0 && generationId > 0)
200      {
201        oldGenerationId =
202            replicationServerDomain.changeGenerationId(generationId);
203      }
204
205      logStartHandshakeSNDandRCV(outReplServerStartMsg,(ReplServerStartMsg)msg);
206
207      // Until here session is encrypted then it depends on the negotiation
208      // The session initiator decides whether to use SSL.
209      if (!this.sslEncryption)
210      {
211        session.stopEncryption();
212      }
213
214      if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
215      {
216        /*
217        Only protocol version above V1 has a phase 2 handshake
218        NOW PROCEED WITH SECOND PHASE OF HANDSHAKE:
219        TopologyMsg then TopologyMsg (with a RS)
220
221        Send our own TopologyMsg to remote RS
222        */
223        TopologyMsg outTopoMsg =
224            replicationServerDomain.createTopologyMsgForRS();
225        sendTopoInfo(outTopoMsg);
226
227        // wait and process Topo from remote RS
228        TopologyMsg inTopoMsg = waitAndProcessTopoFromRemoteRS();
229        if (inTopoMsg == null)
230        {
231          // Simultaneous cross connect.
232          abortStart(null);
233          return;
234        }
235
236        logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
237
238        /*
239        FIXME: i think this should be done for all protocol version !!
240        not only those > V1
241        */
242        replicationServerDomain.register(this);
243
244        /*
245        Process TopologyMsg sent by remote RS: store matching new info
246        (this will also warn our connected DSs of the new received info)
247        */
248        replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false);
249      }
250
251      logger.debug(INFO_REPLICATION_SERVER_CONNECTION_TO_RS, getReplicationServerId(), getServerId(),
252          replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress());
253
254      super.finalizeStart();
255    }
256    catch (IOException e)
257    {
258      logger.traceException(e);
259      LocalizableMessage errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
260          getReplicationServerId(),
261          session.getReadableRemoteAddress());
262      abortStart(errMessage);
263    }
264    catch (DirectoryException e)
265    {
266      logger.traceException(e);
267      abortStart(e.getMessageObject());
268    }
269    catch (Exception e)
270    {
271      logger.traceException(e);
272      abortStart(LocalizableMessage.raw(e.getLocalizedMessage()));
273    }
274    finally
275    {
276      releaseDomainLock();
277    }
278  }
279
280  /**
281   * Starts the handler from a remote ReplServerStart message received from
282   * the remote replication server.
283   * @param inReplServerStartMsg The provided ReplServerStart message received.
284   */
285  public void startFromRemoteRS(ReplServerStartMsg inReplServerStartMsg)
286  {
287    localGenerationId = -1;
288    oldGenerationId = -100;
289    try
290    {
291      // The initiator decides if the session is encrypted
292      sslEncryption = processStartFromRemote(inReplServerStartMsg);
293
294      lockDomainWithTimeout();
295
296      if (replicationServerDomain.isAlreadyConnectedToRS(this))
297      {
298        abortStart(null);
299        return;
300      }
301
302      this.localGenerationId = replicationServerDomain.getGenerationId();
303      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
304
305      logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
306
307      /*
308      until here session is encrypted then it depends on the negotiation
309      The session initiator decides whether to use SSL.
310      */
311      if (!sslEncryption)
312      {
313        session.stopEncryption();
314      }
315
316      TopologyMsg inTopoMsg = null;
317      if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
318      {
319        /*
320        Only protocol version above V1 has a phase 2 handshake
321        NOW PROCEED WITH SECOND PHASE OF HANDSHAKE:
322        TopologyMsg then TopologyMsg (with a RS)
323        wait and process Topo from remote RS
324        */
325        inTopoMsg = waitAndProcessTopoFromRemoteRS();
326        if (inTopoMsg == null)
327        {
328          // Simultaneous cross connect.
329          abortStart(null);
330          return;
331        }
332
333        // send our own TopologyMsg to remote RS
334        TopologyMsg outTopoMsg = replicationServerDomain
335            .createTopologyMsgForRS();
336        sendTopoInfo(outTopoMsg);
337
338        logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg);
339      }
340      else
341      {
342        // Terminate connection from a V1 RS
343
344        // if the remote RS and the local RS have the same genID
345        // then it's ok and nothing else to do
346        if (generationId == localGenerationId)
347        {
348          if (logger.isTraceEnabled())
349          {
350            logger.trace("In " + replicationServer.getMonitorInstanceName()
351                + " " + this + " RS V1 with serverID=" + serverId
352                + " is connected with the right generation ID");
353          }
354        } else
355        {
356          checkGenerationId();
357        }
358        /*
359        Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1
360        all the servers of the topology. We prefer not not send a TopologyMsg
361        for giving partial/false information to the V2 servers as for
362        instance we don't have the connected DS of the V1 RS...When the V1
363        RS will be upgraded in his turn, topo info will be sent and accurate.
364        That way, there is  no risk to have false/incomplete information in
365        other servers.
366        */
367      }
368
369      replicationServerDomain.register(this);
370
371      // Process TopologyMsg sent by remote RS: store matching new info
372      // (this will also warn our connected DSs of the new received info)
373      if (inTopoMsg!=null)
374      {
375        replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false);
376      }
377
378      logger.debug(INFO_REPLICATION_SERVER_CONNECTION_FROM_RS, getReplicationServerId(), getServerId(),
379          replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress());
380
381      super.finalizeStart();
382    }
383    catch (IOException e)
384    {
385      logger.traceException(e);
386      abortStart(ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
387          inReplServerStartMsg.getServerId(), replicationServer.getServerId()));
388    }
389    catch (DirectoryException e)
390    {
391      logger.traceException(e);
392      abortStart(e.getMessageObject());
393    }
394    catch (Exception e)
395    {
396      logger.traceException(e);
397      abortStart(LocalizableMessage.raw(e.getLocalizedMessage()));
398    }
399    finally
400    {
401      releaseDomainLock();
402    }
403  }
404
405  /**
406   * Wait receiving the TopologyMsg from the remote RS and process it.
407   * @return the topologyMsg received or {@code null} if stop was received.
408   * @throws DirectoryException
409   */
410  private TopologyMsg waitAndProcessTopoFromRemoteRS()
411      throws DirectoryException
412  {
413    ReplicationMsg msg;
414    try
415    {
416      msg = session.receive();
417    }
418    catch(Exception e)
419    {
420      LocalizableMessage message = LocalizableMessage.raw(e.getLocalizedMessage());
421      throw new DirectoryException(ResultCode.OTHER, message);
422    }
423
424    if (!(msg instanceof TopologyMsg))
425    {
426      if (msg instanceof StopMsg)
427      {
428        // Remote replication server is probably shutting down, or cross
429        // connection attempt.
430        return null;
431      }
432
433      LocalizableMessage message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(
434          msg.getClass().getCanonicalName(), "TopologyMsg");
435      throw new DirectoryException(ResultCode.OTHER, message);
436    }
437
438    // Remote RS sent his topo msg
439    TopologyMsg inTopoMsg = (TopologyMsg) msg;
440
441    /* Store remote RS weight if it has one.
442     * For protocol version < 4, use default value of 1 for weight
443     */
444    if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
445    {
446      // List should only contain RS info for sender
447      RSInfo rsInfo = inTopoMsg.getRsInfos().get(0);
448      weight = rsInfo.getWeight();
449    }
450
451    /*
452    if the remote RS and the local RS have the same genID
453    then it's ok and nothing else to do
454    */
455    if (generationId == localGenerationId)
456    {
457      if (logger.isTraceEnabled())
458      {
459        logger.trace("In " + replicationServer.getMonitorInstanceName()
460            + " RS with serverID=" + serverId
461            + " is connected with the right generation ID, same as local ="
462            + generationId);
463      }
464    }
465    else
466    {
467      checkGenerationId();
468    }
469
470    return inTopoMsg;
471  }
472
473  /**
474   * Checks local generation ID against the remote RS one,
475   * and logs Warning messages if needed.
476   */
477  private void checkGenerationId()
478  {
479    if (localGenerationId <= 0)
480    {
481      // The local RS is not initialized - take the one received
482      // WARNING: Must be done before computing topo message to send to peer
483      // server as topo message must embed valid generation id for our server
484      oldGenerationId =
485          replicationServerDomain.changeGenerationId(generationId);
486      return;
487    }
488
489    // the local RS is initialized
490    if (generationId > 0
491        // the remote RS is initialized. If not, there's nothing to do anyway.
492        && generationId != localGenerationId)
493    {
494      /* Either:
495       *
496       * 1) The 2 RS have different generationID
497       * replicationServerDomain.getGenerationIdSavedStatus() == true
498       *
499       * if the present RS has received changes regarding its gen ID and so will
500       * not change without a reset then we are just degrading the peer.
501       *
502       * 2) This RS has never received any changes for the current gen ID.
503       *
504       * Example case:
505       * - we are in RS1
506       * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
507       * - RS1 has genId1 from LS1 /genId1 comes from data in suffix
508       * - we are in RS1 and we receive a START msg from RS2
509       * - Each RS keeps its genID / is degraded and when LS2
510       * will be populated from LS1 everything will become ok.
511       *
512       * Issue:
513       * FIXME : Would it be a good idea in some cases to just set the gen ID
514       * received from the peer RS specially if the peer has a non null state
515       * and we have a null state ?
516       * replicationServerDomain.setGenerationId(generationId, false);
517       */
518      logger.warn(WARN_BAD_GENERATION_ID_FROM_RS, serverId, session.getReadableRemoteAddress(), generationId,
519          getBaseDN(), getReplicationServerId(), localGenerationId);
520    }
521  }
522
523  /** {@inheritDoc} */
524  @Override
525  public boolean isDataServer()
526  {
527    return false;
528  }
529
530  /**
531   * Add the DSinfos of the connected Directory Servers
532   * to the List of DSInfo provided as a parameter.
533   *
534   * @param dsInfos The List of DSInfo that should be updated
535   *                with the DSInfo for the remoteDirectoryServers
536   *                connected to this ServerHandler.
537   */
538  public void addDSInfos(List<DSInfo> dsInfos)
539  {
540    synchronized (remoteDirectoryServers)
541    {
542      for (LightweightServerHandler ls : remoteDirectoryServers.values())
543      {
544        dsInfos.add(ls.toDSInfo());
545      }
546    }
547  }
548
549  /**
550   * Shutdown This ServerHandler.
551   */
552  @Override
553  public void shutdown()
554  {
555    super.shutdown();
556    clearRemoteLSHandlers();
557  }
558
559  private void clearRemoteLSHandlers()
560  {
561    synchronized (remoteDirectoryServers)
562    {
563      for (LightweightServerHandler lsh : remoteDirectoryServers.values())
564      {
565        lsh.stopHandler();
566      }
567      remoteDirectoryServers.clear();
568    }
569  }
570
571  /**
572   * Stores topology information received from a peer RS and that must be kept
573   * in RS handler.
574   *
575   * @param topoMsg The received topology message
576   */
577  public void processTopoInfoFromRS(TopologyMsg topoMsg)
578  {
579    // List should only contain RS info for sender
580    final RSInfo rsInfo = topoMsg.getRsInfos().get(0);
581    generationId = rsInfo.getGenerationId();
582    groupId = rsInfo.getGroupId();
583    weight = rsInfo.getWeight();
584
585    synchronized (remoteDirectoryServers)
586    {
587      clearRemoteLSHandlers();
588
589      // Creates the new structure according to the message received.
590      for (DSInfo dsInfo : topoMsg.getReplicaInfos().values())
591      {
592        // For each DS connected to the peer RS
593        DSInfo clonedDSInfo = dsInfo.cloneWithReplicationServerId(serverId);
594        LightweightServerHandler lsh =
595            new LightweightServerHandler(this, clonedDSInfo);
596        lsh.startHandler();
597        remoteDirectoryServers.put(lsh.getServerId(), lsh);
598      }
599    }
600  }
601
602  /**
603   * When this handler is connected to a replication server, specifies if
604   * a wanted server is connected to this replication server.
605   *
606   * @param serverId The server we want to know if it is connected
607   * to the replication server represented by this handler.
608   * @return boolean True is the wanted server is connected to the server
609   * represented by this handler.
610   */
611  public boolean isRemoteLDAPServer(int serverId)
612  {
613    synchronized (remoteDirectoryServers)
614    {
615      for (LightweightServerHandler server : remoteDirectoryServers.values())
616      {
617        if (serverId == server.getServerId())
618        {
619          return true;
620        }
621      }
622      return false;
623    }
624  }
625
626  /**
627   * When the handler is connected to a replication server, specifies the
628   * replication server has remote LDAP servers connected to it.
629   *
630   * @return boolean True is the replication server has remote LDAP servers
631   * connected to it.
632   */
633  public boolean hasRemoteLDAPServers()
634  {
635    return !remoteDirectoryServers.isEmpty();
636  }
637
638  /**
639   * Return a Set containing the servers known by this replicationServer.
640   * @return a set containing the servers known by this replicationServer.
641   */
642  public Set<Integer> getConnectedDirectoryServerIds()
643  {
644    return remoteDirectoryServers.keySet();
645  }
646
647  /** {@inheritDoc} */
648  @Override
649  public String getMonitorInstanceName()
650  {
651    return "Connected replication server RS(" + serverId + ") " + serverURL
652        + ",cn=" + replicationServerDomain.getMonitorInstanceName();
653  }
654
655  /** {@inheritDoc} */
656  @Override
657  public List<Attribute> getMonitorData()
658  {
659    // Get the generic ones
660    List<Attribute> attributes = super.getMonitorData();
661
662    // Add the specific RS ones
663    attributes.add(Attributes.create("Replication-Server", serverURL));
664
665    ReplicationDomainMonitorData md =
666        replicationServerDomain.getDomainMonitorData();
667
668    // Missing changes
669    attributes.add(Attributes.create("missing-changes",
670        String.valueOf(md.getMissingChangesRS(serverId))));
671
672    // get the Server State
673    ServerState state = md.getRSStates(serverId);
674    if (state != null)
675    {
676      AttributeBuilder builder = new AttributeBuilder("server-state");
677      builder.addAllStrings(state.toStringSet());
678      attributes.add(builder.toAttribute());
679    }
680
681    return attributes;
682  }
683
684  /** {@inheritDoc} */
685  @Override
686  public String toString()
687  {
688    if (serverId != 0)
689    {
690      return "Replication server RS(" + serverId + ") for domain \""
691          + replicationServerDomain.getBaseDN() + "\"";
692    }
693    return "Unknown server";
694  }
695
696  /**
697   * Gets the status of the connected DS.
698   * @return The status of the connected DS.
699   */
700  @Override
701  public ServerStatus getStatus()
702  {
703    return ServerStatus.INVALID_STATUS;
704  }
705
706  /**
707   * Retrieves the Address URL for this server handler.
708   *
709   * @return  The Address URL for this server handler,
710   *          in the form of an IP address and port separated by a colon.
711   */
712  public String getServerAddressURL()
713  {
714    return serverAddressURL;
715  }
716
717  /**
718   * Receives a topology msg.
719   * @param topoMsg The message received.
720   * @throws DirectoryException when it occurs.
721   * @throws IOException when it occurs.
722   */
723  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
724  throws DirectoryException, IOException
725  {
726    replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
727  }
728
729}