001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.service; 028 029import java.io.IOException; 030import java.math.BigDecimal; 031import java.math.MathContext; 032import java.math.RoundingMode; 033import java.net.*; 034import java.util.*; 035import java.util.Map.Entry; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.Semaphore; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicReference; 041 042import org.forgerock.i18n.LocalizableMessage; 043import org.forgerock.i18n.slf4j.LocalizedLogger; 044import org.forgerock.util.Utils; 045import org.opends.server.admin.std.server.ReplicationDomainCfg; 046import org.opends.server.core.DirectoryServer; 047import org.opends.server.replication.common.*; 048import org.opends.server.replication.plugin.MultimasterReplication; 049import org.opends.server.replication.protocol.*; 050import org.opends.server.types.DN; 051import org.opends.server.types.HostPort; 052 053import static org.opends.messages.ReplicationMessages.*; 054import static org.opends.server.replication.protocol.ProtocolVersion.*; 055import static org.opends.server.replication.server.ReplicationServer.*; 056import static org.opends.server.util.StaticUtils.*; 057 058/** 059 * The broker for Multi-master Replication. 060 */ 061public class ReplicationBroker 062{ 063 064 /** 065 * Immutable class containing information about whether the broker is 066 * connected to an RS and data associated to this connected RS. 067 */ 068 // @Immutable 069 private static final class ConnectedRS 070 { 071 072 private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS( 073 NO_CONNECTED_SERVER); 074 075 /** The info of the RS we are connected to. */ 076 private final ReplicationServerInfo rsInfo; 077 /** Contains a connected session to the RS if any exist, null otherwise. */ 078 private final Session session; 079 private final String replicationServer; 080 081 private ConnectedRS(String replicationServer) 082 { 083 this.rsInfo = null; 084 this.session = null; 085 this.replicationServer = replicationServer; 086 } 087 088 private ConnectedRS(ReplicationServerInfo rsInfo, Session session) 089 { 090 this.rsInfo = rsInfo; 091 this.session = session; 092 this.replicationServer = session != null ? 093 session.getReadableRemoteAddress() 094 : NO_CONNECTED_SERVER; 095 } 096 097 private static ConnectedRS stopped() 098 { 099 return new ConnectedRS("stopped"); 100 } 101 102 private static ConnectedRS noConnectedRS() 103 { 104 return NO_CONNECTED_RS; 105 } 106 107 public int getServerId() 108 { 109 return rsInfo != null ? rsInfo.getServerId() : -1; 110 } 111 112 private byte getGroupId() 113 { 114 return rsInfo != null ? rsInfo.getGroupId() : -1; 115 } 116 117 private boolean isConnected() 118 { 119 return session != null; 120 } 121 122 /** {@inheritDoc} */ 123 @Override 124 public String toString() 125 { 126 final StringBuilder sb = new StringBuilder(); 127 toString(sb); 128 return sb.toString(); 129 } 130 131 public void toString(StringBuilder sb) 132 { 133 sb.append("connected=").append(isConnected()).append(", "); 134 if (!isConnected()) 135 { 136 sb.append("no connectedRS"); 137 } 138 else 139 { 140 sb.append("connectedRS(serverId=").append(rsInfo.getServerId()) 141 .append(", serverUrl=").append(rsInfo.getServerURL()) 142 .append(", groupId=").append(rsInfo.getGroupId()) 143 .append(")"); 144 } 145 } 146 147 } 148 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 149 private volatile boolean shutdown; 150 private final Object startStopLock = new Object(); 151 private volatile ReplicationDomainCfg config; 152 /** 153 * String reported under CSN=monitor when there is no connected RS. 154 */ 155 static final String NO_CONNECTED_SERVER = "Not connected"; 156 private final ServerState state; 157 private Semaphore sendWindow; 158 private int maxSendWindow; 159 private int rcvWindow = 100; 160 private int halfRcvWindow = rcvWindow / 2; 161 private int timeout; 162 private final ReplSessionSecurity replSessionSecurity; 163 /** 164 * The RS this DS is currently connected to. 165 * <p> 166 * Always use {@link #setConnectedRS(ConnectedRS)} to set a new 167 * connected RS. 168 */ 169 // @NotNull // for the reference 170 private final AtomicReference<ConnectedRS> connectedRS = new AtomicReference<>(ConnectedRS.noConnectedRS()); 171 /** Our replication domain. */ 172 private final ReplicationDomain domain; 173 /** 174 * This object is used as a conditional event to be notified about 175 * the reception of monitor information from the Replication Server. 176 */ 177 private final AtomicBoolean monitorResponse = new AtomicBoolean(false); 178 /** 179 * A Map containing the ServerStates of all the replicas in the topology 180 * as seen by the ReplicationServer the last time it was polled or the last 181 * time it published monitoring information. 182 */ 183 private Map<Integer, ServerState> replicaStates = new HashMap<>(); 184 /** A thread to monitor heartbeats on the session. */ 185 private HeartbeatMonitor heartbeatMonitor; 186 /** The number of times the connection was lost. */ 187 private int numLostConnections; 188 /** 189 * When the broker cannot connect to any replication server 190 * it log an error and keeps continuing every second. 191 * This boolean is set when the first failure happens and is used 192 * to avoid repeating the error message for further failure to connect 193 * and to know that it is necessary to print a new message when the broker 194 * finally succeed to connect. 195 */ 196 private volatile boolean connectionError; 197 private final Object connectPhaseLock = new Object(); 198 /** 199 * The thread that publishes messages to the RS containing the current 200 * change time of this DS. 201 */ 202 private CTHeartbeatPublisherThread ctHeartbeatPublisherThread; 203 /* 204 * Properties for the last topology info received from the network. 205 */ 206 /** Contains the last known state of the replication topology. */ 207 private final AtomicReference<Topology> topology = new AtomicReference<>(new Topology()); 208 /** <pre>@GuardedBy("this")</pre>. */ 209 private volatile int updateDoneCount; 210 private volatile boolean connectRequiresRecovery; 211 212 /** 213 * This integer defines when the best replication server checking algorithm 214 * should be engaged. 215 * Every time a monitoring message (each monitoring publisher period) is 216 * received, it is incremented. When it reaches 2, we run the checking 217 * algorithm to see if we must reconnect to another best replication server. 218 * Then we reset the value to 0. But when a topology message is received, the 219 * integer is reset to 0. This ensures that we wait at least one monitoring 220 * publisher period before running the algorithm, but also that we wait at 221 * least for a monitoring period after the last received topology message 222 * (topology stabilization). 223 */ 224 private int mustRunBestServerCheckingAlgorithm; 225 226 /** 227 * The monitor provider for this replication domain. 228 * <p> 229 * The name of the monitor includes the local address and must therefore be 230 * re-registered every time the session is re-established or destroyed. The 231 * monitor provider can only be created (i.e. non-null) if there is a 232 * replication domain, which is not the case in unit tests. 233 */ 234 private final ReplicationMonitor monitor; 235 236 /** 237 * Creates a new ReplicationServer Broker for a particular ReplicationDomain. 238 * 239 * @param replicationDomain The replication domain that is creating us. 240 * @param state The ServerState that should be used by this broker 241 * when negotiating the session with the replicationServer. 242 * @param config The configuration to use. 243 * @param replSessionSecurity The session security configuration. 244 */ 245 public ReplicationBroker(ReplicationDomain replicationDomain, 246 ServerState state, ReplicationDomainCfg config, 247 ReplSessionSecurity replSessionSecurity) 248 { 249 this.domain = replicationDomain; 250 this.state = state; 251 this.config = config; 252 this.replSessionSecurity = replSessionSecurity; 253 this.rcvWindow = getMaxRcvWindow(); 254 this.halfRcvWindow = rcvWindow / 2; 255 256 /* 257 * Only create a monitor if there is a replication domain (this is not the 258 * case in some unit tests). 259 */ 260 this.monitor = replicationDomain != null ? new ReplicationMonitor( 261 replicationDomain) : null; 262 registerReplicationMonitor(); 263 } 264 265 /** 266 * Start the ReplicationBroker. 267 */ 268 public void start() 269 { 270 synchronized (startStopLock) 271 { 272 shutdown = false; 273 this.rcvWindow = getMaxRcvWindow(); 274 connectAsDataServer(); 275 } 276 } 277 278 /** 279 * Gets the group id of the RS we are connected to. 280 * @return The group id of the RS we are connected to 281 */ 282 public byte getRsGroupId() 283 { 284 return connectedRS.get().getGroupId(); 285 } 286 287 /** 288 * Gets the server id of the RS we are connected to. 289 * @return The server id of the RS we are connected to 290 */ 291 public int getRsServerId() 292 { 293 return connectedRS.get().getServerId(); 294 } 295 296 /** 297 * Gets the server id. 298 * @return The server id 299 */ 300 public int getServerId() 301 { 302 return config.getServerId(); 303 } 304 305 private DN getBaseDN() 306 { 307 return config.getBaseDN(); 308 } 309 310 private Set<String> getReplicationServerUrls() 311 { 312 return config.getReplicationServer(); 313 } 314 315 private byte getGroupId() 316 { 317 return (byte) config.getGroupId(); 318 } 319 320 /** 321 * Gets the server id. 322 * @return The server id 323 */ 324 private long getGenerationID() 325 { 326 return domain.getGenerationID(); 327 } 328 329 /** 330 * Set the generation id - for test purpose. 331 * @param generationID The generation id 332 */ 333 public void setGenerationID(long generationID) 334 { 335 domain.setGenerationID(generationID); 336 } 337 338 /** 339 * Compares 2 replication servers addresses and returns true if they both 340 * represent the same replication server instance. 341 * @param rs1Url Replication server 1 address 342 * @param rs2Url Replication server 2 address 343 * @return True if both replication server addresses represent the same 344 * replication server instance, false otherwise. 345 */ 346 private static boolean isSameReplicationServerUrl(String rs1Url, 347 String rs2Url) 348 { 349 try 350 { 351 final HostPort hp1 = HostPort.valueOf(rs1Url); 352 final HostPort hp2 = HostPort.valueOf(rs2Url); 353 return hp1.isEquivalentTo(hp2); 354 } 355 catch (RuntimeException ex) 356 { 357 // Not a RS url or not a valid port number: should not happen 358 return false; 359 } 360 } 361 362 /** 363 * Bag class for keeping info we get from a replication server in order to 364 * compute the best one to connect to. This is in fact a wrapper to a 365 * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be 366 * updated with a info coming from received topology messages or monitoring 367 * messages. 368 */ 369 static class ReplicationServerInfo 370 { 371 private RSInfo rsInfo; 372 private final short protocolVersion; 373 private final DN baseDN; 374 private final int windowSize; 375 // @NotNull 376 private final ServerState serverState; 377 private final boolean sslEncryption; 378 private final int degradedStatusThreshold; 379 /** Keeps the 0 value if created with a ReplServerStartMsg. */ 380 private int connectedDSNumber; 381 // @NotNull 382 private Set<Integer> connectedDSs; 383 /** 384 * Is this RS locally configured? (the RS is recognized as a usable server). 385 */ 386 private boolean locallyConfigured = true; 387 388 /** 389 * Create a new instance of ReplicationServerInfo wrapping the passed 390 * message. 391 * @param msg LocalizableMessage to wrap. 392 * @param newServerURL Override serverURL. 393 * @return The new instance wrapping the passed message. 394 * @throws IllegalArgumentException If the passed message has an unexpected 395 * type. 396 */ 397 private static ReplicationServerInfo newInstance( 398 ReplicationMsg msg, String newServerURL) throws IllegalArgumentException 399 { 400 final ReplicationServerInfo rsInfo = newInstance(msg); 401 rsInfo.setServerURL(newServerURL); 402 return rsInfo; 403 } 404 405 /** 406 * Create a new instance of ReplicationServerInfo wrapping the passed 407 * message. 408 * @param msg LocalizableMessage to wrap. 409 * @return The new instance wrapping the passed message. 410 * @throws IllegalArgumentException If the passed message has an unexpected 411 * type. 412 */ 413 static ReplicationServerInfo newInstance(ReplicationMsg msg) 414 throws IllegalArgumentException 415 { 416 if (msg instanceof ReplServerStartMsg) 417 { 418 // RS uses protocol V3 or lower 419 return new ReplicationServerInfo((ReplServerStartMsg) msg); 420 } 421 else if (msg instanceof ReplServerStartDSMsg) 422 { 423 // RS uses protocol V4 or higher 424 return new ReplicationServerInfo((ReplServerStartDSMsg) msg); 425 } 426 427 // Unsupported message type: should not happen 428 throw new IllegalArgumentException("Unexpected PDU type: " 429 + msg.getClass().getName() + ":\n" + msg); 430 } 431 432 /** 433 * Constructs a ReplicationServerInfo object wrapping a 434 * {@link ReplServerStartMsg}. 435 * 436 * @param msg 437 * The {@link ReplServerStartMsg} this object will wrap. 438 */ 439 private ReplicationServerInfo(ReplServerStartMsg msg) 440 { 441 this.protocolVersion = msg.getVersion(); 442 this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(), 443 msg.getGenerationId(), msg.getGroupId(), 1); 444 this.baseDN = msg.getBaseDN(); 445 this.windowSize = msg.getWindowSize(); 446 final ServerState ss = msg.getServerState(); 447 this.serverState = ss != null ? ss : new ServerState(); 448 this.sslEncryption = msg.getSSLEncryption(); 449 this.degradedStatusThreshold = msg.getDegradedStatusThreshold(); 450 } 451 452 /** 453 * Constructs a ReplicationServerInfo object wrapping a 454 * {@link ReplServerStartDSMsg}. 455 * 456 * @param msg 457 * The {@link ReplServerStartDSMsg} this object will wrap. 458 */ 459 private ReplicationServerInfo(ReplServerStartDSMsg msg) 460 { 461 this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(), 462 msg.getGenerationId(), msg.getGroupId(), msg.getWeight()); 463 this.protocolVersion = msg.getVersion(); 464 this.baseDN = msg.getBaseDN(); 465 this.windowSize = msg.getWindowSize(); 466 final ServerState ss = msg.getServerState(); 467 this.serverState = ss != null ? ss : new ServerState(); 468 this.sslEncryption = msg.getSSLEncryption(); 469 this.degradedStatusThreshold = msg.getDegradedStatusThreshold(); 470 this.connectedDSNumber = msg.getConnectedDSNumber(); 471 } 472 473 /** 474 * Constructs a new replication server info with the passed RSInfo internal 475 * values and the passed connected DSs. 476 * 477 * @param rsInfo 478 * The RSinfo to use for the update 479 * @param connectedDSs 480 * The new connected DSs 481 */ 482 ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs) 483 { 484 this.rsInfo = 485 new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(), rsInfo 486 .getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 487 this.protocolVersion = 0; 488 this.baseDN = null; 489 this.windowSize = 0; 490 this.connectedDSs = connectedDSs; 491 this.connectedDSNumber = connectedDSs.size(); 492 this.sslEncryption = false; 493 this.degradedStatusThreshold = -1; 494 this.serverState = new ServerState(); 495 } 496 497 /** 498 * Get the server state. 499 * @return The server state 500 */ 501 public ServerState getServerState() 502 { 503 return serverState; 504 } 505 506 /** 507 * Get the group id. 508 * @return The group id 509 */ 510 public byte getGroupId() 511 { 512 return rsInfo.getGroupId(); 513 } 514 515 /** 516 * Get the server protocol version. 517 * @return the protocolVersion 518 */ 519 public short getProtocolVersion() 520 { 521 return protocolVersion; 522 } 523 524 /** 525 * Get the generation id. 526 * @return the generationId 527 */ 528 public long getGenerationId() 529 { 530 return rsInfo.getGenerationId(); 531 } 532 533 /** 534 * Get the server id. 535 * @return the serverId 536 */ 537 public int getServerId() 538 { 539 return rsInfo.getId(); 540 } 541 542 /** 543 * Get the server URL. 544 * @return the serverURL 545 */ 546 public String getServerURL() 547 { 548 return rsInfo.getServerUrl(); 549 } 550 551 /** 552 * Get the base DN. 553 * 554 * @return the base DN 555 */ 556 public DN getBaseDN() 557 { 558 return baseDN; 559 } 560 561 /** 562 * Get the window size. 563 * @return the windowSize 564 */ 565 public int getWindowSize() 566 { 567 return windowSize; 568 } 569 570 /** 571 * Get the ssl encryption. 572 * @return the sslEncryption 573 */ 574 public boolean isSslEncryption() 575 { 576 return sslEncryption; 577 } 578 579 /** 580 * Get the degraded status threshold. 581 * @return the degradedStatusThreshold 582 */ 583 public int getDegradedStatusThreshold() 584 { 585 return degradedStatusThreshold; 586 } 587 588 /** 589 * Get the weight. 590 * @return the weight. Null if this object is a wrapper for 591 * a ReplServerStartMsg. 592 */ 593 public int getWeight() 594 { 595 return rsInfo.getWeight(); 596 } 597 598 /** 599 * Get the connected DS number. 600 * @return the connectedDSNumber. Null if this object is a wrapper for 601 * a ReplServerStartMsg. 602 */ 603 public int getConnectedDSNumber() 604 { 605 return connectedDSNumber; 606 } 607 608 /** 609 * Converts the object to a RSInfo object. 610 * @return The RSInfo object matching this object. 611 */ 612 RSInfo toRSInfo() 613 { 614 return rsInfo; 615 } 616 617 /** 618 * Updates replication server info with the passed RSInfo internal values 619 * and the passed connected DSs. 620 * @param rsInfo The RSinfo to use for the update 621 * @param connectedDSs The new connected DSs 622 */ 623 private void update(RSInfo rsInfo, Set<Integer> connectedDSs) 624 { 625 this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(), 626 rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 627 this.connectedDSs = connectedDSs; 628 this.connectedDSNumber = connectedDSs.size(); 629 } 630 631 private void setServerURL(String newServerURL) 632 { 633 rsInfo = new RSInfo(rsInfo.getId(), newServerURL, 634 rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 635 } 636 637 /** 638 * Updates replication server info with the passed server state. 639 * @param serverState The ServerState to use for the update 640 */ 641 private void update(ServerState serverState) 642 { 643 this.serverState.update(serverState); 644 } 645 646 /** 647 * Get the getConnectedDSs. 648 * @return the getConnectedDSs 649 */ 650 public Set<Integer> getConnectedDSs() 651 { 652 return connectedDSs; 653 } 654 655 /** 656 * Gets the locally configured status for this RS. 657 * @return the locallyConfigured 658 */ 659 public boolean isLocallyConfigured() 660 { 661 return locallyConfigured; 662 } 663 664 /** 665 * Sets the locally configured status for this RS. 666 * @param locallyConfigured the locallyConfigured to set 667 */ 668 public void setLocallyConfigured(boolean locallyConfigured) 669 { 670 this.locallyConfigured = locallyConfigured; 671 } 672 673 /** 674 * Returns a string representation of this object. 675 * @return A string representation of this object. 676 */ 677 @Override 678 public String toString() 679 { 680 return "ReplServerInfo Url:" + getServerURL() 681 + " ServerId:" + getServerId() 682 + " GroupId:" + getGroupId() 683 + " connectedDSs:" + connectedDSs; 684 } 685 } 686 687 /** 688 * Contacts all replication servers to get information from them and being 689 * able to choose the more suitable. 690 * @return the collected information. 691 */ 692 private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo() 693 { 694 final Map<Integer, ReplicationServerInfo> rsInfos = new ConcurrentSkipListMap<>(); 695 696 for (String serverUrl : getReplicationServerUrls()) 697 { 698 // Connect to server + get and store info about it 699 final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false); 700 final ReplicationServerInfo rsInfo = rs.rsInfo; 701 if (rsInfo != null) 702 { 703 rsInfos.put(rsInfo.getServerId(), rsInfo); 704 } 705 } 706 707 return rsInfos; 708 } 709 710 /** 711 * Connect to a ReplicationServer. 712 * 713 * Handshake sequences between a DS and a RS is divided into 2 logical 714 * consecutive phases (phase 1 and phase 2). DS always initiates connection 715 * and always sends first message: 716 * 717 * DS<->RS: 718 * ------- 719 * 720 * phase 1: 721 * DS --- ServerStartMsg ---> RS 722 * DS <--- ReplServerStartDSMsg --- RS 723 * phase 2: 724 * DS --- StartSessionMsg ---> RS 725 * DS <--- TopologyMsg --- RS 726 * 727 * Before performing a full handshake sequence, DS searches for best suitable 728 * RS by making only phase 1 handshake to every RS he knows then closing 729 * connection. This allows to gather information on available RSs and then 730 * decide with which RS the full handshake (phase 1 then phase 2) will be 731 * finally performed. 732 * 733 * @throws NumberFormatException address was invalid 734 */ 735 private void connectAsDataServer() 736 { 737 /* 738 * If a first connect or a connection failure occur, we go through here. 739 * force status machine to NOT_CONNECTED_STATUS so that monitoring can see 740 * that we are not connected. 741 */ 742 domain.toNotConnectedStatus(); 743 744 /* 745 Stop any existing heartbeat monitor and changeTime publisher 746 from a previous session. 747 */ 748 stopRSHeartBeatMonitoring(); 749 stopChangeTimeHeartBeatPublishing(); 750 mustRunBestServerCheckingAlgorithm = 0; 751 752 synchronized (connectPhaseLock) 753 { 754 final int serverId = getServerId(); 755 final DN baseDN = getBaseDN(); 756 757 /* 758 * Connect to each replication server and get their ServerState then find 759 * out which one is the best to connect to. 760 */ 761 if (logger.isTraceEnabled()) 762 { 763 debugInfo("phase 1 : will perform PhaseOneH with each RS in order to elect the preferred one"); 764 } 765 766 // Get info from every available replication servers 767 Map<Integer, ReplicationServerInfo> rsInfos = 768 collectReplicationServersInfo(); 769 computeNewTopology(toRSInfos(rsInfos)); 770 771 if (rsInfos.isEmpty()) 772 { 773 setConnectedRS(ConnectedRS.noConnectedRS()); 774 } 775 else 776 { 777 // At least one server answered, find the best one. 778 RSEvaluations evals = computeBestReplicationServer(true, -1, state, 779 rsInfos, serverId, getGroupId(), getGenerationID()); 780 781 // Best found, now initialize connection to this one (handshake phase 1) 782 if (logger.isTraceEnabled()) 783 { 784 debugInfo("phase 2 : will perform PhaseOneH with the preferred RS=" + evals.getBestRS()); 785 } 786 787 final ConnectedRS electedRS = performPhaseOneHandshake( 788 evals.getBestRS().getServerURL(), true); 789 final ReplicationServerInfo electedRsInfo = electedRS.rsInfo; 790 if (electedRsInfo != null) 791 { 792 /* 793 Update replication server info with potentially more up to date 794 data (server state for instance may have changed) 795 */ 796 rsInfos.put(electedRsInfo.getServerId(), electedRsInfo); 797 798 // Handshake phase 1 exchange went well 799 800 // Compute in which status we are starting the session to tell the RS 801 final ServerStatus initStatus = computeInitialServerStatus( 802 electedRsInfo.getGenerationId(), electedRsInfo.getServerState(), 803 electedRsInfo.getDegradedStatusThreshold(), getGenerationID()); 804 805 // Perform session start (handshake phase 2) 806 final TopologyMsg topologyMsg = 807 performPhaseTwoHandshake(electedRS, initStatus); 808 809 if (topologyMsg != null) // Handshake phase 2 exchange went well 810 { 811 connectToReplicationServer(electedRS, initStatus, topologyMsg); 812 } // Could perform handshake phase 2 with best 813 } // Could perform handshake phase 1 with best 814 } 815 816 // connectedRS has been updated by calls above, reload it 817 final ConnectedRS rs = connectedRS.get(); 818 if (rs.isConnected()) 819 { 820 connectPhaseLock.notify(); 821 822 final long rsGenId = rs.rsInfo.getGenerationId(); 823 final int rsServerId = rs.rsInfo.getServerId(); 824 if (rsGenId == getGenerationID() || rsGenId == -1) 825 { 826 logger.info(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG, serverId, rsServerId, baseDN, 827 rs.replicationServer, getGenerationID()); 828 } 829 else 830 { 831 logger.warn(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG, serverId, rsServerId, baseDN, 832 rs.replicationServer, getGenerationID(), rsGenId); 833 } 834 } 835 else 836 { 837 // This server could not find any replicationServer. 838 // It's going to start in degraded mode. Log a message. 839 if (!connectionError) 840 { 841 connectionError = true; 842 connectPhaseLock.notify(); 843 844 if (!rsInfos.isEmpty()) 845 { 846 logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN, 847 Utils.joinAsString(", ", rsInfos.keySet())); 848 } 849 else 850 { 851 logger.warn(WARN_NO_AVAILABLE_CHANGELOGS, serverId, baseDN); 852 } 853 } 854 } 855 } 856 } 857 858 private void computeNewTopology(List<RSInfo> newRSInfos) 859 { 860 final int rsServerId = getRsServerId(); 861 862 Topology oldTopo; 863 Topology newTopo; 864 do 865 { 866 oldTopo = topology.get(); 867 newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(), 868 rsServerId, getReplicationServerUrls(), oldTopo.rsInfos); 869 } 870 while (!topology.compareAndSet(oldTopo, newTopo)); 871 872 if (logger.isTraceEnabled()) 873 { 874 debugInfo(topologyChange(rsServerId, oldTopo, newTopo)); 875 } 876 } 877 878 private StringBuilder topologyChange(int rsServerId, Topology oldTopo, 879 Topology newTopo) 880 { 881 final StringBuilder sb = new StringBuilder(); 882 sb.append("rsServerId=").append(rsServerId); 883 if (newTopo.equals(oldTopo)) 884 { 885 sb.append(", unchangedTopology=").append(newTopo); 886 } 887 else 888 { 889 sb.append(", oldTopology=").append(oldTopo); 890 sb.append(", newTopology=").append(newTopo); 891 } 892 return sb; 893 } 894 895 /** 896 * Connects to a replication server. 897 * 898 * @param rs 899 * the Replication Server to connect to 900 * @param initStatus 901 * The status to enter the state machine with 902 * @param topologyMsg 903 * the message containing the topology information 904 */ 905 private void connectToReplicationServer(ConnectedRS rs, 906 ServerStatus initStatus, TopologyMsg topologyMsg) 907 { 908 final DN baseDN = getBaseDN(); 909 final ReplicationServerInfo rsInfo = rs.rsInfo; 910 911 boolean connectCompleted = false; 912 try 913 { 914 maxSendWindow = rsInfo.getWindowSize(); 915 916 receiveTopo(topologyMsg, rs.getServerId()); 917 918 /* 919 Log a message to let the administrator know that the failure was resolved. 920 Wake up all the thread that were waiting on the window 921 on the previous connection. 922 */ 923 connectionError = false; 924 if (sendWindow != null) 925 { 926 /* 927 * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding 928 * this semaphore will get blocked when they acquire it. However, we 929 * also need to make sure that we don't overflow the semaphore by 930 * releasing too many permits. 931 */ 932 final int MAX_PERMITS = Integer.MAX_VALUE >>> 2; 933 if (sendWindow.availablePermits() < MAX_PERMITS) 934 { 935 /* 936 * At least 2^29 acquisitions would need to occur for this to be 937 * insufficient. In addition, at least 2^30 releases would need to 938 * occur for this to potentially overflow. Hopefully this is unlikely 939 * to happen. 940 */ 941 sendWindow.release(MAX_PERMITS); 942 } 943 } 944 sendWindow = new Semaphore(maxSendWindow); 945 rcvWindow = getMaxRcvWindow(); 946 947 domain.sessionInitiated(initStatus, rsInfo.getServerState()); 948 949 final byte groupId = getGroupId(); 950 if (rs.getGroupId() != groupId) 951 { 952 /* 953 Connected to replication server with wrong group id: 954 warn user and start heartbeat monitor to recover when a server 955 with the right group id shows up. 956 */ 957 logger.warn(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID, 958 groupId, rs.getServerId(), rsInfo.getServerURL(), rs.getGroupId(), baseDN, getServerId()); 959 } 960 startRSHeartBeatMonitoring(rs); 961 if (rsInfo.getProtocolVersion() >= 962 ProtocolVersion.REPLICATION_PROTOCOL_V3) 963 { 964 startChangeTimeHeartBeatPublishing(rs); 965 } 966 connectCompleted = true; 967 } 968 catch (Exception e) 969 { 970 logger.error(ERR_COMPUTING_FAKE_OPS, baseDN, rsInfo.getServerURL(), 971 e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 972 } 973 finally 974 { 975 if (!connectCompleted) 976 { 977 setConnectedRS(ConnectedRS.noConnectedRS()); 978 } 979 } 980 } 981 982 /** 983 * Determines the status we are starting with according to our state and the 984 * RS state. 985 * 986 * @param rsGenId The generation id of the RS 987 * @param rsState The server state of the RS 988 * @param degradedStatusThreshold The degraded status threshold of the RS 989 * @param dsGenId The local generation id 990 * @return The initial status 991 */ 992 private ServerStatus computeInitialServerStatus(long rsGenId, 993 ServerState rsState, int degradedStatusThreshold, long dsGenId) 994 { 995 if (rsGenId == -1) 996 { 997 // RS has no generation id 998 return ServerStatus.NORMAL_STATUS; 999 } 1000 else if (rsGenId != dsGenId) 1001 { 1002 // DS and RS do not have same generation id 1003 return ServerStatus.BAD_GEN_ID_STATUS; 1004 } 1005 else 1006 { 1007 /* 1008 DS and RS have same generation id 1009 1010 Determine if we are late or not to replay changes. RS uses a 1011 threshold value for pending changes to be replayed by a DS to 1012 determine if the DS is in normal status or in degraded status. 1013 Let's compare the local and remote server state using this threshold 1014 value to determine if we are late or not 1015 */ 1016 1017 int nChanges = ServerState.diffChanges(rsState, state); 1018 if (logger.isTraceEnabled()) 1019 { 1020 debugInfo("computed " + nChanges + " changes late."); 1021 } 1022 1023 /* 1024 Check status to know if it is relevant to change the status. Do not 1025 take RSD lock to test. If we attempt to change the status whereas 1026 we are in a status that do not allows that, this will be noticed by 1027 the changeStatusFromStatusAnalyzer method. This allows to take the 1028 lock roughly only when needed versus every sleep time timeout. 1029 */ 1030 if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold) 1031 { 1032 return ServerStatus.DEGRADED_STATUS; 1033 } 1034 // degradedStatusThreshold value of '0' means no degrading system used 1035 // (no threshold): force normal status 1036 return ServerStatus.NORMAL_STATUS; 1037 } 1038 } 1039 1040 1041 1042 /** 1043 * Connect to the provided server performing the first phase handshake (start 1044 * messages exchange) and return the reply message from the replication 1045 * server, wrapped in a ReplicationServerInfo object. 1046 * 1047 * @param serverURL 1048 * Server to connect to. 1049 * @param keepSession 1050 * Do we keep session opened or not after handshake. Use true if want 1051 * to perform handshake phase 2 with the same session and keep the 1052 * session to create as the current one. 1053 * @return The answer from the server . Null if could not get an answer. 1054 */ 1055 private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession) 1056 { 1057 Session newSession = null; 1058 Socket socket = null; 1059 boolean hasConnected = false; 1060 LocalizableMessage errorMessage = null; 1061 1062 try 1063 { 1064 // Open a socket connection to the next candidate. 1065 socket = new Socket(); 1066 socket.setReceiveBufferSize(1000000); 1067 socket.setTcpNoDelay(true); 1068 if (config.getSourceAddress() != null) 1069 { 1070 InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0); 1071 socket.bind(local); 1072 } 1073 int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); 1074 socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS); 1075 newSession = replSessionSecurity.createClientSession(socket, timeoutMS); 1076 boolean isSslEncryption = replSessionSecurity.isSslEncryption(); 1077 1078 // Send our ServerStartMsg. 1079 final HostPort hp = new HostPort( 1080 socket.getLocalAddress().getHostName(), socket.getLocalPort()); 1081 final String url = hp.toString(); 1082 final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), 1083 getMaxRcvWindow(), config.getHeartbeatInterval(), state, 1084 getGenerationID(), isSslEncryption, getGroupId()); 1085 newSession.publish(serverStartMsg); 1086 1087 // Read the ReplServerStartMsg or ReplServerStartDSMsg that should 1088 // come back. 1089 ReplicationMsg msg = newSession.receive(); 1090 if (logger.isTraceEnabled()) 1091 { 1092 debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n" 1093 + msg); 1094 } 1095 1096 // Wrap received message in a server info object 1097 final ReplicationServerInfo replServerInfo = 1098 ReplicationServerInfo.newInstance(msg, serverURL); 1099 1100 // Sanity check 1101 final DN repDN = replServerInfo.getBaseDN(); 1102 if (!getBaseDN().equals(repDN)) 1103 { 1104 errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDN, getBaseDN()); 1105 return setConnectedRS(ConnectedRS.noConnectedRS()); 1106 } 1107 1108 /* 1109 * We have sent our own protocol version to the replication server. The 1110 * replication server will use the same one (or an older one if it is an 1111 * old replication server). 1112 */ 1113 newSession.setProtocolVersion( 1114 getCompatibleVersion(replServerInfo.getProtocolVersion())); 1115 1116 if (!isSslEncryption) 1117 { 1118 newSession.stopEncryption(); 1119 } 1120 1121 hasConnected = true; 1122 1123 if (keepSession) 1124 { 1125 // cannot store it yet, 1126 // only store after a successful phase two handshake 1127 return new ConnectedRS(replServerInfo, newSession); 1128 } 1129 return new ConnectedRS(replServerInfo, null); 1130 } 1131 catch (ConnectException e) 1132 { 1133 logger.traceException(e); 1134 errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), serverURL, getBaseDN()); 1135 } 1136 catch (SocketTimeoutException e) 1137 { 1138 logger.traceException(e); 1139 errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), serverURL, getBaseDN()); 1140 } 1141 catch (Exception e) 1142 { 1143 logger.traceException(e); 1144 errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( 1145 getServerId(), serverURL, getBaseDN(), stackTraceToSingleLineString(e)); 1146 } 1147 finally 1148 { 1149 if (!hasConnected || !keepSession) 1150 { 1151 close(newSession); 1152 close(socket); 1153 } 1154 1155 if (!hasConnected && errorMessage != null && !connectionError) 1156 { 1157 // There was no server waiting on this host:port 1158 // Log a notice and will try the next replicationServer in the list 1159 if (keepSession) // Log error message only for final connection 1160 { 1161 // log the error message only once to avoid overflowing the error log 1162 logger.error(errorMessage); 1163 } 1164 1165 logger.trace(errorMessage); 1166 } 1167 } 1168 return setConnectedRS(ConnectedRS.noConnectedRS()); 1169 } 1170 1171 /** 1172 * Performs the second phase handshake (send StartSessionMsg and receive 1173 * TopologyMsg messages exchange) and return the reply message from the 1174 * replication server. 1175 * 1176 * @param electedRS Server we are connecting with. 1177 * @param initStatus The status we are starting with 1178 * @return The ReplServerStartMsg the server replied. Null if could not 1179 * get an answer. 1180 */ 1181 private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS, 1182 ServerStatus initStatus) 1183 { 1184 try 1185 { 1186 // Send our StartSessionMsg. 1187 final StartSessionMsg startSessionMsg; 1188 startSessionMsg = new StartSessionMsg( 1189 initStatus, 1190 domain.getRefUrls(), 1191 domain.isAssured(), 1192 domain.getAssuredMode(), 1193 domain.getAssuredSdLevel()); 1194 startSessionMsg.setEclIncludes( 1195 domain.getEclIncludes(domain.getServerId()), 1196 domain.getEclIncludesForDeletes(domain.getServerId())); 1197 final Session session = electedRS.session; 1198 session.publish(startSessionMsg); 1199 1200 // Read the TopologyMsg that should come back. 1201 final TopologyMsg topologyMsg = (TopologyMsg) session.receive(); 1202 1203 if (logger.isTraceEnabled()) 1204 { 1205 debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg 1206 + "\nAND RECEIVED:\n" + topologyMsg); 1207 } 1208 1209 // Alright set the timeout to the desired value 1210 session.setSoTimeout(timeout); 1211 setConnectedRS(electedRS); 1212 return topologyMsg; 1213 } 1214 catch (Exception e) 1215 { 1216 logger.error(WARN_EXCEPTION_STARTING_SESSION_PHASE, 1217 getServerId(), electedRS.rsInfo.getServerURL(), getBaseDN(), stackTraceToSingleLineString(e)); 1218 1219 setConnectedRS(ConnectedRS.noConnectedRS()); 1220 return null; 1221 } 1222 } 1223 1224 /** 1225 * Class holding evaluation results for electing the best replication server 1226 * for the local directory server. 1227 */ 1228 static class RSEvaluations 1229 { 1230 private final int localServerId; 1231 private Map<Integer, ReplicationServerInfo> bestRSs; 1232 private final Map<Integer, LocalizableMessage> rsEvals = new HashMap<>(); 1233 1234 /** 1235 * Ctor. 1236 * 1237 * @param localServerId 1238 * the serverId for the local directory server 1239 * @param rsInfos 1240 * a Map of serverId => {@link ReplicationServerInfo} with all the 1241 * candidate replication servers 1242 */ 1243 RSEvaluations(int localServerId, 1244 Map<Integer, ReplicationServerInfo> rsInfos) 1245 { 1246 this.localServerId = localServerId; 1247 this.bestRSs = rsInfos; 1248 } 1249 1250 private boolean keepBest(LocalEvaluation eval) 1251 { 1252 if (eval.hasAcceptedAny()) 1253 { 1254 bestRSs = eval.getAccepted(); 1255 rsEvals.putAll(eval.getRejected()); 1256 return true; 1257 } 1258 return false; 1259 } 1260 1261 /** 1262 * Sets the elected best replication server, rejecting all the other 1263 * replication servers with the supplied evaluation. 1264 * 1265 * @param bestRsId 1266 * the serverId of the elected replication server 1267 * @param rejectedRSsEval 1268 * the evaluation for all the rejected replication servers 1269 */ 1270 private void setBestRS(int bestRsId, LocalizableMessage rejectedRSsEval) 1271 { 1272 for (Iterator<Entry<Integer, ReplicationServerInfo>> it = 1273 this.bestRSs.entrySet().iterator(); it.hasNext();) 1274 { 1275 final Entry<Integer, ReplicationServerInfo> entry = it.next(); 1276 final Integer rsId = entry.getKey(); 1277 final ReplicationServerInfo rsInfo = entry.getValue(); 1278 if (rsInfo.getServerId() != bestRsId) 1279 { 1280 it.remove(); 1281 } 1282 rsEvals.put(rsId, rejectedRSsEval); 1283 } 1284 } 1285 1286 private void discardAll(LocalizableMessage eval) 1287 { 1288 for (Integer rsId : bestRSs.keySet()) 1289 { 1290 rsEvals.put(rsId, eval); 1291 } 1292 } 1293 1294 private boolean foundBestRS() 1295 { 1296 return bestRSs.size() == 1; 1297 } 1298 1299 /** 1300 * Returns the {@link ReplicationServerInfo} for the best replication 1301 * server. 1302 * 1303 * @return the {@link ReplicationServerInfo} for the best replication server 1304 */ 1305 ReplicationServerInfo getBestRS() 1306 { 1307 if (foundBestRS()) 1308 { 1309 return bestRSs.values().iterator().next(); 1310 } 1311 return null; 1312 } 1313 1314 /** 1315 * Returns the evaluations for all the candidate replication servers. 1316 * 1317 * @return a Map of serverId => LocalizableMessage containing the evaluation for each 1318 * candidate replication servers. 1319 */ 1320 Map<Integer, LocalizableMessage> getEvaluations() 1321 { 1322 if (foundBestRS()) 1323 { 1324 final Integer bestRSServerId = getBestRS().getServerId(); 1325 if (rsEvals.get(bestRSServerId) == null) 1326 { 1327 final LocalizableMessage eval = NOTE_BEST_RS.get(bestRSServerId, localServerId); 1328 rsEvals.put(bestRSServerId, eval); 1329 } 1330 } 1331 return Collections.unmodifiableMap(rsEvals); 1332 } 1333 1334 /** 1335 * Returns the evaluation for the supplied replication server Id. 1336 * <p> 1337 * Note: "unknown RS" message is returned if the supplied replication server 1338 * was not part of the candidate replication servers. 1339 * 1340 * @param rsServerId 1341 * the supplied replication server Id 1342 * @return the evaluation {@link LocalizableMessage} for the supplied replication 1343 * server Id 1344 */ 1345 private LocalizableMessage getEvaluation(int rsServerId) 1346 { 1347 final LocalizableMessage evaluation = getEvaluations().get(rsServerId); 1348 if (evaluation != null) 1349 { 1350 return evaluation; 1351 } 1352 return NOTE_UNKNOWN_RS.get(rsServerId, localServerId); 1353 } 1354 1355 /** {@inheritDoc} */ 1356 @Override 1357 public String toString() 1358 { 1359 return "Current best replication server Ids: " + bestRSs.keySet() 1360 + ", Evaluation of connected replication servers" 1361 + " (ServerId => Evaluation): " + rsEvals.keySet() 1362 + ", Any replication server not appearing here" 1363 + " could not be contacted."; 1364 } 1365 } 1366 1367 /** 1368 * Evaluation local to one filter. 1369 */ 1370 private static class LocalEvaluation 1371 { 1372 private final Map<Integer, ReplicationServerInfo> accepted = new HashMap<>(); 1373 private final Map<ReplicationServerInfo, LocalizableMessage> rsEvals = new HashMap<>(); 1374 1375 private void accept(Integer rsId, ReplicationServerInfo rsInfo) 1376 { 1377 // forget previous eval, including undoing reject 1378 this.rsEvals.remove(rsInfo); 1379 this.accepted.put(rsId, rsInfo); 1380 } 1381 1382 private void reject(ReplicationServerInfo rsInfo, LocalizableMessage reason) 1383 { 1384 this.accepted.remove(rsInfo.getServerId()); // undo accept 1385 this.rsEvals.put(rsInfo, reason); 1386 } 1387 1388 private Map<Integer, ReplicationServerInfo> getAccepted() 1389 { 1390 return accepted; 1391 } 1392 1393 private ReplicationServerInfo[] getAcceptedRSInfos() 1394 { 1395 return accepted.values().toArray( 1396 new ReplicationServerInfo[accepted.size()]); 1397 } 1398 1399 public Map<Integer, LocalizableMessage> getRejected() 1400 { 1401 final Map<Integer, LocalizableMessage> result = new HashMap<>(); 1402 for (Entry<ReplicationServerInfo, LocalizableMessage> entry : rsEvals.entrySet()) 1403 { 1404 result.put(entry.getKey().getServerId(), entry.getValue()); 1405 } 1406 return result; 1407 } 1408 1409 private boolean hasAcceptedAny() 1410 { 1411 return !accepted.isEmpty(); 1412 } 1413 1414 } 1415 1416 /** 1417 * Returns the replication server that best fits our need so that we can 1418 * connect to it or determine if we must disconnect from current one to 1419 * re-connect to best server. 1420 * <p> 1421 * Note: this method is static for test purpose (access from unit tests) 1422 * 1423 * @param firstConnection True if we run this method for the very first 1424 * connection of the broker. False if we run this method to determine if the 1425 * replication server we are currently connected to is still the best or not. 1426 * @param rsServerId The id of the replication server we are currently 1427 * connected to. Only used when firstConnection is false. 1428 * @param myState The local server state. 1429 * @param rsInfos The list of available replication servers and their 1430 * associated information (choice will be made among them). 1431 * @param localServerId The server id for the suffix we are working for. 1432 * @param groupId The groupId we prefer being connected to if possible 1433 * @param generationId The generation id we are using 1434 * @return The computed best replication server. If the returned value is 1435 * null, the best replication server is undetermined but the local server must 1436 * disconnect (so the best replication server is another one than the current 1437 * one). Null can only be returned when firstConnection is false. 1438 */ 1439 static RSEvaluations computeBestReplicationServer( 1440 boolean firstConnection, int rsServerId, ServerState myState, 1441 Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, 1442 byte groupId, long generationId) 1443 { 1444 final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos); 1445 // Shortcut, if only one server, this is the best 1446 if (evals.foundBestRS()) 1447 { 1448 return evals; 1449 } 1450 1451 /** 1452 * Apply some filtering criteria to determine the best servers list from 1453 * the available ones. The ordered list of criteria is (from more important 1454 * to less important): 1455 * - replication server has the same group id as the local DS one 1456 * - replication server has the same generation id as the local DS one 1457 * - replication server is up to date regarding changes generated by the 1458 * local DS 1459 * - replication server in the same VM as local DS one 1460 */ 1461 /* 1462 The list of best replication servers is filtered with each criteria. At 1463 each criteria, the list is replaced with the filtered one if there 1464 are some servers from the filtering, otherwise, the list is left as is 1465 and the new filtering for the next criteria is applied and so on. 1466 1467 Use only servers locally configured: those are servers declared in 1468 the local configuration. When the current method is called, for 1469 sure, at least one server from the list is locally configured 1470 */ 1471 filterServersLocallyConfigured(evals, localServerId); 1472 // Some servers with same group id ? 1473 filterServersWithSameGroupId(evals, localServerId, groupId); 1474 // Some servers with same generation id ? 1475 final boolean rssWithSameGenerationIdExist = 1476 filterServersWithSameGenerationId(evals, localServerId, generationId); 1477 if (rssWithSameGenerationIdExist) 1478 { 1479 // If some servers with the right generation id this is useful to 1480 // run the local DS change criteria 1481 filterServersWithAllLocalDSChanges(evals, myState, localServerId); 1482 } 1483 // Some servers in the local VM or local host? 1484 filterServersOnSameHost(evals, localServerId); 1485 1486 if (evals.foundBestRS()) 1487 { 1488 return evals; 1489 } 1490 1491 /** 1492 * Now apply the choice based on the weight to the best servers list 1493 */ 1494 if (firstConnection) 1495 { 1496 // We are not connected to a server yet 1497 computeBestServerForWeight(evals, -1, -1); 1498 } 1499 else 1500 { 1501 /* 1502 * We are already connected to a RS: compute the best RS as far as the 1503 * weights is concerned. If this is another one, some DS must disconnect. 1504 */ 1505 computeBestServerForWeight(evals, rsServerId, localServerId); 1506 } 1507 return evals; 1508 } 1509 1510 /** 1511 * Creates a new list that contains only replication servers that are locally 1512 * configured. 1513 * @param evals The evaluation object 1514 */ 1515 private static void filterServersLocallyConfigured(RSEvaluations evals, 1516 int localServerId) 1517 { 1518 final LocalEvaluation eval = new LocalEvaluation(); 1519 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1520 { 1521 final Integer rsId = entry.getKey(); 1522 final ReplicationServerInfo rsInfo = entry.getValue(); 1523 if (rsInfo.isLocallyConfigured()) 1524 { 1525 eval.accept(rsId, rsInfo); 1526 } 1527 else 1528 { 1529 eval.reject(rsInfo, 1530 NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId)); 1531 } 1532 } 1533 evals.keepBest(eval); 1534 } 1535 1536 /** 1537 * Creates a new list that contains only replication servers that have the 1538 * passed group id, from a passed replication server list. 1539 * @param evals The evaluation object 1540 * @param groupId The group id that must match 1541 */ 1542 private static void filterServersWithSameGroupId(RSEvaluations evals, 1543 int localServerId, byte groupId) 1544 { 1545 final LocalEvaluation eval = new LocalEvaluation(); 1546 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1547 { 1548 final Integer rsId = entry.getKey(); 1549 final ReplicationServerInfo rsInfo = entry.getValue(); 1550 if (rsInfo.getGroupId() == groupId) 1551 { 1552 eval.accept(rsId, rsInfo); 1553 } 1554 else 1555 { 1556 eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get( 1557 rsId, rsInfo.getGroupId(), localServerId, groupId)); 1558 } 1559 } 1560 evals.keepBest(eval); 1561 } 1562 1563 /** 1564 * Creates a new list that contains only replication servers that have the 1565 * provided generation id, from a provided replication server list. 1566 * When the selected replication servers have no change (empty serverState) 1567 * then the 'empty'(generationId==-1) replication servers are also included 1568 * in the result list. 1569 * 1570 * @param evals The evaluation object 1571 * @param generationId The generation id that must match 1572 * @return whether some replication server passed the filter 1573 */ 1574 private static boolean filterServersWithSameGenerationId( 1575 RSEvaluations evals, long localServerId, long generationId) 1576 { 1577 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1578 final LocalEvaluation eval = new LocalEvaluation(); 1579 boolean emptyState = true; 1580 1581 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1582 { 1583 final Integer rsId = entry.getKey(); 1584 final ReplicationServerInfo rsInfo = entry.getValue(); 1585 if (rsInfo.getGenerationId() == generationId) 1586 { 1587 eval.accept(rsId, rsInfo); 1588 if (!rsInfo.serverState.isEmpty()) 1589 { 1590 emptyState = false; 1591 } 1592 } 1593 else if (rsInfo.getGenerationId() == -1) 1594 { 1595 eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId, 1596 generationId, localServerId)); 1597 } 1598 else 1599 { 1600 eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get( 1601 rsId, rsInfo.getGenerationId(), localServerId, generationId)); 1602 } 1603 } 1604 1605 if (emptyState) 1606 { 1607 // If the RS with a generationId have all an empty state, 1608 // then the 'empty'(genId=-1) RSes are also candidate 1609 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1610 { 1611 ReplicationServerInfo rsInfo = entry.getValue(); 1612 if (rsInfo.getGenerationId() == -1) 1613 { 1614 // will undo the reject of previously rejected RSs 1615 eval.accept(entry.getKey(), rsInfo); 1616 } 1617 } 1618 } 1619 1620 return evals.keepBest(eval); 1621 } 1622 1623 /** 1624 * Creates a new list that contains only replication servers that have the 1625 * latest changes from the passed DS, from a passed replication server list. 1626 * @param evals The evaluation object 1627 * @param localState The state of the local DS 1628 * @param localServerId The server id to consider for the changes 1629 */ 1630 private static void filterServersWithAllLocalDSChanges( 1631 RSEvaluations evals, ServerState localState, int localServerId) 1632 { 1633 // Extract the CSN of the latest change generated by the local server 1634 final CSN localCSN = getCSN(localState, localServerId); 1635 1636 /** 1637 * Find replication servers that are up to date (or more up to date than us, 1638 * if for instance we failed and restarted, having sent some changes to the 1639 * RS but without having time to store our own state) regarding our own 1640 * server id. If some servers are more up to date, prefer this list but take 1641 * only the latest CSN. 1642 */ 1643 final LocalEvaluation mostUpToDateEval = new LocalEvaluation(); 1644 boolean foundRSMoreUpToDateThanLocalDS = false; 1645 CSN latestRsCSN = null; 1646 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1647 { 1648 final Integer rsId = entry.getKey(); 1649 final ReplicationServerInfo rsInfo = entry.getValue(); 1650 final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId); 1651 1652 // Has this replication server the latest local change ? 1653 if (rsCSN.isOlderThan(localCSN)) 1654 { 1655 mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get( 1656 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1657 } 1658 else if (rsCSN.equals(localCSN)) 1659 { 1660 // This replication server has exactly the latest change from the 1661 // local server 1662 if (!foundRSMoreUpToDateThanLocalDS) 1663 { 1664 mostUpToDateEval.accept(rsId, rsInfo); 1665 } 1666 else 1667 { 1668 mostUpToDateEval.reject(rsInfo, 1669 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1670 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1671 } 1672 } 1673 else if (rsCSN.isNewerThan(localCSN)) 1674 { 1675 // This replication server is even more up to date than the local server 1676 if (latestRsCSN == null) 1677 { 1678 foundRSMoreUpToDateThanLocalDS = true; 1679 // all previous results are now outdated, reject them all 1680 rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId, 1681 localCSN); 1682 // Initialize the latest CSN 1683 latestRsCSN = rsCSN; 1684 } 1685 1686 if (rsCSN.equals(latestRsCSN)) 1687 { 1688 mostUpToDateEval.accept(rsId, rsInfo); 1689 } 1690 else if (rsCSN.isNewerThan(latestRsCSN)) 1691 { 1692 // This RS is even more up to date, reject all previously accepted RSs 1693 // and store this new RS 1694 rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId, 1695 localCSN); 1696 mostUpToDateEval.accept(rsId, rsInfo); 1697 latestRsCSN = rsCSN; 1698 } 1699 else 1700 { 1701 mostUpToDateEval.reject(rsInfo, 1702 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1703 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1704 } 1705 } 1706 } 1707 evals.keepBest(mostUpToDateEval); 1708 } 1709 1710 private static CSN getCSN(ServerState state, int serverId) 1711 { 1712 final CSN csn = state.getCSN(serverId); 1713 if (csn != null) 1714 { 1715 return csn; 1716 } 1717 return new CSN(0, 0, serverId); 1718 } 1719 1720 private static void rejectAllWithRSIsLaterThanBestRS( 1721 final LocalEvaluation eval, int localServerId, CSN localCSN) 1722 { 1723 for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos()) 1724 { 1725 final String rsCSN = 1726 getCSN(rsInfo.getServerState(), localServerId).toStringUI(); 1727 final LocalizableMessage reason = 1728 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1729 rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI()); 1730 eval.reject(rsInfo, reason); 1731 } 1732 } 1733 1734 /** 1735 * Creates a new list that contains only replication servers that are on the 1736 * same host as the local DS, from a passed replication server list. This 1737 * method will gives priority to any replication server which is in the same 1738 * VM as this DS. 1739 * 1740 * @param evals The evaluation object 1741 */ 1742 private static void filterServersOnSameHost(RSEvaluations evals, 1743 int localServerId) 1744 { 1745 /* 1746 * Initially look for all servers on the same host. If we find one in the 1747 * same VM, then narrow the search. 1748 */ 1749 boolean foundRSInSameVM = false; 1750 final LocalEvaluation eval = new LocalEvaluation(); 1751 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1752 { 1753 final Integer rsId = entry.getKey(); 1754 final ReplicationServerInfo rsInfo = entry.getValue(); 1755 final HostPort hp = HostPort.valueOf(rsInfo.getServerURL()); 1756 if (hp.isLocalAddress()) 1757 { 1758 if (isLocalReplicationServerPort(hp.getPort())) 1759 { 1760 if (!foundRSInSameVM) 1761 { 1762 // An RS in the same VM will always have priority. 1763 // Narrow the search to only include servers in this VM. 1764 rejectAllWithRSOnDifferentVMThanDS(eval, localServerId); 1765 foundRSInSameVM = true; 1766 } 1767 eval.accept(rsId, rsInfo); 1768 } 1769 else if (!foundRSInSameVM) 1770 { 1771 // OK, accept RSs on the same machine because we have not found an RS 1772 // in the same VM yet 1773 eval.accept(rsId, rsInfo); 1774 } 1775 else 1776 { 1777 // Skip: we have found some RSs in the same VM, but this RS is not. 1778 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId, 1779 localServerId)); 1780 } 1781 } 1782 else 1783 { 1784 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId, 1785 localServerId)); 1786 } 1787 } 1788 evals.keepBest(eval); 1789 } 1790 1791 private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval, 1792 int localServerId) 1793 { 1794 for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos()) 1795 { 1796 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get( 1797 rsInfo.getServerId(), localServerId)); 1798 } 1799 } 1800 1801 /** 1802 * Computes the best replication server the local server should be connected 1803 * to so that the load is correctly spread across the topology, following the 1804 * weights guidance. 1805 * Warning: This method is expected to be called with at least 2 servers in 1806 * bestServers 1807 * Note: this method is static for test purpose (access from unit tests) 1808 * @param evals The evaluation object 1809 * @param currentRsServerId The replication server the local server is 1810 * currently connected to. -1 if the local server is not yet connected 1811 * to any replication server. 1812 * @param localServerId The server id of the local server. This is not used 1813 * when it is not connected to a replication server 1814 * (currentRsServerId = -1) 1815 */ 1816 static void computeBestServerForWeight(RSEvaluations evals, 1817 int currentRsServerId, int localServerId) 1818 { 1819 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1820 /* 1821 * - Compute the load goal of each RS, deducing it from the weights affected 1822 * to them. 1823 * - Compute the current load of each RS, deducing it from the DSs 1824 * currently connected to them. 1825 * - Compute the differences between the load goals and the current loads of 1826 * the RSs. 1827 */ 1828 // Sum of the weights 1829 int sumOfWeights = 0; 1830 // Sum of the connected DSs 1831 int sumOfConnectedDSs = 0; 1832 for (ReplicationServerInfo rsInfo : bestServers.values()) 1833 { 1834 sumOfWeights += rsInfo.getWeight(); 1835 sumOfConnectedDSs += rsInfo.getConnectedDSNumber(); 1836 } 1837 1838 // Distance (difference) of the current loads to the load goals of each RS: 1839 // key:server id, value: distance 1840 Map<Integer, BigDecimal> loadDistances = new HashMap<>(); 1841 // Precision for the operations (number of digits after the dot) 1842 final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); 1843 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1844 { 1845 final Integer rsId = entry.getKey(); 1846 final ReplicationServerInfo rsInfo = entry.getValue(); 1847 1848 // load goal = rs weight / sum of weights 1849 BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide( 1850 BigDecimal.valueOf(sumOfWeights), mathContext); 1851 BigDecimal currentLoadBd = BigDecimal.ZERO; 1852 if (sumOfConnectedDSs != 0) 1853 { 1854 // current load = number of connected DSs / total number of DSs 1855 int connectedDSs = rsInfo.getConnectedDSNumber(); 1856 currentLoadBd = BigDecimal.valueOf(connectedDSs).divide( 1857 BigDecimal.valueOf(sumOfConnectedDSs), mathContext); 1858 } 1859 // load distance = load goal - current load 1860 BigDecimal loadDistanceBd = 1861 loadGoalBd.subtract(currentLoadBd, mathContext); 1862 loadDistances.put(rsId, loadDistanceBd); 1863 } 1864 1865 if (currentRsServerId == -1) 1866 { 1867 // The local server is not connected yet, find best server to connect to, 1868 // taking the weights into account. 1869 computeBestServerWhenNotConnected(evals, loadDistances, localServerId); 1870 } 1871 else 1872 { 1873 // The local server is currently connected to a RS, let's see if it must 1874 // disconnect or not, taking the weights into account. 1875 computeBestServerWhenConnected(evals, loadDistances, localServerId, 1876 currentRsServerId, sumOfWeights, sumOfConnectedDSs); 1877 } 1878 } 1879 1880 private static void computeBestServerWhenNotConnected(RSEvaluations evals, 1881 Map<Integer, BigDecimal> loadDistances, int localServerId) 1882 { 1883 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1884 /* 1885 * Find the server with the current highest distance to its load goal and 1886 * choose it. Make an exception if every server is correctly balanced, 1887 * that is every current load distances are equal to 0, in that case, 1888 * choose the server with the highest weight 1889 */ 1890 int bestRsId = 0; // If all server equal, return the first one 1891 float highestDistance = Float.NEGATIVE_INFINITY; 1892 boolean allRsWithZeroDistance = true; 1893 int highestWeightRsId = -1; 1894 int highestWeight = -1; 1895 for (Integer rsId : bestServers.keySet()) 1896 { 1897 float loadDistance = loadDistances.get(rsId).floatValue(); 1898 if (loadDistance > highestDistance) 1899 { 1900 // This server is far more from its balance point 1901 bestRsId = rsId; 1902 highestDistance = loadDistance; 1903 } 1904 if (loadDistance != 0) 1905 { 1906 allRsWithZeroDistance = false; 1907 } 1908 int weight = bestServers.get(rsId).getWeight(); 1909 if (weight > highestWeight) 1910 { 1911 // This server has a higher weight 1912 highestWeightRsId = rsId; 1913 highestWeight = weight; 1914 } 1915 } 1916 // All servers with a 0 distance ? 1917 if (allRsWithZeroDistance) 1918 { 1919 // Choose server with the highest weight 1920 bestRsId = highestWeightRsId; 1921 } 1922 evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId, 1923 bestRsId)); 1924 } 1925 1926 private static void computeBestServerWhenConnected(RSEvaluations evals, 1927 Map<Integer, BigDecimal> loadDistances, int localServerId, 1928 int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs) 1929 { 1930 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1931 final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); 1932 float currentLoadDistance = 1933 loadDistances.get(currentRsServerId).floatValue(); 1934 if (currentLoadDistance < 0) 1935 { 1936 /* 1937 Too much DSs connected to the current RS, compared with its load 1938 goal: 1939 Determine the potential number of DSs to disconnect from the current 1940 RS and see if the local DS is part of them: the DSs that must 1941 disconnect are those with the lowest server id. 1942 Compute the sum of the distances of the load goals of the other RSs 1943 */ 1944 BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO; 1945 for (Integer rsId : bestServers.keySet()) 1946 { 1947 if (rsId != currentRsServerId) 1948 { 1949 sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add( 1950 loadDistances.get(rsId), mathContext); 1951 } 1952 } 1953 1954 if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0) 1955 { 1956 /* 1957 The average distance of the other RSs shows a lack of DSs. 1958 Compute the number of DSs to disconnect from the current RS, 1959 rounding to the nearest integer number. Do only this if there is 1960 no risk of yoyo effect: when the exact balance cannot be 1961 established due to the current number of DSs connected, do not 1962 disconnect a DS. A simple example where the balance cannot be 1963 reached is: 1964 - RS1 has weight 1 and 2 DSs 1965 - RS2 has weight 1 and 1 DS 1966 => disconnecting a DS from RS1 to reconnect it to RS2 would have no 1967 sense as this would lead to the reverse situation. In that case, 1968 the perfect balance cannot be reached and we must stick to the 1969 current situation, otherwise the DS would keep move between the 2 1970 RSs 1971 */ 1972 float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. 1973 multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext) 1974 .floatValue(); 1975 int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber); 1976 1977 // Avoid yoyo effect 1978 if (overloadingDSsNumber == 1) 1979 { 1980 // What would be the new load distance for the current RS if 1981 // we disconnect some DSs ? 1982 ReplicationServerInfo currentReplicationServerInfo = 1983 bestServers.get(currentRsServerId); 1984 1985 int currentRsWeight = currentReplicationServerInfo.getWeight(); 1986 BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight); 1987 BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights); 1988 BigDecimal currentRsLoadGoalBd = 1989 currentRsWeightBd.divide(sumOfWeightsBd, mathContext); 1990 BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO; 1991 if (sumOfConnectedDSs != 0) 1992 { 1993 int connectedDSs = currentReplicationServerInfo. 1994 getConnectedDSNumber(); 1995 BigDecimal potentialNewConnectedDSsBd = 1996 BigDecimal.valueOf(connectedDSs - 1); 1997 BigDecimal sumOfConnectedDSsBd = 1998 BigDecimal.valueOf(sumOfConnectedDSs); 1999 potentialCurrentRsNewLoadBd = 2000 potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd, 2001 mathContext); 2002 } 2003 BigDecimal potentialCurrentRsNewLoadDistanceBd = 2004 currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd, 2005 mathContext); 2006 2007 // What would be the new load distance for the other RSs ? 2008 BigDecimal additionalDsLoadBd = 2009 BigDecimal.ONE.divide( 2010 BigDecimal.valueOf(sumOfConnectedDSs), mathContext); 2011 BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = 2012 sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, 2013 mathContext); 2014 2015 /* 2016 Now compare both values: we must not disconnect the DS if this 2017 is for going in a situation where the load distance of the other 2018 RSs is the opposite of the future load distance of the local RS 2019 or we would evaluate that we should disconnect just after being 2020 arrived on the new RS. But we should disconnect if we reach the 2021 perfect balance (both values are 0). 2022 */ 2023 if (mustAvoidYoyoEffect(potentialCurrentRsNewLoadDistanceBd, 2024 potentialNewSumOfLoadDistancesOfOtherRSsBd)) 2025 { 2026 // Avoid the yoyo effect, and keep the local DS connected to its 2027 // current RS 2028 evals.setBestRS(currentRsServerId, 2029 NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId)); 2030 return; 2031 } 2032 } 2033 2034 ReplicationServerInfo currentRsInfo = 2035 bestServers.get(currentRsServerId); 2036 if (isServerOverloadingRS(localServerId, currentRsInfo, 2037 overloadingDSsNumber)) 2038 { 2039 // The local server is part of the DSs to disconnect 2040 evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get( 2041 localServerId, currentRsServerId)); 2042 } 2043 else 2044 { 2045 // The local server is not part of the servers to disconnect from the 2046 // current RS. 2047 evals.setBestRS(currentRsServerId, 2048 NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId, 2049 currentRsServerId)); 2050 } 2051 } else { 2052 // The average distance of the other RSs does not show a lack of DSs: 2053 // no need to disconnect any DS from the current RS. 2054 evals.setBestRS(currentRsServerId, 2055 NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId, 2056 currentRsServerId)); 2057 } 2058 } else { 2059 // The RS load goal is reached or there are not enough DSs connected to 2060 // it to reach it: do not disconnect from this RS and return rsInfo for 2061 // this RS 2062 evals.setBestRS(currentRsServerId, 2063 NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId, 2064 currentRsServerId)); 2065 } 2066 } 2067 2068 private static boolean mustAvoidYoyoEffect(BigDecimal rsNewLoadDistance, 2069 BigDecimal otherRSsNewSumOfLoadDistances) 2070 { 2071 final MathContext roundCtx = new MathContext(6, RoundingMode.DOWN); 2072 final BigDecimal rsLoadDistance = rsNewLoadDistance.round(roundCtx); 2073 final BigDecimal otherRSsSumOfLoadDistances = 2074 otherRSsNewSumOfLoadDistances.round(roundCtx); 2075 2076 return rsLoadDistance.compareTo(BigDecimal.ZERO) != 0 2077 && rsLoadDistance.compareTo(otherRSsSumOfLoadDistances.negate()) == 0; 2078 } 2079 2080 /** 2081 * Returns whether the local DS is overloading the RS. 2082 * <p> 2083 * There are an "overloadingDSsNumber" of DS overloading the RS. The list of 2084 * DSs connected to this RS is ordered by serverId to use a consistent 2085 * ordering across all nodes in the topology. The serverIds which index in the 2086 * List are lower than "overloadingDSsNumber" will be evicted first. 2087 * <p> 2088 * This ordering is unfair since nodes with the lower serverIds will be 2089 * evicted more often than nodes with higher serverIds. However, it is a 2090 * consistent and reliable ordering applicable anywhere in the topology. 2091 */ 2092 private static boolean isServerOverloadingRS(int localServerId, 2093 ReplicationServerInfo currentRsInfo, int overloadingDSsNumber) 2094 { 2095 List<Integer> serversConnectedToCurrentRS = new ArrayList<>(currentRsInfo.getConnectedDSs()); 2096 Collections.sort(serversConnectedToCurrentRS); 2097 2098 final int idx = serversConnectedToCurrentRS.indexOf(localServerId); 2099 return idx != -1 && idx < overloadingDSsNumber; 2100 } 2101 2102 /** 2103 * Start the heartbeat monitor thread. 2104 */ 2105 private void startRSHeartBeatMonitoring(ConnectedRS rs) 2106 { 2107 final long heartbeatInterval = config.getHeartbeatInterval(); 2108 if (heartbeatInterval > 0) 2109 { 2110 heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(), 2111 getBaseDN().toString(), rs.session, heartbeatInterval); 2112 heartbeatMonitor.start(); 2113 } 2114 } 2115 2116 /** 2117 * Stop the heartbeat monitor thread. 2118 */ 2119 private synchronized void stopRSHeartBeatMonitoring() 2120 { 2121 if (heartbeatMonitor != null) 2122 { 2123 heartbeatMonitor.shutdown(); 2124 heartbeatMonitor = null; 2125 } 2126 } 2127 2128 /** 2129 * Restart the ReplicationBroker. 2130 * @param infiniteTry the socket which failed 2131 */ 2132 public void reStart(boolean infiniteTry) 2133 { 2134 reStart(connectedRS.get().session, infiniteTry); 2135 } 2136 2137 /** 2138 * Restart the ReplicationServer broker after a failure. 2139 * 2140 * @param failingSession the socket which failed 2141 * @param infiniteTry the socket which failed 2142 */ 2143 private void reStart(Session failingSession, boolean infiniteTry) 2144 { 2145 if (failingSession != null) 2146 { 2147 failingSession.close(); 2148 numLostConnections++; 2149 } 2150 2151 ConnectedRS rs = connectedRS.get(); 2152 if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS())) 2153 { 2154 rs = setConnectedRS(ConnectedRS.noConnectedRS()); 2155 } 2156 2157 while (true) 2158 { 2159 // Synchronize inside the loop in order to allow shutdown. 2160 synchronized (startStopLock) 2161 { 2162 if (rs.isConnected() || shutdown) 2163 { 2164 break; 2165 } 2166 2167 try 2168 { 2169 connectAsDataServer(); 2170 rs = connectedRS.get(); 2171 } 2172 catch (Exception e) 2173 { 2174 logger.error(NOTE_EXCEPTION_RESTARTING_SESSION, 2175 getBaseDN(), e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 2176 } 2177 2178 if (rs.isConnected() || !infiniteTry) 2179 { 2180 break; 2181 } 2182 } 2183 try 2184 { 2185 Thread.sleep(500); 2186 } 2187 catch (InterruptedException ignored) 2188 { 2189 // ignore 2190 } 2191 } 2192 2193 if (logger.isTraceEnabled()) 2194 { 2195 debugInfo("end restart : connected=" + rs.isConnected() + " with RS(" 2196 + rs.getServerId() + ") genId=" + getGenerationID()); 2197 } 2198 } 2199 2200 /** 2201 * Publish a message to the other servers. 2202 * @param msg the message to publish 2203 */ 2204 public void publish(ReplicationMsg msg) 2205 { 2206 publish(msg, false, true); 2207 } 2208 2209 /** 2210 * Publish a message to the other servers. 2211 * @param msg The message to publish. 2212 * @param retryOnFailure Whether reconnect should automatically be done. 2213 * @return Whether publish succeeded. 2214 */ 2215 boolean publish(ReplicationMsg msg, boolean retryOnFailure) 2216 { 2217 return publish(msg, false, retryOnFailure); 2218 } 2219 2220 /** 2221 * Publish a recovery message to the other servers. 2222 * @param msg the message to publish 2223 */ 2224 public void publishRecovery(ReplicationMsg msg) 2225 { 2226 publish(msg, true, true); 2227 } 2228 2229 /** 2230 * Publish a message to the other servers. 2231 * @param msg the message to publish 2232 * @param recoveryMsg the message is a recovery LocalizableMessage 2233 * @param retryOnFailure whether retry should be done on failure 2234 * @return whether the message was successfully sent. 2235 */ 2236 private boolean publish(ReplicationMsg msg, boolean recoveryMsg, 2237 boolean retryOnFailure) 2238 { 2239 boolean done = false; 2240 2241 while (!done && !shutdown) 2242 { 2243 if (connectionError) 2244 { 2245 /* 2246 It was not possible to connect to any replication server. 2247 Since the operation was already processed, we have no other 2248 choice than to return without sending the ReplicationMsg 2249 and relying on the resend procedure of the connect phase to 2250 fix the problem when we finally connect. 2251 */ 2252 2253 if (logger.isTraceEnabled()) 2254 { 2255 debugInfo("publish(): Publishing a message is not possible due to" 2256 + " existing connection error."); 2257 } 2258 2259 return false; 2260 } 2261 2262 try 2263 { 2264 /* 2265 save the session at the time when we acquire the 2266 sendwindow credit so that we can make sure later 2267 that the session did not change in between. 2268 This is necessary to make sure that we don't publish a message 2269 on a session with a credit that was acquired from a previous 2270 session. 2271 */ 2272 Session currentSession; 2273 Semaphore currentWindowSemaphore; 2274 synchronized (connectPhaseLock) 2275 { 2276 currentSession = connectedRS.get().session; 2277 currentWindowSemaphore = sendWindow; 2278 } 2279 2280 /* 2281 If the Replication domain has decided that there is a need to 2282 recover some changes then it is not allowed to send this 2283 change but it will be the responsibility of the recovery thread to 2284 do it. 2285 */ 2286 if (!recoveryMsg & connectRequiresRecovery) 2287 { 2288 return false; 2289 } 2290 2291 boolean credit; 2292 if (msg instanceof UpdateMsg) 2293 { 2294 /* 2295 Acquiring the window credit must be done outside of the 2296 connectPhaseLock because it can be blocking and we don't 2297 want to hold off reconnection in case the connection dropped. 2298 */ 2299 credit = 2300 currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS); 2301 } 2302 else 2303 { 2304 credit = true; 2305 } 2306 2307 if (credit) 2308 { 2309 synchronized (connectPhaseLock) 2310 { 2311 /* 2312 session may have been set to null in the connection phase 2313 when restarting the broker for example. 2314 Check the session. If it has changed, some disconnection or 2315 reconnection happened and we need to restart from scratch. 2316 */ 2317 final Session session = connectedRS.get().session; 2318 if (session != null && session == currentSession) 2319 { 2320 session.publish(msg); 2321 done = true; 2322 } 2323 } 2324 } 2325 if (!credit && currentWindowSemaphore.availablePermits() == 0) 2326 { 2327 synchronized (connectPhaseLock) 2328 { 2329 /* 2330 the window is still closed. 2331 Send a WindowProbeMsg message to wake up the receiver in case the 2332 window update message was lost somehow... 2333 then loop to check again if connection was closed. 2334 */ 2335 Session session = connectedRS.get().session; 2336 if (session != null) 2337 { 2338 session.publish(new WindowProbeMsg()); 2339 } 2340 } 2341 } 2342 } 2343 catch (IOException e) 2344 { 2345 if (logger.isTraceEnabled()) 2346 { 2347 debugInfo("publish(): IOException caught: " 2348 + stackTraceToSingleLineString(e)); 2349 } 2350 if (!retryOnFailure) 2351 { 2352 return false; 2353 } 2354 2355 // The receive threads should handle reconnection or 2356 // mark this broker in error. Just retry. 2357 synchronized (connectPhaseLock) 2358 { 2359 try 2360 { 2361 connectPhaseLock.wait(100); 2362 } 2363 catch (InterruptedException ignored) 2364 { 2365 if (logger.isTraceEnabled()) 2366 { 2367 debugInfo("publish(): InterruptedException caught 1: " 2368 + stackTraceToSingleLineString(ignored)); 2369 } 2370 } 2371 } 2372 } 2373 catch (InterruptedException ignored) 2374 { 2375 // just loop. 2376 if (logger.isTraceEnabled()) 2377 { 2378 debugInfo("publish(): InterruptedException caught 2: " 2379 + stackTraceToSingleLineString(ignored)); 2380 } 2381 } 2382 } 2383 return true; 2384 } 2385 2386 /** 2387 * Receive a message. 2388 * This method is not thread-safe and should either always be 2389 * called in a single thread or protected by a locking mechanism 2390 * before being called. This is a wrapper to the method with a boolean version 2391 * so that we do not have to modify existing tests. 2392 * 2393 * @return the received message 2394 * @throws SocketTimeoutException if the timeout set by setSoTimeout 2395 * has expired 2396 */ 2397 public ReplicationMsg receive() throws SocketTimeoutException 2398 { 2399 return receive(false, true, false); 2400 } 2401 2402 /** 2403 * Receive a message. 2404 * This method is not thread-safe and should either always be 2405 * called in a single thread or protected by a locking mechanism 2406 * before being called. 2407 * 2408 * @param reconnectToTheBestRS Whether broker will automatically switch 2409 * to the best suitable RS. 2410 * @param reconnectOnFailure Whether broker will automatically reconnect 2411 * on failure. 2412 * @param returnOnTopoChange Whether broker should return TopologyMsg 2413 * received. 2414 * @return the received message 2415 * 2416 * @throws SocketTimeoutException if the timeout set by setSoTimeout 2417 * has expired 2418 */ 2419 ReplicationMsg receive(boolean reconnectToTheBestRS, 2420 boolean reconnectOnFailure, boolean returnOnTopoChange) 2421 throws SocketTimeoutException 2422 { 2423 while (!shutdown) 2424 { 2425 ConnectedRS rs = connectedRS.get(); 2426 if (reconnectOnFailure && !rs.isConnected()) 2427 { 2428 // infinite try to reconnect 2429 reStart(null, true); 2430 continue; 2431 } 2432 2433 // Save session information for later in case we need it for log messages 2434 // after the session has been closed and/or failed. 2435 if (rs.session == null) 2436 { 2437 // Must be shutting down. 2438 break; 2439 } 2440 2441 final int serverId = getServerId(); 2442 final DN baseDN = getBaseDN(); 2443 final int previousRsServerID = rs.getServerId(); 2444 try 2445 { 2446 ReplicationMsg msg = rs.session.receive(); 2447 if (msg instanceof UpdateMsg) 2448 { 2449 synchronized (this) 2450 { 2451 rcvWindow--; 2452 } 2453 } 2454 if (msg instanceof WindowMsg) 2455 { 2456 final WindowMsg windowMsg = (WindowMsg) msg; 2457 sendWindow.release(windowMsg.getNumAck()); 2458 } 2459 else if (msg instanceof TopologyMsg) 2460 { 2461 final TopologyMsg topoMsg = (TopologyMsg) msg; 2462 receiveTopo(topoMsg, getRsServerId()); 2463 if (reconnectToTheBestRS) 2464 { 2465 // Reset wait time before next computation of best server 2466 mustRunBestServerCheckingAlgorithm = 0; 2467 } 2468 2469 // Caller wants to check what's changed 2470 if (returnOnTopoChange) 2471 { 2472 return msg; 2473 } 2474 } 2475 else if (msg instanceof StopMsg) 2476 { 2477 // RS performs a proper disconnection 2478 logger.warn(WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED, previousRsServerID, rs.replicationServer, 2479 serverId, baseDN); 2480 2481 // Try to find a suitable RS 2482 reStart(rs.session, true); 2483 } 2484 else if (msg instanceof MonitorMsg) 2485 { 2486 // This is the response to a MonitorRequest that was sent earlier or 2487 // the regular message of the monitoring publisher of the RS. 2488 MonitorMsg monitorMsg = (MonitorMsg) msg; 2489 2490 // Extract and store replicas ServerStates 2491 final Map<Integer, ServerState> newReplicaStates = new HashMap<>(); 2492 for (int srvId : toIterable(monitorMsg.ldapIterator())) 2493 { 2494 newReplicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId)); 2495 } 2496 replicaStates = newReplicaStates; 2497 2498 // Notify the sender that the response was received. 2499 synchronized (monitorResponse) 2500 { 2501 monitorResponse.set(true); 2502 monitorResponse.notify(); 2503 } 2504 2505 // Update the replication servers ServerStates with new received info 2506 Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos; 2507 for (int srvId : toIterable(monitorMsg.rsIterator())) 2508 { 2509 final ReplicationServerInfo rsInfo = rsInfos.get(srvId); 2510 if (rsInfo != null) 2511 { 2512 rsInfo.update(monitorMsg.getRSServerState(srvId)); 2513 } 2514 } 2515 2516 /* 2517 Now if it is allowed, compute the best replication server to see if 2518 it is still the one we are currently connected to. If not, 2519 disconnect properly and let the connection algorithm re-connect to 2520 best replication server 2521 */ 2522 if (reconnectToTheBestRS) 2523 { 2524 mustRunBestServerCheckingAlgorithm++; 2525 if (mustRunBestServerCheckingAlgorithm == 2) 2526 { 2527 // Stable topology (no topo msg since few seconds): proceed with 2528 // best server checking. 2529 final RSEvaluations evals = computeBestReplicationServer( 2530 false, previousRsServerID, state, 2531 rsInfos, serverId, getGroupId(), getGenerationID()); 2532 final ReplicationServerInfo bestServerInfo = evals.getBestRS(); 2533 if (previousRsServerID != -1 2534 && (bestServerInfo == null 2535 || bestServerInfo.getServerId() != previousRsServerID)) 2536 { 2537 // The best replication server is no more the one we are 2538 // currently using. Disconnect properly then reconnect. 2539 LocalizableMessage message; 2540 if (bestServerInfo == null) 2541 { 2542 message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( 2543 serverId, previousRsServerID, rs.replicationServer, baseDN); 2544 } 2545 else 2546 { 2547 final int bestRsServerId = bestServerInfo.getServerId(); 2548 message = NOTE_NEW_BEST_REPLICATION_SERVER.get( 2549 serverId, previousRsServerID, rs.replicationServer, bestRsServerId, baseDN, 2550 evals.getEvaluation(previousRsServerID), 2551 evals.getEvaluation(bestRsServerId)); 2552 } 2553 logger.info(message); 2554 if (logger.isTraceEnabled()) 2555 { 2556 debugInfo("best replication servers evaluation results: " + evals); 2557 } 2558 reStart(true); 2559 } 2560 2561 // Reset wait time before next computation of best server 2562 mustRunBestServerCheckingAlgorithm = 0; 2563 } 2564 } 2565 } 2566 else 2567 { 2568 return msg; 2569 } 2570 } 2571 catch (SocketTimeoutException e) 2572 { 2573 throw e; 2574 } 2575 catch (Exception e) 2576 { 2577 logger.traceException(e); 2578 2579 if (!shutdown) 2580 { 2581 if (rs.session == null || !rs.session.closeInitiated()) 2582 { 2583 // We did not initiate the close on our side, log an error message. 2584 logger.error(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED, 2585 serverId, baseDN, previousRsServerID, rs.replicationServer); 2586 } 2587 2588 if (!reconnectOnFailure) 2589 { 2590 break; // does not seem necessary to explicitly disconnect .. 2591 } 2592 2593 reStart(rs.session, true); 2594 } 2595 } 2596 } // while !shutdown 2597 return null; 2598 } 2599 2600 /** 2601 * Gets the States of all the Replicas currently in the Topology. When this 2602 * method is called, a Monitoring message will be sent to the Replication 2603 * Server to which this domain is currently connected so that it computes a 2604 * table containing information about all Directory Servers in the topology. 2605 * This Computation involves communications will all the servers currently 2606 * connected and 2607 * 2608 * @return The States of all Replicas in the topology (except us) 2609 */ 2610 public Map<Integer, ServerState> getReplicaStates() 2611 { 2612 monitorResponse.set(false); 2613 2614 // publish Monitor Request LocalizableMessage to the Replication Server 2615 publish(new MonitorRequestMsg(getServerId(), getRsServerId())); 2616 2617 // wait for Response up to 10 seconds. 2618 try 2619 { 2620 synchronized (monitorResponse) 2621 { 2622 if (!monitorResponse.get()) 2623 { 2624 monitorResponse.wait(10000); 2625 } 2626 } 2627 } catch (InterruptedException e) 2628 { 2629 Thread.currentThread().interrupt(); 2630 } 2631 return replicaStates; 2632 } 2633 2634 /** 2635 * This method allows to do the necessary computing for the window 2636 * management after treatment by the worker threads. 2637 * 2638 * This should be called once the replay thread have done their job 2639 * and the window can be open again. 2640 */ 2641 public synchronized void updateWindowAfterReplay() 2642 { 2643 try 2644 { 2645 updateDoneCount++; 2646 final Session session = connectedRS.get().session; 2647 if (updateDoneCount >= halfRcvWindow && session != null) 2648 { 2649 session.publish(new WindowMsg(updateDoneCount)); 2650 rcvWindow += updateDoneCount; 2651 updateDoneCount = 0; 2652 } 2653 } catch (IOException e) 2654 { 2655 // Any error on the socket will be handled by the thread calling receive() 2656 // just ignore. 2657 } 2658 } 2659 2660 /** Stop the server. */ 2661 public void stop() 2662 { 2663 if (logger.isTraceEnabled()) 2664 { 2665 debugInfo("is stopping and will close the connection to RS(" + getRsServerId() + ")"); 2666 } 2667 2668 synchronized (startStopLock) 2669 { 2670 domain.publishReplicaOfflineMsg(); 2671 shutdown = true; 2672 setConnectedRS(ConnectedRS.stopped()); 2673 stopRSHeartBeatMonitoring(); 2674 stopChangeTimeHeartBeatPublishing(); 2675 deregisterReplicationMonitor(); 2676 } 2677 } 2678 2679 /** 2680 * Set a timeout value. 2681 * With this option set to a non-zero value, calls to the receive() method 2682 * block for only this amount of time after which a 2683 * java.net.SocketTimeoutException is raised. 2684 * The Broker is valid and usable even after such an Exception is raised. 2685 * 2686 * @param timeout the specified timeout, in milliseconds. 2687 * @throws SocketException if there is an error in the underlying protocol, 2688 * such as a TCP error. 2689 */ 2690 public void setSoTimeout(int timeout) throws SocketException 2691 { 2692 this.timeout = timeout; 2693 final Session session = connectedRS.get().session; 2694 if (session != null) 2695 { 2696 session.setSoTimeout(timeout); 2697 } 2698 } 2699 2700 /** 2701 * Get the name of the replicationServer to which this broker is currently 2702 * connected. 2703 * 2704 * @return the name of the replicationServer to which this domain 2705 * is currently connected. 2706 */ 2707 public String getReplicationServer() 2708 { 2709 return connectedRS.get().replicationServer; 2710 } 2711 2712 /** 2713 * Get the maximum receive window size. 2714 * 2715 * @return The maximum receive window size. 2716 */ 2717 public int getMaxRcvWindow() 2718 { 2719 return config.getWindowSize(); 2720 } 2721 2722 /** 2723 * Get the current receive window size. 2724 * 2725 * @return The current receive window size. 2726 */ 2727 public int getCurrentRcvWindow() 2728 { 2729 return rcvWindow; 2730 } 2731 2732 /** 2733 * Get the maximum send window size. 2734 * 2735 * @return The maximum send window size. 2736 */ 2737 public int getMaxSendWindow() 2738 { 2739 return maxSendWindow; 2740 } 2741 2742 /** 2743 * Get the current send window size. 2744 * 2745 * @return The current send window size. 2746 */ 2747 public int getCurrentSendWindow() 2748 { 2749 if (isConnected()) 2750 { 2751 return sendWindow.availablePermits(); 2752 } 2753 return 0; 2754 } 2755 2756 /** 2757 * Get the number of times the connection was lost. 2758 * @return The number of times the connection was lost. 2759 */ 2760 public int getNumLostConnections() 2761 { 2762 return numLostConnections; 2763 } 2764 2765 /** 2766 * Change some configuration parameters. 2767 * 2768 * @param newConfig The new config to use. 2769 * @return A boolean indicating if the changes 2770 * requires to restart the service. 2771 */ 2772 boolean changeConfig(ReplicationDomainCfg newConfig) 2773 { 2774 // These parameters needs to be renegotiated with the ReplicationServer 2775 // so if they have changed, that requires restarting the session with 2776 // the ReplicationServer. 2777 // A new session is necessary only when information regarding 2778 // the connection is modified 2779 boolean needToRestartSession = 2780 !newConfig.getReplicationServer().equals(config.getReplicationServer()) 2781 || newConfig.getWindowSize() != config.getWindowSize() 2782 || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval() 2783 || newConfig.getGroupId() != config.getGroupId(); 2784 2785 this.config = newConfig; 2786 this.rcvWindow = newConfig.getWindowSize(); 2787 this.halfRcvWindow = this.rcvWindow / 2; 2788 2789 return needToRestartSession; 2790 } 2791 2792 /** 2793 * Get the version of the replication protocol. 2794 * @return The version of the replication protocol. 2795 */ 2796 public short getProtocolVersion() 2797 { 2798 final Session session = connectedRS.get().session; 2799 if (session != null) 2800 { 2801 return session.getProtocolVersion(); 2802 } 2803 return ProtocolVersion.getCurrentVersion(); 2804 } 2805 2806 /** 2807 * Check if the broker is connected to a ReplicationServer and therefore 2808 * ready to received and send Replication Messages. 2809 * 2810 * @return true if the server is connected, false if not. 2811 */ 2812 public boolean isConnected() 2813 { 2814 return connectedRS.get().isConnected(); 2815 } 2816 2817 /** 2818 * Determine whether the connection to the replication server is encrypted. 2819 * @return true if the connection is encrypted, false otherwise. 2820 */ 2821 public boolean isSessionEncrypted() 2822 { 2823 final Session session = connectedRS.get().session; 2824 return session != null ? session.isEncrypted() : false; 2825 } 2826 2827 /** 2828 * Signals the RS we just entered a new status. 2829 * @param newStatus The status the local DS just entered 2830 */ 2831 public void signalStatusChange(ServerStatus newStatus) 2832 { 2833 try 2834 { 2835 connectedRS.get().session.publish( 2836 new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus)); 2837 } catch (IOException ex) 2838 { 2839 logger.error(ERR_EXCEPTION_SENDING_CS, getBaseDN(), getServerId(), 2840 ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex)); 2841 } 2842 } 2843 2844 /** 2845 * Gets the info for DSs in the topology (except us). 2846 * @return The info for DSs in the topology (except us) 2847 */ 2848 public Map<Integer, DSInfo> getReplicaInfos() 2849 { 2850 return topology.get().replicaInfos; 2851 } 2852 2853 /** 2854 * Gets the info for RSs in the topology (except the one we are connected 2855 * to). 2856 * @return The info for RSs in the topology (except the one we are connected 2857 * to) 2858 */ 2859 public List<RSInfo> getRsInfos() 2860 { 2861 return toRSInfos(topology.get().rsInfos); 2862 } 2863 2864 private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos) 2865 { 2866 final List<RSInfo> result = new ArrayList<>(); 2867 for (ReplicationServerInfo rsInfo : rsInfos.values()) 2868 { 2869 result.add(rsInfo.toRSInfo()); 2870 } 2871 return result; 2872 } 2873 2874 /** 2875 * Processes an incoming TopologyMsg. 2876 * Updates the structures for the local view of the topology. 2877 * 2878 * @param topoMsg 2879 * The topology information received from RS. 2880 * @param rsServerId 2881 * the serverId to use for the connectedDS 2882 */ 2883 private void receiveTopo(TopologyMsg topoMsg, int rsServerId) 2884 { 2885 final Topology newTopo = computeNewTopology(topoMsg, rsServerId); 2886 for (DSInfo dsInfo : newTopo.replicaInfos.values()) 2887 { 2888 domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo 2889 .getEclIncludesForDeletes()); 2890 } 2891 } 2892 2893 private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId) 2894 { 2895 Topology oldTopo; 2896 Topology newTopo; 2897 do 2898 { 2899 oldTopo = topology.get(); 2900 newTopo = new Topology(topoMsg, getServerId(), rsServerId, 2901 getReplicationServerUrls(), oldTopo.rsInfos); 2902 } 2903 while (!topology.compareAndSet(oldTopo, newTopo)); 2904 2905 if (logger.isTraceEnabled()) 2906 { 2907 final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo); 2908 sb.append(" received TopologyMsg=").append(topoMsg); 2909 debugInfo(sb); 2910 } 2911 return newTopo; 2912 } 2913 2914 /** 2915 * Contains the last known state of the replication topology. 2916 */ 2917 static final class Topology 2918 { 2919 2920 /** 2921 * The RS's serverId that this DS was connected to when this topology state 2922 * was computed. 2923 */ 2924 private final int rsServerId; 2925 /** 2926 * Info for other DSs. 2927 * <p> 2928 * Warning: does not contain info for us (for our server id) 2929 */ 2930 final Map<Integer, DSInfo> replicaInfos; 2931 /** 2932 * The map of replication server info initialized at connection time and 2933 * regularly updated. This is used to decide to which best suitable 2934 * replication server one wants to connect. Key: replication server id 2935 * Value: replication server info for the matching replication server id 2936 */ 2937 final Map<Integer, ReplicationServerInfo> rsInfos; 2938 2939 private Topology() 2940 { 2941 this.rsServerId = -1; 2942 this.replicaInfos = Collections.emptyMap(); 2943 this.rsInfos = Collections.emptyMap(); 2944 } 2945 2946 /** 2947 * Constructor to use when only the RSInfos need to be recomputed. 2948 * 2949 * @param dsInfosToKeep 2950 * the DSInfos that will be stored as is 2951 * @param newRSInfos 2952 * the new RSInfos from which to compute the new topology 2953 * @param dsServerId 2954 * the DS serverId 2955 * @param rsServerId 2956 * the current connected RS serverId 2957 * @param configuredReplicationServerUrls 2958 * the configured replication server URLs 2959 * @param previousRsInfos 2960 * the RSInfos computed in the previous Topology object 2961 */ 2962 Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos, 2963 int dsServerId, int rsServerId, 2964 Set<String> configuredReplicationServerUrls, 2965 Map<Integer, ReplicationServerInfo> previousRsInfos) 2966 { 2967 this.rsServerId = rsServerId; 2968 this.replicaInfos = dsInfosToKeep == null 2969 ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep; 2970 this.rsInfos = computeRSInfos(dsServerId, newRSInfos, 2971 previousRsInfos, configuredReplicationServerUrls); 2972 } 2973 2974 /** 2975 * Constructor to use when a new TopologyMsg has been received. 2976 * 2977 * @param topoMsg 2978 * the topology message containing the new DSInfos and RSInfos from 2979 * which to compute the new topology 2980 * @param dsServerId 2981 * the DS serverId 2982 * @param rsServerId 2983 * the current connected RS serverId 2984 * @param configuredReplicationServerUrls 2985 * the configured replication server URLs 2986 * @param previousRsInfos 2987 * the RSInfos computed in the previous Topology object 2988 */ 2989 Topology(TopologyMsg topoMsg, int dsServerId, 2990 int rsServerId, Set<String> configuredReplicationServerUrls, 2991 Map<Integer, ReplicationServerInfo> previousRsInfos) 2992 { 2993 this.rsServerId = rsServerId; 2994 this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId); 2995 this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(), 2996 previousRsInfos, configuredReplicationServerUrls); 2997 } 2998 2999 private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos, 3000 int dsServerId) 3001 { 3002 final Map<Integer, DSInfo> copy = new HashMap<>(dsInfos); 3003 copy.remove(dsServerId); 3004 return Collections.unmodifiableMap(copy); 3005 } 3006 3007 private Map<Integer, ReplicationServerInfo> computeRSInfos( 3008 int dsServerId, List<RSInfo> newRsInfos, 3009 Map<Integer, ReplicationServerInfo> previousRsInfos, 3010 Set<String> configuredReplicationServerUrls) 3011 { 3012 final Map<Integer, ReplicationServerInfo> results = new HashMap<>(previousRsInfos); 3013 3014 // Update replication server info list with the received topology info 3015 final Set<Integer> rssToKeep = new HashSet<>(); 3016 for (RSInfo newRSInfo : newRsInfos) 3017 { 3018 final int rsId = newRSInfo.getId(); 3019 rssToKeep.add(rsId); // Mark this server as still existing 3020 Set<Integer> connectedDSs = 3021 computeDSsConnectedTo(rsId, dsServerId); 3022 ReplicationServerInfo rsInfo = results.get(rsId); 3023 if (rsInfo == null) 3024 { 3025 // New replication server, create info for it add it to the list 3026 rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs); 3027 setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls); 3028 results.put(rsId, rsInfo); 3029 } 3030 else 3031 { 3032 // Update the existing info for the replication server 3033 rsInfo.update(newRSInfo, connectedDSs); 3034 } 3035 } 3036 3037 // Remove any replication server that may have disappeared from the 3038 // topology 3039 results.keySet().retainAll(rssToKeep); 3040 3041 return Collections.unmodifiableMap(results); 3042 } 3043 3044 /** Computes the list of DSs connected to a particular RS. */ 3045 private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId) 3046 { 3047 final Set<Integer> connectedDSs = new HashSet<>(); 3048 if (rsServerId == rsId) 3049 { 3050 /* 3051 * If we are computing connected DSs for the RS we are connected to, we 3052 * should count the local DS as the DSInfo of the local DS is not sent 3053 * by the replication server in the topology message. We must count 3054 * ourselves as a connected server. 3055 */ 3056 connectedDSs.add(dsServerId); 3057 } 3058 3059 for (DSInfo dsInfo : replicaInfos.values()) 3060 { 3061 if (dsInfo.getRsId() == rsId) 3062 { 3063 connectedDSs.add(dsInfo.getDsId()); 3064 } 3065 } 3066 3067 return connectedDSs; 3068 } 3069 3070 /** 3071 * Sets the locally configured flag for the passed ReplicationServerInfo 3072 * object, analyzing the local configuration. 3073 * 3074 * @param rsInfo 3075 * the Replication server to check and update 3076 * @param configuredReplicationServerUrls 3077 */ 3078 private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo, 3079 Set<String> configuredReplicationServerUrls) 3080 { 3081 // Determine if the passed ReplicationServerInfo has a URL that is present 3082 // in the locally configured replication servers 3083 String rsUrl = rsInfo.getServerURL(); 3084 if (rsUrl == null) 3085 { 3086 // The ReplicationServerInfo has been generated from a server with 3087 // no URL in TopologyMsg (i.e: with replication protocol version < 4): 3088 // ignore this server as we do not know how to connect to it 3089 rsInfo.setLocallyConfigured(false); 3090 return; 3091 } 3092 for (String serverUrl : configuredReplicationServerUrls) 3093 { 3094 if (isSameReplicationServerUrl(serverUrl, rsUrl)) 3095 { 3096 // This RS is locally configured, mark this 3097 rsInfo.setLocallyConfigured(true); 3098 rsInfo.setServerURL(serverUrl); 3099 return; 3100 } 3101 } 3102 rsInfo.setLocallyConfigured(false); 3103 } 3104 3105 /** {@inheritDoc} */ 3106 @Override 3107 public boolean equals(Object obj) 3108 { 3109 if (this == obj) 3110 { 3111 return true; 3112 } 3113 if (obj == null || getClass() != obj.getClass()) 3114 { 3115 return false; 3116 } 3117 final Topology other = (Topology) obj; 3118 return rsServerId == other.rsServerId 3119 && Objects.equals(replicaInfos, other.replicaInfos) 3120 && Objects.equals(rsInfos, other.rsInfos) 3121 && urlsEqual1(replicaInfos, other.replicaInfos) 3122 && urlsEqual2(rsInfos, other.rsInfos); 3123 } 3124 3125 private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1, 3126 Map<Integer, DSInfo> replicaInfos2) 3127 { 3128 for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet()) 3129 { 3130 DSInfo dsInfo = replicaInfos2.get(entry.getKey()); 3131 if (!Objects.equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl())) 3132 { 3133 return false; 3134 } 3135 } 3136 return true; 3137 } 3138 3139 private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1, 3140 Map<Integer, ReplicationServerInfo> rsInfos2) 3141 { 3142 for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet()) 3143 { 3144 ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey()); 3145 if (!Objects.equals(entry.getValue().getServerURL(), rsInfo.getServerURL())) 3146 { 3147 return false; 3148 } 3149 } 3150 return true; 3151 } 3152 3153 /** {@inheritDoc} */ 3154 @Override 3155 public int hashCode() 3156 { 3157 final int prime = 31; 3158 int result = 1; 3159 result = prime * result + rsServerId; 3160 result = prime * result 3161 + (replicaInfos == null ? 0 : replicaInfos.hashCode()); 3162 result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode()); 3163 return result; 3164 } 3165 3166 /** {@inheritDoc} */ 3167 @Override 3168 public String toString() 3169 { 3170 return getClass().getSimpleName() 3171 + " rsServerId=" + rsServerId 3172 + ", replicaInfos=" + replicaInfos.values() 3173 + ", rsInfos=" + rsInfos.values(); 3174 } 3175 } 3176 3177 /** 3178 * Check if the broker could not find any Replication Server and therefore 3179 * connection attempt failed. 3180 * 3181 * @return true if the server could not connect to any Replication Server. 3182 */ 3183 boolean hasConnectionError() 3184 { 3185 return connectionError; 3186 } 3187 3188 /** 3189 * Starts publishing to the RS the current timestamp used in this server. 3190 */ 3191 private void startChangeTimeHeartBeatPublishing(ConnectedRS rs) 3192 { 3193 // Start a CSN heartbeat thread. 3194 long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval(); 3195 if (changeTimeHeartbeatInterval > 0) 3196 { 3197 final String threadName = "Replica DS(" + getServerId() 3198 + ") change time heartbeat publisher for domain \"" + getBaseDN() 3199 + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer; 3200 3201 ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( 3202 threadName, rs.session, changeTimeHeartbeatInterval, getServerId()); 3203 ctHeartbeatPublisherThread.start(); 3204 } 3205 else 3206 { 3207 if (logger.isTraceEnabled()) 3208 { 3209 debugInfo("is not configured to send CSN heartbeat interval"); 3210 } 3211 } 3212 } 3213 3214 /** 3215 * Stops publishing to the RS the current timestamp used in this server. 3216 */ 3217 private synchronized void stopChangeTimeHeartBeatPublishing() 3218 { 3219 if (ctHeartbeatPublisherThread != null) 3220 { 3221 ctHeartbeatPublisherThread.shutdown(); 3222 ctHeartbeatPublisherThread = null; 3223 } 3224 } 3225 3226 /** 3227 * Set the connectRequiresRecovery to the provided value. 3228 * This flag is used to indicate if a recovery of Update is necessary 3229 * after a reconnection to a RS. 3230 * It is the responsibility of the ReplicationDomain to set it during the 3231 * sessionInitiated phase. 3232 * 3233 * @param b the new value of the connectRequiresRecovery. 3234 */ 3235 public void setRecoveryRequired(boolean b) 3236 { 3237 connectRequiresRecovery = b; 3238 } 3239 3240 /** 3241 * Returns whether the broker is shutting down. 3242 * @return whether the broker is shutting down. 3243 */ 3244 boolean shuttingDown() 3245 { 3246 return shutdown; 3247 } 3248 3249 /** 3250 * Returns the local address of this replication domain, or the empty string 3251 * if it is not yet connected. 3252 * 3253 * @return The local address. 3254 */ 3255 String getLocalUrl() 3256 { 3257 final Session session = connectedRS.get().session; 3258 return session != null ? session.getLocalUrl() : ""; 3259 } 3260 3261 /** 3262 * Returns the replication monitor instance name associated with this broker. 3263 * 3264 * @return The replication monitor instance name. 3265 */ 3266 String getReplicationMonitorInstanceName() 3267 { 3268 // Only invoked by replication domain so always non-null. 3269 return monitor.getMonitorInstanceName(); 3270 } 3271 3272 private ConnectedRS setConnectedRS(final ConnectedRS newRS) 3273 { 3274 final ConnectedRS oldRS = connectedRS.getAndSet(newRS); 3275 if (!oldRS.equals(newRS) && oldRS.session != null) 3276 { 3277 // monitor name is changing, deregister before registering again 3278 deregisterReplicationMonitor(); 3279 oldRS.session.close(); 3280 registerReplicationMonitor(); 3281 } 3282 return newRS; 3283 } 3284 3285 /** 3286 * Must be invoked each time the session changes because, the monitor name is 3287 * dynamically created with the session name, while monitor registration is 3288 * static. 3289 * 3290 * @see #monitor 3291 */ 3292 private void registerReplicationMonitor() 3293 { 3294 // The monitor should not be registered if this is a unit test 3295 // because the replication domain is null. 3296 if (monitor != null) 3297 { 3298 DirectoryServer.registerMonitorProvider(monitor); 3299 } 3300 } 3301 3302 private void deregisterReplicationMonitor() 3303 { 3304 // The monitor should not be deregistered if this is a unit test 3305 // because the replication domain is null. 3306 if (monitor != null) 3307 { 3308 DirectoryServer.deregisterMonitorProvider(monitor); 3309 } 3310 } 3311 3312 /** {@inheritDoc} */ 3313 @Override 3314 public String toString() 3315 { 3316 final StringBuilder sb = new StringBuilder(); 3317 sb.append(getClass().getSimpleName()) 3318 .append(" \"").append(getBaseDN()).append(" ") 3319 .append(getServerId()).append("\",") 3320 .append(" groupId=").append(getGroupId()) 3321 .append(", genId=").append(getGenerationID()) 3322 .append(", "); 3323 connectedRS.get().toString(sb); 3324 return sb.toString(); 3325 } 3326 3327 private void debugInfo(CharSequence message) 3328 { 3329 logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN() 3330 + " and serverId=" + getServerId() + ": " + message); 3331 } 3332}