001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server; 028 029import static org.opends.messages.ReplicationMessages.*; 030import static org.opends.server.replication.protocol.ProtocolVersion.*; 031 032import java.io.IOException; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ConcurrentHashMap; 037 038import org.forgerock.i18n.LocalizableMessage; 039import org.forgerock.i18n.slf4j.LocalizedLogger; 040import org.forgerock.opendj.ldap.ResultCode; 041import org.opends.server.replication.common.DSInfo; 042import org.opends.server.replication.common.RSInfo; 043import org.opends.server.replication.common.ServerState; 044import org.opends.server.replication.common.ServerStatus; 045import org.opends.server.replication.protocol.*; 046import org.opends.server.types.*; 047 048/** 049 * This class defines a server handler, which handles all interaction with a 050 * peer replication server. 051 */ 052public class ReplicationServerHandler extends ServerHandler 053{ 054 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 055 056 /** Properties filled only if remote server is a RS. */ 057 private String serverAddressURL; 058 /** 059 * This collection will contain as many elements as there are 060 * LDAP servers connected to the remote replication server. 061 */ 062 private final Map<Integer, LightweightServerHandler> remoteDirectoryServers = new ConcurrentHashMap<>(); 063 064 /** 065 * Starts this handler based on a start message received from remote server. 066 * @param inReplServerStartMsg The start msg provided by the remote server. 067 * @return Whether the remote server requires encryption or not. 068 * @throws DirectoryException When a problem occurs. 069 */ 070 private boolean processStartFromRemote( 071 ReplServerStartMsg inReplServerStartMsg) 072 throws DirectoryException 073 { 074 try 075 { 076 short protocolVersion = getCompatibleVersion(inReplServerStartMsg 077 .getVersion()); 078 session.setProtocolVersion(protocolVersion); 079 generationId = inReplServerStartMsg.getGenerationId(); 080 serverId = inReplServerStartMsg.getServerId(); 081 serverURL = inReplServerStartMsg.getServerURL(); 082 serverAddressURL = toServerAddressURL(serverURL); 083 setBaseDNAndDomain(inReplServerStartMsg.getBaseDN(), false); 084 setInitialServerState(inReplServerStartMsg.getServerState()); 085 setSendWindowSize(inReplServerStartMsg.getWindowSize()); 086 if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) 087 { 088 // We support connection from a V1 RS 089 // Only V2 protocol has the group id in repl server start message 090 this.groupId = inReplServerStartMsg.getGroupId(); 091 } 092 093 oldGenerationId = -100; 094 } 095 catch(Exception e) 096 { 097 LocalizableMessage message = LocalizableMessage.raw(e.getLocalizedMessage()); 098 throw new DirectoryException(ResultCode.OTHER, message); 099 } 100 return inReplServerStartMsg.getSSLEncryption(); 101 } 102 103 private String toServerAddressURL(String serverURL) 104 { 105 final int port = HostPort.valueOf(serverURL).getPort(); 106 // Ensure correct formatting of IPv6 addresses by using a HostPort instance. 107 return new HostPort(session.getRemoteAddress(), port).toString(); 108 } 109 110 /** 111 * Sends a start message to the remote RS. 112 * 113 * @return The ReplServerStartMsg sent. 114 * @throws IOException 115 * When an exception occurs. 116 */ 117 private ReplServerStartMsg sendStartToRemote() throws IOException 118 { 119 ReplServerStartMsg outReplServerStartMsg = createReplServerStartMsg(); 120 send(outReplServerStartMsg); 121 return outReplServerStartMsg; 122 } 123 124 /** 125 * Creates a new handler object to a remote replication server. 126 * @param session The session with the remote RS. 127 * @param queueSize The queue size to manage updates to that RS. 128 * @param replicationServer The hosting local RS object. 129 * @param rcvWindowSize The receiving window size. 130 */ 131 public ReplicationServerHandler( 132 Session session, 133 int queueSize, 134 ReplicationServer replicationServer, 135 int rcvWindowSize) 136 { 137 super(session, queueSize, replicationServer, rcvWindowSize); 138 } 139 140 /** 141 * Connect the hosting RS to the RS represented by THIS handler 142 * on an outgoing connection. 143 * @param baseDN The baseDN 144 * @param sslEncryption The sslEncryption requested to the remote RS. 145 * @throws DirectoryException when an error occurs. 146 */ 147 public void connect(DN baseDN, boolean sslEncryption) 148 throws DirectoryException 149 { 150 // we are the initiator and decides of the encryption 151 this.sslEncryption = sslEncryption; 152 153 setBaseDNAndDomain(baseDN, false); 154 155 localGenerationId = replicationServerDomain.getGenerationId(); 156 oldGenerationId = localGenerationId; 157 158 try 159 { 160 lockDomainNoTimeout(); 161 162 ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(); 163 164 // Wait answer 165 ReplicationMsg msg = session.receive(); 166 167 // Reject bad responses 168 if (!(msg instanceof ReplServerStartMsg)) 169 { 170 if (msg instanceof StopMsg) 171 { 172 // Remote replication server is probably shutting down or simultaneous 173 // cross-connect detected. 174 abortStart(null); 175 } 176 else 177 { 178 LocalizableMessage message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg 179 .getClass().getCanonicalName(), "ReplServerStartMsg"); 180 abortStart(message); 181 } 182 return; 183 } 184 185 processStartFromRemote((ReplServerStartMsg) msg); 186 187 if (replicationServerDomain.isAlreadyConnectedToRS(this)) 188 { 189 // Simultaneous cross connect. 190 abortStart(null); 191 return; 192 } 193 194 /* 195 Since we are going to send the topology message before having received 196 one, we need to set the generation ID as soon as possible if it is 197 currently uninitialized. See OpenDJ-121. 198 */ 199 if (localGenerationId < 0 && generationId > 0) 200 { 201 oldGenerationId = 202 replicationServerDomain.changeGenerationId(generationId); 203 } 204 205 logStartHandshakeSNDandRCV(outReplServerStartMsg,(ReplServerStartMsg)msg); 206 207 // Until here session is encrypted then it depends on the negotiation 208 // The session initiator decides whether to use SSL. 209 if (!this.sslEncryption) 210 { 211 session.stopEncryption(); 212 } 213 214 if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1) 215 { 216 /* 217 Only protocol version above V1 has a phase 2 handshake 218 NOW PROCEED WITH SECOND PHASE OF HANDSHAKE: 219 TopologyMsg then TopologyMsg (with a RS) 220 221 Send our own TopologyMsg to remote RS 222 */ 223 TopologyMsg outTopoMsg = 224 replicationServerDomain.createTopologyMsgForRS(); 225 sendTopoInfo(outTopoMsg); 226 227 // wait and process Topo from remote RS 228 TopologyMsg inTopoMsg = waitAndProcessTopoFromRemoteRS(); 229 if (inTopoMsg == null) 230 { 231 // Simultaneous cross connect. 232 abortStart(null); 233 return; 234 } 235 236 logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg); 237 238 /* 239 FIXME: i think this should be done for all protocol version !! 240 not only those > V1 241 */ 242 replicationServerDomain.register(this); 243 244 /* 245 Process TopologyMsg sent by remote RS: store matching new info 246 (this will also warn our connected DSs of the new received info) 247 */ 248 replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false); 249 } 250 251 logger.debug(INFO_REPLICATION_SERVER_CONNECTION_TO_RS, getReplicationServerId(), getServerId(), 252 replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress()); 253 254 super.finalizeStart(); 255 } 256 catch (IOException e) 257 { 258 logger.traceException(e); 259 LocalizableMessage errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get( 260 getReplicationServerId(), 261 session.getReadableRemoteAddress()); 262 abortStart(errMessage); 263 } 264 catch (DirectoryException e) 265 { 266 logger.traceException(e); 267 abortStart(e.getMessageObject()); 268 } 269 catch (Exception e) 270 { 271 logger.traceException(e); 272 abortStart(LocalizableMessage.raw(e.getLocalizedMessage())); 273 } 274 finally 275 { 276 releaseDomainLock(); 277 } 278 } 279 280 /** 281 * Starts the handler from a remote ReplServerStart message received from 282 * the remote replication server. 283 * @param inReplServerStartMsg The provided ReplServerStart message received. 284 */ 285 public void startFromRemoteRS(ReplServerStartMsg inReplServerStartMsg) 286 { 287 localGenerationId = -1; 288 oldGenerationId = -100; 289 try 290 { 291 // The initiator decides if the session is encrypted 292 sslEncryption = processStartFromRemote(inReplServerStartMsg); 293 294 lockDomainWithTimeout(); 295 296 if (replicationServerDomain.isAlreadyConnectedToRS(this)) 297 { 298 abortStart(null); 299 return; 300 } 301 302 this.localGenerationId = replicationServerDomain.getGenerationId(); 303 ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(); 304 305 logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg); 306 307 /* 308 until here session is encrypted then it depends on the negotiation 309 The session initiator decides whether to use SSL. 310 */ 311 if (!sslEncryption) 312 { 313 session.stopEncryption(); 314 } 315 316 TopologyMsg inTopoMsg = null; 317 if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1) 318 { 319 /* 320 Only protocol version above V1 has a phase 2 handshake 321 NOW PROCEED WITH SECOND PHASE OF HANDSHAKE: 322 TopologyMsg then TopologyMsg (with a RS) 323 wait and process Topo from remote RS 324 */ 325 inTopoMsg = waitAndProcessTopoFromRemoteRS(); 326 if (inTopoMsg == null) 327 { 328 // Simultaneous cross connect. 329 abortStart(null); 330 return; 331 } 332 333 // send our own TopologyMsg to remote RS 334 TopologyMsg outTopoMsg = replicationServerDomain 335 .createTopologyMsgForRS(); 336 sendTopoInfo(outTopoMsg); 337 338 logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg); 339 } 340 else 341 { 342 // Terminate connection from a V1 RS 343 344 // if the remote RS and the local RS have the same genID 345 // then it's ok and nothing else to do 346 if (generationId == localGenerationId) 347 { 348 if (logger.isTraceEnabled()) 349 { 350 logger.trace("In " + replicationServer.getMonitorInstanceName() 351 + " " + this + " RS V1 with serverID=" + serverId 352 + " is connected with the right generation ID"); 353 } 354 } else 355 { 356 checkGenerationId(); 357 } 358 /* 359 Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1 360 all the servers of the topology. We prefer not not send a TopologyMsg 361 for giving partial/false information to the V2 servers as for 362 instance we don't have the connected DS of the V1 RS...When the V1 363 RS will be upgraded in his turn, topo info will be sent and accurate. 364 That way, there is no risk to have false/incomplete information in 365 other servers. 366 */ 367 } 368 369 replicationServerDomain.register(this); 370 371 // Process TopologyMsg sent by remote RS: store matching new info 372 // (this will also warn our connected DSs of the new received info) 373 if (inTopoMsg!=null) 374 { 375 replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false); 376 } 377 378 logger.debug(INFO_REPLICATION_SERVER_CONNECTION_FROM_RS, getReplicationServerId(), getServerId(), 379 replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress()); 380 381 super.finalizeStart(); 382 } 383 catch (IOException e) 384 { 385 logger.traceException(e); 386 abortStart(ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get( 387 inReplServerStartMsg.getServerId(), replicationServer.getServerId())); 388 } 389 catch (DirectoryException e) 390 { 391 logger.traceException(e); 392 abortStart(e.getMessageObject()); 393 } 394 catch (Exception e) 395 { 396 logger.traceException(e); 397 abortStart(LocalizableMessage.raw(e.getLocalizedMessage())); 398 } 399 finally 400 { 401 releaseDomainLock(); 402 } 403 } 404 405 /** 406 * Wait receiving the TopologyMsg from the remote RS and process it. 407 * @return the topologyMsg received or {@code null} if stop was received. 408 * @throws DirectoryException 409 */ 410 private TopologyMsg waitAndProcessTopoFromRemoteRS() 411 throws DirectoryException 412 { 413 ReplicationMsg msg; 414 try 415 { 416 msg = session.receive(); 417 } 418 catch(Exception e) 419 { 420 LocalizableMessage message = LocalizableMessage.raw(e.getLocalizedMessage()); 421 throw new DirectoryException(ResultCode.OTHER, message); 422 } 423 424 if (!(msg instanceof TopologyMsg)) 425 { 426 if (msg instanceof StopMsg) 427 { 428 // Remote replication server is probably shutting down, or cross 429 // connection attempt. 430 return null; 431 } 432 433 LocalizableMessage message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get( 434 msg.getClass().getCanonicalName(), "TopologyMsg"); 435 throw new DirectoryException(ResultCode.OTHER, message); 436 } 437 438 // Remote RS sent his topo msg 439 TopologyMsg inTopoMsg = (TopologyMsg) msg; 440 441 /* Store remote RS weight if it has one. 442 * For protocol version < 4, use default value of 1 for weight 443 */ 444 if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) 445 { 446 // List should only contain RS info for sender 447 RSInfo rsInfo = inTopoMsg.getRsInfos().get(0); 448 weight = rsInfo.getWeight(); 449 } 450 451 /* 452 if the remote RS and the local RS have the same genID 453 then it's ok and nothing else to do 454 */ 455 if (generationId == localGenerationId) 456 { 457 if (logger.isTraceEnabled()) 458 { 459 logger.trace("In " + replicationServer.getMonitorInstanceName() 460 + " RS with serverID=" + serverId 461 + " is connected with the right generation ID, same as local =" 462 + generationId); 463 } 464 } 465 else 466 { 467 checkGenerationId(); 468 } 469 470 return inTopoMsg; 471 } 472 473 /** 474 * Checks local generation ID against the remote RS one, 475 * and logs Warning messages if needed. 476 */ 477 private void checkGenerationId() 478 { 479 if (localGenerationId <= 0) 480 { 481 // The local RS is not initialized - take the one received 482 // WARNING: Must be done before computing topo message to send to peer 483 // server as topo message must embed valid generation id for our server 484 oldGenerationId = 485 replicationServerDomain.changeGenerationId(generationId); 486 return; 487 } 488 489 // the local RS is initialized 490 if (generationId > 0 491 // the remote RS is initialized. If not, there's nothing to do anyway. 492 && generationId != localGenerationId) 493 { 494 /* Either: 495 * 496 * 1) The 2 RS have different generationID 497 * replicationServerDomain.getGenerationIdSavedStatus() == true 498 * 499 * if the present RS has received changes regarding its gen ID and so will 500 * not change without a reset then we are just degrading the peer. 501 * 502 * 2) This RS has never received any changes for the current gen ID. 503 * 504 * Example case: 505 * - we are in RS1 506 * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) 507 * - RS1 has genId1 from LS1 /genId1 comes from data in suffix 508 * - we are in RS1 and we receive a START msg from RS2 509 * - Each RS keeps its genID / is degraded and when LS2 510 * will be populated from LS1 everything will become ok. 511 * 512 * Issue: 513 * FIXME : Would it be a good idea in some cases to just set the gen ID 514 * received from the peer RS specially if the peer has a non null state 515 * and we have a null state ? 516 * replicationServerDomain.setGenerationId(generationId, false); 517 */ 518 logger.warn(WARN_BAD_GENERATION_ID_FROM_RS, serverId, session.getReadableRemoteAddress(), generationId, 519 getBaseDN(), getReplicationServerId(), localGenerationId); 520 } 521 } 522 523 /** {@inheritDoc} */ 524 @Override 525 public boolean isDataServer() 526 { 527 return false; 528 } 529 530 /** 531 * Add the DSinfos of the connected Directory Servers 532 * to the List of DSInfo provided as a parameter. 533 * 534 * @param dsInfos The List of DSInfo that should be updated 535 * with the DSInfo for the remoteDirectoryServers 536 * connected to this ServerHandler. 537 */ 538 public void addDSInfos(List<DSInfo> dsInfos) 539 { 540 synchronized (remoteDirectoryServers) 541 { 542 for (LightweightServerHandler ls : remoteDirectoryServers.values()) 543 { 544 dsInfos.add(ls.toDSInfo()); 545 } 546 } 547 } 548 549 /** 550 * Shutdown This ServerHandler. 551 */ 552 @Override 553 public void shutdown() 554 { 555 super.shutdown(); 556 clearRemoteLSHandlers(); 557 } 558 559 private void clearRemoteLSHandlers() 560 { 561 synchronized (remoteDirectoryServers) 562 { 563 for (LightweightServerHandler lsh : remoteDirectoryServers.values()) 564 { 565 lsh.stopHandler(); 566 } 567 remoteDirectoryServers.clear(); 568 } 569 } 570 571 /** 572 * Stores topology information received from a peer RS and that must be kept 573 * in RS handler. 574 * 575 * @param topoMsg The received topology message 576 */ 577 public void processTopoInfoFromRS(TopologyMsg topoMsg) 578 { 579 // List should only contain RS info for sender 580 final RSInfo rsInfo = topoMsg.getRsInfos().get(0); 581 generationId = rsInfo.getGenerationId(); 582 groupId = rsInfo.getGroupId(); 583 weight = rsInfo.getWeight(); 584 585 synchronized (remoteDirectoryServers) 586 { 587 clearRemoteLSHandlers(); 588 589 // Creates the new structure according to the message received. 590 for (DSInfo dsInfo : topoMsg.getReplicaInfos().values()) 591 { 592 // For each DS connected to the peer RS 593 DSInfo clonedDSInfo = dsInfo.cloneWithReplicationServerId(serverId); 594 LightweightServerHandler lsh = 595 new LightweightServerHandler(this, clonedDSInfo); 596 lsh.startHandler(); 597 remoteDirectoryServers.put(lsh.getServerId(), lsh); 598 } 599 } 600 } 601 602 /** 603 * When this handler is connected to a replication server, specifies if 604 * a wanted server is connected to this replication server. 605 * 606 * @param serverId The server we want to know if it is connected 607 * to the replication server represented by this handler. 608 * @return boolean True is the wanted server is connected to the server 609 * represented by this handler. 610 */ 611 public boolean isRemoteLDAPServer(int serverId) 612 { 613 synchronized (remoteDirectoryServers) 614 { 615 for (LightweightServerHandler server : remoteDirectoryServers.values()) 616 { 617 if (serverId == server.getServerId()) 618 { 619 return true; 620 } 621 } 622 return false; 623 } 624 } 625 626 /** 627 * When the handler is connected to a replication server, specifies the 628 * replication server has remote LDAP servers connected to it. 629 * 630 * @return boolean True is the replication server has remote LDAP servers 631 * connected to it. 632 */ 633 public boolean hasRemoteLDAPServers() 634 { 635 return !remoteDirectoryServers.isEmpty(); 636 } 637 638 /** 639 * Return a Set containing the servers known by this replicationServer. 640 * @return a set containing the servers known by this replicationServer. 641 */ 642 public Set<Integer> getConnectedDirectoryServerIds() 643 { 644 return remoteDirectoryServers.keySet(); 645 } 646 647 /** {@inheritDoc} */ 648 @Override 649 public String getMonitorInstanceName() 650 { 651 return "Connected replication server RS(" + serverId + ") " + serverURL 652 + ",cn=" + replicationServerDomain.getMonitorInstanceName(); 653 } 654 655 /** {@inheritDoc} */ 656 @Override 657 public List<Attribute> getMonitorData() 658 { 659 // Get the generic ones 660 List<Attribute> attributes = super.getMonitorData(); 661 662 // Add the specific RS ones 663 attributes.add(Attributes.create("Replication-Server", serverURL)); 664 665 ReplicationDomainMonitorData md = 666 replicationServerDomain.getDomainMonitorData(); 667 668 // Missing changes 669 attributes.add(Attributes.create("missing-changes", 670 String.valueOf(md.getMissingChangesRS(serverId)))); 671 672 // get the Server State 673 ServerState state = md.getRSStates(serverId); 674 if (state != null) 675 { 676 AttributeBuilder builder = new AttributeBuilder("server-state"); 677 builder.addAllStrings(state.toStringSet()); 678 attributes.add(builder.toAttribute()); 679 } 680 681 return attributes; 682 } 683 684 /** {@inheritDoc} */ 685 @Override 686 public String toString() 687 { 688 if (serverId != 0) 689 { 690 return "Replication server RS(" + serverId + ") for domain \"" 691 + replicationServerDomain.getBaseDN() + "\""; 692 } 693 return "Unknown server"; 694 } 695 696 /** 697 * Gets the status of the connected DS. 698 * @return The status of the connected DS. 699 */ 700 @Override 701 public ServerStatus getStatus() 702 { 703 return ServerStatus.INVALID_STATUS; 704 } 705 706 /** 707 * Retrieves the Address URL for this server handler. 708 * 709 * @return The Address URL for this server handler, 710 * in the form of an IP address and port separated by a colon. 711 */ 712 public String getServerAddressURL() 713 { 714 return serverAddressURL; 715 } 716 717 /** 718 * Receives a topology msg. 719 * @param topoMsg The message received. 720 * @throws DirectoryException when it occurs. 721 * @throws IOException when it occurs. 722 */ 723 public void receiveTopoInfoFromRS(TopologyMsg topoMsg) 724 throws DirectoryException, IOException 725 { 726 replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true); 727 } 728 729}