001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server;
028
029import static org.opends.messages.ConfigMessages.*;
030import static org.opends.messages.ReplicationMessages.*;
031import static org.opends.server.util.StaticUtils.*;
032
033import java.io.IOException;
034import java.net.*;
035import java.util.*;
036import java.util.concurrent.CopyOnWriteArraySet;
037import java.util.concurrent.atomic.AtomicBoolean;
038
039import org.forgerock.i18n.LocalizableMessage;
040import org.forgerock.i18n.slf4j.LocalizedLogger;
041import org.forgerock.opendj.config.server.ConfigChangeResult;
042import org.forgerock.opendj.config.server.ConfigException;
043import org.forgerock.opendj.ldap.ResultCode;
044import org.forgerock.opendj.ldap.SearchScope;
045import org.opends.server.admin.server.ConfigurationChangeListener;
046import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
047import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.ConflictBehavior;
048import org.opends.server.admin.std.server.ReplicationServerCfg;
049import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
050import org.opends.server.api.VirtualAttributeProvider;
051import org.opends.server.backends.ChangelogBackend;
052import org.opends.server.core.DirectoryServer;
053import org.opends.server.replication.common.CSN;
054import org.opends.server.replication.common.MultiDomainServerState;
055import org.opends.server.replication.common.ServerState;
056import org.opends.server.replication.plugin.MultimasterReplication;
057import org.opends.server.replication.protocol.ReplServerStartMsg;
058import org.opends.server.replication.protocol.ReplSessionSecurity;
059import org.opends.server.replication.protocol.ReplicationMsg;
060import org.opends.server.replication.protocol.ServerStartMsg;
061import org.opends.server.replication.protocol.Session;
062import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
063import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
064import org.opends.server.replication.server.changelog.api.ChangelogDB;
065import org.opends.server.replication.server.changelog.api.ChangelogException;
066import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
067import org.opends.server.replication.server.changelog.file.FileChangelogDB;
068import org.opends.server.replication.server.changelog.je.JEChangelogDB;
069import org.opends.server.replication.service.DSRSShutdownSync;
070import org.opends.server.types.AttributeType;
071import org.opends.server.types.DN;
072import org.opends.server.types.DirectoryException;
073import org.opends.server.types.HostPort;
074import org.opends.server.types.SearchFilter;
075import org.opends.server.types.VirtualAttributeRule;
076
077/**
078 * ReplicationServer Listener. This singleton is the main object of the
079 * replication server. It waits for the incoming connections and create listener
080 * and publisher objects for connection with LDAP servers and with replication
081 * servers It is responsible for creating the replication server
082 * replicationServerDomain and managing it
083 */
084public final class ReplicationServer
085  implements ConfigurationChangeListener<ReplicationServerCfg>
086{
087  private String serverURL;
088
089  private ServerSocket listenSocket;
090  private Thread listenThread;
091  private Thread connectThread;
092
093  /** The current configuration of this replication server. */
094  private ReplicationServerCfg config;
095  private final DSRSShutdownSync dsrsShutdownSync;
096
097  /**
098   * This table is used to store the list of dn for which we are currently
099   * handling servers.
100   */
101  private final Map<DN, ReplicationServerDomain> baseDNs = new HashMap<>();
102
103  /** The database storing the changes. */
104  private final ChangelogDB changelogDB;
105
106  /** The backend that allow to search the changes (external changelog). */
107  private ChangelogBackend changelogBackend;
108
109  private final AtomicBoolean shutdown = new AtomicBoolean();
110  private boolean stopListen;
111  private final ReplSessionSecurity replSessionSecurity;
112
113  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
114
115  /** To know whether a domain is enabled for the external changelog. */
116  private final ECLEnabledDomainPredicate domainPredicate;
117
118  /**
119   * This is required for unit testing, so that we can keep track of all the
120   * replication servers which are running in the VM.
121   */
122  private static final Set<Integer> localPorts = new CopyOnWriteArraySet<>();
123
124  /** Monitors for synchronizing domain creation with the connect thread. */
125  private final Object domainTicketLock = new Object();
126  private final Object connectThreadLock = new Object();
127  private long domainTicket;
128
129  /**
130   * Holds the list of all replication servers instantiated in this VM.
131   * This allows to perform clean up of the RS databases in unit tests.
132   */
133  private static final List<ReplicationServer> allInstances = new ArrayList<>();
134
135  /**
136   * Creates a new Replication server using the provided configuration entry.
137   *
138   * @param cfg The configuration of this replication server.
139   * @throws ConfigException When Configuration is invalid.
140   */
141  public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
142  {
143    this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
144  }
145
146  /**
147   * Creates a new Replication server using the provided configuration entry and shutdown
148   * synchronization object.
149   *
150   * @param cfg The configuration of this replication server.
151   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
152   * @throws ConfigException When Configuration is invalid.
153   */
154  public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
155  {
156    this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
157  }
158
159  /**
160   * Creates a new Replication server using the provided configuration entry, shutdown
161   * synchronization object and domain predicate.
162   *
163   * @param cfg The configuration of this replication server.
164   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
165   * @param predicate Indicates whether a domain is enabled for the external changelog.
166   * @throws ConfigException When Configuration is invalid.
167   */
168  public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
169      final ECLEnabledDomainPredicate predicate) throws ConfigException
170  {
171    this.config = cfg;
172    this.dsrsShutdownSync = dsrsShutdownSync;
173    this.domainPredicate = predicate;
174
175    enableExternalChangeLog();
176    ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
177    logger.trace("Using %s as DB implementation for changelog DB", dbImpl);
178    if (dbImpl == ReplicationDBImplementation.JE)
179    {
180      this.changelogDB = new JEChangelogDB(this, cfg);
181    }
182    else
183    {
184      this.changelogDB = new FileChangelogDB(this, cfg);
185    }
186
187    replSessionSecurity = new ReplSessionSecurity();
188    initialize();
189    cfg.addChangeListener(this);
190
191    localPorts.add(getReplicationPort());
192
193    // Keep track of this new instance
194    allInstances.add(this);
195  }
196
197  private Set<HostPort> getConfiguredRSAddresses()
198  {
199    final Set<HostPort> results = new HashSet<>();
200    for (String serverAddress : this.config.getReplicationServer())
201    {
202      results.add(HostPort.valueOf(serverAddress));
203    }
204    return results;
205  }
206
207  /**
208   * Get the list of every replication servers instantiated in the current VM.
209   * @return The list of every replication servers instantiated in the current
210   * VM.
211   */
212  public static List<ReplicationServer> getAllInstances()
213  {
214    return allInstances;
215  }
216
217  /**
218   * The run method for the Listen thread.
219   * This thread accept incoming connections on the replication server
220   * ports from other replication servers or from LDAP servers
221   * and spawn further thread responsible for handling those connections
222   */
223  void runListen()
224  {
225    logger.info(NOTE_REPLICATION_SERVER_LISTENING,
226        getServerId(),
227        listenSocket.getInetAddress().getHostAddress(),
228        listenSocket.getLocalPort());
229
230    while (!shutdown.get() && !stopListen)
231    {
232      // Wait on the replicationServer port.
233      // Read incoming messages and create LDAP or ReplicationServer listener
234      // and Publisher.
235      try
236      {
237        Session session;
238        Socket newSocket = null;
239        try
240        {
241          newSocket = listenSocket.accept();
242          newSocket.setTcpNoDelay(true);
243          newSocket.setKeepAlive(true);
244          int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
245          session = replSessionSecurity.createServerSession(newSocket,
246              timeoutMS);
247          if (session == null) // Error, go back to accept
248          {
249            continue;
250          }
251        }
252        catch (Exception e)
253        {
254          // If problems happen during the SSL handshake, it is necessary
255          // to close the socket to free the associated resources.
256          if (newSocket != null)
257          {
258            newSocket.close();
259          }
260          continue;
261        }
262
263        ReplicationMsg msg = session.receive();
264
265        final int queueSize = this.config.getQueueSize();
266        final int rcvWindow = this.config.getWindowSize();
267        if (msg instanceof ServerStartMsg)
268        {
269          DataServerHandler dsHandler = new DataServerHandler(
270              session, queueSize, this, rcvWindow);
271          dsHandler.startFromRemoteDS((ServerStartMsg) msg);
272        }
273        else if (msg instanceof ReplServerStartMsg)
274        {
275          ReplicationServerHandler rsHandler = new ReplicationServerHandler(
276              session, queueSize, this, rcvWindow);
277          rsHandler.startFromRemoteRS((ReplServerStartMsg) msg);
278        }
279        else
280        {
281          // We did not recognize the message, close session as what
282          // can happen after is undetermined and we do not want the server to
283          // be disturbed
284          session.close();
285          return;
286        }
287      }
288      catch (Exception e)
289      {
290        // The socket has probably been closed as part of the
291        // shutdown or changing the port number process.
292        // Just log debug information and loop.
293        // Do not log the message during shutdown.
294        logger.traceException(e);
295        if (!shutdown.get())
296        {
297          logger.error(ERR_EXCEPTION_LISTENING, e.getLocalizedMessage());
298        }
299      }
300    }
301  }
302
303  /**
304   * This method manages the connection with the other replication servers.
305   * It periodically checks that this replication server is indeed connected
306   * to all the other replication servers and if not attempts to
307   * make the connection.
308   */
309  void runConnect()
310  {
311    synchronized (connectThreadLock)
312    {
313      while (!shutdown.get())
314      {
315        HostPort localAddress = HostPort.localAddress(getReplicationPort());
316        for (ReplicationServerDomain domain : getReplicationServerDomains())
317        {
318          /*
319           * If there are N RSs configured then we will usually be connected to
320           * N-1 of them, since one of them is usually this RS. However, we
321           * cannot guarantee this since the configuration may not contain this
322           * RS.
323           */
324          final Set<HostPort> connectedRSAddresses =
325              getConnectedRSAddresses(domain);
326          for (HostPort rsAddress : getConfiguredRSAddresses())
327          {
328            if (connectedRSAddresses.contains(rsAddress))
329            {
330              continue; // Skip: already connected.
331            }
332
333            // FIXME: this will need changing if we ever support listening on
334            // specific addresses.
335            if (rsAddress.equals(localAddress))
336            {
337              continue; // Skip: avoid connecting to self.
338            }
339
340            connect(rsAddress, domain.getBaseDN());
341          }
342        }
343
344        // Notify any threads waiting with domain tickets after each iteration.
345        synchronized (domainTicketLock)
346        {
347          domainTicket++;
348          domainTicketLock.notifyAll();
349        }
350
351        // Retry each second.
352        final int randomizer = (int) (Math.random() * 100);
353        try
354        {
355          // Releases lock, allows threads to get domain ticket.
356          connectThreadLock.wait(1000 + randomizer);
357        }
358        catch (InterruptedException e)
359        {
360          // Signaled to shutdown.
361          return;
362        }
363      }
364    }
365  }
366
367  private Set<HostPort> getConnectedRSAddresses(ReplicationServerDomain domain)
368  {
369    Set<HostPort> results = new HashSet<>();
370    for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values())
371    {
372      results.add(HostPort.valueOf(rsHandler.getServerAddressURL()));
373    }
374    return results;
375  }
376
377  /**
378   * Establish a connection to the server with the address and port.
379   *
380   * @param remoteServerAddress
381   *          The address and port for the server
382   * @param baseDN
383   *          The baseDN of the connection
384   */
385  private void connect(HostPort remoteServerAddress, DN baseDN)
386  {
387    boolean sslEncryption = replSessionSecurity.isSslEncryption();
388
389    if (logger.isTraceEnabled())
390    {
391      logger.trace("RS " + getMonitorInstanceName() + " connects to "
392          + remoteServerAddress);
393    }
394
395    Socket socket = new Socket();
396    Session session = null;
397    try
398    {
399      socket.setTcpNoDelay(true);
400      if (config.getSourceAddress() != null)
401      {
402        InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0);
403        socket.bind(local);
404      }
405      int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
406      socket.connect(remoteServerAddress.toInetSocketAddress(), timeoutMS);
407      session = replSessionSecurity.createClientSession(socket, timeoutMS);
408
409      ReplicationServerHandler rsHandler = new ReplicationServerHandler(
410          session, config.getQueueSize(), this, config.getWindowSize());
411      rsHandler.connect(baseDN, sslEncryption);
412    }
413    catch (Exception e)
414    {
415      logger.traceException(e);
416      close(session);
417      close(socket);
418    }
419  }
420
421  /** Initialization function for the replicationServer. */
422  private void initialize()
423  {
424    shutdown.set(false);
425
426    try
427    {
428      this.changelogDB.initializeDB();
429
430      setServerURL();
431      listenSocket = new ServerSocket();
432      listenSocket.bind(new InetSocketAddress(getReplicationPort()));
433
434      // creates working threads: we must first connect, then start to listen.
435      if (logger.isTraceEnabled())
436      {
437        logger.trace("RS " + getMonitorInstanceName() + " creates connect thread");
438      }
439      connectThread = new ReplicationServerConnectThread(this);
440      connectThread.start();
441
442      if (logger.isTraceEnabled())
443      {
444        logger.trace("RS " + getMonitorInstanceName() + " creates listen thread");
445      }
446
447      listenThread = new ReplicationServerListenThread(this);
448      listenThread.start();
449
450      if (logger.isTraceEnabled())
451      {
452        logger.trace("RS " + getMonitorInstanceName() + " successfully initialized");
453      }
454    } catch (UnknownHostException e)
455    {
456      logger.error(ERR_UNKNOWN_HOSTNAME);
457    } catch (IOException e)
458    {
459      logger.error(ERR_COULD_NOT_BIND_CHANGELOG, getReplicationPort(), e.getMessage());
460    }
461  }
462
463  /**
464   * Enable the external changelog if it is not already enabled.
465   * <p>
466   * The external changelog is provided by the changelog backend.
467   *
468   * @throws ConfigException
469   *            If an error occurs.
470   */
471  private void enableExternalChangeLog() throws ConfigException
472  {
473    if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
474    {
475      // Backend has already been created and initialized
476      // This can occurs in tests
477      return;
478    }
479    try
480    {
481      changelogBackend = new ChangelogBackend(this, domainPredicate);
482      changelogBackend.openBackend();
483      try
484      {
485        DirectoryServer.registerBackend(changelogBackend);
486      }
487      catch (Exception e)
488      {
489        logger.error(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
490            getExceptionMessage(e)));
491      }
492
493      registerVirtualAttributeRules();
494    }
495    catch (Exception e)
496    {
497      // TODO : I18N with correct message + what kind of exception should we really throw ?
498      // (Directory/Initialization/Config Exception)
499      throw new ConfigException(LocalizableMessage.raw("Error when enabling external changelog"), e);
500    }
501  }
502
503  private void shutdownExternalChangelog()
504  {
505    if (changelogBackend != null)
506    {
507      DirectoryServer.deregisterBackend(changelogBackend);
508      changelogBackend.finalizeBackend();
509      changelogBackend = null;
510    }
511    deregisterVirtualAttributeRules();
512  }
513
514  private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
515  {
516    final List<VirtualAttributeRule> rules = new ArrayList<>();
517    rules.add(buildVirtualAttributeRule("lastexternalchangelogcookie", new LastCookieVirtualProvider(this)));
518    rules.add(buildVirtualAttributeRule("firstchangenumber", new FirstChangeNumberVirtualAttributeProvider(this)));
519    rules.add(buildVirtualAttributeRule("lastchangenumber", new LastChangeNumberVirtualAttributeProvider(this)));
520    rules.add(buildVirtualAttributeRule("changelog", new ChangelogBaseDNVirtualAttributeProvider()));
521    return rules;
522  }
523
524  private void registerVirtualAttributeRules() throws DirectoryException {
525    for (VirtualAttributeRule rule : getVirtualAttributesRules())
526    {
527      DirectoryServer.registerVirtualAttribute(rule);
528    }
529  }
530
531  private void deregisterVirtualAttributeRules()
532  {
533    try
534    {
535      for (VirtualAttributeRule rule : getVirtualAttributesRules())
536      {
537        DirectoryServer.deregisterVirtualAttribute(rule);
538      }
539    }
540    catch (DirectoryException e)
541    {
542      // Should never happen
543      throw new RuntimeException(e);
544    }
545  }
546
547  private static VirtualAttributeRule buildVirtualAttributeRule(String attrName,
548      VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> provider)
549      throws DirectoryException
550  {
551    ConflictBehavior conflictBehavior = ConflictBehavior.VIRTUAL_OVERRIDES_REAL;
552
553    try
554    {
555      Set<DN> baseDNs = Collections.singleton(DN.valueOf(""));
556      Set<DN> groupDNs = Collections.emptySet();
557      Set<SearchFilter> filters = Collections.singleton(SearchFilter.objectClassPresent());
558
559      // To avoid the configuration in cn=config just
560      // create a rule and register it into the DirectoryServer
561      provider.initializeVirtualAttributeProvider(null);
562
563      AttributeType attributeType = DirectoryServer.getAttributeType(attrName);
564      return new VirtualAttributeRule(attributeType, provider,
565            baseDNs, SearchScope.BASE_OBJECT,
566            groupDNs, filters, conflictBehavior);
567    }
568    catch (Exception e)
569    {
570      LocalizableMessage message =
571        NOTE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e);
572      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, message, e);
573    }
574  }
575
576  /**
577   * Get the ReplicationServerDomain associated to the base DN given in
578   * parameter.
579   *
580   * @param baseDN
581   *          The base DN for which the ReplicationServerDomain must be
582   *          returned.
583   * @return The ReplicationServerDomain associated to the base DN given in
584   *         parameter.
585   */
586  public ReplicationServerDomain getReplicationServerDomain(DN baseDN)
587  {
588    return getReplicationServerDomain(baseDN, false);
589  }
590
591  /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
592  private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
593  {
594    Set<DN> domains = null;
595    synchronized (baseDNs)
596    {
597      domains = new HashSet<>(baseDNs.keySet());
598    }
599    domains.removeAll(excludedBaseDNs);
600    return domains;
601  }
602
603  /**
604   * Validate that provided cookie is coherent with this replication server,
605   * when ignoring the provided set of DNs.
606   * <p>
607   * The cookie is coherent if and only if it exactly has the set of DNs corresponding to
608   * the replication domains, and the states in the cookie are not older than oldest states
609   * in the server.
610   *
611   * @param cookie
612   *            The multi domain state (cookie) to validate.
613   * @param ignoredBaseDNs
614   *            The set of DNs to ignore when validating
615   * @throws DirectoryException
616   *            If the cookie is not valid
617   */
618  public void validateCookie(MultiDomainServerState cookie, Set<DN> ignoredBaseDNs) throws DirectoryException
619  {
620    final Set<DN> activeDomains = getDNsOfActiveDomainsInServer(ignoredBaseDNs);
621    final Set<DN> cookieDomains = getDNsOfCookie(cookie);
622
623    checkNoUnknownDomainIsProvidedInCookie(cookie, activeDomains, cookieDomains);
624    checkCookieIsNotOutdated(cookie, activeDomains);
625  }
626
627  private Set<DN> getDNsOfCookie(MultiDomainServerState cookie)
628  {
629    final Set<DN> cookieDomains = new HashSet<>();
630    for (final DN dn : cookie)
631    {
632      cookieDomains.add(dn);
633    }
634    return cookieDomains;
635  }
636
637  private Set<DN> getDNsOfActiveDomainsInServer(final Set<DN> ignoredBaseDNs) throws DirectoryException
638  {
639    final Set<DN> activeDomains = new HashSet<>();
640    for (final DN dn : getDomainDNs(ignoredBaseDNs))
641    {
642      final ServerState lastServerState = getReplicationServerDomain(dn).getLatestServerState();
643      if (!lastServerState.isEmpty())
644      {
645         activeDomains.add(dn);
646      }
647    }
648    return activeDomains;
649  }
650
651  private void checkNoUnknownDomainIsProvidedInCookie(final MultiDomainServerState cookie, final Set<DN> activeDomains,
652      final Set<DN> cookieDomains) throws DirectoryException
653  {
654    if (!activeDomains.containsAll(cookieDomains))
655    {
656      final Set<DN> unknownCookieDomains = new HashSet<>(cookieDomains);
657      unknownCookieDomains.removeAll(activeDomains);
658      final StringBuilder currentStartingCookie = new StringBuilder();
659      for (DN domainDN : activeDomains) {
660        currentStartingCookie.append(domainDN).append(":").append(cookie.getServerState(domainDN)).append(";");
661      }
662      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
663          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
664              unknownCookieDomains.toString(), currentStartingCookie));
665    }
666  }
667
668  private void checkCookieIsNotOutdated(final MultiDomainServerState cookie, final Set<DN> activeDomains)
669      throws DirectoryException
670  {
671    for (DN dn : activeDomains)
672    {
673      if (isCookieOutdatedForDomain(cookie, dn))
674      {
675        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
676            ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(dn.toString()));
677      }
678    }
679  }
680
681  /** Check that provided cookie is not outdated compared to the oldest state of a domain. */
682  private boolean isCookieOutdatedForDomain(MultiDomainServerState cookie, DN domainDN)
683  {
684    final ServerState providedState = cookie.getServerState(domainDN);
685    if (providedState == null)
686    {
687      // missing domains do not invalidate a cookie.
688      // results will include all the changes of the missing domains
689      return false;
690    }
691    final ServerState domainOldestState = getReplicationServerDomain(domainDN).getOldestState();
692    for (final CSN oldestCsn : domainOldestState)
693    {
694      final CSN providedCsn = providedState.getCSN(oldestCsn.getServerId());
695      if (providedCsn != null && providedCsn.isOlderThan(oldestCsn))
696      {
697        return true;
698      }
699    }
700    return false;
701  }
702
703  /**
704   * Get the ReplicationServerDomain associated to the base DN given in
705   * parameter.
706   *
707   * @param baseDN The base DN for which the ReplicationServerDomain must be
708   * returned.
709   * @param create Specifies whether to create the ReplicationServerDomain if
710   *        it does not already exist.
711   * @return The ReplicationServerDomain associated to the base DN given in
712   *         parameter.
713   */
714  public ReplicationServerDomain getReplicationServerDomain(DN baseDN,
715      boolean create)
716  {
717    synchronized (baseDNs)
718    {
719      ReplicationServerDomain domain = baseDNs.get(baseDN);
720      if (domain == null && create) {
721        domain = new ReplicationServerDomain(baseDN, this);
722        baseDNs.put(baseDN, domain);
723      }
724      return domain;
725    }
726  }
727
728  /**
729   * Waits for connections to this ReplicationServer.
730   */
731  void waitConnections()
732  {
733    // Acquire a domain ticket and wait for a complete cycle of the connect
734    // thread.
735    final long myDomainTicket;
736    synchronized (connectThreadLock)
737    {
738      // Connect thread must be waiting.
739      synchronized (domainTicketLock)
740      {
741        // Determine the ticket which will be used in the next connect thread
742        // iteration.
743        myDomainTicket = domainTicket + 1;
744      }
745
746      // Wake up connect thread.
747      connectThreadLock.notify();
748    }
749
750    // Wait until the connect thread has processed next connect phase.
751    synchronized (domainTicketLock)
752    {
753      while (myDomainTicket > domainTicket && !shutdown.get())
754      {
755        try
756        {
757          // Wait with timeout so that we detect shutdown.
758          domainTicketLock.wait(500);
759        }
760        catch (InterruptedException e)
761        {
762          // Can't do anything with this.
763          Thread.currentThread().interrupt();
764        }
765      }
766    }
767  }
768
769  /**
770   * Shutdown the Replication Server service and all its connections.
771   */
772  public void shutdown()
773  {
774    localPorts.remove(getReplicationPort());
775
776    if (!shutdown.compareAndSet(false, true))
777    {
778      return;
779    }
780
781    // shutdown the connect thread
782    if (connectThread != null)
783    {
784      connectThread.interrupt();
785    }
786
787    // shutdown the listener thread
788    close(listenSocket);
789    if (listenThread != null)
790    {
791      listenThread.interrupt();
792    }
793
794    // shutdown all the replication domains
795    for (ReplicationServerDomain domain : getReplicationServerDomains())
796    {
797      domain.shutdown();
798    }
799
800    shutdownExternalChangelog();
801
802    try
803    {
804      this.changelogDB.shutdownDB();
805    }
806    catch (ChangelogException ignored)
807    {
808      logger.traceException(ignored);
809    }
810
811    // Remove this instance from the global instance list
812    allInstances.remove(this);
813  }
814
815  /**
816   * Retrieves the time after which changes must be deleted from the
817   * persistent storage (in milliseconds).
818   *
819   * @return  The time after which changes must be deleted from the
820   *          persistent storage (in milliseconds).
821   */
822  public long getPurgeDelay()
823  {
824    return this.config.getReplicationPurgeDelay() * 1000;
825  }
826
827  /**
828   * Check if the provided configuration is acceptable for add.
829   *
830   * @param configuration The configuration to check.
831   * @param unacceptableReasons When the configuration is not acceptable, this
832   *                            table is use to return the reasons why this
833   *                            configuration is not acceptable.
834   *
835   * @return true if the configuration is acceptable, false other wise.
836   */
837  public static boolean isConfigurationAcceptable(
838      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
839  {
840    int port = configuration.getReplicationPort();
841
842    try
843    {
844      ServerSocket tmpSocket = new ServerSocket();
845      tmpSocket.bind(new InetSocketAddress(port));
846      tmpSocket.close();
847      return true;
848    }
849    catch (Exception e)
850    {
851      LocalizableMessage message = ERR_COULD_NOT_BIND_CHANGELOG.get(port, e.getMessage());
852      unacceptableReasons.add(message);
853      return false;
854    }
855  }
856
857  /** {@inheritDoc} */
858  @Override
859  public ConfigChangeResult applyConfigurationChange(
860      ReplicationServerCfg configuration)
861  {
862    final ConfigChangeResult ccr = new ConfigChangeResult();
863
864    // Some of those properties change don't need specific code.
865    // They will be applied for next connections. Some others have immediate effect
866    final Set<HostPort> oldRSAddresses = getConfiguredRSAddresses();
867
868    final ReplicationServerCfg oldConfig = this.config;
869    this.config = configuration;
870
871    disconnectRemovedReplicationServers(oldRSAddresses);
872
873    final long newPurgeDelay = config.getReplicationPurgeDelay();
874    if (newPurgeDelay != oldConfig.getReplicationPurgeDelay())
875    {
876      this.changelogDB.setPurgeDelay(getPurgeDelay());
877    }
878    final boolean computeCN = config.isComputeChangeNumber();
879    if (computeCN != oldConfig.isComputeChangeNumber())
880    {
881      try
882      {
883        this.changelogDB.setComputeChangeNumber(computeCN);
884      }
885      catch (ChangelogException e)
886      {
887        logger.traceException(e);
888        ccr.setResultCode(ResultCode.OPERATIONS_ERROR);
889      }
890    }
891
892    // changing the listen port requires to stop the listen thread
893    // and restart it.
894    if (getReplicationPort() != oldConfig.getReplicationPort())
895    {
896      stopListen = true;
897      try
898      {
899        listenSocket.close();
900        listenThread.join();
901        stopListen = false;
902
903        setServerURL();
904        listenSocket = new ServerSocket();
905        listenSocket.bind(new InetSocketAddress(getReplicationPort()));
906
907        listenThread = new ReplicationServerListenThread(this);
908        listenThread.start();
909      }
910      catch (IOException e)
911      {
912        logger.traceException(e);
913        logger.error(ERR_COULD_NOT_CLOSE_THE_SOCKET, e);
914      }
915      catch (InterruptedException e)
916      {
917        logger.traceException(e);
918        logger.error(ERR_COULD_NOT_STOP_LISTEN_THREAD, e);
919      }
920    }
921
922    // Update period value for monitoring publishers
923    if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod())
924    {
925      for (ReplicationServerDomain domain : getReplicationServerDomains())
926      {
927        domain.updateMonitoringPeriod(config.getMonitoringPeriod());
928      }
929    }
930
931    // Changed the group id ?
932    if (config.getGroupId() != oldConfig.getGroupId())
933    {
934      // Have a new group id: Disconnect every servers.
935      for (ReplicationServerDomain domain : getReplicationServerDomains())
936      {
937        domain.stopAllServers(true);
938      }
939    }
940
941    // Set a potential new weight
942    if (oldConfig.getWeight() != config.getWeight())
943    {
944      // Broadcast the new weight the the whole topology. This will make some
945      // DSs reconnect (if needed) to other RSs according to the new weight of
946      // this RS.
947      broadcastConfigChange();
948    }
949
950    final String newDir = config.getReplicationDBDirectory();
951    if (newDir != null && !newDir.equals(oldConfig.getReplicationDBDirectory()))
952    {
953      ccr.setAdminActionRequired(true);
954    }
955    return ccr;
956  }
957
958  /**
959   * Try and set a sensible URL for this replication server. Since we are
960   * listening on all addresses there are a couple of potential candidates:
961   * <ol>
962   * <li>a matching server URL in the replication server's configuration,</li>
963   * <li>hostname local address.</li>
964   * </ol>
965   */
966  private void setServerURL() throws UnknownHostException
967  {
968    /*
969     * First try the set of configured replication servers to see if one of them
970     * is this replication server (this should always be the case).
971     */
972    for (HostPort rsAddress : getConfiguredRSAddresses())
973    {
974      /*
975       * No need validate the string format because the admin framework has
976       * already done it.
977       */
978      if (rsAddress.getPort() == getReplicationPort()
979          && rsAddress.isLocalAddress())
980      {
981        serverURL = rsAddress.toString();
982        return;
983      }
984    }
985
986    // Fall-back to the machine hostname.
987    final String host = InetAddress.getLocalHost().getHostName();
988    // Ensure correct formatting of IPv6 addresses by using a HostPort instance.
989    serverURL = new HostPort(host, getReplicationPort()).toString();
990  }
991
992  /**
993   * Broadcast a configuration change that just happened to the whole topology
994   * by sending a TopologyMsg to every entity in the topology.
995   */
996  private void broadcastConfigChange()
997  {
998    for (ReplicationServerDomain domain : getReplicationServerDomains())
999    {
1000      domain.sendTopoInfoToAll();
1001    }
1002  }
1003
1004  /** {@inheritDoc} */
1005  @Override
1006  public boolean isConfigurationChangeAcceptable(
1007      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
1008  {
1009    return true;
1010  }
1011
1012  /**
1013   * Get the value of generationId for the replication replicationServerDomain
1014   * associated with the provided baseDN.
1015   *
1016   * @param baseDN The baseDN of the replicationServerDomain.
1017   * @return The value of the generationID.
1018   */
1019  public long getGenerationId(DN baseDN)
1020  {
1021    final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
1022    return rsd != null ? rsd.getGenerationId() : -1;
1023  }
1024
1025  /**
1026   * Get the serverId for this replication server.
1027   *
1028   * @return The value of the serverId.
1029   *
1030   */
1031  public int getServerId()
1032  {
1033    return this.config.getReplicationServerId();
1034  }
1035
1036  /**
1037   * Do what needed when the config object related to this replication server
1038   * is deleted from the server configuration.
1039   */
1040  public void remove()
1041  {
1042    if (logger.isTraceEnabled())
1043    {
1044      logger.trace("RS " + getMonitorInstanceName() + " starts removing");
1045    }
1046    shutdown();
1047  }
1048
1049  /**
1050   * Returns an iterator on the list of replicationServerDomain.
1051   * Returns null if none.
1052   * @return the iterator.
1053   */
1054  public Iterator<ReplicationServerDomain> getDomainIterator()
1055  {
1056    return getReplicationServerDomains().iterator();
1057  }
1058
1059  /**
1060   * Get the assured mode timeout.
1061   * <p>
1062   * It is the Timeout (in milliseconds) when waiting for acknowledgments.
1063   *
1064   * @return The assured mode timeout.
1065   */
1066  public long getAssuredTimeout()
1067  {
1068    return this.config.getAssuredTimeout();
1069  }
1070
1071  /**
1072   * Get The replication server group id.
1073   * @return The replication server group id.
1074   */
1075  public byte getGroupId()
1076  {
1077    return (byte) this.config.getGroupId();
1078  }
1079
1080  /**
1081   * Get the degraded status threshold value for status analyzer.
1082   * <p>
1083   * The degraded status threshold is the number of pending changes for a DS,
1084   * considered as threshold value to put the DS in DEGRADED_STATUS. If value is
1085   * 0, status analyzer is disabled.
1086   *
1087   * @return The degraded status threshold value for status analyzer.
1088   */
1089  public int getDegradedStatusThreshold()
1090  {
1091    return this.config.getDegradedStatusThreshold();
1092  }
1093
1094  /**
1095   * Get the monitoring publisher period value.
1096   * <p>
1097   * It is the number of milliseconds to wait before sending new monitoring
1098   * messages. If value is 0, monitoring publisher is disabled.
1099   *
1100   * @return the monitoring publisher period value.
1101   */
1102  public long getMonitoringPublisherPeriod()
1103  {
1104    return this.config.getMonitoringPeriod();
1105  }
1106
1107  /**
1108   * Compute the list of replication servers that are not any more connected to
1109   * this Replication Server and stop the corresponding handlers.
1110   *
1111   * @param oldRSAddresses
1112   *          the old list of configured replication servers addresses.
1113   */
1114  private void disconnectRemovedReplicationServers(Set<HostPort> oldRSAddresses)
1115  {
1116    final Collection<HostPort> serversToDisconnect = new ArrayList<>();
1117
1118    final Set<HostPort> newRSAddresses = getConfiguredRSAddresses();
1119    for (HostPort oldRSAddress : oldRSAddresses)
1120    {
1121      if (!newRSAddresses.contains(oldRSAddress))
1122      {
1123        serversToDisconnect.add(oldRSAddress);
1124      }
1125    }
1126
1127    if (serversToDisconnect.isEmpty())
1128    {
1129      return;
1130    }
1131
1132    for (ReplicationServerDomain domain: getReplicationServerDomains())
1133    {
1134      domain.stopReplicationServers(serversToDisconnect);
1135    }
1136  }
1137
1138  /**
1139   * Retrieves a printable name for this Replication Server Instance.
1140   *
1141   * @return A printable name for this Replication Server Instance.
1142   */
1143  public String getMonitorInstanceName()
1144  {
1145    return "Replication Server " + getReplicationPort() + " " + getServerId();
1146  }
1147
1148  /**
1149   * Retrieves the port used by this ReplicationServer.
1150   *
1151   * @return The port used by this ReplicationServer.
1152   */
1153  public int getReplicationPort()
1154  {
1155    return config.getReplicationPort();
1156  }
1157
1158  /**
1159   * Getter on the server URL.
1160   * @return the server URL.
1161   */
1162  public String getServerURL()
1163  {
1164    return this.serverURL;
1165  }
1166
1167  /**
1168   * WARNING : only use this methods for tests purpose.
1169   *
1170   * Add the Replication Server given as a parameter in the list
1171   * of local replication servers.
1172   *
1173   * @param server The server to be added.
1174   */
1175  public static void onlyForTestsAddlocalReplicationServer(String server)
1176  {
1177    localPorts.add(HostPort.valueOf(server).getPort());
1178  }
1179
1180  /**
1181   * WARNING : only use this methods for tests purpose.
1182   *
1183   * Clear the list of local Replication Servers
1184   *
1185   */
1186  public static void onlyForTestsClearLocalReplicationServerList()
1187  {
1188    localPorts.clear();
1189  }
1190
1191  /**
1192   * Returns {@code true} if the provided port is one of the ports that this
1193   * replication server is listening on.
1194   *
1195   * @param port
1196   *          The port to be checked.
1197   * @return {@code true} if the provided port is one of the ports that this
1198   *         replication server is listening on.
1199   */
1200  public static boolean isLocalReplicationServerPort(int port)
1201  {
1202    return localPorts.contains(port);
1203  }
1204
1205  /**
1206   * Get (or create) a handler on the {@link ChangeNumberIndexDB} for external
1207   * changelog.
1208   *
1209   * @return the handler.
1210   */
1211  ChangeNumberIndexDB getChangeNumberIndexDB()
1212  {
1213    return this.changelogDB.getChangeNumberIndexDB();
1214  }
1215
1216  /**
1217   * Returns the oldest change number in the change number index DB.
1218   *
1219   * @return the oldest change number in the change number index DB
1220   * @throws DirectoryException
1221   *           When a problem happens
1222   */
1223  public long getOldestChangeNumber() throws DirectoryException
1224  {
1225    try
1226    {
1227      final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
1228      final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
1229      if (oldestRecord != null)
1230      {
1231        return oldestRecord.getChangeNumber();
1232      }
1233      // database is empty
1234      return cnIndexDB.getLastGeneratedChangeNumber();
1235    }
1236    catch (ChangelogException e)
1237    {
1238      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e);
1239    }
1240  }
1241
1242  /**
1243   * Returns the newest change number in the change number index DB.
1244   *
1245   * @return the newest change number in the change number index DB
1246   * @throws DirectoryException
1247   *           When a problem happens
1248   */
1249  public long getNewestChangeNumber() throws DirectoryException
1250  {
1251    try
1252    {
1253      final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
1254      final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord();
1255      if (newestRecord != null)
1256      {
1257        return newestRecord.getChangeNumber();
1258      }
1259      // database is empty
1260      return cnIndexDB.getLastGeneratedChangeNumber();
1261    }
1262    catch (ChangelogException e)
1263    {
1264      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e);
1265    }
1266  }
1267
1268  /**
1269   * Returns the newest cookie value.
1270   *
1271   * @param excludedBaseDNs
1272   *          The set of baseDNs excluded from ECL.
1273   * @return the newest cookie value.
1274   */
1275  public MultiDomainServerState getNewestECLCookie(Set<DN> excludedBaseDNs)
1276  {
1277    // Initialize start state for all running domains with empty state
1278    final MultiDomainServerState result = new MultiDomainServerState();
1279    for (ReplicationServerDomain rsDomain : getReplicationServerDomains())
1280    {
1281      if (!excludedBaseDNs.contains(rsDomain.getBaseDN()))
1282      {
1283        final ServerState latestDBServerState = rsDomain.getLatestServerState();
1284        if (!latestDBServerState.isEmpty())
1285        {
1286          result.replace(rsDomain.getBaseDN(), latestDBServerState);
1287        }
1288      }
1289    }
1290    return result;
1291  }
1292
1293  /**
1294   * Gets the weight affected to the replication server.
1295   * <p>
1296   * Each replication server of the topology has a weight. When combined
1297   * together, the weights of the replication servers of a same group can be
1298   * translated to a percentage that determines the quantity of directory
1299   * servers of the topology that should be connected to a replication server.
1300   * <p>
1301   * For instance imagine a topology with 3 replication servers (with the same
1302   * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
1303   * RS1 should have 25% of the directory servers connected in the topology, RS2
1304   * 25%, and RS3 50%. This may be useful if the replication servers of the
1305   * topology have a different power and one wants to spread the load between
1306   * the replication servers according to their power.
1307   *
1308   * @return the weight
1309   */
1310  public int getWeight()
1311  {
1312    return this.config.getWeight();
1313  }
1314
1315  private Collection<ReplicationServerDomain> getReplicationServerDomains()
1316  {
1317    synchronized (baseDNs)
1318    {
1319      return new ArrayList<>(baseDNs.values());
1320    }
1321  }
1322
1323  /**
1324   * Returns the changelogDB.
1325   *
1326   * @return the changelogDB.
1327   */
1328  public ChangelogDB getChangelogDB()
1329  {
1330    return this.changelogDB;
1331  }
1332
1333  /**
1334   * Returns the synchronization object for shutdown of combined DS/RS instances.
1335   *
1336   * @return the synchronization object for shutdown of combined DS/RS instances.
1337   */
1338  DSRSShutdownSync getDSRSShutdownSync()
1339  {
1340    return dsrsShutdownSync;
1341  }
1342
1343  /** {@inheritDoc} */
1344  @Override
1345  public String toString()
1346  {
1347    return "RS(" + getServerId() + ") on " + serverURL + ", domains="
1348        + baseDNs.keySet();
1349  }
1350
1351}