001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server; 028 029import static org.opends.messages.ConfigMessages.*; 030import static org.opends.messages.ReplicationMessages.*; 031import static org.opends.server.util.StaticUtils.*; 032 033import java.io.IOException; 034import java.net.*; 035import java.util.*; 036import java.util.concurrent.CopyOnWriteArraySet; 037import java.util.concurrent.atomic.AtomicBoolean; 038 039import org.forgerock.i18n.LocalizableMessage; 040import org.forgerock.i18n.slf4j.LocalizedLogger; 041import org.forgerock.opendj.config.server.ConfigChangeResult; 042import org.forgerock.opendj.config.server.ConfigException; 043import org.forgerock.opendj.ldap.ResultCode; 044import org.forgerock.opendj.ldap.SearchScope; 045import org.opends.server.admin.server.ConfigurationChangeListener; 046import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; 047import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.ConflictBehavior; 048import org.opends.server.admin.std.server.ReplicationServerCfg; 049import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; 050import org.opends.server.api.VirtualAttributeProvider; 051import org.opends.server.backends.ChangelogBackend; 052import org.opends.server.core.DirectoryServer; 053import org.opends.server.replication.common.CSN; 054import org.opends.server.replication.common.MultiDomainServerState; 055import org.opends.server.replication.common.ServerState; 056import org.opends.server.replication.plugin.MultimasterReplication; 057import org.opends.server.replication.protocol.ReplServerStartMsg; 058import org.opends.server.replication.protocol.ReplSessionSecurity; 059import org.opends.server.replication.protocol.ReplicationMsg; 060import org.opends.server.replication.protocol.ServerStartMsg; 061import org.opends.server.replication.protocol.Session; 062import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; 063import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 064import org.opends.server.replication.server.changelog.api.ChangelogDB; 065import org.opends.server.replication.server.changelog.api.ChangelogException; 066import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; 067import org.opends.server.replication.server.changelog.file.FileChangelogDB; 068import org.opends.server.replication.server.changelog.je.JEChangelogDB; 069import org.opends.server.replication.service.DSRSShutdownSync; 070import org.opends.server.types.AttributeType; 071import org.opends.server.types.DN; 072import org.opends.server.types.DirectoryException; 073import org.opends.server.types.HostPort; 074import org.opends.server.types.SearchFilter; 075import org.opends.server.types.VirtualAttributeRule; 076 077/** 078 * ReplicationServer Listener. This singleton is the main object of the 079 * replication server. It waits for the incoming connections and create listener 080 * and publisher objects for connection with LDAP servers and with replication 081 * servers It is responsible for creating the replication server 082 * replicationServerDomain and managing it 083 */ 084public final class ReplicationServer 085 implements ConfigurationChangeListener<ReplicationServerCfg> 086{ 087 private String serverURL; 088 089 private ServerSocket listenSocket; 090 private Thread listenThread; 091 private Thread connectThread; 092 093 /** The current configuration of this replication server. */ 094 private ReplicationServerCfg config; 095 private final DSRSShutdownSync dsrsShutdownSync; 096 097 /** 098 * This table is used to store the list of dn for which we are currently 099 * handling servers. 100 */ 101 private final Map<DN, ReplicationServerDomain> baseDNs = new HashMap<>(); 102 103 /** The database storing the changes. */ 104 private final ChangelogDB changelogDB; 105 106 /** The backend that allow to search the changes (external changelog). */ 107 private ChangelogBackend changelogBackend; 108 109 private final AtomicBoolean shutdown = new AtomicBoolean(); 110 private boolean stopListen; 111 private final ReplSessionSecurity replSessionSecurity; 112 113 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 114 115 /** To know whether a domain is enabled for the external changelog. */ 116 private final ECLEnabledDomainPredicate domainPredicate; 117 118 /** 119 * This is required for unit testing, so that we can keep track of all the 120 * replication servers which are running in the VM. 121 */ 122 private static final Set<Integer> localPorts = new CopyOnWriteArraySet<>(); 123 124 /** Monitors for synchronizing domain creation with the connect thread. */ 125 private final Object domainTicketLock = new Object(); 126 private final Object connectThreadLock = new Object(); 127 private long domainTicket; 128 129 /** 130 * Holds the list of all replication servers instantiated in this VM. 131 * This allows to perform clean up of the RS databases in unit tests. 132 */ 133 private static final List<ReplicationServer> allInstances = new ArrayList<>(); 134 135 /** 136 * Creates a new Replication server using the provided configuration entry. 137 * 138 * @param cfg The configuration of this replication server. 139 * @throws ConfigException When Configuration is invalid. 140 */ 141 public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException 142 { 143 this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()); 144 } 145 146 /** 147 * Creates a new Replication server using the provided configuration entry and shutdown 148 * synchronization object. 149 * 150 * @param cfg The configuration of this replication server. 151 * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. 152 * @throws ConfigException When Configuration is invalid. 153 */ 154 public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException 155 { 156 this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate()); 157 } 158 159 /** 160 * Creates a new Replication server using the provided configuration entry, shutdown 161 * synchronization object and domain predicate. 162 * 163 * @param cfg The configuration of this replication server. 164 * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. 165 * @param predicate Indicates whether a domain is enabled for the external changelog. 166 * @throws ConfigException When Configuration is invalid. 167 */ 168 public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync, 169 final ECLEnabledDomainPredicate predicate) throws ConfigException 170 { 171 this.config = cfg; 172 this.dsrsShutdownSync = dsrsShutdownSync; 173 this.domainPredicate = predicate; 174 175 enableExternalChangeLog(); 176 ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation(); 177 logger.trace("Using %s as DB implementation for changelog DB", dbImpl); 178 if (dbImpl == ReplicationDBImplementation.JE) 179 { 180 this.changelogDB = new JEChangelogDB(this, cfg); 181 } 182 else 183 { 184 this.changelogDB = new FileChangelogDB(this, cfg); 185 } 186 187 replSessionSecurity = new ReplSessionSecurity(); 188 initialize(); 189 cfg.addChangeListener(this); 190 191 localPorts.add(getReplicationPort()); 192 193 // Keep track of this new instance 194 allInstances.add(this); 195 } 196 197 private Set<HostPort> getConfiguredRSAddresses() 198 { 199 final Set<HostPort> results = new HashSet<>(); 200 for (String serverAddress : this.config.getReplicationServer()) 201 { 202 results.add(HostPort.valueOf(serverAddress)); 203 } 204 return results; 205 } 206 207 /** 208 * Get the list of every replication servers instantiated in the current VM. 209 * @return The list of every replication servers instantiated in the current 210 * VM. 211 */ 212 public static List<ReplicationServer> getAllInstances() 213 { 214 return allInstances; 215 } 216 217 /** 218 * The run method for the Listen thread. 219 * This thread accept incoming connections on the replication server 220 * ports from other replication servers or from LDAP servers 221 * and spawn further thread responsible for handling those connections 222 */ 223 void runListen() 224 { 225 logger.info(NOTE_REPLICATION_SERVER_LISTENING, 226 getServerId(), 227 listenSocket.getInetAddress().getHostAddress(), 228 listenSocket.getLocalPort()); 229 230 while (!shutdown.get() && !stopListen) 231 { 232 // Wait on the replicationServer port. 233 // Read incoming messages and create LDAP or ReplicationServer listener 234 // and Publisher. 235 try 236 { 237 Session session; 238 Socket newSocket = null; 239 try 240 { 241 newSocket = listenSocket.accept(); 242 newSocket.setTcpNoDelay(true); 243 newSocket.setKeepAlive(true); 244 int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); 245 session = replSessionSecurity.createServerSession(newSocket, 246 timeoutMS); 247 if (session == null) // Error, go back to accept 248 { 249 continue; 250 } 251 } 252 catch (Exception e) 253 { 254 // If problems happen during the SSL handshake, it is necessary 255 // to close the socket to free the associated resources. 256 if (newSocket != null) 257 { 258 newSocket.close(); 259 } 260 continue; 261 } 262 263 ReplicationMsg msg = session.receive(); 264 265 final int queueSize = this.config.getQueueSize(); 266 final int rcvWindow = this.config.getWindowSize(); 267 if (msg instanceof ServerStartMsg) 268 { 269 DataServerHandler dsHandler = new DataServerHandler( 270 session, queueSize, this, rcvWindow); 271 dsHandler.startFromRemoteDS((ServerStartMsg) msg); 272 } 273 else if (msg instanceof ReplServerStartMsg) 274 { 275 ReplicationServerHandler rsHandler = new ReplicationServerHandler( 276 session, queueSize, this, rcvWindow); 277 rsHandler.startFromRemoteRS((ReplServerStartMsg) msg); 278 } 279 else 280 { 281 // We did not recognize the message, close session as what 282 // can happen after is undetermined and we do not want the server to 283 // be disturbed 284 session.close(); 285 return; 286 } 287 } 288 catch (Exception e) 289 { 290 // The socket has probably been closed as part of the 291 // shutdown or changing the port number process. 292 // Just log debug information and loop. 293 // Do not log the message during shutdown. 294 logger.traceException(e); 295 if (!shutdown.get()) 296 { 297 logger.error(ERR_EXCEPTION_LISTENING, e.getLocalizedMessage()); 298 } 299 } 300 } 301 } 302 303 /** 304 * This method manages the connection with the other replication servers. 305 * It periodically checks that this replication server is indeed connected 306 * to all the other replication servers and if not attempts to 307 * make the connection. 308 */ 309 void runConnect() 310 { 311 synchronized (connectThreadLock) 312 { 313 while (!shutdown.get()) 314 { 315 HostPort localAddress = HostPort.localAddress(getReplicationPort()); 316 for (ReplicationServerDomain domain : getReplicationServerDomains()) 317 { 318 /* 319 * If there are N RSs configured then we will usually be connected to 320 * N-1 of them, since one of them is usually this RS. However, we 321 * cannot guarantee this since the configuration may not contain this 322 * RS. 323 */ 324 final Set<HostPort> connectedRSAddresses = 325 getConnectedRSAddresses(domain); 326 for (HostPort rsAddress : getConfiguredRSAddresses()) 327 { 328 if (connectedRSAddresses.contains(rsAddress)) 329 { 330 continue; // Skip: already connected. 331 } 332 333 // FIXME: this will need changing if we ever support listening on 334 // specific addresses. 335 if (rsAddress.equals(localAddress)) 336 { 337 continue; // Skip: avoid connecting to self. 338 } 339 340 connect(rsAddress, domain.getBaseDN()); 341 } 342 } 343 344 // Notify any threads waiting with domain tickets after each iteration. 345 synchronized (domainTicketLock) 346 { 347 domainTicket++; 348 domainTicketLock.notifyAll(); 349 } 350 351 // Retry each second. 352 final int randomizer = (int) (Math.random() * 100); 353 try 354 { 355 // Releases lock, allows threads to get domain ticket. 356 connectThreadLock.wait(1000 + randomizer); 357 } 358 catch (InterruptedException e) 359 { 360 // Signaled to shutdown. 361 return; 362 } 363 } 364 } 365 } 366 367 private Set<HostPort> getConnectedRSAddresses(ReplicationServerDomain domain) 368 { 369 Set<HostPort> results = new HashSet<>(); 370 for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values()) 371 { 372 results.add(HostPort.valueOf(rsHandler.getServerAddressURL())); 373 } 374 return results; 375 } 376 377 /** 378 * Establish a connection to the server with the address and port. 379 * 380 * @param remoteServerAddress 381 * The address and port for the server 382 * @param baseDN 383 * The baseDN of the connection 384 */ 385 private void connect(HostPort remoteServerAddress, DN baseDN) 386 { 387 boolean sslEncryption = replSessionSecurity.isSslEncryption(); 388 389 if (logger.isTraceEnabled()) 390 { 391 logger.trace("RS " + getMonitorInstanceName() + " connects to " 392 + remoteServerAddress); 393 } 394 395 Socket socket = new Socket(); 396 Session session = null; 397 try 398 { 399 socket.setTcpNoDelay(true); 400 if (config.getSourceAddress() != null) 401 { 402 InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0); 403 socket.bind(local); 404 } 405 int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); 406 socket.connect(remoteServerAddress.toInetSocketAddress(), timeoutMS); 407 session = replSessionSecurity.createClientSession(socket, timeoutMS); 408 409 ReplicationServerHandler rsHandler = new ReplicationServerHandler( 410 session, config.getQueueSize(), this, config.getWindowSize()); 411 rsHandler.connect(baseDN, sslEncryption); 412 } 413 catch (Exception e) 414 { 415 logger.traceException(e); 416 close(session); 417 close(socket); 418 } 419 } 420 421 /** Initialization function for the replicationServer. */ 422 private void initialize() 423 { 424 shutdown.set(false); 425 426 try 427 { 428 this.changelogDB.initializeDB(); 429 430 setServerURL(); 431 listenSocket = new ServerSocket(); 432 listenSocket.bind(new InetSocketAddress(getReplicationPort())); 433 434 // creates working threads: we must first connect, then start to listen. 435 if (logger.isTraceEnabled()) 436 { 437 logger.trace("RS " + getMonitorInstanceName() + " creates connect thread"); 438 } 439 connectThread = new ReplicationServerConnectThread(this); 440 connectThread.start(); 441 442 if (logger.isTraceEnabled()) 443 { 444 logger.trace("RS " + getMonitorInstanceName() + " creates listen thread"); 445 } 446 447 listenThread = new ReplicationServerListenThread(this); 448 listenThread.start(); 449 450 if (logger.isTraceEnabled()) 451 { 452 logger.trace("RS " + getMonitorInstanceName() + " successfully initialized"); 453 } 454 } catch (UnknownHostException e) 455 { 456 logger.error(ERR_UNKNOWN_HOSTNAME); 457 } catch (IOException e) 458 { 459 logger.error(ERR_COULD_NOT_BIND_CHANGELOG, getReplicationPort(), e.getMessage()); 460 } 461 } 462 463 /** 464 * Enable the external changelog if it is not already enabled. 465 * <p> 466 * The external changelog is provided by the changelog backend. 467 * 468 * @throws ConfigException 469 * If an error occurs. 470 */ 471 private void enableExternalChangeLog() throws ConfigException 472 { 473 if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID)) 474 { 475 // Backend has already been created and initialized 476 // This can occurs in tests 477 return; 478 } 479 try 480 { 481 changelogBackend = new ChangelogBackend(this, domainPredicate); 482 changelogBackend.openBackend(); 483 try 484 { 485 DirectoryServer.registerBackend(changelogBackend); 486 } 487 catch (Exception e) 488 { 489 logger.error(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(), 490 getExceptionMessage(e))); 491 } 492 493 registerVirtualAttributeRules(); 494 } 495 catch (Exception e) 496 { 497 // TODO : I18N with correct message + what kind of exception should we really throw ? 498 // (Directory/Initialization/Config Exception) 499 throw new ConfigException(LocalizableMessage.raw("Error when enabling external changelog"), e); 500 } 501 } 502 503 private void shutdownExternalChangelog() 504 { 505 if (changelogBackend != null) 506 { 507 DirectoryServer.deregisterBackend(changelogBackend); 508 changelogBackend.finalizeBackend(); 509 changelogBackend = null; 510 } 511 deregisterVirtualAttributeRules(); 512 } 513 514 private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException 515 { 516 final List<VirtualAttributeRule> rules = new ArrayList<>(); 517 rules.add(buildVirtualAttributeRule("lastexternalchangelogcookie", new LastCookieVirtualProvider(this))); 518 rules.add(buildVirtualAttributeRule("firstchangenumber", new FirstChangeNumberVirtualAttributeProvider(this))); 519 rules.add(buildVirtualAttributeRule("lastchangenumber", new LastChangeNumberVirtualAttributeProvider(this))); 520 rules.add(buildVirtualAttributeRule("changelog", new ChangelogBaseDNVirtualAttributeProvider())); 521 return rules; 522 } 523 524 private void registerVirtualAttributeRules() throws DirectoryException { 525 for (VirtualAttributeRule rule : getVirtualAttributesRules()) 526 { 527 DirectoryServer.registerVirtualAttribute(rule); 528 } 529 } 530 531 private void deregisterVirtualAttributeRules() 532 { 533 try 534 { 535 for (VirtualAttributeRule rule : getVirtualAttributesRules()) 536 { 537 DirectoryServer.deregisterVirtualAttribute(rule); 538 } 539 } 540 catch (DirectoryException e) 541 { 542 // Should never happen 543 throw new RuntimeException(e); 544 } 545 } 546 547 private static VirtualAttributeRule buildVirtualAttributeRule(String attrName, 548 VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> provider) 549 throws DirectoryException 550 { 551 ConflictBehavior conflictBehavior = ConflictBehavior.VIRTUAL_OVERRIDES_REAL; 552 553 try 554 { 555 Set<DN> baseDNs = Collections.singleton(DN.valueOf("")); 556 Set<DN> groupDNs = Collections.emptySet(); 557 Set<SearchFilter> filters = Collections.singleton(SearchFilter.objectClassPresent()); 558 559 // To avoid the configuration in cn=config just 560 // create a rule and register it into the DirectoryServer 561 provider.initializeVirtualAttributeProvider(null); 562 563 AttributeType attributeType = DirectoryServer.getAttributeType(attrName); 564 return new VirtualAttributeRule(attributeType, provider, 565 baseDNs, SearchScope.BASE_OBJECT, 566 groupDNs, filters, conflictBehavior); 567 } 568 catch (Exception e) 569 { 570 LocalizableMessage message = 571 NOTE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e); 572 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, message, e); 573 } 574 } 575 576 /** 577 * Get the ReplicationServerDomain associated to the base DN given in 578 * parameter. 579 * 580 * @param baseDN 581 * The base DN for which the ReplicationServerDomain must be 582 * returned. 583 * @return The ReplicationServerDomain associated to the base DN given in 584 * parameter. 585 */ 586 public ReplicationServerDomain getReplicationServerDomain(DN baseDN) 587 { 588 return getReplicationServerDomain(baseDN, false); 589 } 590 591 /** Returns the replicated domain DNs minus the provided set of excluded DNs. */ 592 private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException 593 { 594 Set<DN> domains = null; 595 synchronized (baseDNs) 596 { 597 domains = new HashSet<>(baseDNs.keySet()); 598 } 599 domains.removeAll(excludedBaseDNs); 600 return domains; 601 } 602 603 /** 604 * Validate that provided cookie is coherent with this replication server, 605 * when ignoring the provided set of DNs. 606 * <p> 607 * The cookie is coherent if and only if it exactly has the set of DNs corresponding to 608 * the replication domains, and the states in the cookie are not older than oldest states 609 * in the server. 610 * 611 * @param cookie 612 * The multi domain state (cookie) to validate. 613 * @param ignoredBaseDNs 614 * The set of DNs to ignore when validating 615 * @throws DirectoryException 616 * If the cookie is not valid 617 */ 618 public void validateCookie(MultiDomainServerState cookie, Set<DN> ignoredBaseDNs) throws DirectoryException 619 { 620 final Set<DN> activeDomains = getDNsOfActiveDomainsInServer(ignoredBaseDNs); 621 final Set<DN> cookieDomains = getDNsOfCookie(cookie); 622 623 checkNoUnknownDomainIsProvidedInCookie(cookie, activeDomains, cookieDomains); 624 checkCookieIsNotOutdated(cookie, activeDomains); 625 } 626 627 private Set<DN> getDNsOfCookie(MultiDomainServerState cookie) 628 { 629 final Set<DN> cookieDomains = new HashSet<>(); 630 for (final DN dn : cookie) 631 { 632 cookieDomains.add(dn); 633 } 634 return cookieDomains; 635 } 636 637 private Set<DN> getDNsOfActiveDomainsInServer(final Set<DN> ignoredBaseDNs) throws DirectoryException 638 { 639 final Set<DN> activeDomains = new HashSet<>(); 640 for (final DN dn : getDomainDNs(ignoredBaseDNs)) 641 { 642 final ServerState lastServerState = getReplicationServerDomain(dn).getLatestServerState(); 643 if (!lastServerState.isEmpty()) 644 { 645 activeDomains.add(dn); 646 } 647 } 648 return activeDomains; 649 } 650 651 private void checkNoUnknownDomainIsProvidedInCookie(final MultiDomainServerState cookie, final Set<DN> activeDomains, 652 final Set<DN> cookieDomains) throws DirectoryException 653 { 654 if (!activeDomains.containsAll(cookieDomains)) 655 { 656 final Set<DN> unknownCookieDomains = new HashSet<>(cookieDomains); 657 unknownCookieDomains.removeAll(activeDomains); 658 final StringBuilder currentStartingCookie = new StringBuilder(); 659 for (DN domainDN : activeDomains) { 660 currentStartingCookie.append(domainDN).append(":").append(cookie.getServerState(domainDN)).append(";"); 661 } 662 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 663 ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get( 664 unknownCookieDomains.toString(), currentStartingCookie)); 665 } 666 } 667 668 private void checkCookieIsNotOutdated(final MultiDomainServerState cookie, final Set<DN> activeDomains) 669 throws DirectoryException 670 { 671 for (DN dn : activeDomains) 672 { 673 if (isCookieOutdatedForDomain(cookie, dn)) 674 { 675 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 676 ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(dn.toString())); 677 } 678 } 679 } 680 681 /** Check that provided cookie is not outdated compared to the oldest state of a domain. */ 682 private boolean isCookieOutdatedForDomain(MultiDomainServerState cookie, DN domainDN) 683 { 684 final ServerState providedState = cookie.getServerState(domainDN); 685 if (providedState == null) 686 { 687 // missing domains do not invalidate a cookie. 688 // results will include all the changes of the missing domains 689 return false; 690 } 691 final ServerState domainOldestState = getReplicationServerDomain(domainDN).getOldestState(); 692 for (final CSN oldestCsn : domainOldestState) 693 { 694 final CSN providedCsn = providedState.getCSN(oldestCsn.getServerId()); 695 if (providedCsn != null && providedCsn.isOlderThan(oldestCsn)) 696 { 697 return true; 698 } 699 } 700 return false; 701 } 702 703 /** 704 * Get the ReplicationServerDomain associated to the base DN given in 705 * parameter. 706 * 707 * @param baseDN The base DN for which the ReplicationServerDomain must be 708 * returned. 709 * @param create Specifies whether to create the ReplicationServerDomain if 710 * it does not already exist. 711 * @return The ReplicationServerDomain associated to the base DN given in 712 * parameter. 713 */ 714 public ReplicationServerDomain getReplicationServerDomain(DN baseDN, 715 boolean create) 716 { 717 synchronized (baseDNs) 718 { 719 ReplicationServerDomain domain = baseDNs.get(baseDN); 720 if (domain == null && create) { 721 domain = new ReplicationServerDomain(baseDN, this); 722 baseDNs.put(baseDN, domain); 723 } 724 return domain; 725 } 726 } 727 728 /** 729 * Waits for connections to this ReplicationServer. 730 */ 731 void waitConnections() 732 { 733 // Acquire a domain ticket and wait for a complete cycle of the connect 734 // thread. 735 final long myDomainTicket; 736 synchronized (connectThreadLock) 737 { 738 // Connect thread must be waiting. 739 synchronized (domainTicketLock) 740 { 741 // Determine the ticket which will be used in the next connect thread 742 // iteration. 743 myDomainTicket = domainTicket + 1; 744 } 745 746 // Wake up connect thread. 747 connectThreadLock.notify(); 748 } 749 750 // Wait until the connect thread has processed next connect phase. 751 synchronized (domainTicketLock) 752 { 753 while (myDomainTicket > domainTicket && !shutdown.get()) 754 { 755 try 756 { 757 // Wait with timeout so that we detect shutdown. 758 domainTicketLock.wait(500); 759 } 760 catch (InterruptedException e) 761 { 762 // Can't do anything with this. 763 Thread.currentThread().interrupt(); 764 } 765 } 766 } 767 } 768 769 /** 770 * Shutdown the Replication Server service and all its connections. 771 */ 772 public void shutdown() 773 { 774 localPorts.remove(getReplicationPort()); 775 776 if (!shutdown.compareAndSet(false, true)) 777 { 778 return; 779 } 780 781 // shutdown the connect thread 782 if (connectThread != null) 783 { 784 connectThread.interrupt(); 785 } 786 787 // shutdown the listener thread 788 close(listenSocket); 789 if (listenThread != null) 790 { 791 listenThread.interrupt(); 792 } 793 794 // shutdown all the replication domains 795 for (ReplicationServerDomain domain : getReplicationServerDomains()) 796 { 797 domain.shutdown(); 798 } 799 800 shutdownExternalChangelog(); 801 802 try 803 { 804 this.changelogDB.shutdownDB(); 805 } 806 catch (ChangelogException ignored) 807 { 808 logger.traceException(ignored); 809 } 810 811 // Remove this instance from the global instance list 812 allInstances.remove(this); 813 } 814 815 /** 816 * Retrieves the time after which changes must be deleted from the 817 * persistent storage (in milliseconds). 818 * 819 * @return The time after which changes must be deleted from the 820 * persistent storage (in milliseconds). 821 */ 822 public long getPurgeDelay() 823 { 824 return this.config.getReplicationPurgeDelay() * 1000; 825 } 826 827 /** 828 * Check if the provided configuration is acceptable for add. 829 * 830 * @param configuration The configuration to check. 831 * @param unacceptableReasons When the configuration is not acceptable, this 832 * table is use to return the reasons why this 833 * configuration is not acceptable. 834 * 835 * @return true if the configuration is acceptable, false other wise. 836 */ 837 public static boolean isConfigurationAcceptable( 838 ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons) 839 { 840 int port = configuration.getReplicationPort(); 841 842 try 843 { 844 ServerSocket tmpSocket = new ServerSocket(); 845 tmpSocket.bind(new InetSocketAddress(port)); 846 tmpSocket.close(); 847 return true; 848 } 849 catch (Exception e) 850 { 851 LocalizableMessage message = ERR_COULD_NOT_BIND_CHANGELOG.get(port, e.getMessage()); 852 unacceptableReasons.add(message); 853 return false; 854 } 855 } 856 857 /** {@inheritDoc} */ 858 @Override 859 public ConfigChangeResult applyConfigurationChange( 860 ReplicationServerCfg configuration) 861 { 862 final ConfigChangeResult ccr = new ConfigChangeResult(); 863 864 // Some of those properties change don't need specific code. 865 // They will be applied for next connections. Some others have immediate effect 866 final Set<HostPort> oldRSAddresses = getConfiguredRSAddresses(); 867 868 final ReplicationServerCfg oldConfig = this.config; 869 this.config = configuration; 870 871 disconnectRemovedReplicationServers(oldRSAddresses); 872 873 final long newPurgeDelay = config.getReplicationPurgeDelay(); 874 if (newPurgeDelay != oldConfig.getReplicationPurgeDelay()) 875 { 876 this.changelogDB.setPurgeDelay(getPurgeDelay()); 877 } 878 final boolean computeCN = config.isComputeChangeNumber(); 879 if (computeCN != oldConfig.isComputeChangeNumber()) 880 { 881 try 882 { 883 this.changelogDB.setComputeChangeNumber(computeCN); 884 } 885 catch (ChangelogException e) 886 { 887 logger.traceException(e); 888 ccr.setResultCode(ResultCode.OPERATIONS_ERROR); 889 } 890 } 891 892 // changing the listen port requires to stop the listen thread 893 // and restart it. 894 if (getReplicationPort() != oldConfig.getReplicationPort()) 895 { 896 stopListen = true; 897 try 898 { 899 listenSocket.close(); 900 listenThread.join(); 901 stopListen = false; 902 903 setServerURL(); 904 listenSocket = new ServerSocket(); 905 listenSocket.bind(new InetSocketAddress(getReplicationPort())); 906 907 listenThread = new ReplicationServerListenThread(this); 908 listenThread.start(); 909 } 910 catch (IOException e) 911 { 912 logger.traceException(e); 913 logger.error(ERR_COULD_NOT_CLOSE_THE_SOCKET, e); 914 } 915 catch (InterruptedException e) 916 { 917 logger.traceException(e); 918 logger.error(ERR_COULD_NOT_STOP_LISTEN_THREAD, e); 919 } 920 } 921 922 // Update period value for monitoring publishers 923 if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod()) 924 { 925 for (ReplicationServerDomain domain : getReplicationServerDomains()) 926 { 927 domain.updateMonitoringPeriod(config.getMonitoringPeriod()); 928 } 929 } 930 931 // Changed the group id ? 932 if (config.getGroupId() != oldConfig.getGroupId()) 933 { 934 // Have a new group id: Disconnect every servers. 935 for (ReplicationServerDomain domain : getReplicationServerDomains()) 936 { 937 domain.stopAllServers(true); 938 } 939 } 940 941 // Set a potential new weight 942 if (oldConfig.getWeight() != config.getWeight()) 943 { 944 // Broadcast the new weight the the whole topology. This will make some 945 // DSs reconnect (if needed) to other RSs according to the new weight of 946 // this RS. 947 broadcastConfigChange(); 948 } 949 950 final String newDir = config.getReplicationDBDirectory(); 951 if (newDir != null && !newDir.equals(oldConfig.getReplicationDBDirectory())) 952 { 953 ccr.setAdminActionRequired(true); 954 } 955 return ccr; 956 } 957 958 /** 959 * Try and set a sensible URL for this replication server. Since we are 960 * listening on all addresses there are a couple of potential candidates: 961 * <ol> 962 * <li>a matching server URL in the replication server's configuration,</li> 963 * <li>hostname local address.</li> 964 * </ol> 965 */ 966 private void setServerURL() throws UnknownHostException 967 { 968 /* 969 * First try the set of configured replication servers to see if one of them 970 * is this replication server (this should always be the case). 971 */ 972 for (HostPort rsAddress : getConfiguredRSAddresses()) 973 { 974 /* 975 * No need validate the string format because the admin framework has 976 * already done it. 977 */ 978 if (rsAddress.getPort() == getReplicationPort() 979 && rsAddress.isLocalAddress()) 980 { 981 serverURL = rsAddress.toString(); 982 return; 983 } 984 } 985 986 // Fall-back to the machine hostname. 987 final String host = InetAddress.getLocalHost().getHostName(); 988 // Ensure correct formatting of IPv6 addresses by using a HostPort instance. 989 serverURL = new HostPort(host, getReplicationPort()).toString(); 990 } 991 992 /** 993 * Broadcast a configuration change that just happened to the whole topology 994 * by sending a TopologyMsg to every entity in the topology. 995 */ 996 private void broadcastConfigChange() 997 { 998 for (ReplicationServerDomain domain : getReplicationServerDomains()) 999 { 1000 domain.sendTopoInfoToAll(); 1001 } 1002 } 1003 1004 /** {@inheritDoc} */ 1005 @Override 1006 public boolean isConfigurationChangeAcceptable( 1007 ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons) 1008 { 1009 return true; 1010 } 1011 1012 /** 1013 * Get the value of generationId for the replication replicationServerDomain 1014 * associated with the provided baseDN. 1015 * 1016 * @param baseDN The baseDN of the replicationServerDomain. 1017 * @return The value of the generationID. 1018 */ 1019 public long getGenerationId(DN baseDN) 1020 { 1021 final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN); 1022 return rsd != null ? rsd.getGenerationId() : -1; 1023 } 1024 1025 /** 1026 * Get the serverId for this replication server. 1027 * 1028 * @return The value of the serverId. 1029 * 1030 */ 1031 public int getServerId() 1032 { 1033 return this.config.getReplicationServerId(); 1034 } 1035 1036 /** 1037 * Do what needed when the config object related to this replication server 1038 * is deleted from the server configuration. 1039 */ 1040 public void remove() 1041 { 1042 if (logger.isTraceEnabled()) 1043 { 1044 logger.trace("RS " + getMonitorInstanceName() + " starts removing"); 1045 } 1046 shutdown(); 1047 } 1048 1049 /** 1050 * Returns an iterator on the list of replicationServerDomain. 1051 * Returns null if none. 1052 * @return the iterator. 1053 */ 1054 public Iterator<ReplicationServerDomain> getDomainIterator() 1055 { 1056 return getReplicationServerDomains().iterator(); 1057 } 1058 1059 /** 1060 * Get the assured mode timeout. 1061 * <p> 1062 * It is the Timeout (in milliseconds) when waiting for acknowledgments. 1063 * 1064 * @return The assured mode timeout. 1065 */ 1066 public long getAssuredTimeout() 1067 { 1068 return this.config.getAssuredTimeout(); 1069 } 1070 1071 /** 1072 * Get The replication server group id. 1073 * @return The replication server group id. 1074 */ 1075 public byte getGroupId() 1076 { 1077 return (byte) this.config.getGroupId(); 1078 } 1079 1080 /** 1081 * Get the degraded status threshold value for status analyzer. 1082 * <p> 1083 * The degraded status threshold is the number of pending changes for a DS, 1084 * considered as threshold value to put the DS in DEGRADED_STATUS. If value is 1085 * 0, status analyzer is disabled. 1086 * 1087 * @return The degraded status threshold value for status analyzer. 1088 */ 1089 public int getDegradedStatusThreshold() 1090 { 1091 return this.config.getDegradedStatusThreshold(); 1092 } 1093 1094 /** 1095 * Get the monitoring publisher period value. 1096 * <p> 1097 * It is the number of milliseconds to wait before sending new monitoring 1098 * messages. If value is 0, monitoring publisher is disabled. 1099 * 1100 * @return the monitoring publisher period value. 1101 */ 1102 public long getMonitoringPublisherPeriod() 1103 { 1104 return this.config.getMonitoringPeriod(); 1105 } 1106 1107 /** 1108 * Compute the list of replication servers that are not any more connected to 1109 * this Replication Server and stop the corresponding handlers. 1110 * 1111 * @param oldRSAddresses 1112 * the old list of configured replication servers addresses. 1113 */ 1114 private void disconnectRemovedReplicationServers(Set<HostPort> oldRSAddresses) 1115 { 1116 final Collection<HostPort> serversToDisconnect = new ArrayList<>(); 1117 1118 final Set<HostPort> newRSAddresses = getConfiguredRSAddresses(); 1119 for (HostPort oldRSAddress : oldRSAddresses) 1120 { 1121 if (!newRSAddresses.contains(oldRSAddress)) 1122 { 1123 serversToDisconnect.add(oldRSAddress); 1124 } 1125 } 1126 1127 if (serversToDisconnect.isEmpty()) 1128 { 1129 return; 1130 } 1131 1132 for (ReplicationServerDomain domain: getReplicationServerDomains()) 1133 { 1134 domain.stopReplicationServers(serversToDisconnect); 1135 } 1136 } 1137 1138 /** 1139 * Retrieves a printable name for this Replication Server Instance. 1140 * 1141 * @return A printable name for this Replication Server Instance. 1142 */ 1143 public String getMonitorInstanceName() 1144 { 1145 return "Replication Server " + getReplicationPort() + " " + getServerId(); 1146 } 1147 1148 /** 1149 * Retrieves the port used by this ReplicationServer. 1150 * 1151 * @return The port used by this ReplicationServer. 1152 */ 1153 public int getReplicationPort() 1154 { 1155 return config.getReplicationPort(); 1156 } 1157 1158 /** 1159 * Getter on the server URL. 1160 * @return the server URL. 1161 */ 1162 public String getServerURL() 1163 { 1164 return this.serverURL; 1165 } 1166 1167 /** 1168 * WARNING : only use this methods for tests purpose. 1169 * 1170 * Add the Replication Server given as a parameter in the list 1171 * of local replication servers. 1172 * 1173 * @param server The server to be added. 1174 */ 1175 public static void onlyForTestsAddlocalReplicationServer(String server) 1176 { 1177 localPorts.add(HostPort.valueOf(server).getPort()); 1178 } 1179 1180 /** 1181 * WARNING : only use this methods for tests purpose. 1182 * 1183 * Clear the list of local Replication Servers 1184 * 1185 */ 1186 public static void onlyForTestsClearLocalReplicationServerList() 1187 { 1188 localPorts.clear(); 1189 } 1190 1191 /** 1192 * Returns {@code true} if the provided port is one of the ports that this 1193 * replication server is listening on. 1194 * 1195 * @param port 1196 * The port to be checked. 1197 * @return {@code true} if the provided port is one of the ports that this 1198 * replication server is listening on. 1199 */ 1200 public static boolean isLocalReplicationServerPort(int port) 1201 { 1202 return localPorts.contains(port); 1203 } 1204 1205 /** 1206 * Get (or create) a handler on the {@link ChangeNumberIndexDB} for external 1207 * changelog. 1208 * 1209 * @return the handler. 1210 */ 1211 ChangeNumberIndexDB getChangeNumberIndexDB() 1212 { 1213 return this.changelogDB.getChangeNumberIndexDB(); 1214 } 1215 1216 /** 1217 * Returns the oldest change number in the change number index DB. 1218 * 1219 * @return the oldest change number in the change number index DB 1220 * @throws DirectoryException 1221 * When a problem happens 1222 */ 1223 public long getOldestChangeNumber() throws DirectoryException 1224 { 1225 try 1226 { 1227 final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); 1228 final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); 1229 if (oldestRecord != null) 1230 { 1231 return oldestRecord.getChangeNumber(); 1232 } 1233 // database is empty 1234 return cnIndexDB.getLastGeneratedChangeNumber(); 1235 } 1236 catch (ChangelogException e) 1237 { 1238 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e); 1239 } 1240 } 1241 1242 /** 1243 * Returns the newest change number in the change number index DB. 1244 * 1245 * @return the newest change number in the change number index DB 1246 * @throws DirectoryException 1247 * When a problem happens 1248 */ 1249 public long getNewestChangeNumber() throws DirectoryException 1250 { 1251 try 1252 { 1253 final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); 1254 final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord(); 1255 if (newestRecord != null) 1256 { 1257 return newestRecord.getChangeNumber(); 1258 } 1259 // database is empty 1260 return cnIndexDB.getLastGeneratedChangeNumber(); 1261 } 1262 catch (ChangelogException e) 1263 { 1264 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e); 1265 } 1266 } 1267 1268 /** 1269 * Returns the newest cookie value. 1270 * 1271 * @param excludedBaseDNs 1272 * The set of baseDNs excluded from ECL. 1273 * @return the newest cookie value. 1274 */ 1275 public MultiDomainServerState getNewestECLCookie(Set<DN> excludedBaseDNs) 1276 { 1277 // Initialize start state for all running domains with empty state 1278 final MultiDomainServerState result = new MultiDomainServerState(); 1279 for (ReplicationServerDomain rsDomain : getReplicationServerDomains()) 1280 { 1281 if (!excludedBaseDNs.contains(rsDomain.getBaseDN())) 1282 { 1283 final ServerState latestDBServerState = rsDomain.getLatestServerState(); 1284 if (!latestDBServerState.isEmpty()) 1285 { 1286 result.replace(rsDomain.getBaseDN(), latestDBServerState); 1287 } 1288 } 1289 } 1290 return result; 1291 } 1292 1293 /** 1294 * Gets the weight affected to the replication server. 1295 * <p> 1296 * Each replication server of the topology has a weight. When combined 1297 * together, the weights of the replication servers of a same group can be 1298 * translated to a percentage that determines the quantity of directory 1299 * servers of the topology that should be connected to a replication server. 1300 * <p> 1301 * For instance imagine a topology with 3 replication servers (with the same 1302 * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that 1303 * RS1 should have 25% of the directory servers connected in the topology, RS2 1304 * 25%, and RS3 50%. This may be useful if the replication servers of the 1305 * topology have a different power and one wants to spread the load between 1306 * the replication servers according to their power. 1307 * 1308 * @return the weight 1309 */ 1310 public int getWeight() 1311 { 1312 return this.config.getWeight(); 1313 } 1314 1315 private Collection<ReplicationServerDomain> getReplicationServerDomains() 1316 { 1317 synchronized (baseDNs) 1318 { 1319 return new ArrayList<>(baseDNs.values()); 1320 } 1321 } 1322 1323 /** 1324 * Returns the changelogDB. 1325 * 1326 * @return the changelogDB. 1327 */ 1328 public ChangelogDB getChangelogDB() 1329 { 1330 return this.changelogDB; 1331 } 1332 1333 /** 1334 * Returns the synchronization object for shutdown of combined DS/RS instances. 1335 * 1336 * @return the synchronization object for shutdown of combined DS/RS instances. 1337 */ 1338 DSRSShutdownSync getDSRSShutdownSync() 1339 { 1340 return dsrsShutdownSync; 1341 } 1342 1343 /** {@inheritDoc} */ 1344 @Override 1345 public String toString() 1346 { 1347 return "RS(" + getServerId() + ") on " + serverURL + ", domains=" 1348 + baseDNs.keySet(); 1349 } 1350 1351}