001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server; 028 029import static org.opends.messages.ReplicationMessages.*; 030import static org.opends.server.replication.common.ServerStatus.*; 031import static org.opends.server.replication.common.StatusMachine.*; 032import static org.opends.server.replication.protocol.ProtocolVersion.*; 033 034import java.io.IOException; 035import java.util.*; 036 037import org.forgerock.i18n.LocalizableMessage; 038import org.forgerock.i18n.slf4j.LocalizedLogger; 039import org.forgerock.opendj.ldap.ResultCode; 040import org.opends.server.replication.common.*; 041import org.opends.server.replication.protocol.*; 042import org.opends.server.types.*; 043 044/** 045 * This class defines a server handler, which handles all interaction with a 046 * peer server (RS or DS). 047 */ 048public class DataServerHandler extends ServerHandler 049{ 050 051 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 052 053 /** 054 * Temporary generationId received in handshake/phase1, and used after 055 * handshake/phase2. 056 */ 057 private long tmpGenerationId; 058 059 /** Status of this DS (only used if this server handler represents a DS). */ 060 private ServerStatus status = ServerStatus.INVALID_STATUS; 061 062 /** Referrals URLs this DS is exporting. */ 063 private List<String> refUrls = new ArrayList<>(); 064 /** Assured replication enabled on DS or not. */ 065 private boolean assuredFlag; 066 /** DS assured mode (relevant if assured replication enabled). */ 067 private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; 068 /** DS safe data level (relevant if assured mode is safe data). */ 069 private byte safeDataLevel = -1; 070 private Set<String> eclIncludes = new HashSet<>(); 071 private Set<String> eclIncludesForDeletes = new HashSet<>(); 072 073 /** 074 * Creates a new data server handler. 075 * @param session The session opened with the remote data server. 076 * @param queueSize The queue size. 077 * @param replicationServer The hosting RS. 078 * @param rcvWindowSize The receiving window size. 079 */ 080 public DataServerHandler( 081 Session session, 082 int queueSize, 083 ReplicationServer replicationServer, 084 int rcvWindowSize) 085 { 086 super(session, queueSize, replicationServer, rcvWindowSize); 087 } 088 089 /** 090 * Order the peer DS server to change his status or close the connection 091 * according to the requested new generation id. 092 * @param newGenId The new generation id to take into account 093 * @throws IOException If IO error occurred. 094 */ 095 public void changeStatusForResetGenId(long newGenId) throws IOException 096 { 097 StatusMachineEvent event = getStatusMachineEvent(newGenId); 098 if (event == null) 099 { 100 return; 101 } 102 103 if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT 104 && status == ServerStatus.FULL_UPDATE_STATUS) 105 { 106 // Prevent useless error message (full update status cannot lead to bad gen status) 107 logger.info(NOTE_BAD_GEN_ID_IN_FULL_UPDATE, replicationServer.getServerId(), 108 getBaseDN(), serverId, generationId, newGenId); 109 return; 110 } 111 112 changeStatus(event, "for reset gen id"); 113 } 114 115 private StatusMachineEvent getStatusMachineEvent(long newGenId) 116 { 117 if (newGenId == -1) 118 { 119 // The generation id is being made invalid, let's put the DS 120 // into BAD_GEN_ID_STATUS 121 return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; 122 } 123 if (newGenId != generationId) 124 { 125 // This server has a bad generation id compared to new reference one, 126 // let's put it into BAD_GEN_ID_STATUS 127 return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT; 128 } 129 130 if (status != ServerStatus.BAD_GEN_ID_STATUS) 131 { 132 if (logger.isTraceEnabled()) 133 { 134 logger.trace("In RS " + replicationServer.getServerId() 135 + ", DS " + getServerId() + " for baseDN=" + getBaseDN() 136 + " has already generation id " + newGenId 137 + " so no ChangeStatusMsg sent to him."); 138 } 139 return null; 140 } 141 142 // This server has the good new reference generation id. 143 // Close connection with him to force his reconnection: DS will 144 // reconnect in NORMAL_STATUS or DEGRADED_STATUS. 145 146 if (logger.isTraceEnabled()) 147 { 148 logger.trace("In RS " + replicationServer.getServerId() 149 + ", closing connection to DS " + getServerId() + " for baseDN=" + getBaseDN() 150 + " to force reconnection as new local generationId" 151 + " and remote one match and DS is in bad gen id: " + newGenId); 152 } 153 154 // Connection closure must not be done calling RSD.stopHandler() as it 155 // would rewait the RSD lock that we already must have entering this 156 // method. This would lead to a reentrant lock which we do not want. 157 // So simply close the session, this will make the hang up appear 158 // after the reader thread that took the RSD lock releases it. 159 if (session != null 160 // V4 protocol introduced a StopMsg to properly close the 161 // connection between servers 162 && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) 163 { 164 try 165 { 166 session.publish(new StopMsg()); 167 } 168 catch (IOException ioe) 169 { 170 // Anyway, going to close session, so nothing to do 171 } 172 } 173 174 // NOT_CONNECTED_STATUS is the last one in RS session life: handler 175 // will soon disappear after this method call... 176 status = ServerStatus.NOT_CONNECTED_STATUS; 177 return null; 178 } 179 180 /** 181 * Change the status according to the event. 182 * 183 * @param event 184 * The event to be used for new status computation 185 * @return The new status of the DS 186 * @throws IOException 187 * When raised by the underlying session 188 */ 189 public ServerStatus changeStatus(StatusMachineEvent event) throws IOException 190 { 191 return changeStatus(event, "from status analyzer"); 192 } 193 194 private ServerStatus changeStatus(StatusMachineEvent event, String origin) 195 throws IOException 196 { 197 // Check state machine allows this new status (Sanity check) 198 ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); 199 if (newStatus == ServerStatus.INVALID_STATUS) 200 { 201 logger.error(ERR_RS_CANNOT_CHANGE_STATUS, getBaseDN(), serverId, status, event); 202 // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice 203 // versa. We may be trying to change the status while another status has 204 // just been entered: e.g a full update has just been engaged. 205 // In that case, just ignore attempt to change the status 206 return newStatus; 207 } 208 209 // Send message requesting to change the DS status 210 ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS); 211 212 if (logger.isTraceEnabled()) 213 { 214 logger.trace("In RS " + replicationServer.getServerId() 215 + " Sending change status " + origin + " to " + getServerId() 216 + " for baseDN=" + getBaseDN() + ":\n" + csMsg); 217 } 218 219 session.publish(csMsg); 220 221 status = newStatus; 222 223 return newStatus; 224 } 225 226 /** {@inheritDoc} */ 227 @Override 228 public List<Attribute> getMonitorData() 229 { 230 // Get the generic ones 231 List<Attribute> attributes = super.getMonitorData(); 232 233 // Add the specific DS ones 234 attributes.add(Attributes.create("replica", serverURL)); 235 attributes.add(Attributes.create("connected-to", 236 this.replicationServer.getMonitorInstanceName())); 237 238 ReplicationDomainMonitorData md = 239 replicationServerDomain.getDomainMonitorData(); 240 241 // Oldest missing update 242 long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); 243 if (approxFirstMissingDate > 0) 244 { 245 Date date = new Date(approxFirstMissingDate); 246 attributes.add(Attributes.create( 247 "approx-older-change-not-synchronized", date.toString())); 248 attributes.add(Attributes.create( 249 "approx-older-change-not-synchronized-millis", String 250 .valueOf(approxFirstMissingDate))); 251 } 252 253 // Missing changes 254 attributes.add(Attributes.create("missing-changes", 255 String.valueOf(md.getMissingChanges(serverId)))); 256 257 // Replication delay 258 attributes.add(Attributes.create("approximate-delay", 259 String.valueOf(md.getApproxDelay(serverId)))); 260 261 /* get the Server State */ 262 ServerState state = md.getLDAPServerState(serverId); 263 if (state != null) 264 { 265 AttributeBuilder builder = new AttributeBuilder("server-state"); 266 builder.addAllStrings(state.toStringSet()); 267 attributes.add(builder.toAttribute()); 268 } 269 270 return attributes; 271 } 272 273 /** {@inheritDoc} */ 274 @Override 275 public String getMonitorInstanceName() 276 { 277 return "Connected directory server DS(" + serverId + ") " + serverURL 278 + ",cn=" + replicationServerDomain.getMonitorInstanceName(); 279 } 280 281 /** 282 * Gets the status of the connected DS. 283 * @return The status of the connected DS. 284 */ 285 @Override 286 public ServerStatus getStatus() 287 { 288 return status; 289 } 290 291 /** {@inheritDoc} */ 292 @Override 293 public boolean isDataServer() 294 { 295 return true; 296 } 297 298 /** 299 * Process message of a remote server changing his status. 300 * @param csMsg The message containing the new status 301 * @return The new server status of the DS 302 */ 303 public ServerStatus processNewStatus(ChangeStatusMsg csMsg) 304 { 305 // Get the status the DS just entered 306 ServerStatus reqStatus = csMsg.getNewStatus(); 307 // Translate new status to a state machine event 308 StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus); 309 if (event == StatusMachineEvent.INVALID_EVENT) 310 { 311 logger.error(ERR_RS_INVALID_NEW_STATUS, reqStatus, getBaseDN(), serverId); 312 return ServerStatus.INVALID_STATUS; 313 } 314 315 // Check state machine allows this new status 316 ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); 317 if (newStatus == ServerStatus.INVALID_STATUS) 318 { 319 logger.error(ERR_RS_CANNOT_CHANGE_STATUS, getBaseDN(), serverId, status, event); 320 return ServerStatus.INVALID_STATUS; 321 } 322 323 status = newStatus; 324 return status; 325 } 326 327 /** 328 * Processes a start message received from a remote data server. 329 * @param serverStartMsg The provided start message received. 330 * @return flag specifying whether the remote server requests encryption. 331 * @throws DirectoryException raised when an error occurs. 332 */ 333 public boolean processStartFromRemote(ServerStartMsg serverStartMsg) 334 throws DirectoryException 335 { 336 session 337 .setProtocolVersion(getCompatibleVersion(serverStartMsg.getVersion())); 338 tmpGenerationId = serverStartMsg.getGenerationId(); 339 serverId = serverStartMsg.getServerId(); 340 serverURL = serverStartMsg.getServerURL(); 341 groupId = serverStartMsg.getGroupId(); 342 heartbeatInterval = serverStartMsg.getHeartbeatInterval(); 343 344 // generic stuff 345 setBaseDNAndDomain(serverStartMsg.getBaseDN(), true); 346 setInitialServerState(serverStartMsg.getServerState()); 347 setSendWindowSize(serverStartMsg.getWindowSize()); 348 349 if (heartbeatInterval < 0) 350 { 351 heartbeatInterval = 0; 352 } 353 return serverStartMsg.getSSLEncryption(); 354 } 355 356 /** Send our own TopologyMsg to DS. */ 357 private TopologyMsg sendTopoToRemoteDS() throws IOException 358 { 359 TopologyMsg outTopoMsg = replicationServerDomain 360 .createTopologyMsgForDS(this.serverId); 361 sendTopoInfo(outTopoMsg); 362 return outTopoMsg; 363 } 364 365 /** 366 * Starts the handler from a remote ServerStart message received from 367 * the remote data server. 368 * @param inServerStartMsg The provided ServerStart message received. 369 */ 370 public void startFromRemoteDS(ServerStartMsg inServerStartMsg) 371 { 372 try 373 { 374 // initializations 375 localGenerationId = -1; 376 oldGenerationId = -100; 377 378 // processes the ServerStart message received 379 boolean sessionInitiatorSSLEncryption = 380 processStartFromRemote(inServerStartMsg); 381 382 /** 383 * Hack to be sure that if a server disconnects and reconnect, we 384 * let the reader thread see the closure and cleanup any reference 385 * to old connection. This must be done before taking the domain lock so 386 * that the reader thread has a chance to stop the handler. 387 * 388 * TODO: This hack should be removed and disconnection/reconnection 389 * properly dealt with. 390 */ 391 if (replicationServerDomain.getConnectedDSs() 392 .containsKey(inServerStartMsg.getServerId())) 393 { 394 try { 395 Thread.sleep(100); 396 } 397 catch(Exception e){ 398 abortStart(null); 399 return; 400 } 401 } 402 403 lockDomainNoTimeout(); 404 405 localGenerationId = replicationServerDomain.getGenerationId(); 406 oldGenerationId = localGenerationId; 407 408 if (replicationServerDomain.isAlreadyConnectedToDS(this)) 409 { 410 abortStart(null); 411 return; 412 } 413 414 try 415 { 416 StartMsg outStartMsg = sendStartToRemote(); 417 418 logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg); 419 420 // The session initiator decides whether to use SSL. 421 // Until here session is encrypted then it depends on the negotiation 422 if (!sessionInitiatorSSLEncryption) 423 { 424 session.stopEncryption(); 425 } 426 427 // wait and process StartSessionMsg from remote RS 428 StartSessionMsg inStartSessionMsg = 429 waitAndProcessStartSessionFromRemoteDS(); 430 if (inStartSessionMsg == null) 431 { 432 // DS wants to properly close the connection (DS sent a StopMsg) 433 logStopReceived(); 434 abortStart(null); 435 return; 436 } 437 438 // Send our own TopologyMsg to remote DS 439 TopologyMsg outTopoMsg = sendTopoToRemoteDS(); 440 441 logStartSessionHandshake(inStartSessionMsg, outTopoMsg); 442 } 443 catch(IOException e) 444 { 445 LocalizableMessage errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get( 446 inServerStartMsg.getServerId(), replicationServer.getServerId()); 447 throw new DirectoryException(ResultCode.OTHER, errMessage); 448 } 449 catch (Exception e) 450 { 451 // We do not need to support DS V1 connection, we just accept RS V1 452 // connection: 453 // We just trash the message, log the event for debug purpose and close 454 // the connection 455 throw new DirectoryException(ResultCode.OTHER, null, null); 456 } 457 458 replicationServerDomain.register(this); 459 460 logger.debug(INFO_REPLICATION_SERVER_CONNECTION_FROM_DS, getReplicationServerId(), getServerId(), 461 replicationServerDomain.getBaseDN(), session.getReadableRemoteAddress()); 462 463 super.finalizeStart(); 464 } 465 catch(DirectoryException de) 466 { 467 abortStart(de.getMessageObject()); 468 } 469 catch(Exception e) 470 { 471 abortStart(null); 472 } 473 finally 474 { 475 releaseDomainLock(); 476 } 477 } 478 479 /** 480 * Sends a start message to the remote DS. 481 * 482 * @return The StartMsg sent. 483 * @throws IOException 484 * When an exception occurs. 485 */ 486 private StartMsg sendStartToRemote() throws IOException 487 { 488 final StartMsg startMsg; 489 490 // Before V4 protocol, we sent a ReplServerStartMsg 491 if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4) 492 { 493 // Peer DS uses protocol < V4 : send it a ReplServerStartMsg 494 startMsg = createReplServerStartMsg(); 495 } 496 else 497 { 498 // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg 499 startMsg = new ReplServerStartDSMsg(getReplicationServerId(), 500 getReplicationServerURL(), getBaseDN(), maxRcvWindow, 501 replicationServerDomain.getLatestServerState(), 502 localGenerationId, sslEncryption, getLocalGroupId(), 503 replicationServer.getDegradedStatusThreshold(), 504 replicationServer.getWeight(), 505 replicationServerDomain.getConnectedDSs().size()); 506 } 507 508 send(startMsg); 509 return startMsg; 510 } 511 512 /** 513 * Creates a DSInfo structure representing this remote DS. 514 * @return The DSInfo structure representing this remote DS 515 */ 516 public DSInfo toDSInfo() 517 { 518 return new DSInfo(serverId, serverURL, getReplicationServerId(), 519 generationId, status, assuredFlag, assuredMode, safeDataLevel, groupId, 520 refUrls, eclIncludes, eclIncludesForDeletes, getProtocolVersion()); 521 } 522 523 /** {@inheritDoc} */ 524 @Override 525 public String toString() 526 { 527 if (serverId != 0) 528 { 529 return "Replica DS(" + serverId + ") for domain \"" 530 + replicationServerDomain.getBaseDN() + "\""; 531 } 532 return "Unknown server"; 533 } 534 535 /** 536 * Wait receiving the StartSessionMsg from the remote DS and process it, or 537 * receiving a StopMsg to properly stop the handshake procedure. 538 * @return the startSessionMsg received or null DS sent a stop message to 539 * not finish the handshake. 540 * @throws Exception 541 */ 542 private StartSessionMsg waitAndProcessStartSessionFromRemoteDS() 543 throws Exception 544 { 545 ReplicationMsg msg = session.receive(); 546 547 if (msg instanceof StopMsg) 548 { 549 // DS wants to stop handshake (was just for handshake phase one for RS 550 // choice). Return null to make the session be terminated. 551 return null; 552 } else if (!(msg instanceof StartSessionMsg)) 553 { 554 LocalizableMessage message = LocalizableMessage.raw( 555 "Protocol error: StartSessionMsg required." + msg + " received."); 556 abortStart(message); 557 return null; 558 } 559 560 // Process StartSessionMsg sent by remote DS 561 StartSessionMsg startSessionMsg = (StartSessionMsg) msg; 562 563 this.status = startSessionMsg.getStatus(); 564 // Sanity check: is it a valid initial status? 565 if (!isValidInitialStatus(this.status)) 566 { 567 throw new DirectoryException(ResultCode.OTHER, 568 ERR_RS_INVALID_INIT_STATUS.get( this.status, getBaseDN(), serverId)); 569 } 570 571 this.refUrls = startSessionMsg.getReferralsURLs(); 572 this.assuredFlag = startSessionMsg.isAssured(); 573 this.assuredMode = startSessionMsg.getAssuredMode(); 574 this.safeDataLevel = startSessionMsg.getSafeDataLevel(); 575 this.eclIncludes = startSessionMsg.getEclIncludes(); 576 this.eclIncludesForDeletes = startSessionMsg.getEclIncludesForDeletes(); 577 578 /* 579 * If we have already a generationID set for the domain 580 * then 581 * if the connecting replica has not the same 582 * then it is degraded locally and notified by an error message 583 * else 584 * we set the generationID from the one received 585 * (unsaved yet on disk . will be set with the 1rst change 586 * received) 587 */ 588 generationId = tmpGenerationId; 589 if (localGenerationId > 0) 590 { 591 if (generationId != localGenerationId) 592 { 593 logger.warn(WARN_BAD_GENERATION_ID_FROM_DS, serverId, session.getReadableRemoteAddress(), 594 generationId, getBaseDN(), getReplicationServerId(), localGenerationId); 595 } 596 } 597 else 598 { 599 // We are an empty ReplicationServer 600 if (generationId > 0 && !getServerState().isEmpty()) 601 { 602 // If the LDAP server has already sent changes 603 // it is not expected to connect to an empty RS 604 logger.warn(WARN_BAD_GENERATION_ID_FROM_DS, serverId, session.getReadableRemoteAddress(), 605 generationId, getBaseDN(), getReplicationServerId(), localGenerationId); 606 } 607 else 608 { 609 // The local RS is not initialized - take the one received 610 // WARNING: Must be done before computing topo message to send 611 // to peer server as topo message must embed valid generation id 612 // for our server 613 oldGenerationId = replicationServerDomain.changeGenerationId(generationId); 614 } 615 } 616 return startSessionMsg; 617 } 618 619 /** 620 * Process message of a remote server changing his status. 621 * @param csMsg The message containing the new status 622 */ 623 public void receiveNewStatus(ChangeStatusMsg csMsg) 624 { 625 replicationServerDomain.processNewStatus(this, csMsg); 626 } 627}