001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server;
028
029import static org.opends.messages.ReplicationMessages.*;
030
031import java.io.IOException;
032import java.util.List;
033import java.util.Random;
034import java.util.concurrent.Semaphore;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import org.forgerock.i18n.LocalizableMessage;
039import org.forgerock.i18n.slf4j.LocalizedLogger;
040import org.forgerock.opendj.config.server.ConfigException;
041import org.forgerock.opendj.ldap.ResultCode;
042import org.opends.server.admin.std.server.MonitorProviderCfg;
043import org.opends.server.core.DirectoryServer;
044import org.opends.server.replication.common.AssuredMode;
045import org.opends.server.replication.common.CSN;
046import org.opends.server.replication.common.RSInfo;
047import org.opends.server.replication.common.ServerStatus;
048import org.opends.server.replication.protocol.*;
049import org.opends.server.replication.server.changelog.api.ChangelogException;
050import org.opends.server.types.Attribute;
051import org.opends.server.types.Attributes;
052import org.opends.server.types.DirectoryException;
053import org.opends.server.types.InitializationException;
054
055/**
056 * This class defines a server handler  :
057 * - that is a MessageHandler (see this class for more details)
058 * - that handles all interaction with a peer server (RS or DS).
059 */
060public abstract class ServerHandler extends MessageHandler
061{
062
063  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
064
065  /**
066   * Time during which the server will wait for existing thread to stop
067   * during the shutdownWriter.
068   */
069  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
070
071  /**
072   * The serverId of the remote server.
073   */
074  protected int serverId;
075  /**
076   * The session opened with the remote server.
077   */
078  protected final Session session;
079
080  /**
081   * The serverURL of the remote server.
082   */
083  protected String serverURL;
084  /**
085   * Number of updates received from the server in assured safe read mode.
086   */
087  private int assuredSrReceivedUpdates;
088  /**
089   * Number of updates received from the server in assured safe read mode that
090   * timed out.
091   */
092  private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
093  /**
094   * Number of updates sent to the server in assured safe read mode.
095   */
096  private int assuredSrSentUpdates;
097  /**
098   * Number of updates sent to the server in assured safe read mode that timed
099   * out.
100   */
101  private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
102  /**
103   * Number of updates received from the server in assured safe data mode.
104   */
105  private int assuredSdReceivedUpdates;
106  /**
107   * Number of updates received from the server in assured safe data mode that
108   * timed out.
109   */
110  private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
111  /**
112   * Number of updates sent to the server in assured safe data mode.
113   */
114  private int assuredSdSentUpdates;
115
116  /**
117   * Number of updates sent to the server in assured safe data mode that timed out.
118   */
119  private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
120
121  /**
122   * The associated ServerWriter that sends messages to the remote server.
123   */
124  private ServerWriter writer;
125
126  /**
127   * The associated ServerReader that receives messages from the remote server.
128   */
129  private ServerReader reader;
130
131  /** Window. */
132  private int rcvWindow;
133  private final int rcvWindowSizeHalf;
134
135  /** The size of the receiving window. */
136  protected final int maxRcvWindow;
137  /** Semaphore that the writer uses to control the flow to the remote server. */
138  private Semaphore sendWindow;
139  /** The initial size of the sending window. */
140  private int sendWindowSize;
141  /** Remote generation id. */
142  protected long generationId = -1;
143  /** The generation id of the hosting RS. */
144  protected long localGenerationId = -1;
145  /** The generation id before processing a new start handshake. */
146  protected long oldGenerationId = -1;
147  /** Group id of this remote server. */
148  protected byte groupId = -1;
149  /** The SSL encryption after the negotiation with the peer. */
150  protected boolean sslEncryption;
151  /**
152   * The time in milliseconds between heartbeats from the replication
153   * server.  Zero means heartbeats are off.
154   */
155  protected long heartbeatInterval;
156
157  /** The thread that will send heartbeats. */
158  private HeartbeatThread heartbeatThread;
159
160  /** Set when ServerWriter is stopping. */
161  private volatile boolean shutdownWriter;
162
163  /** Weight of this remote server. */
164  protected int weight = 1;
165
166  /**
167   * Creates a new server handler instance with the provided socket.
168   *
169   * @param session The Session used by the ServerHandler to
170   *                 communicate with the remote entity.
171   * @param queueSize The maximum number of update that will be kept
172   *                  in memory by this ServerHandler.
173   * @param replicationServer The hosting replication server.
174   * @param rcvWindowSize The window size to receive from the remote server.
175   */
176  public ServerHandler(
177      Session session,
178      int queueSize,
179      ReplicationServer replicationServer,
180      int rcvWindowSize)
181  {
182    super(queueSize, replicationServer);
183    this.session = session;
184    this.rcvWindowSizeHalf = rcvWindowSize / 2;
185    this.maxRcvWindow = rcvWindowSize;
186    this.rcvWindow = rcvWindowSize;
187  }
188
189  /**
190   * Abort a start procedure currently establishing.
191   * @param reason The provided reason.
192   */
193  protected void abortStart(LocalizableMessage reason)
194  {
195    // We did not recognize the message, close session as what can happen after
196    // is undetermined and we do not want the server to be disturbed
197    Session localSession = session;
198    if (localSession != null)
199    {
200      if (reason != null)
201      {
202        if (logger.isTraceEnabled())
203        {
204         logger.trace("In " + this + " closing session with err=" + reason);
205        }
206        logger.error(reason);
207      }
208
209      // This method is only called when aborting a failing handshake and
210      // not StopMsg should be sent in such situation. StopMsg are only
211      // expected when full handshake has been performed, or at end of
212      // handshake phase 1, when DS was just gathering available RS info
213      localSession.close();
214    }
215
216    releaseDomainLock();
217
218    // If generation id of domain was changed, set it back to old value
219    // We may have changed it as it was -1 and we received a value >0 from peer
220    // server and the last topo message sent may have failed being sent: in that
221    // case retrieve old value of generation id for replication server domain
222    if (oldGenerationId != -100)
223    {
224      replicationServerDomain.changeGenerationId(oldGenerationId);
225    }
226  }
227
228  /**
229   * Releases the lock on the replication server domain if it was held.
230   */
231  protected void releaseDomainLock()
232  {
233    if (replicationServerDomain.hasLock())
234    {
235      replicationServerDomain.release();
236    }
237  }
238
239  /**
240   * Check the protocol window and send WindowMsg if necessary.
241   *
242   * @throws IOException when the session becomes unavailable.
243   */
244  public synchronized void checkWindow() throws IOException
245  {
246    if (rcvWindow < rcvWindowSizeHalf)
247    {
248      WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
249      session.publish(msg);
250      rcvWindow += rcvWindowSizeHalf;
251    }
252  }
253
254  /**
255   * Decrement the protocol window, then check if it is necessary
256   * to send a WindowMsg and send it.
257   *
258   * @throws IOException when the session becomes unavailable.
259   */
260  private synchronized void decAndCheckWindow() throws IOException
261  {
262    rcvWindow--;
263    checkWindow();
264  }
265
266  /**
267   * Finalize the initialization, create reader, writer, heartbeat system
268   * and monitoring system.
269   * @throws DirectoryException When an exception is raised.
270   */
271  protected void finalizeStart() throws DirectoryException
272  {
273    // FIXME:ECL We should refactor so that a SH always have a session
274    if (session != null)
275    {
276      try
277      {
278        // Disable timeout for next communications
279        session.setSoTimeout(0);
280      }
281      catch(Exception e)
282      { /* do nothing */
283      }
284
285      // sendWindow MUST be created before starting the writer
286      sendWindow = new Semaphore(sendWindowSize);
287
288      writer = new ServerWriter(session, this, replicationServerDomain,
289          replicationServer.getDSRSShutdownSync());
290      reader = new ServerReader(session, this);
291
292      session.setName("Replication server RS(" + getReplicationServerId()
293          + ") session thread to " + this + " at "
294          + session.getReadableRemoteAddress());
295      session.start();
296      try
297      {
298        session.waitForStartup();
299      }
300      catch (InterruptedException e)
301      {
302        final LocalizableMessage message =
303            ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
304        throw new DirectoryException(ResultCode.OTHER, message, e);
305      }
306      reader.start();
307      writer.start();
308
309      // Create a thread to send heartbeat messages.
310      if (heartbeatInterval > 0)
311      {
312        String threadName = "Replication server RS(" + getReplicationServerId()
313            + ") heartbeat publisher to " + this + " at "
314            + session.getReadableRemoteAddress();
315        heartbeatThread = new HeartbeatThread(threadName, session,
316            heartbeatInterval / 3);
317        heartbeatThread.start();
318      }
319    }
320
321    DirectoryServer.deregisterMonitorProvider(this);
322    DirectoryServer.registerMonitorProvider(this);
323  }
324
325  /**
326   * Sends a message.
327   *
328   * @param msg
329   *          The message to be sent.
330   * @throws IOException
331   *           When it occurs while sending the message,
332   */
333  public void send(ReplicationMsg msg) throws IOException
334  {
335    // avoid logging anything for unit tests that include a null domain.
336    if (logger.isTraceEnabled())
337    {
338      logger.trace("In "
339          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
340          + this + " publishes message:\n" + msg);
341    }
342    session.publish(msg);
343  }
344
345  /**
346   * Get the age of the older change that has not yet been replicated
347   * to the server handled by this ServerHandler.
348   * @return The age if the older change has not yet been replicated
349   *         to the server handled by this ServerHandler.
350   */
351  public long getApproxFirstMissingDate()
352  {
353    // Get the older CSN received
354    CSN olderUpdateCSN = getOlderUpdateCSN();
355    if (olderUpdateCSN != null)
356    {
357      // If not present in the local RS db,
358      // then approximate with the older update time
359      return olderUpdateCSN.getTime();
360    }
361    return 0;
362  }
363
364  /**
365   * Get the number of updates received from the server in assured safe data
366   * mode.
367   * @return The number of updates received from the server in assured safe data
368   * mode
369   */
370  public int getAssuredSdReceivedUpdates()
371  {
372    return assuredSdReceivedUpdates;
373  }
374
375  /**
376   * Get the number of updates received from the server in assured safe data
377   * mode that timed out.
378   * @return The number of updates received from the server in assured safe data
379   * mode that timed out.
380   */
381  public AtomicInteger getAssuredSdReceivedUpdatesTimeout()
382  {
383    return assuredSdReceivedUpdatesTimeout;
384  }
385
386  /**
387   * Get the number of updates sent to the server in assured safe data mode.
388   * @return The number of updates sent to the server in assured safe data mode
389   */
390  public int getAssuredSdSentUpdates()
391  {
392    return assuredSdSentUpdates;
393  }
394
395  /**
396   * Get the number of updates sent to the server in assured safe data mode that
397   * timed out.
398   * @return The number of updates sent to the server in assured safe data mode
399   * that timed out.
400   */
401  public AtomicInteger getAssuredSdSentUpdatesTimeout()
402  {
403    return assuredSdSentUpdatesTimeout;
404  }
405
406  /**
407   * Get the number of updates received from the server in assured safe read
408   * mode.
409   * @return The number of updates received from the server in assured safe read
410   * mode
411   */
412  public int getAssuredSrReceivedUpdates()
413  {
414    return assuredSrReceivedUpdates;
415  }
416
417  /**
418   * Get the number of updates received from the server in assured safe read
419   * mode that timed out.
420   * @return The number of updates received from the server in assured safe read
421   * mode that timed out.
422   */
423  public AtomicInteger getAssuredSrReceivedUpdatesTimeout()
424  {
425    return assuredSrReceivedUpdatesTimeout;
426  }
427
428  /**
429   * Get the number of updates sent to the server in assured safe read mode.
430   * @return The number of updates sent to the server in assured safe read mode
431   */
432  public int getAssuredSrSentUpdates()
433  {
434    return assuredSrSentUpdates;
435  }
436
437  /**
438   * Get the number of updates sent to the server in assured safe read mode that
439   * timed out.
440   * @return The number of updates sent to the server in assured safe read mode
441   * that timed out.
442   */
443  public AtomicInteger getAssuredSrSentUpdatesTimeout()
444  {
445    return assuredSrSentUpdatesTimeout;
446  }
447
448  /**
449   * Returns the Replication Server Domain to which belongs this server handler.
450   *
451   * @return The replication server domain.
452   */
453  public ReplicationServerDomain getDomain()
454  {
455    return replicationServerDomain;
456  }
457
458  /**
459   * Returns the value of generationId for that handler.
460   * @return The value of the generationId.
461   */
462  public long getGenerationId()
463  {
464    return generationId;
465  }
466
467  /**
468   * Gets the group id of the server represented by this object.
469   * @return The group id of the server represented by this object.
470   */
471  public byte getGroupId()
472  {
473    return groupId;
474  }
475
476  /**
477   * Get our heartbeat interval.
478   * @return Our heartbeat interval.
479   */
480  public long getHeartbeatInterval()
481  {
482    return heartbeatInterval;
483  }
484
485  /** {@inheritDoc} */
486  @Override
487  public List<Attribute> getMonitorData()
488  {
489    // Get the generic ones
490    List<Attribute> attributes = super.getMonitorData();
491
492    attributes.add(Attributes.create("server-id", String.valueOf(serverId)));
493    attributes.add(Attributes.create("domain-name", String.valueOf(getBaseDN())));
494
495    // Deprecated
496    attributes.add(Attributes.create("max-waiting-changes", String
497        .valueOf(maxQueueSize)));
498    attributes.add(Attributes.create("sent-updates", String
499        .valueOf(getOutCount())));
500    attributes.add(Attributes.create("received-updates", String
501        .valueOf(getInCount())));
502
503    // Assured counters
504    attributes.add(Attributes.create("assured-sr-received-updates", String
505        .valueOf(getAssuredSrReceivedUpdates())));
506    attributes.add(Attributes.create("assured-sr-received-updates-timeout",
507        String .valueOf(getAssuredSrReceivedUpdatesTimeout())));
508    attributes.add(Attributes.create("assured-sr-sent-updates", String
509        .valueOf(getAssuredSrSentUpdates())));
510    attributes.add(Attributes.create("assured-sr-sent-updates-timeout", String
511        .valueOf(getAssuredSrSentUpdatesTimeout())));
512    attributes.add(Attributes.create("assured-sd-received-updates", String
513        .valueOf(getAssuredSdReceivedUpdates())));
514    if (!isDataServer())
515    {
516      attributes.add(Attributes.create("assured-sd-sent-updates",
517          String.valueOf(getAssuredSdSentUpdates())));
518      attributes.add(Attributes.create("assured-sd-sent-updates-timeout",
519          String.valueOf(getAssuredSdSentUpdatesTimeout())));
520    } else
521    {
522      attributes.add(Attributes.create("assured-sd-received-updates-timeout",
523          String.valueOf(getAssuredSdReceivedUpdatesTimeout())));
524    }
525
526    // Window stats
527    attributes.add(Attributes.create("max-send-window", String.valueOf(sendWindowSize)));
528    attributes.add(Attributes.create("current-send-window", String.valueOf(sendWindow.availablePermits())));
529    attributes.add(Attributes.create("max-rcv-window", String.valueOf(maxRcvWindow)));
530    attributes.add(Attributes.create("current-rcv-window", String.valueOf(rcvWindow)));
531
532    // Encryption
533    attributes.add(Attributes.create("ssl-encryption", String.valueOf(session.isEncrypted())));
534
535    // Data generation
536    attributes.add(Attributes.create("generation-id", String.valueOf(generationId)));
537
538    return attributes;
539  }
540
541  /**
542   * Retrieves the name of this monitor provider.  It should be unique among all
543   * monitor providers, including all instances of the same monitor provider.
544   *
545   * @return  The name of this monitor provider.
546   */
547  @Override
548  public abstract String getMonitorInstanceName();
549
550  /**
551   * Gets the protocol version used with this remote server.
552   * @return The protocol version used with this remote server.
553   */
554  public short getProtocolVersion()
555  {
556    return session.getProtocolVersion();
557  }
558
559  /**
560   * Get the Server Id.
561   *
562   * @return the ID of the server to which this object is linked
563   */
564  public int getServerId()
565  {
566    return serverId;
567  }
568
569  /**
570   * Retrieves the URL for this server handler.
571   *
572   * @return  The URL for this server handler, in the form of an address and
573   *          port separated by a colon.
574   */
575  public String getServerURL()
576  {
577    return serverURL;
578  }
579
580  /**
581   * Return the ServerStatus.
582   * @return The server status.
583   */
584  protected abstract ServerStatus getStatus();
585
586  /**
587   * Increment the number of updates received from the server in assured safe
588   * data mode.
589   */
590  public void incrementAssuredSdReceivedUpdates()
591  {
592    assuredSdReceivedUpdates++;
593  }
594
595  /**
596   * Increment the number of updates received from the server in assured safe
597   * data mode that timed out.
598   */
599  public void incrementAssuredSdReceivedUpdatesTimeout()
600  {
601    assuredSdReceivedUpdatesTimeout.incrementAndGet();
602  }
603
604  /**
605   * Increment the number of updates sent to the server in assured safe data
606   * mode that timed out.
607   */
608  public void incrementAssuredSdSentUpdatesTimeout()
609  {
610    assuredSdSentUpdatesTimeout.incrementAndGet();
611  }
612
613  /**
614   * Increment the number of updates received from the server in assured safe
615   * read mode.
616   */
617  public void incrementAssuredSrReceivedUpdates()
618  {
619    assuredSrReceivedUpdates++;
620  }
621
622  /**
623   * Increment the number of updates received from the server in assured safe
624   * read mode that timed out.
625   */
626  public void incrementAssuredSrReceivedUpdatesTimeout()
627  {
628    assuredSrReceivedUpdatesTimeout.incrementAndGet();
629  }
630
631  /**
632   * Increment the number of updates sent to the server in assured safe read
633   * mode that timed out.
634   */
635  public void incrementAssuredSrSentUpdatesTimeout()
636  {
637    assuredSrSentUpdatesTimeout.incrementAndGet();
638  }
639
640  /** {@inheritDoc} */
641  @Override
642  public void initializeMonitorProvider(MonitorProviderCfg configuration)
643  throws ConfigException, InitializationException
644  {
645    // Nothing to do for now
646  }
647
648  /**
649   * Check if the server associated to this ServerHandler is a data server
650   * in the topology.
651   * @return true if the server is a data server.
652   */
653  public abstract boolean isDataServer();
654
655  /**
656   * Check if the server associated to this ServerHandler is a replication
657   * server.
658   * @return true if the server is a replication server.
659   */
660  public boolean isReplicationServer()
661  {
662    return !isDataServer();
663  }
664
665  // The handshake phase must be done by blocking any access to structures
666  // keeping info on connected servers, so that one can safely check for
667  // pre-existence of a server, send a coherent snapshot of known topology to
668  // peers, update the local view of the topology...
669  //
670  // For instance a kind of problem could be that while we connect with a
671  // peer RS, a DS is connecting at the same time and we could publish the
672  // connected DSs to the peer RS forgetting this last DS in the TopologyMsg.
673  //
674  // This method and every others that need to read/make changes to the
675  // structures holding topology for the domain should:
676  // - call ReplicationServerDomain.lock()
677  // - read/modify structures
678  // - call ReplicationServerDomain.release()
679  //
680  // More information is provided in comment of ReplicationServerDomain.lock()
681
682  /**
683   * Lock the domain without a timeout.
684   * <p>
685   * If domain already exists, lock it until handshake is finished otherwise it
686   * will be created and locked later in the method
687   *
688   * @throws DirectoryException
689   *           When an exception occurs.
690   * @throws InterruptedException
691   *           If the current thread was interrupted while waiting for the lock.
692   */
693  public void lockDomainNoTimeout() throws DirectoryException,
694      InterruptedException
695  {
696    if (!replicationServerDomain.hasLock())
697    {
698      replicationServerDomain.lock();
699    }
700  }
701
702  /**
703   * Lock the domain with a timeout.
704   * <p>
705   * Take the lock on the domain. WARNING: Here we try to acquire the lock with
706   * a timeout. This is for preventing a deadlock that may happen if there are
707   * cross connection attempts (for same domain) from this replication server
708   * and from a peer one.
709   * <p>
710   * Here is the scenario:
711   * <ol>
712   * <li>RS1 connect thread takes the domain lock and starts connection to RS2
713   * </li>
714   * <li>at the same time RS2 connect thread takes his domain lock and start
715   * connection to RS2</li>
716   * <li>RS2 listen thread starts processing received ReplServerStartMsg from
717   * RS1 and wants to acquire the lock on the domain (here) but cannot as RS2
718   * connect thread already has it</li>
719   * <li>RS1 listen thread starts processing received ReplServerStartMsg from
720   * RS2 and wants to acquire the lock on the domain (here) but cannot as RS1
721   * connect thread already has it</li>
722   * </ol>
723   * => Deadlock: 4 threads are locked.
724   * <p>
725   * To prevent threads locking in such situation, the listen threads here will
726   * both timeout trying to acquire the lock. The random time for the timeout
727   * should allow on connection attempt to be aborted whereas the other one
728   * should have time to finish in the same time.
729   * <p>
730   * Warning: the minimum time (3s) should be big enough to allow normal
731   * situation connections to terminate. The added random time should represent
732   * a big enough range so that the chance to have one listen thread timing out
733   * a lot before the peer one is great. When the first listen thread times out,
734   * the remote connect thread should release the lock and allow the peer listen
735   * thread to take the lock it was waiting for and process the connection
736   * attempt.
737   *
738   * @throws DirectoryException
739   *           When an exception occurs.
740   * @throws InterruptedException
741   *           If the current thread was interrupted while waiting for the lock.
742   */
743  public void lockDomainWithTimeout() throws DirectoryException,
744      InterruptedException
745  {
746    final Random random = new Random();
747    final int randomTime = random.nextInt(6); // Random from 0 to 5
748    // Wait at least 3 seconds + (0 to 5 seconds)
749    final long timeout = 3000 + randomTime * 1000;
750    final boolean lockAcquired = replicationServerDomain.tryLock(timeout);
751    if (!lockAcquired)
752    {
753      LocalizableMessage message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
754          getBaseDN(), serverId, session.getReadableRemoteAddress(), getReplicationServerId());
755      throw new DirectoryException(ResultCode.OTHER, message);
756    }
757  }
758
759  /**
760   * Processes a routable message.
761   *
762   * @param msg The message to be processed.
763   */
764  void process(RoutableMsg msg)
765  {
766    if (logger.isTraceEnabled())
767    {
768      logger.trace("In "
769          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
770          + this + " processes routable msg received:" + msg);
771    }
772    replicationServerDomain.process(msg, this);
773  }
774
775  /**
776   * Responds to a monitor request message.
777   *
778   * @param msg
779   *          The monitor request message.
780   */
781  void processMonitorRequestMsg(MonitorRequestMsg msg)
782  {
783    replicationServerDomain.processMonitorRequestMsg(msg, this);
784  }
785
786  /**
787   * Responds to a monitor message.
788   *
789   * @param msg
790   *          The monitor message.
791   */
792  void processMonitorMsg(MonitorMsg msg)
793  {
794    replicationServerDomain.processMonitorMsg(msg, this);
795  }
796
797  /**
798   * Processes a change time heartbeat msg.
799   *
800   * @param msg
801   *          The message to be processed.
802   * @throws DirectoryException
803   *           When an exception is raised.
804   */
805  void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException
806  {
807    if (logger.isTraceEnabled())
808    {
809      logger.trace("In "
810          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
811          + this + " processes received msg:\n" + msg);
812    }
813    replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
814  }
815
816  /**
817   * Process the reception of a WindowProbeMsg message.
818   *
819   * @throws IOException
820   *           When the session becomes unavailable.
821   */
822  public void replyToWindowProbe() throws IOException
823  {
824    if (rcvWindow > 0)
825    {
826      // The LDAP server believes that its window is closed while it is not,
827      // this means that some problem happened in the window exchange procedure!
828      // lets update the LDAP server with out current window size and hope
829      // that everything will work better in the future.
830      // TODO also log an error message.
831      session.publish(new WindowMsg(rcvWindow));
832    }
833    else
834    {
835      // Both the LDAP server and the replication server believes that the
836      // window is closed. Lets check the flowcontrol in case we
837      // can now resume operations and send a windowMessage if necessary.
838      checkWindow();
839    }
840  }
841
842  /**
843   * Sends the provided TopologyMsg to the peer server.
844   *
845   * @param topoMsg
846   *          The TopologyMsg message to be sent.
847   * @throws IOException
848   *           When it occurs while sending the message,
849   */
850  public void sendTopoInfo(TopologyMsg topoMsg) throws IOException
851  {
852    // V1 Rs do not support the TopologyMsg
853    if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
854    {
855      send(topoMsg);
856    }
857  }
858
859  /**
860   * Set a new generation ID.
861   *
862   * @param generationId The new generation ID
863   *
864   */
865  public void setGenerationId(long generationId)
866  {
867    this.generationId = generationId;
868  }
869
870  /**
871   * Sets the window size when used when sending to the remote.
872   * @param size The provided window size.
873   */
874  protected void setSendWindowSize(int size)
875  {
876    this.sendWindowSize = size;
877  }
878
879  /**
880   * Shutdown This ServerHandler.
881   */
882  @Override
883  public void shutdown()
884  {
885    shutdownWriter = true;
886    setConsumerActive(false);
887    super.shutdown();
888
889    if (session != null)
890    {
891      session.close();
892    }
893    if (heartbeatThread != null)
894    {
895      heartbeatThread.shutdown();
896    }
897
898    DirectoryServer.deregisterMonitorProvider(this);
899
900    /*
901     * Be sure to wait for ServerWriter and ServerReader death
902     * It does not matter if we try to stop a thread which is us (reader
903     * or writer), but we must not wait for our own thread death.
904     */
905    try
906    {
907      if (writer != null && !Thread.currentThread().equals(writer))
908      {
909        writer.join(SHUTDOWN_JOIN_TIMEOUT);
910      }
911      if (reader != null && !Thread.currentThread().equals(reader))
912      {
913        reader.join(SHUTDOWN_JOIN_TIMEOUT);
914      }
915    } catch (InterruptedException e)
916    {
917      // don't try anymore to join and return.
918    }
919    if (logger.isTraceEnabled())
920    {
921      logger.trace("SH.shutdowned(" + this + ")");
922    }
923  }
924
925  /**
926   * Select the next update that must be sent to the server managed by this
927   * ServerHandler.
928   *
929   * @return the next update that must be sent to the server managed by this
930   *         ServerHandler.
931   * @throws ChangelogException
932   *            If a problem occurs when reading the changelog
933   */
934  public UpdateMsg take() throws ChangelogException
935  {
936    final UpdateMsg msg = getNextMessage();
937
938    acquirePermitInSendWindow();
939
940    if (msg != null)
941    {
942      incrementOutCount();
943      if (msg.isAssured())
944      {
945        incrementAssuredStats(msg);
946      }
947      return msg;
948    }
949    return null;
950  }
951
952  private void acquirePermitInSendWindow()
953  {
954    boolean acquired = false;
955    boolean interrupted = true;
956    do
957    {
958      try
959      {
960        acquired = sendWindow.tryAcquire(500, TimeUnit.MILLISECONDS);
961        interrupted = false;
962      } catch (InterruptedException e)
963      {
964        // loop until not interrupted
965      }
966    } while ((interrupted || !acquired) && !shutdownWriter);
967  }
968
969  private void incrementAssuredStats(final UpdateMsg msg)
970  {
971    if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
972    {
973      assuredSrSentUpdates++;
974    }
975    else if (!isDataServer())
976    {
977      assuredSdSentUpdates++;
978    }
979  }
980
981  /**
982   * Creates a RSInfo structure representing this remote RS.
983   * @return The RSInfo structure representing this remote RS
984   */
985  public RSInfo toRSInfo()
986  {
987    return new RSInfo(serverId, serverURL, generationId, groupId, weight);
988  }
989
990  /**
991   * Update the send window size based on the credit specified in the
992   * given window message.
993   *
994   * @param windowMsg The Window LocalizableMessage containing the information
995   *                  necessary for updating the window size.
996   */
997  public void updateWindow(WindowMsg windowMsg)
998  {
999    sendWindow.release(windowMsg.getNumAck());
1000  }
1001
1002  /**
1003   * Log the messages involved in the start handshake.
1004   * @param inStartMsg The message received first.
1005   * @param outStartMsg The message sent in response.
1006   */
1007  protected void logStartHandshakeRCVandSND(
1008      StartMsg inStartMsg,
1009      StartMsg outStartMsg)
1010  {
1011    if (logger.isTraceEnabled())
1012    {
1013      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1014          + ", " + getClass().getSimpleName() + " " + this + ":"
1015          + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg
1016          + "\nAND REPLIED:\n" + outStartMsg);
1017    }
1018  }
1019
1020  /**
1021   * Log the messages involved in the start handshake.
1022   * @param outStartMsg The message sent first.
1023   * @param inStartMsg The message received in response.
1024   */
1025  protected void logStartHandshakeSNDandRCV(
1026      StartMsg outStartMsg,
1027      StartMsg inStartMsg)
1028  {
1029    if (logger.isTraceEnabled())
1030    {
1031      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1032          + ", " + getClass().getSimpleName() + " " + this + ":"
1033          + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n"
1034          + inStartMsg);
1035    }
1036  }
1037
1038  /**
1039   * Log the messages involved in the Topology handshake.
1040   * @param inTopoMsg The message received first.
1041   * @param outTopoMsg The message sent in response.
1042   */
1043  protected void logTopoHandshakeRCVandSND(
1044      TopologyMsg inTopoMsg,
1045      TopologyMsg outTopoMsg)
1046  {
1047    if (logger.isTraceEnabled())
1048    {
1049      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1050          + ", " + getClass().getSimpleName() + " " + this + ":"
1051          + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n"
1052          + outTopoMsg);
1053    }
1054  }
1055
1056  /**
1057   * Log the messages involved in the Topology handshake.
1058   * @param outTopoMsg The message sent first.
1059   * @param inTopoMsg The message received in response.
1060   */
1061  protected void logTopoHandshakeSNDandRCV(
1062      TopologyMsg outTopoMsg,
1063      TopologyMsg inTopoMsg)
1064  {
1065    if (logger.isTraceEnabled())
1066    {
1067      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1068          + ", " + getClass().getSimpleName() + " " + this + ":"
1069          + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n"
1070          + inTopoMsg);
1071    }
1072  }
1073
1074  /**
1075   * Log the messages involved in the Topology/StartSession handshake.
1076   * @param inStartSessionMsg The message received first.
1077   * @param outTopoMsg The message sent in response.
1078   */
1079  protected void logStartSessionHandshake(
1080      StartSessionMsg inStartSessionMsg,
1081      TopologyMsg outTopoMsg)
1082  {
1083    if (logger.isTraceEnabled())
1084    {
1085      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1086          + ", " + getClass().getSimpleName() + " " + this + " :"
1087          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg
1088          + "\nAND REPLIED:\n" + outTopoMsg);
1089    }
1090  }
1091
1092  /**
1093   * Log stop message has been received.
1094   */
1095  protected void logStopReceived()
1096  {
1097    if (logger.isTraceEnabled())
1098    {
1099      logger.trace("In " + this.replicationServer.getMonitorInstanceName()
1100          + ", " + getClass().getSimpleName() + " " + this + " :"
1101          + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
1102    }
1103  }
1104
1105  /**
1106   * Process a Ack message received.
1107   * @param ack the message received.
1108   */
1109  void processAck(AckMsg ack)
1110  {
1111    replicationServerDomain.processAck(ack, this);
1112  }
1113
1114  /**
1115   * Get the reference generation id (associated with the changes in the db).
1116   * @return the reference generation id.
1117   */
1118  public long getReferenceGenId()
1119  {
1120    return replicationServerDomain.getGenerationId();
1121  }
1122
1123  /**
1124   * Process a ResetGenerationIdMsg message received.
1125   * @param msg the message received.
1126   */
1127  void processResetGenId(ResetGenerationIdMsg msg)
1128  {
1129    replicationServerDomain.resetGenerationId(this, msg);
1130  }
1131
1132  /**
1133   * Put a new update message received.
1134   * @param update the update message received.
1135   * @throws IOException when it occurs.
1136   */
1137  public void put(UpdateMsg update) throws IOException
1138  {
1139    decAndCheckWindow();
1140    replicationServerDomain.put(update, this);
1141  }
1142
1143  /**
1144   * Stop this handler.
1145   */
1146  public void doStop()
1147  {
1148    replicationServerDomain.stopServer(this, false);
1149  }
1150
1151  /**
1152   * Creates a ReplServerStartMsg for the current ServerHandler.
1153   *
1154   * @return a new ReplServerStartMsg for the current ServerHandler.
1155   */
1156  protected ReplServerStartMsg createReplServerStartMsg()
1157  {
1158    return new ReplServerStartMsg(getReplicationServerId(),
1159        getReplicationServerURL(), getBaseDN(), maxRcvWindow,
1160        replicationServerDomain.getLatestServerState(), localGenerationId,
1161        sslEncryption, getLocalGroupId(),
1162        replicationServer.getDegradedStatusThreshold());
1163  }
1164
1165  /**
1166   * Returns a "badly disconnected" error message for this server handler.
1167   *
1168   * @return a "badly disconnected" error message for this server handler
1169   */
1170  public LocalizableMessage getBadlyDisconnectedErrorMessage()
1171  {
1172    if (isDataServer())
1173    {
1174      return ERR_DS_BADLY_DISCONNECTED.get(getReplicationServerId(),
1175          getServerId(), session.getReadableRemoteAddress(), getBaseDN());
1176    }
1177    return ERR_RS_BADLY_DISCONNECTED.get(getReplicationServerId(),
1178        getServerId(), session.getReadableRemoteAddress(), getBaseDN());
1179  }
1180}