001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server;
028
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.replication.common.ServerStatus.*;
031import static org.opends.server.replication.common.StatusMachine.*;
032import static org.opends.server.replication.protocol.ProtocolVersion.*;
033
034import java.io.IOException;
035import java.util.*;
036
037import org.forgerock.i18n.LocalizableMessage;
038import org.forgerock.i18n.slf4j.LocalizedLogger;
039import org.forgerock.opendj.ldap.ResultCode;
040import org.opends.server.replication.common.*;
041import org.opends.server.replication.protocol.*;
042import org.opends.server.types.*;
043
044/**
045 * This class defines a server handler, which handles all interaction with a
046 * peer server (RS or DS).
047 */
048public class DataServerHandler extends ServerHandler
049{
050
051  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
052
053  /**
054   * Temporary generationId received in handshake/phase1, and used after
055   * handshake/phase2.
056   */
057  private long tmpGenerationId;
058
059  /** Status of this DS (only used if this server handler represents a DS). */
060  private ServerStatus status = ServerStatus.INVALID_STATUS;
061
062  /** Referrals URLs this DS is exporting. */
063  private List<String> refUrls = new ArrayList<>();
064  /** Assured replication enabled on DS or not. */
065  private boolean assuredFlag;
066  /** DS assured mode (relevant if assured replication enabled). */
067  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
068  /** DS safe data level (relevant if assured mode is safe data). */
069  private byte safeDataLevel = -1;
070  private Set<String> eclIncludes = new HashSet<>();
071  private Set<String> eclIncludesForDeletes = new HashSet<>();
072
073  /**
074   * Creates a new data server handler.
075   * @param session The session opened with the remote data server.
076   * @param queueSize The queue size.
077   * @param replicationServer The hosting RS.
078   * @param rcvWindowSize The receiving window size.
079   */
080  public DataServerHandler(
081      Session session,
082      int queueSize,
083      ReplicationServer replicationServer,
084      int rcvWindowSize)
085  {
086    super(session, queueSize, replicationServer, rcvWindowSize);
087  }
088
089  /**
090   * Order the peer DS server to change his status or close the connection
091   * according to the requested new generation id.
092   * @param newGenId The new generation id to take into account
093   * @throws IOException If IO error occurred.
094   */
095  public void changeStatusForResetGenId(long newGenId) throws IOException
096  {
097    StatusMachineEvent event = getStatusMachineEvent(newGenId);
098    if (event == null)
099    {
100      return;
101    }
102
103    if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
104        && status == ServerStatus.FULL_UPDATE_STATUS)
105    {
106      // Prevent useless error message (full update status cannot lead to bad gen status)
107      logger.info(NOTE_BAD_GEN_ID_IN_FULL_UPDATE, replicationServer.getServerId(),
108          getBaseDN(), serverId, generationId, newGenId);
109      return;
110    }
111
112    changeStatus(event, "for reset gen id");
113  }
114
115  private StatusMachineEvent getStatusMachineEvent(long newGenId)
116  {
117    if (newGenId == -1)
118    {
119      // The generation id is being made invalid, let's put the DS
120      // into BAD_GEN_ID_STATUS
121      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
122    }
123    if (newGenId != generationId)
124    {
125      // This server has a bad generation id compared to new reference one,
126      // let's put it into BAD_GEN_ID_STATUS
127      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
128    }
129
130    if (status != ServerStatus.BAD_GEN_ID_STATUS)
131    {
132      if (logger.isTraceEnabled())
133      {
134        logger.trace("In RS " + replicationServer.getServerId()
135            + ", DS " + getServerId() + " for baseDN=" + getBaseDN()
136            + " has already generation id " + newGenId
137            + " so no ChangeStatusMsg sent to him.");
138      }
139      return null;
140    }
141
142    // This server has the good new reference generation id.
143    // Close connection with him to force his reconnection: DS will
144    // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
145
146    if (logger.isTraceEnabled())
147    {
148      logger.trace("In RS " + replicationServer.getServerId()
149          + ", closing connection to DS " + getServerId() + " for baseDN=" + getBaseDN()
150          + " to force reconnection as new local generationId"
151          + " and remote one match and DS is in bad gen id: " + newGenId);
152    }
153
154    // Connection closure must not be done calling RSD.stopHandler() as it
155    // would rewait the RSD lock that we already must have entering this
156    // method. This would lead to a reentrant lock which we do not want.
157    // So simply close the session, this will make the hang up appear
158    // after the reader thread that took the RSD lock releases it.
159    if (session != null
160        // V4 protocol introduced a StopMsg to properly close the
161        // connection between servers
162        && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
163    {
164      try
165      {
166        session.publish(new StopMsg());
167      }
168      catch (IOException ioe)
169      {
170        // Anyway, going to close session, so nothing to do
171      }
172    }
173
174    // NOT_CONNECTED_STATUS is the last one in RS session life: handler
175    // will soon disappear after this method call...
176    status = ServerStatus.NOT_CONNECTED_STATUS;
177    return null;
178  }
179
180  /**
181   * Change the status according to the event.
182   *
183   * @param event
184   *          The event to be used for new status computation
185   * @return The new status of the DS
186   * @throws IOException
187   *           When raised by the underlying session
188   */
189  public ServerStatus changeStatus(StatusMachineEvent event) throws IOException
190  {
191    return changeStatus(event, "from status analyzer");
192  }
193
194  private ServerStatus changeStatus(StatusMachineEvent event, String origin)
195      throws IOException
196  {
197    // Check state machine allows this new status (Sanity check)
198    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
199    if (newStatus == ServerStatus.INVALID_STATUS)
200    {
201      logger.error(ERR_RS_CANNOT_CHANGE_STATUS, getBaseDN(), serverId, status, event);
202      // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice
203      // versa. We may be trying to change the status while another status has
204      // just been entered: e.g a full update has just been engaged.
205      // In that case, just ignore attempt to change the status
206      return newStatus;
207    }
208
209    // Send message requesting to change the DS status
210    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS);
211
212    if (logger.isTraceEnabled())
213    {
214      logger.trace("In RS " + replicationServer.getServerId()
215          + " Sending change status " + origin + " to " + getServerId()
216          + " for baseDN=" + getBaseDN() + ":\n" + csMsg);
217    }
218
219    session.publish(csMsg);
220
221    status = newStatus;
222
223    return newStatus;
224  }
225
226  /** {@inheritDoc} */
227  @Override
228  public List<Attribute> getMonitorData()
229  {
230    // Get the generic ones
231    List<Attribute> attributes = super.getMonitorData();
232
233    // Add the specific DS ones
234    attributes.add(Attributes.create("replica", serverURL));
235    attributes.add(Attributes.create("connected-to",
236        this.replicationServer.getMonitorInstanceName()));
237
238    ReplicationDomainMonitorData md =
239        replicationServerDomain.getDomainMonitorData();
240
241    // Oldest missing update
242    long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
243    if (approxFirstMissingDate > 0)
244    {
245      Date date = new Date(approxFirstMissingDate);
246      attributes.add(Attributes.create(
247          "approx-older-change-not-synchronized", date.toString()));
248      attributes.add(Attributes.create(
249          "approx-older-change-not-synchronized-millis", String
250          .valueOf(approxFirstMissingDate)));
251    }
252
253    // Missing changes
254    attributes.add(Attributes.create("missing-changes",
255        String.valueOf(md.getMissingChanges(serverId))));
256
257    // Replication delay
258    attributes.add(Attributes.create("approximate-delay",
259        String.valueOf(md.getApproxDelay(serverId))));
260
261    /* get the Server State */
262    ServerState state = md.getLDAPServerState(serverId);
263    if (state != null)
264    {
265      AttributeBuilder builder = new AttributeBuilder("server-state");
266      builder.addAllStrings(state.toStringSet());
267      attributes.add(builder.toAttribute());
268    }
269
270    return attributes;
271  }
272
273  /** {@inheritDoc} */
274  @Override
275  public String getMonitorInstanceName()
276  {
277    return "Connected directory server DS(" + serverId + ") " + serverURL
278        + ",cn=" + replicationServerDomain.getMonitorInstanceName();
279  }
280
281  /**
282   * Gets the status of the connected DS.
283   * @return The status of the connected DS.
284   */
285  @Override
286  public ServerStatus getStatus()
287  {
288    return status;
289  }
290
291  /** {@inheritDoc} */
292  @Override
293  public boolean isDataServer()
294  {
295    return true;
296  }
297
298  /**
299   * Process message of a remote server changing his status.
300   * @param csMsg The message containing the new status
301   * @return The new server status of the DS
302   */
303  public ServerStatus processNewStatus(ChangeStatusMsg csMsg)
304  {
305    // Get the status the DS just entered
306    ServerStatus reqStatus = csMsg.getNewStatus();
307    // Translate new status to a state machine event
308    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
309    if (event == StatusMachineEvent.INVALID_EVENT)
310    {
311      logger.error(ERR_RS_INVALID_NEW_STATUS, reqStatus, getBaseDN(), serverId);
312      return ServerStatus.INVALID_STATUS;
313    }
314
315    // Check state machine allows this new status
316    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
317    if (newStatus == ServerStatus.INVALID_STATUS)
318    {
319      logger.error(ERR_RS_CANNOT_CHANGE_STATUS, getBaseDN(), serverId, status, event);
320      return ServerStatus.INVALID_STATUS;
321    }
322
323    status = newStatus;
324    return status;
325  }
326
327  /**
328   * Processes a start message received from a remote data server.
329   * @param serverStartMsg The provided start message received.
330   * @return flag specifying whether the remote server requests encryption.
331   * @throws DirectoryException raised when an error occurs.
332   */
333  public boolean processStartFromRemote(ServerStartMsg serverStartMsg)
334  throws DirectoryException
335  {
336    session
337        .setProtocolVersion(getCompatibleVersion(serverStartMsg.getVersion()));
338    tmpGenerationId = serverStartMsg.getGenerationId();
339    serverId = serverStartMsg.getServerId();
340    serverURL = serverStartMsg.getServerURL();
341    groupId = serverStartMsg.getGroupId();
342    heartbeatInterval = serverStartMsg.getHeartbeatInterval();
343
344    // generic stuff
345    setBaseDNAndDomain(serverStartMsg.getBaseDN(), true);
346    setInitialServerState(serverStartMsg.getServerState());
347    setSendWindowSize(serverStartMsg.getWindowSize());
348
349    if (heartbeatInterval < 0)
350    {
351      heartbeatInterval = 0;
352    }
353    return serverStartMsg.getSSLEncryption();
354  }
355
356  /** Send our own TopologyMsg to DS. */
357  private TopologyMsg sendTopoToRemoteDS() throws IOException
358  {
359    TopologyMsg outTopoMsg = replicationServerDomain
360        .createTopologyMsgForDS(this.serverId);
361    sendTopoInfo(outTopoMsg);
362    return outTopoMsg;
363  }
364
365  /**
366   * Starts the handler from a remote ServerStart message received from
367   * the remote data server.
368   * @param inServerStartMsg The provided ServerStart message received.
369   */
370  public void startFromRemoteDS(ServerStartMsg inServerStartMsg)
371  {
372    try
373    {
374      // initializations
375      localGenerationId = -1;
376      oldGenerationId = -100;
377
378      // processes the ServerStart message received
379      boolean sessionInitiatorSSLEncryption =
380        processStartFromRemote(inServerStartMsg);
381
382      /**
383       * Hack to be sure that if a server disconnects and reconnect, we
384       * let the reader thread see the closure and cleanup any reference
385       * to old connection. This must be done before taking the domain lock so
386       * that the reader thread has a chance to stop the handler.
387       *
388       * TODO: This hack should be removed and disconnection/reconnection
389       * properly dealt with.
390       */
391      if (replicationServerDomain.getConnectedDSs()
392          .containsKey(inServerStartMsg.getServerId()))
393      {
394        try {
395          Thread.sleep(100);
396        }
397        catch(Exception e){
398          abortStart(null);
399          return;
400        }
401      }
402
403      lockDomainNoTimeout();
404
405      localGenerationId = replicationServerDomain.getGenerationId();
406      oldGenerationId = localGenerationId;
407
408      if (replicationServerDomain.isAlreadyConnectedToDS(this))
409      {
410        abortStart(null);
411        return;
412      }
413
414      try
415      {
416        StartMsg outStartMsg = sendStartToRemote();
417
418        logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
419
420        // The session initiator decides whether to use SSL.
421        // Until here session is encrypted then it depends on the negotiation
422        if (!sessionInitiatorSSLEncryption)
423        {
424          session.stopEncryption();
425        }
426
427        // wait and process StartSessionMsg from remote RS
428        StartSessionMsg inStartSessionMsg =
429          waitAndProcessStartSessionFromRemoteDS();
430        if (inStartSessionMsg == null)
431        {
432          // DS wants to properly close the connection (DS sent a StopMsg)
433          logStopReceived();
434          abortStart(null);
435          return;
436        }
437
438        // Send our own TopologyMsg to remote DS
439        TopologyMsg outTopoMsg = sendTopoToRemoteDS();
440
441        logStartSessionHandshake(inStartSessionMsg, outTopoMsg);
442      }
443      catch(IOException e)
444      {
445        LocalizableMessage errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
446            inServerStartMsg.getServerId(), replicationServer.getServerId());
447        throw new DirectoryException(ResultCode.OTHER, errMessage);
448      }
449      catch (Exception e)
450      {
451        // We do not need to support DS V1 connection, we just accept RS V1
452        // connection:
453        // We just trash the message, log the event for debug purpose and close
454        // the connection
455        throw new DirectoryException(ResultCode.OTHER, null, null);
456      }
457
458      replicationServerDomain.register(this);
459
460      logger.debug(INFO_REPLICATION_SERVER_CONNECTION_FROM_DS, getReplicationServerId(), getServerId(),
461              replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress());
462
463      super.finalizeStart();
464    }
465    catch(DirectoryException de)
466    {
467      abortStart(de.getMessageObject());
468    }
469    catch(Exception e)
470    {
471      abortStart(null);
472    }
473    finally
474    {
475      releaseDomainLock();
476    }
477  }
478
479  /**
480   * Sends a start message to the remote DS.
481   *
482   * @return The StartMsg sent.
483   * @throws IOException
484   *           When an exception occurs.
485   */
486  private StartMsg sendStartToRemote() throws IOException
487  {
488    final StartMsg startMsg;
489
490    // Before V4 protocol, we sent a ReplServerStartMsg
491    if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
492    {
493      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
494      startMsg = createReplServerStartMsg();
495    }
496    else
497    {
498      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
499      startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
500          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
501          replicationServerDomain.getLatestServerState(),
502          localGenerationId, sslEncryption, getLocalGroupId(),
503          replicationServer.getDegradedStatusThreshold(),
504          replicationServer.getWeight(),
505          replicationServerDomain.getConnectedDSs().size());
506    }
507
508    send(startMsg);
509    return startMsg;
510  }
511
512  /**
513   * Creates a DSInfo structure representing this remote DS.
514   * @return The DSInfo structure representing this remote DS
515   */
516  public DSInfo toDSInfo()
517  {
518    return new DSInfo(serverId, serverURL, getReplicationServerId(),
519        generationId, status, assuredFlag, assuredMode, safeDataLevel, groupId,
520        refUrls, eclIncludes, eclIncludesForDeletes, getProtocolVersion());
521  }
522
523  /** {@inheritDoc} */
524  @Override
525  public String toString()
526  {
527    if (serverId != 0)
528    {
529      return "Replica DS(" + serverId + ") for domain \""
530          + replicationServerDomain.getBaseDN() + "\"";
531    }
532    return "Unknown server";
533  }
534
535  /**
536   * Wait receiving the StartSessionMsg from the remote DS and process it, or
537   * receiving a StopMsg to properly stop the handshake procedure.
538   * @return the startSessionMsg received or null DS sent a stop message to
539   *         not finish the handshake.
540   * @throws Exception
541   */
542  private StartSessionMsg waitAndProcessStartSessionFromRemoteDS()
543      throws Exception
544  {
545    ReplicationMsg msg = session.receive();
546
547    if (msg instanceof StopMsg)
548    {
549      // DS wants to stop handshake (was just for handshake phase one for RS
550      // choice). Return null to make the session be terminated.
551      return null;
552    } else if (!(msg instanceof StartSessionMsg))
553    {
554      LocalizableMessage message = LocalizableMessage.raw(
555          "Protocol error: StartSessionMsg required." + msg + " received.");
556      abortStart(message);
557      return null;
558    }
559
560    // Process StartSessionMsg sent by remote DS
561    StartSessionMsg startSessionMsg = (StartSessionMsg) msg;
562
563    this.status = startSessionMsg.getStatus();
564    // Sanity check: is it a valid initial status?
565    if (!isValidInitialStatus(this.status))
566    {
567      throw new DirectoryException(ResultCode.OTHER,
568          ERR_RS_INVALID_INIT_STATUS.get( this.status, getBaseDN(), serverId));
569    }
570
571    this.refUrls = startSessionMsg.getReferralsURLs();
572    this.assuredFlag = startSessionMsg.isAssured();
573    this.assuredMode = startSessionMsg.getAssuredMode();
574    this.safeDataLevel = startSessionMsg.getSafeDataLevel();
575    this.eclIncludes = startSessionMsg.getEclIncludes();
576    this.eclIncludesForDeletes = startSessionMsg.getEclIncludesForDeletes();
577
578    /*
579     * If we have already a generationID set for the domain
580     * then
581     *   if the connecting replica has not the same
582     *   then it is degraded locally and notified by an error message
583     * else
584     *   we set the generationID from the one received
585     *   (unsaved yet on disk . will be set with the 1rst change
586     * received)
587     */
588    generationId = tmpGenerationId;
589    if (localGenerationId > 0)
590    {
591      if (generationId != localGenerationId)
592      {
593        logger.warn(WARN_BAD_GENERATION_ID_FROM_DS, serverId, session.getReadableRemoteAddress(),
594            generationId, getBaseDN(), getReplicationServerId(), localGenerationId);
595      }
596    }
597    else
598    {
599      // We are an empty ReplicationServer
600      if (generationId > 0 && !getServerState().isEmpty())
601      {
602        // If the LDAP server has already sent changes
603        // it is not expected to connect to an empty RS
604        logger.warn(WARN_BAD_GENERATION_ID_FROM_DS, serverId, session.getReadableRemoteAddress(),
605            generationId, getBaseDN(), getReplicationServerId(), localGenerationId);
606      }
607      else
608      {
609        // The local RS is not initialized - take the one received
610        // WARNING: Must be done before computing topo message to send
611        // to peer server as topo message must embed valid generation id
612        // for our server
613        oldGenerationId = replicationServerDomain.changeGenerationId(generationId);
614      }
615    }
616    return startSessionMsg;
617  }
618
619  /**
620   * Process message of a remote server changing his status.
621   * @param csMsg The message containing the new status
622   */
623  public void receiveNewStatus(ChangeStatusMsg csMsg)
624  {
625    replicationServerDomain.processNewStatus(this, csMsg);
626  }
627}