001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2009-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2014 ForgeRock AS
026 */
027package org.opends.server.replication.server;
028
029import java.io.IOException;
030
031import org.opends.server.api.DirectoryThread;
032import org.forgerock.i18n.slf4j.LocalizedLogger;
033import org.opends.server.replication.protocol.MonitorMsg;
034
035/**
036 * This thread regularly publishes monitoring information:
037 * - it sends monitoring messages regarding the direct topology (directly
038 *   connected DSs and RSs) to the connected RSs
039 * - it sends monitoring messages regarding the whole topology (also includes
040 *   the local RS) to the connected DSs
041 * Note: as of today, monitoring messages mainly contains the server state of
042 * the entities.
043 */
044public class MonitoringPublisher extends DirectoryThread
045{
046  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
047
048  /** The replication domain we send monitoring for. */
049  private final ReplicationServerDomain domain;
050
051  /** Sleep time (in ms) before sending new monitoring messages. */
052  private volatile long period;
053
054  private final Object shutdownLock = new Object();
055
056  /**
057   * Create a monitoring publisher.
058   * @param replicationServerDomain The ReplicationServerDomain the monitoring
059   *        publisher is for.
060   * @param period The sleep time to use
061   */
062  public MonitoringPublisher(ReplicationServerDomain replicationServerDomain,
063    long period)
064  {
065    super("Replication server RS("
066        + replicationServerDomain.getLocalRSServerId()
067        + ") monitor publisher for domain \""
068        + replicationServerDomain.getBaseDN() + "\"");
069
070    this.domain = replicationServerDomain;
071    this.period = period;
072  }
073
074  /**
075   * Run method for the monitoring publisher.
076   */
077  @Override
078  public void run()
079  {
080    if (logger.isTraceEnabled())
081    {
082      logger.trace(getMessage("Monitoring publisher starting."));
083    }
084
085    try
086    {
087      while (!isShutdownInitiated())
088      {
089        // Send global topology information to peer DSs
090        final int senderId = domain.getLocalRSServerId();
091        final MonitorMsg monitorMsg =
092            domain.createGlobalTopologyMonitorMsg(senderId, 0);
093
094        for (ServerHandler serverHandler : domain.getConnectedDSs().values())
095        {
096          // send() can be long operation, check for shutdown between each calls
097          if (isShutdownInitiated())
098          {
099            break;
100          }
101          try
102          {
103            serverHandler.send(monitorMsg);
104          }
105          catch (IOException e)
106          {
107            // Server is disconnecting ? Forget it
108          }
109        }
110
111        synchronized (shutdownLock)
112        {
113          // double check to ensure the call to notify() was not missed
114          if (!isShutdownInitiated())
115          {
116            shutdownLock.wait(period);
117          }
118        }
119      }
120    }
121    catch (InterruptedException e)
122    {
123      logger.trace(getMessage(
124          "Monitoring publisher has been interrupted while sleeping."));
125    }
126
127    logger.trace(getMessage("Monitoring publisher is terminated."));
128  }
129
130
131
132  /**
133   * Stops the thread.
134   */
135  public void shutdown()
136  {
137    initiateShutdown();
138    synchronized (shutdownLock)
139    {
140      shutdownLock.notifyAll();
141    }
142    if (logger.isTraceEnabled())
143    {
144      logger.trace(getMessage("Shutting down monitoring publisher."));
145    }
146  }
147
148  /**
149   * Waits for thread death. If not terminated within 2 seconds,
150   * forces interruption
151   */
152  public void waitForShutdown()
153  {
154    try
155    {
156      // Here, "this" is the monitoring publisher thread
157      join(2000);
158    }
159    catch (InterruptedException e)
160    {
161      // exit the loop if this thread is interrupted.
162    }
163  }
164
165  /**
166   * Sets the period value.
167   * @param period The new period value.
168   */
169  public void setPeriod(long period)
170  {
171    if (logger.isTraceEnabled())
172    {
173      logger.trace(getMessage(
174          "Monitoring publisher changing period value to " + period));
175    }
176
177    this.period = period;
178  }
179
180  private String getMessage(String message)
181  {
182    return "In RS " + domain.getLocalRSServerId() + ", for base dn "
183        + domain.getBaseDN() + ": " + message;
184  }
185}