001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server; 028 029import static org.opends.messages.ReplicationMessages.*; 030 031import java.io.IOException; 032import java.util.List; 033import java.util.Random; 034import java.util.concurrent.Semaphore; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import org.forgerock.i18n.LocalizableMessage; 039import org.forgerock.i18n.slf4j.LocalizedLogger; 040import org.forgerock.opendj.config.server.ConfigException; 041import org.forgerock.opendj.ldap.ResultCode; 042import org.opends.server.admin.std.server.MonitorProviderCfg; 043import org.opends.server.core.DirectoryServer; 044import org.opends.server.replication.common.AssuredMode; 045import org.opends.server.replication.common.CSN; 046import org.opends.server.replication.common.RSInfo; 047import org.opends.server.replication.common.ServerStatus; 048import org.opends.server.replication.protocol.*; 049import org.opends.server.replication.server.changelog.api.ChangelogException; 050import org.opends.server.types.Attribute; 051import org.opends.server.types.Attributes; 052import org.opends.server.types.DirectoryException; 053import org.opends.server.types.InitializationException; 054 055/** 056 * This class defines a server handler : 057 * - that is a MessageHandler (see this class for more details) 058 * - that handles all interaction with a peer server (RS or DS). 059 */ 060public abstract class ServerHandler extends MessageHandler 061{ 062 063 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 064 065 /** 066 * Time during which the server will wait for existing thread to stop 067 * during the shutdownWriter. 068 */ 069 private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; 070 071 /** 072 * The serverId of the remote server. 073 */ 074 protected int serverId; 075 /** 076 * The session opened with the remote server. 077 */ 078 protected final Session session; 079 080 /** 081 * The serverURL of the remote server. 082 */ 083 protected String serverURL; 084 /** 085 * Number of updates received from the server in assured safe read mode. 086 */ 087 private int assuredSrReceivedUpdates; 088 /** 089 * Number of updates received from the server in assured safe read mode that 090 * timed out. 091 */ 092 private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger(); 093 /** 094 * Number of updates sent to the server in assured safe read mode. 095 */ 096 private int assuredSrSentUpdates; 097 /** 098 * Number of updates sent to the server in assured safe read mode that timed 099 * out. 100 */ 101 private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); 102 /** 103 * Number of updates received from the server in assured safe data mode. 104 */ 105 private int assuredSdReceivedUpdates; 106 /** 107 * Number of updates received from the server in assured safe data mode that 108 * timed out. 109 */ 110 private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger(); 111 /** 112 * Number of updates sent to the server in assured safe data mode. 113 */ 114 private int assuredSdSentUpdates; 115 116 /** 117 * Number of updates sent to the server in assured safe data mode that timed out. 118 */ 119 private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger(); 120 121 /** 122 * The associated ServerWriter that sends messages to the remote server. 123 */ 124 private ServerWriter writer; 125 126 /** 127 * The associated ServerReader that receives messages from the remote server. 128 */ 129 private ServerReader reader; 130 131 /** Window. */ 132 private int rcvWindow; 133 private final int rcvWindowSizeHalf; 134 135 /** The size of the receiving window. */ 136 protected final int maxRcvWindow; 137 /** Semaphore that the writer uses to control the flow to the remote server. */ 138 private Semaphore sendWindow; 139 /** The initial size of the sending window. */ 140 private int sendWindowSize; 141 /** Remote generation id. */ 142 protected long generationId = -1; 143 /** The generation id of the hosting RS. */ 144 protected long localGenerationId = -1; 145 /** The generation id before processing a new start handshake. */ 146 protected long oldGenerationId = -1; 147 /** Group id of this remote server. */ 148 protected byte groupId = -1; 149 /** The SSL encryption after the negotiation with the peer. */ 150 protected boolean sslEncryption; 151 /** 152 * The time in milliseconds between heartbeats from the replication 153 * server. Zero means heartbeats are off. 154 */ 155 protected long heartbeatInterval; 156 157 /** The thread that will send heartbeats. */ 158 private HeartbeatThread heartbeatThread; 159 160 /** Set when ServerWriter is stopping. */ 161 private volatile boolean shutdownWriter; 162 163 /** Weight of this remote server. */ 164 protected int weight = 1; 165 166 /** 167 * Creates a new server handler instance with the provided socket. 168 * 169 * @param session The Session used by the ServerHandler to 170 * communicate with the remote entity. 171 * @param queueSize The maximum number of update that will be kept 172 * in memory by this ServerHandler. 173 * @param replicationServer The hosting replication server. 174 * @param rcvWindowSize The window size to receive from the remote server. 175 */ 176 public ServerHandler( 177 Session session, 178 int queueSize, 179 ReplicationServer replicationServer, 180 int rcvWindowSize) 181 { 182 super(queueSize, replicationServer); 183 this.session = session; 184 this.rcvWindowSizeHalf = rcvWindowSize / 2; 185 this.maxRcvWindow = rcvWindowSize; 186 this.rcvWindow = rcvWindowSize; 187 } 188 189 /** 190 * Abort a start procedure currently establishing. 191 * @param reason The provided reason. 192 */ 193 protected void abortStart(LocalizableMessage reason) 194 { 195 // We did not recognize the message, close session as what can happen after 196 // is undetermined and we do not want the server to be disturbed 197 Session localSession = session; 198 if (localSession != null) 199 { 200 if (reason != null) 201 { 202 if (logger.isTraceEnabled()) 203 { 204 logger.trace("In " + this + " closing session with err=" + reason); 205 } 206 logger.error(reason); 207 } 208 209 // This method is only called when aborting a failing handshake and 210 // not StopMsg should be sent in such situation. StopMsg are only 211 // expected when full handshake has been performed, or at end of 212 // handshake phase 1, when DS was just gathering available RS info 213 localSession.close(); 214 } 215 216 releaseDomainLock(); 217 218 // If generation id of domain was changed, set it back to old value 219 // We may have changed it as it was -1 and we received a value >0 from peer 220 // server and the last topo message sent may have failed being sent: in that 221 // case retrieve old value of generation id for replication server domain 222 if (oldGenerationId != -100) 223 { 224 replicationServerDomain.changeGenerationId(oldGenerationId); 225 } 226 } 227 228 /** 229 * Releases the lock on the replication server domain if it was held. 230 */ 231 protected void releaseDomainLock() 232 { 233 if (replicationServerDomain.hasLock()) 234 { 235 replicationServerDomain.release(); 236 } 237 } 238 239 /** 240 * Check the protocol window and send WindowMsg if necessary. 241 * 242 * @throws IOException when the session becomes unavailable. 243 */ 244 public synchronized void checkWindow() throws IOException 245 { 246 if (rcvWindow < rcvWindowSizeHalf) 247 { 248 WindowMsg msg = new WindowMsg(rcvWindowSizeHalf); 249 session.publish(msg); 250 rcvWindow += rcvWindowSizeHalf; 251 } 252 } 253 254 /** 255 * Decrement the protocol window, then check if it is necessary 256 * to send a WindowMsg and send it. 257 * 258 * @throws IOException when the session becomes unavailable. 259 */ 260 private synchronized void decAndCheckWindow() throws IOException 261 { 262 rcvWindow--; 263 checkWindow(); 264 } 265 266 /** 267 * Finalize the initialization, create reader, writer, heartbeat system 268 * and monitoring system. 269 * @throws DirectoryException When an exception is raised. 270 */ 271 protected void finalizeStart() throws DirectoryException 272 { 273 // FIXME:ECL We should refactor so that a SH always have a session 274 if (session != null) 275 { 276 try 277 { 278 // Disable timeout for next communications 279 session.setSoTimeout(0); 280 } 281 catch(Exception e) 282 { /* do nothing */ 283 } 284 285 // sendWindow MUST be created before starting the writer 286 sendWindow = new Semaphore(sendWindowSize); 287 288 writer = new ServerWriter(session, this, replicationServerDomain, 289 replicationServer.getDSRSShutdownSync()); 290 reader = new ServerReader(session, this); 291 292 session.setName("Replication server RS(" + getReplicationServerId() 293 + ") session thread to " + this + " at " 294 + session.getReadableRemoteAddress()); 295 session.start(); 296 try 297 { 298 session.waitForStartup(); 299 } 300 catch (InterruptedException e) 301 { 302 final LocalizableMessage message = 303 ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName()); 304 throw new DirectoryException(ResultCode.OTHER, message, e); 305 } 306 reader.start(); 307 writer.start(); 308 309 // Create a thread to send heartbeat messages. 310 if (heartbeatInterval > 0) 311 { 312 String threadName = "Replication server RS(" + getReplicationServerId() 313 + ") heartbeat publisher to " + this + " at " 314 + session.getReadableRemoteAddress(); 315 heartbeatThread = new HeartbeatThread(threadName, session, 316 heartbeatInterval / 3); 317 heartbeatThread.start(); 318 } 319 } 320 321 DirectoryServer.deregisterMonitorProvider(this); 322 DirectoryServer.registerMonitorProvider(this); 323 } 324 325 /** 326 * Sends a message. 327 * 328 * @param msg 329 * The message to be sent. 330 * @throws IOException 331 * When it occurs while sending the message, 332 */ 333 public void send(ReplicationMsg msg) throws IOException 334 { 335 // avoid logging anything for unit tests that include a null domain. 336 if (logger.isTraceEnabled()) 337 { 338 logger.trace("In " 339 + replicationServerDomain.getLocalRSMonitorInstanceName() + " " 340 + this + " publishes message:\n" + msg); 341 } 342 session.publish(msg); 343 } 344 345 /** 346 * Get the age of the older change that has not yet been replicated 347 * to the server handled by this ServerHandler. 348 * @return The age if the older change has not yet been replicated 349 * to the server handled by this ServerHandler. 350 */ 351 public long getApproxFirstMissingDate() 352 { 353 // Get the older CSN received 354 CSN olderUpdateCSN = getOlderUpdateCSN(); 355 if (olderUpdateCSN != null) 356 { 357 // If not present in the local RS db, 358 // then approximate with the older update time 359 return olderUpdateCSN.getTime(); 360 } 361 return 0; 362 } 363 364 /** 365 * Get the number of updates received from the server in assured safe data 366 * mode. 367 * @return The number of updates received from the server in assured safe data 368 * mode 369 */ 370 public int getAssuredSdReceivedUpdates() 371 { 372 return assuredSdReceivedUpdates; 373 } 374 375 /** 376 * Get the number of updates received from the server in assured safe data 377 * mode that timed out. 378 * @return The number of updates received from the server in assured safe data 379 * mode that timed out. 380 */ 381 public AtomicInteger getAssuredSdReceivedUpdatesTimeout() 382 { 383 return assuredSdReceivedUpdatesTimeout; 384 } 385 386 /** 387 * Get the number of updates sent to the server in assured safe data mode. 388 * @return The number of updates sent to the server in assured safe data mode 389 */ 390 public int getAssuredSdSentUpdates() 391 { 392 return assuredSdSentUpdates; 393 } 394 395 /** 396 * Get the number of updates sent to the server in assured safe data mode that 397 * timed out. 398 * @return The number of updates sent to the server in assured safe data mode 399 * that timed out. 400 */ 401 public AtomicInteger getAssuredSdSentUpdatesTimeout() 402 { 403 return assuredSdSentUpdatesTimeout; 404 } 405 406 /** 407 * Get the number of updates received from the server in assured safe read 408 * mode. 409 * @return The number of updates received from the server in assured safe read 410 * mode 411 */ 412 public int getAssuredSrReceivedUpdates() 413 { 414 return assuredSrReceivedUpdates; 415 } 416 417 /** 418 * Get the number of updates received from the server in assured safe read 419 * mode that timed out. 420 * @return The number of updates received from the server in assured safe read 421 * mode that timed out. 422 */ 423 public AtomicInteger getAssuredSrReceivedUpdatesTimeout() 424 { 425 return assuredSrReceivedUpdatesTimeout; 426 } 427 428 /** 429 * Get the number of updates sent to the server in assured safe read mode. 430 * @return The number of updates sent to the server in assured safe read mode 431 */ 432 public int getAssuredSrSentUpdates() 433 { 434 return assuredSrSentUpdates; 435 } 436 437 /** 438 * Get the number of updates sent to the server in assured safe read mode that 439 * timed out. 440 * @return The number of updates sent to the server in assured safe read mode 441 * that timed out. 442 */ 443 public AtomicInteger getAssuredSrSentUpdatesTimeout() 444 { 445 return assuredSrSentUpdatesTimeout; 446 } 447 448 /** 449 * Returns the Replication Server Domain to which belongs this server handler. 450 * 451 * @return The replication server domain. 452 */ 453 public ReplicationServerDomain getDomain() 454 { 455 return replicationServerDomain; 456 } 457 458 /** 459 * Returns the value of generationId for that handler. 460 * @return The value of the generationId. 461 */ 462 public long getGenerationId() 463 { 464 return generationId; 465 } 466 467 /** 468 * Gets the group id of the server represented by this object. 469 * @return The group id of the server represented by this object. 470 */ 471 public byte getGroupId() 472 { 473 return groupId; 474 } 475 476 /** 477 * Get our heartbeat interval. 478 * @return Our heartbeat interval. 479 */ 480 public long getHeartbeatInterval() 481 { 482 return heartbeatInterval; 483 } 484 485 /** {@inheritDoc} */ 486 @Override 487 public List<Attribute> getMonitorData() 488 { 489 // Get the generic ones 490 List<Attribute> attributes = super.getMonitorData(); 491 492 attributes.add(Attributes.create("server-id", String.valueOf(serverId))); 493 attributes.add(Attributes.create("domain-name", String.valueOf(getBaseDN()))); 494 495 // Deprecated 496 attributes.add(Attributes.create("max-waiting-changes", String 497 .valueOf(maxQueueSize))); 498 attributes.add(Attributes.create("sent-updates", String 499 .valueOf(getOutCount()))); 500 attributes.add(Attributes.create("received-updates", String 501 .valueOf(getInCount()))); 502 503 // Assured counters 504 attributes.add(Attributes.create("assured-sr-received-updates", String 505 .valueOf(getAssuredSrReceivedUpdates()))); 506 attributes.add(Attributes.create("assured-sr-received-updates-timeout", 507 String .valueOf(getAssuredSrReceivedUpdatesTimeout()))); 508 attributes.add(Attributes.create("assured-sr-sent-updates", String 509 .valueOf(getAssuredSrSentUpdates()))); 510 attributes.add(Attributes.create("assured-sr-sent-updates-timeout", String 511 .valueOf(getAssuredSrSentUpdatesTimeout()))); 512 attributes.add(Attributes.create("assured-sd-received-updates", String 513 .valueOf(getAssuredSdReceivedUpdates()))); 514 if (!isDataServer()) 515 { 516 attributes.add(Attributes.create("assured-sd-sent-updates", 517 String.valueOf(getAssuredSdSentUpdates()))); 518 attributes.add(Attributes.create("assured-sd-sent-updates-timeout", 519 String.valueOf(getAssuredSdSentUpdatesTimeout()))); 520 } else 521 { 522 attributes.add(Attributes.create("assured-sd-received-updates-timeout", 523 String.valueOf(getAssuredSdReceivedUpdatesTimeout()))); 524 } 525 526 // Window stats 527 attributes.add(Attributes.create("max-send-window", String.valueOf(sendWindowSize))); 528 attributes.add(Attributes.create("current-send-window", String.valueOf(sendWindow.availablePermits()))); 529 attributes.add(Attributes.create("max-rcv-window", String.valueOf(maxRcvWindow))); 530 attributes.add(Attributes.create("current-rcv-window", String.valueOf(rcvWindow))); 531 532 // Encryption 533 attributes.add(Attributes.create("ssl-encryption", String.valueOf(session.isEncrypted()))); 534 535 // Data generation 536 attributes.add(Attributes.create("generation-id", String.valueOf(generationId))); 537 538 return attributes; 539 } 540 541 /** 542 * Retrieves the name of this monitor provider. It should be unique among all 543 * monitor providers, including all instances of the same monitor provider. 544 * 545 * @return The name of this monitor provider. 546 */ 547 @Override 548 public abstract String getMonitorInstanceName(); 549 550 /** 551 * Gets the protocol version used with this remote server. 552 * @return The protocol version used with this remote server. 553 */ 554 public short getProtocolVersion() 555 { 556 return session.getProtocolVersion(); 557 } 558 559 /** 560 * Get the Server Id. 561 * 562 * @return the ID of the server to which this object is linked 563 */ 564 public int getServerId() 565 { 566 return serverId; 567 } 568 569 /** 570 * Retrieves the URL for this server handler. 571 * 572 * @return The URL for this server handler, in the form of an address and 573 * port separated by a colon. 574 */ 575 public String getServerURL() 576 { 577 return serverURL; 578 } 579 580 /** 581 * Return the ServerStatus. 582 * @return The server status. 583 */ 584 protected abstract ServerStatus getStatus(); 585 586 /** 587 * Increment the number of updates received from the server in assured safe 588 * data mode. 589 */ 590 public void incrementAssuredSdReceivedUpdates() 591 { 592 assuredSdReceivedUpdates++; 593 } 594 595 /** 596 * Increment the number of updates received from the server in assured safe 597 * data mode that timed out. 598 */ 599 public void incrementAssuredSdReceivedUpdatesTimeout() 600 { 601 assuredSdReceivedUpdatesTimeout.incrementAndGet(); 602 } 603 604 /** 605 * Increment the number of updates sent to the server in assured safe data 606 * mode that timed out. 607 */ 608 public void incrementAssuredSdSentUpdatesTimeout() 609 { 610 assuredSdSentUpdatesTimeout.incrementAndGet(); 611 } 612 613 /** 614 * Increment the number of updates received from the server in assured safe 615 * read mode. 616 */ 617 public void incrementAssuredSrReceivedUpdates() 618 { 619 assuredSrReceivedUpdates++; 620 } 621 622 /** 623 * Increment the number of updates received from the server in assured safe 624 * read mode that timed out. 625 */ 626 public void incrementAssuredSrReceivedUpdatesTimeout() 627 { 628 assuredSrReceivedUpdatesTimeout.incrementAndGet(); 629 } 630 631 /** 632 * Increment the number of updates sent to the server in assured safe read 633 * mode that timed out. 634 */ 635 public void incrementAssuredSrSentUpdatesTimeout() 636 { 637 assuredSrSentUpdatesTimeout.incrementAndGet(); 638 } 639 640 /** {@inheritDoc} */ 641 @Override 642 public void initializeMonitorProvider(MonitorProviderCfg configuration) 643 throws ConfigException, InitializationException 644 { 645 // Nothing to do for now 646 } 647 648 /** 649 * Check if the server associated to this ServerHandler is a data server 650 * in the topology. 651 * @return true if the server is a data server. 652 */ 653 public abstract boolean isDataServer(); 654 655 /** 656 * Check if the server associated to this ServerHandler is a replication 657 * server. 658 * @return true if the server is a replication server. 659 */ 660 public boolean isReplicationServer() 661 { 662 return !isDataServer(); 663 } 664 665 // The handshake phase must be done by blocking any access to structures 666 // keeping info on connected servers, so that one can safely check for 667 // pre-existence of a server, send a coherent snapshot of known topology to 668 // peers, update the local view of the topology... 669 // 670 // For instance a kind of problem could be that while we connect with a 671 // peer RS, a DS is connecting at the same time and we could publish the 672 // connected DSs to the peer RS forgetting this last DS in the TopologyMsg. 673 // 674 // This method and every others that need to read/make changes to the 675 // structures holding topology for the domain should: 676 // - call ReplicationServerDomain.lock() 677 // - read/modify structures 678 // - call ReplicationServerDomain.release() 679 // 680 // More information is provided in comment of ReplicationServerDomain.lock() 681 682 /** 683 * Lock the domain without a timeout. 684 * <p> 685 * If domain already exists, lock it until handshake is finished otherwise it 686 * will be created and locked later in the method 687 * 688 * @throws DirectoryException 689 * When an exception occurs. 690 * @throws InterruptedException 691 * If the current thread was interrupted while waiting for the lock. 692 */ 693 public void lockDomainNoTimeout() throws DirectoryException, 694 InterruptedException 695 { 696 if (!replicationServerDomain.hasLock()) 697 { 698 replicationServerDomain.lock(); 699 } 700 } 701 702 /** 703 * Lock the domain with a timeout. 704 * <p> 705 * Take the lock on the domain. WARNING: Here we try to acquire the lock with 706 * a timeout. This is for preventing a deadlock that may happen if there are 707 * cross connection attempts (for same domain) from this replication server 708 * and from a peer one. 709 * <p> 710 * Here is the scenario: 711 * <ol> 712 * <li>RS1 connect thread takes the domain lock and starts connection to RS2 713 * </li> 714 * <li>at the same time RS2 connect thread takes his domain lock and start 715 * connection to RS2</li> 716 * <li>RS2 listen thread starts processing received ReplServerStartMsg from 717 * RS1 and wants to acquire the lock on the domain (here) but cannot as RS2 718 * connect thread already has it</li> 719 * <li>RS1 listen thread starts processing received ReplServerStartMsg from 720 * RS2 and wants to acquire the lock on the domain (here) but cannot as RS1 721 * connect thread already has it</li> 722 * </ol> 723 * => Deadlock: 4 threads are locked. 724 * <p> 725 * To prevent threads locking in such situation, the listen threads here will 726 * both timeout trying to acquire the lock. The random time for the timeout 727 * should allow on connection attempt to be aborted whereas the other one 728 * should have time to finish in the same time. 729 * <p> 730 * Warning: the minimum time (3s) should be big enough to allow normal 731 * situation connections to terminate. The added random time should represent 732 * a big enough range so that the chance to have one listen thread timing out 733 * a lot before the peer one is great. When the first listen thread times out, 734 * the remote connect thread should release the lock and allow the peer listen 735 * thread to take the lock it was waiting for and process the connection 736 * attempt. 737 * 738 * @throws DirectoryException 739 * When an exception occurs. 740 * @throws InterruptedException 741 * If the current thread was interrupted while waiting for the lock. 742 */ 743 public void lockDomainWithTimeout() throws DirectoryException, 744 InterruptedException 745 { 746 final Random random = new Random(); 747 final int randomTime = random.nextInt(6); // Random from 0 to 5 748 // Wait at least 3 seconds + (0 to 5 seconds) 749 final long timeout = 3000 + randomTime * 1000; 750 final boolean lockAcquired = replicationServerDomain.tryLock(timeout); 751 if (!lockAcquired) 752 { 753 LocalizableMessage message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get( 754 getBaseDN(), serverId, session.getReadableRemoteAddress(), getReplicationServerId()); 755 throw new DirectoryException(ResultCode.OTHER, message); 756 } 757 } 758 759 /** 760 * Processes a routable message. 761 * 762 * @param msg The message to be processed. 763 */ 764 void process(RoutableMsg msg) 765 { 766 if (logger.isTraceEnabled()) 767 { 768 logger.trace("In " 769 + replicationServerDomain.getLocalRSMonitorInstanceName() + " " 770 + this + " processes routable msg received:" + msg); 771 } 772 replicationServerDomain.process(msg, this); 773 } 774 775 /** 776 * Responds to a monitor request message. 777 * 778 * @param msg 779 * The monitor request message. 780 */ 781 void processMonitorRequestMsg(MonitorRequestMsg msg) 782 { 783 replicationServerDomain.processMonitorRequestMsg(msg, this); 784 } 785 786 /** 787 * Responds to a monitor message. 788 * 789 * @param msg 790 * The monitor message. 791 */ 792 void processMonitorMsg(MonitorMsg msg) 793 { 794 replicationServerDomain.processMonitorMsg(msg, this); 795 } 796 797 /** 798 * Processes a change time heartbeat msg. 799 * 800 * @param msg 801 * The message to be processed. 802 * @throws DirectoryException 803 * When an exception is raised. 804 */ 805 void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException 806 { 807 if (logger.isTraceEnabled()) 808 { 809 logger.trace("In " 810 + replicationServerDomain.getLocalRSMonitorInstanceName() + " " 811 + this + " processes received msg:\n" + msg); 812 } 813 replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg); 814 } 815 816 /** 817 * Process the reception of a WindowProbeMsg message. 818 * 819 * @throws IOException 820 * When the session becomes unavailable. 821 */ 822 public void replyToWindowProbe() throws IOException 823 { 824 if (rcvWindow > 0) 825 { 826 // The LDAP server believes that its window is closed while it is not, 827 // this means that some problem happened in the window exchange procedure! 828 // lets update the LDAP server with out current window size and hope 829 // that everything will work better in the future. 830 // TODO also log an error message. 831 session.publish(new WindowMsg(rcvWindow)); 832 } 833 else 834 { 835 // Both the LDAP server and the replication server believes that the 836 // window is closed. Lets check the flowcontrol in case we 837 // can now resume operations and send a windowMessage if necessary. 838 checkWindow(); 839 } 840 } 841 842 /** 843 * Sends the provided TopologyMsg to the peer server. 844 * 845 * @param topoMsg 846 * The TopologyMsg message to be sent. 847 * @throws IOException 848 * When it occurs while sending the message, 849 */ 850 public void sendTopoInfo(TopologyMsg topoMsg) throws IOException 851 { 852 // V1 Rs do not support the TopologyMsg 853 if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1) 854 { 855 send(topoMsg); 856 } 857 } 858 859 /** 860 * Set a new generation ID. 861 * 862 * @param generationId The new generation ID 863 * 864 */ 865 public void setGenerationId(long generationId) 866 { 867 this.generationId = generationId; 868 } 869 870 /** 871 * Sets the window size when used when sending to the remote. 872 * @param size The provided window size. 873 */ 874 protected void setSendWindowSize(int size) 875 { 876 this.sendWindowSize = size; 877 } 878 879 /** 880 * Shutdown This ServerHandler. 881 */ 882 @Override 883 public void shutdown() 884 { 885 shutdownWriter = true; 886 setConsumerActive(false); 887 super.shutdown(); 888 889 if (session != null) 890 { 891 session.close(); 892 } 893 if (heartbeatThread != null) 894 { 895 heartbeatThread.shutdown(); 896 } 897 898 DirectoryServer.deregisterMonitorProvider(this); 899 900 /* 901 * Be sure to wait for ServerWriter and ServerReader death 902 * It does not matter if we try to stop a thread which is us (reader 903 * or writer), but we must not wait for our own thread death. 904 */ 905 try 906 { 907 if (writer != null && !Thread.currentThread().equals(writer)) 908 { 909 writer.join(SHUTDOWN_JOIN_TIMEOUT); 910 } 911 if (reader != null && !Thread.currentThread().equals(reader)) 912 { 913 reader.join(SHUTDOWN_JOIN_TIMEOUT); 914 } 915 } catch (InterruptedException e) 916 { 917 // don't try anymore to join and return. 918 } 919 if (logger.isTraceEnabled()) 920 { 921 logger.trace("SH.shutdowned(" + this + ")"); 922 } 923 } 924 925 /** 926 * Select the next update that must be sent to the server managed by this 927 * ServerHandler. 928 * 929 * @return the next update that must be sent to the server managed by this 930 * ServerHandler. 931 * @throws ChangelogException 932 * If a problem occurs when reading the changelog 933 */ 934 public UpdateMsg take() throws ChangelogException 935 { 936 final UpdateMsg msg = getNextMessage(); 937 938 acquirePermitInSendWindow(); 939 940 if (msg != null) 941 { 942 incrementOutCount(); 943 if (msg.isAssured()) 944 { 945 incrementAssuredStats(msg); 946 } 947 return msg; 948 } 949 return null; 950 } 951 952 private void acquirePermitInSendWindow() 953 { 954 boolean acquired = false; 955 boolean interrupted = true; 956 do 957 { 958 try 959 { 960 acquired = sendWindow.tryAcquire(500, TimeUnit.MILLISECONDS); 961 interrupted = false; 962 } catch (InterruptedException e) 963 { 964 // loop until not interrupted 965 } 966 } while ((interrupted || !acquired) && !shutdownWriter); 967 } 968 969 private void incrementAssuredStats(final UpdateMsg msg) 970 { 971 if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) 972 { 973 assuredSrSentUpdates++; 974 } 975 else if (!isDataServer()) 976 { 977 assuredSdSentUpdates++; 978 } 979 } 980 981 /** 982 * Creates a RSInfo structure representing this remote RS. 983 * @return The RSInfo structure representing this remote RS 984 */ 985 public RSInfo toRSInfo() 986 { 987 return new RSInfo(serverId, serverURL, generationId, groupId, weight); 988 } 989 990 /** 991 * Update the send window size based on the credit specified in the 992 * given window message. 993 * 994 * @param windowMsg The Window LocalizableMessage containing the information 995 * necessary for updating the window size. 996 */ 997 public void updateWindow(WindowMsg windowMsg) 998 { 999 sendWindow.release(windowMsg.getNumAck()); 1000 } 1001 1002 /** 1003 * Log the messages involved in the start handshake. 1004 * @param inStartMsg The message received first. 1005 * @param outStartMsg The message sent in response. 1006 */ 1007 protected void logStartHandshakeRCVandSND( 1008 StartMsg inStartMsg, 1009 StartMsg outStartMsg) 1010 { 1011 if (logger.isTraceEnabled()) 1012 { 1013 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1014 + ", " + getClass().getSimpleName() + " " + this + ":" 1015 + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg 1016 + "\nAND REPLIED:\n" + outStartMsg); 1017 } 1018 } 1019 1020 /** 1021 * Log the messages involved in the start handshake. 1022 * @param outStartMsg The message sent first. 1023 * @param inStartMsg The message received in response. 1024 */ 1025 protected void logStartHandshakeSNDandRCV( 1026 StartMsg outStartMsg, 1027 StartMsg inStartMsg) 1028 { 1029 if (logger.isTraceEnabled()) 1030 { 1031 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1032 + ", " + getClass().getSimpleName() + " " + this + ":" 1033 + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n" 1034 + inStartMsg); 1035 } 1036 } 1037 1038 /** 1039 * Log the messages involved in the Topology handshake. 1040 * @param inTopoMsg The message received first. 1041 * @param outTopoMsg The message sent in response. 1042 */ 1043 protected void logTopoHandshakeRCVandSND( 1044 TopologyMsg inTopoMsg, 1045 TopologyMsg outTopoMsg) 1046 { 1047 if (logger.isTraceEnabled()) 1048 { 1049 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1050 + ", " + getClass().getSimpleName() + " " + this + ":" 1051 + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n" 1052 + outTopoMsg); 1053 } 1054 } 1055 1056 /** 1057 * Log the messages involved in the Topology handshake. 1058 * @param outTopoMsg The message sent first. 1059 * @param inTopoMsg The message received in response. 1060 */ 1061 protected void logTopoHandshakeSNDandRCV( 1062 TopologyMsg outTopoMsg, 1063 TopologyMsg inTopoMsg) 1064 { 1065 if (logger.isTraceEnabled()) 1066 { 1067 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1068 + ", " + getClass().getSimpleName() + " " + this + ":" 1069 + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n" 1070 + inTopoMsg); 1071 } 1072 } 1073 1074 /** 1075 * Log the messages involved in the Topology/StartSession handshake. 1076 * @param inStartSessionMsg The message received first. 1077 * @param outTopoMsg The message sent in response. 1078 */ 1079 protected void logStartSessionHandshake( 1080 StartSessionMsg inStartSessionMsg, 1081 TopologyMsg outTopoMsg) 1082 { 1083 if (logger.isTraceEnabled()) 1084 { 1085 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1086 + ", " + getClass().getSimpleName() + " " + this + " :" 1087 + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg 1088 + "\nAND REPLIED:\n" + outTopoMsg); 1089 } 1090 } 1091 1092 /** 1093 * Log stop message has been received. 1094 */ 1095 protected void logStopReceived() 1096 { 1097 if (logger.isTraceEnabled()) 1098 { 1099 logger.trace("In " + this.replicationServer.getMonitorInstanceName() 1100 + ", " + getClass().getSimpleName() + " " + this + " :" 1101 + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE"); 1102 } 1103 } 1104 1105 /** 1106 * Process a Ack message received. 1107 * @param ack the message received. 1108 */ 1109 void processAck(AckMsg ack) 1110 { 1111 replicationServerDomain.processAck(ack, this); 1112 } 1113 1114 /** 1115 * Get the reference generation id (associated with the changes in the db). 1116 * @return the reference generation id. 1117 */ 1118 public long getReferenceGenId() 1119 { 1120 return replicationServerDomain.getGenerationId(); 1121 } 1122 1123 /** 1124 * Process a ResetGenerationIdMsg message received. 1125 * @param msg the message received. 1126 */ 1127 void processResetGenId(ResetGenerationIdMsg msg) 1128 { 1129 replicationServerDomain.resetGenerationId(this, msg); 1130 } 1131 1132 /** 1133 * Put a new update message received. 1134 * @param update the update message received. 1135 * @throws IOException when it occurs. 1136 */ 1137 public void put(UpdateMsg update) throws IOException 1138 { 1139 decAndCheckWindow(); 1140 replicationServerDomain.put(update, this); 1141 } 1142 1143 /** 1144 * Stop this handler. 1145 */ 1146 public void doStop() 1147 { 1148 replicationServerDomain.stopServer(this, false); 1149 } 1150 1151 /** 1152 * Creates a ReplServerStartMsg for the current ServerHandler. 1153 * 1154 * @return a new ReplServerStartMsg for the current ServerHandler. 1155 */ 1156 protected ReplServerStartMsg createReplServerStartMsg() 1157 { 1158 return new ReplServerStartMsg(getReplicationServerId(), 1159 getReplicationServerURL(), getBaseDN(), maxRcvWindow, 1160 replicationServerDomain.getLatestServerState(), localGenerationId, 1161 sslEncryption, getLocalGroupId(), 1162 replicationServer.getDegradedStatusThreshold()); 1163 } 1164 1165 /** 1166 * Returns a "badly disconnected" error message for this server handler. 1167 * 1168 * @return a "badly disconnected" error message for this server handler 1169 */ 1170 public LocalizableMessage getBadlyDisconnectedErrorMessage() 1171 { 1172 if (isDataServer()) 1173 { 1174 return ERR_DS_BADLY_DISCONNECTED.get(getReplicationServerId(), 1175 getServerId(), session.getReadableRemoteAddress(), getBaseDN()); 1176 } 1177 return ERR_RS_BADLY_DISCONNECTED.get(getReplicationServerId(), 1178 getServerId(), session.getReadableRemoteAddress(), getBaseDN()); 1179 } 1180}