001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Timer; 038import java.util.TimerTask; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042import java.util.concurrent.locks.ReentrantLock; 043 044import org.forgerock.i18n.LocalizableMessage; 045import org.forgerock.i18n.LocalizableMessageBuilder; 046import org.forgerock.i18n.slf4j.LocalizedLogger; 047import org.forgerock.opendj.ldap.ResultCode; 048import org.opends.server.admin.std.server.MonitorProviderCfg; 049import org.opends.server.api.MonitorProvider; 050import org.opends.server.core.DirectoryServer; 051import org.opends.server.replication.common.CSN; 052import org.opends.server.replication.common.DSInfo; 053import org.opends.server.replication.common.RSInfo; 054import org.opends.server.replication.common.ServerState; 055import org.opends.server.replication.common.ServerStatus; 056import org.opends.server.replication.common.StatusMachineEvent; 057import org.opends.server.replication.protocol.AckMsg; 058import org.opends.server.replication.protocol.ChangeStatusMsg; 059import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; 060import org.opends.server.replication.protocol.ErrorMsg; 061import org.opends.server.replication.protocol.MonitorMsg; 062import org.opends.server.replication.protocol.MonitorRequestMsg; 063import org.opends.server.replication.protocol.ReplicaOfflineMsg; 064import org.opends.server.replication.protocol.ResetGenerationIdMsg; 065import org.opends.server.replication.protocol.RoutableMsg; 066import org.opends.server.replication.protocol.TopologyMsg; 067import org.opends.server.replication.protocol.UpdateMsg; 068import org.opends.server.replication.server.changelog.api.ChangelogException; 069import org.opends.server.replication.server.changelog.api.DBCursor; 070import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 071import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 072import org.opends.server.types.Attribute; 073import org.opends.server.types.Attributes; 074import org.opends.server.types.DN; 075import org.opends.server.types.DirectoryException; 076import org.opends.server.types.HostPort; 077 078import static org.opends.messages.ReplicationMessages.*; 079import static org.opends.server.replication.common.ServerStatus.*; 080import static org.opends.server.replication.common.StatusMachineEvent.*; 081import static org.opends.server.replication.protocol.ProtocolVersion.*; 082import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; 083import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 084import static org.opends.server.util.CollectionUtils.*; 085import static org.opends.server.util.StaticUtils.*; 086 087/** 088 * This class define an in-memory cache that will be used to store 089 * the messages that have been received from an LDAP server or 090 * from another replication server and that should be forwarded to 091 * other servers. 092 * 093 * The size of the cache is set by configuration. 094 * If the cache becomes bigger than the configured size, the older messages 095 * are removed and should they be needed again must be read from the backing 096 * file 097 * 098 * it runs a thread that is responsible for saving the messages 099 * received to the disk and for trimming them 100 * Decision to trim can be based on disk space or age of the message 101 */ 102public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg> 103{ 104 private final DN baseDN; 105 106 /** 107 * Periodically verifies whether the connected DSs are late and publishes any 108 * pending status messages. 109 */ 110 private final StatusAnalyzer statusAnalyzer; 111 112 /** 113 * The monitoring publisher that periodically sends monitoring messages to the 114 * topology. Using an AtomicReference to avoid leaking references to costly 115 * threads. 116 */ 117 private final AtomicReference<MonitoringPublisher> monitoringPublisher = new AtomicReference<>(); 118 /** Maintains monitor data for the current domain. */ 119 private final ReplicationDomainMonitor domainMonitor = new ReplicationDomainMonitor(this); 120 121 /** 122 * The following map contains one balanced tree for each replica ID to which 123 * we are currently publishing the first update in the balanced tree is the 124 * next change that we must push to this particular server. 125 */ 126 private final Map<Integer, DataServerHandler> connectedDSs = new ConcurrentHashMap<>(); 127 128 /** 129 * This map contains one ServerHandler for each replication servers with which 130 * we are connected (so normally all the replication servers) the first update 131 * in the balanced tree is the next change that we must push to this 132 * particular server. 133 */ 134 private final Map<Integer, ReplicationServerHandler> connectedRSs = new ConcurrentHashMap<>(); 135 136 private final ReplicationDomainDB domainDB; 137 /** The ReplicationServer that created the current instance. */ 138 private final ReplicationServer localReplicationServer; 139 140 /** 141 * The generationId of the current replication domain. The generationId is 142 * computed by hashing the first 1000 entries in the DB. 143 */ 144 private volatile long generationId = -1; 145 /** 146 * JNR, this is legacy code, hard to follow logic. I think what this field 147 * tries to say is: "is the generationId in use anywhere?", i.e. is there a 148 * replication topology in place? As soon as an answer to any of these 149 * question comes true, then it is set to true. 150 * <p> 151 * It looks like the only use of this field is to prevent the 152 * {@link #generationId} from being reset by 153 * {@link #resetGenerationIdIfPossible()}. 154 */ 155 private volatile boolean generationIdSavedStatus; 156 157 /** The tracer object for the debug logger. */ 158 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 159 160 /** 161 * The needed info for each received assured update message we are waiting 162 * acks for. 163 * <p> 164 * Key: a CSN matching a received update message which requested 165 * assured mode usage (either safe read or safe data mode) 166 * <p> 167 * Value: The object holding every info needed about the already received acks 168 * as well as the acks to be received. 169 * 170 * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub 171 * classes javadoc. 172 */ 173 private final Map<CSN, ExpectedAcksInfo> waitingAcks = new ConcurrentHashMap<>(); 174 175 /** 176 * The timer used to run the timeout code (timer tasks) for the assured update 177 * messages we are waiting acks for. 178 */ 179 private final Timer assuredTimeoutTimer; 180 /** 181 * Counter used to purge the timer tasks references in assuredTimeoutTimer, 182 * every n number of treated assured messages. 183 */ 184 private int assuredTimeoutTimerPurgeCounter; 185 186 187 188 /** 189 * Stores pending status messages such as DS change time heartbeats for future 190 * forwarding to the rest of the topology. This class is required in order to 191 * decouple inbound IO processing from outbound IO processing and avoid 192 * potential inter-process deadlocks. In particular, the {@code ServerReader} 193 * thread must not send messages. 194 */ 195 private static class PendingStatusMessages 196 { 197 private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats = new HashMap<>(1); 198 private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs = new HashMap<>(1); 199 private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs = new HashMap<>(1); 200 private boolean sendRSTopologyMsg; 201 private boolean sendDSTopologyMsg; 202 private int excludedDSForTopologyMsg = -1; 203 204 /** 205 * Enqueues a TopologyMsg for all the connected directory servers in order 206 * to let them know the topology (every known DSs and RSs). 207 * 208 * @param excludedDS 209 * If not null, the topology message will not be sent to this DS. 210 */ 211 private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS) 212 { 213 int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1; 214 if (sendDSTopologyMsg) 215 { 216 if (excludedServerId != excludedDSForTopologyMsg) 217 { 218 excludedDSForTopologyMsg = -1; 219 } 220 } 221 else 222 { 223 sendDSTopologyMsg = true; 224 excludedDSForTopologyMsg = excludedServerId; 225 } 226 } 227 228 /** 229 * Enqueues a TopologyMsg for all the connected replication servers in order 230 * to let them know our connected LDAP servers. 231 */ 232 private void enqueueTopoInfoToAllRSs() 233 { 234 sendRSTopologyMsg = true; 235 } 236 237 /** 238 * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to 239 * all other RS instances. 240 * 241 * @param msg 242 * The heartbeat message. 243 */ 244 private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg) 245 { 246 pendingHeartbeats.put(msg.getCSN().getServerId(), msg); 247 } 248 249 private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg) 250 { 251 pendingDSMonitorMsgs.put(dsServerId, msg); 252 } 253 254 private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg) 255 { 256 pendingRSMonitorMsgs.put(rsServerId, msg); 257 } 258 259 /** {@inheritDoc} */ 260 @Override 261 public String toString() 262 { 263 return getClass().getSimpleName() 264 + " pendingHeartbeats=" + pendingHeartbeats 265 + ", pendingDSMonitorMsgs=" + pendingDSMonitorMsgs 266 + ", pendingRSMonitorMsgs=" + pendingRSMonitorMsgs 267 + ", sendRSTopologyMsg=" + sendRSTopologyMsg 268 + ", sendDSTopologyMsg=" + sendDSTopologyMsg 269 + ", excludedDSForTopologyMsg=" + excludedDSForTopologyMsg; 270 } 271 } 272 273 private final Object pendingStatusMessagesLock = new Object(); 274 275 /** @GuardedBy("pendingStatusMessagesLock") */ 276 private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages(); 277 278 /** 279 * Creates a new ReplicationServerDomain associated to the baseDN. 280 * 281 * @param baseDN 282 * The baseDN associated to the ReplicationServerDomain. 283 * @param localReplicationServer 284 * the ReplicationServer that created this instance. 285 */ 286 public ReplicationServerDomain(DN baseDN, 287 ReplicationServer localReplicationServer) 288 { 289 this.baseDN = baseDN; 290 this.localReplicationServer = localReplicationServer; 291 this.assuredTimeoutTimer = new Timer("Replication server RS(" 292 + localReplicationServer.getServerId() 293 + ") assured timer for domain \"" + baseDN + "\"", true); 294 this.domainDB = 295 localReplicationServer.getChangelogDB().getReplicationDomainDB(); 296 this.statusAnalyzer = new StatusAnalyzer(this); 297 this.statusAnalyzer.start(); 298 DirectoryServer.registerMonitorProvider(this); 299 } 300 301 /** 302 * Add an update that has been received to the list of 303 * updates that must be forwarded to all other servers. 304 * 305 * @param updateMsg The update that has been received. 306 * @param sourceHandler The ServerHandler for the server from which the 307 * update was received 308 * @throws IOException When an IO exception happens during the update 309 * processing. 310 */ 311 public void put(UpdateMsg updateMsg, ServerHandler sourceHandler) throws IOException 312 { 313 sourceHandler.updateServerState(updateMsg); 314 sourceHandler.incrementInCount(); 315 setGenerationIdIfUnset(sourceHandler.getGenerationId()); 316 317 /** 318 * If this is an assured message (a message requesting ack), we must 319 * construct the ExpectedAcksInfo object with the right number of expected 320 * acks before posting message to the writers. Otherwise some writers may 321 * have time to post, receive the ack and increment received ack counter 322 * (kept in ExpectedAcksInfo object) and we could think the acknowledgment 323 * is fully processed although it may be not (some other acks from other 324 * servers are not yet arrived). So for that purpose we do a pre-loop 325 * to determine to who we will post an assured message. 326 * Whether the assured mode is safe read or safe data, we anyway do not 327 * support the assured replication feature across topologies with different 328 * group ids. The assured feature insures assured replication based on the 329 * same locality (group id). For instance in double data center deployment 330 * (2 group id usage) with assured replication enabled, an assured message 331 * sent from data center 1 (group id = 1) will be sent to servers of both 332 * data centers, but one will request and wait acks only from servers of the 333 * data center 1. 334 */ 335 final PreparedAssuredInfo preparedAssuredInfo = getPreparedAssuredInfo(updateMsg, sourceHandler); 336 337 if (!publishUpdateMsg(updateMsg)) 338 { 339 return; 340 } 341 342 final List<Integer> assuredServers = getAssuredServers(updateMsg, preparedAssuredInfo); 343 344 /** 345 * The update message equivalent to the originally received update message, 346 * but with assured flag disabled. This message is the one that should be 347 * sent to non eligible servers for assured mode. 348 * We need a clone like of the original message with assured flag off, to be 349 * posted to servers we don't want to wait the ack from (not normal status 350 * servers or servers with different group id). This must be done because 351 * the posted message is a reference so each writer queue gets the same 352 * reference, thus, changing the assured flag of an object is done for every 353 * references posted on every writer queues. That is why we need a message 354 * version with assured flag on and another one with assured flag off. 355 */ 356 final NotAssuredUpdateMsg notAssuredUpdateMsg = 357 preparedAssuredInfo != null ? new NotAssuredUpdateMsg(updateMsg) : null; 358 359 // Push the message to the replication servers 360 if (sourceHandler.isDataServer()) 361 { 362 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 363 { 364 /** 365 * Ignore updates to RS with bad gen id 366 * (no system managed status for a RS) 367 */ 368 if (!isDifferentGenerationId(rsHandler, updateMsg)) 369 { 370 addUpdate(rsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); 371 } 372 } 373 } 374 375 // Push the message to the LDAP servers 376 for (DataServerHandler dsHandler : connectedDSs.values()) 377 { 378 // Do not forward the change to the server that just sent it 379 if (dsHandler != sourceHandler 380 && !isUpdateMsgFiltered(updateMsg, dsHandler)) 381 { 382 addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); 383 } 384 } 385 } 386 387 private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler, 388 UpdateMsg updateMsg) 389 { 390 final boolean isDifferent = isDifferentGenerationId(rsHandler.getGenerationId()); 391 if (isDifferent && logger.isTraceEnabled()) 392 { 393 debug("updateMsg " + updateMsg.getCSN() 394 + " will not be sent to replication server " 395 + rsHandler.getServerId() + " with generation id " 396 + rsHandler.getGenerationId() + " different from local " 397 + "generation id " + generationId); 398 } 399 return isDifferent; 400 } 401 402 /** 403 * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS. 404 * <p> 405 * The RSD lock should not be taken here as it is acceptable to have a delay 406 * between the time the server has a wrong status and the fact we detect it: 407 * the updates that succeed to pass during this time will have no impact on 408 * remote server. But it is interesting to not saturate uselessly the network 409 * if the updates are not necessary so this check to stop sending updates is 410 * interesting anyway. Not taking the RSD lock allows to have better 411 * performances in normal mode (most of the time). 412 */ 413 private boolean isUpdateMsgFiltered(UpdateMsg updateMsg, DataServerHandler dsHandler) 414 { 415 final ServerStatus dsStatus = dsHandler.getStatus(); 416 if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) 417 { 418 if (logger.isTraceEnabled()) 419 { 420 debug("updateMsg " + updateMsg.getCSN() 421 + " will not be sent to directory server " 422 + dsHandler.getServerId() + " with generation id " 423 + dsHandler.getGenerationId() + " different from local " 424 + "generation id " + generationId); 425 } 426 return true; 427 } 428 else if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) 429 { 430 if (logger.isTraceEnabled()) 431 { 432 debug("updateMsg " + updateMsg.getCSN() 433 + " will not be sent to directory server " 434 + dsHandler.getServerId() + " as it is in full update"); 435 } 436 return true; 437 } 438 return false; 439 } 440 441 private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg, 442 ServerHandler sourceHandler) throws IOException 443 { 444 // Assured feature is supported starting from replication protocol V2 445 if (!updateMsg.isAssured() 446 || sourceHandler.getProtocolVersion() < REPLICATION_PROTOCOL_V2) 447 { 448 return null; 449 } 450 451 // According to assured sub-mode, prepare structures to keep track of 452 // the acks we are interested in. 453 switch (updateMsg.getAssuredMode()) 454 { 455 case SAFE_DATA_MODE: 456 sourceHandler.incrementAssuredSdReceivedUpdates(); 457 return processSafeDataUpdateMsg(updateMsg, sourceHandler); 458 459 case SAFE_READ_MODE: 460 sourceHandler.incrementAssuredSrReceivedUpdates(); 461 return processSafeReadUpdateMsg(updateMsg, sourceHandler); 462 463 default: 464 // Unknown assured mode: should never happen 465 logger.error(ERR_RS_UNKNOWN_ASSURED_MODE, 466 localReplicationServer.getServerId(), updateMsg.getAssuredMode(), baseDN, updateMsg); 467 return null; 468 } 469 } 470 471 private List<Integer> getAssuredServers(UpdateMsg updateMsg, PreparedAssuredInfo preparedAssuredInfo) 472 { 473 List<Integer> expectedServers = null; 474 if (preparedAssuredInfo != null && preparedAssuredInfo.expectedServers != null) 475 { 476 expectedServers = preparedAssuredInfo.expectedServers; 477 // Store the expected acks info into the global map. 478 // The code for processing reception of acks for this update will update 479 // info kept in this object and if enough acks received, it will send 480 // back the final ack to the requester and remove the object from this map 481 // OR 482 // The following timer will time out and send an timeout ack to the 483 // requester if the acks are not received in time. The timer will also 484 // remove the object from this map. 485 final CSN csn = updateMsg.getCSN(); 486 waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo); 487 488 // Arm timer for this assured update message (wait for acks until it times out) 489 final AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn); 490 assuredTimeoutTimer.schedule(assuredTimeoutTask, localReplicationServer.getAssuredTimeout()); 491 // Purge timer every 100 treated messages 492 assuredTimeoutTimerPurgeCounter++; 493 if ((assuredTimeoutTimerPurgeCounter % 100) == 0) 494 { 495 assuredTimeoutTimer.purge(); 496 } 497 } 498 499 return expectedServers != null ? expectedServers : Collections.<Integer> emptyList(); 500 } 501 502 private boolean publishUpdateMsg(UpdateMsg updateMsg) 503 { 504 try 505 { 506 if (updateMsg instanceof ReplicaOfflineMsg) 507 { 508 final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg; 509 this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN()); 510 return true; 511 } 512 513 if (this.domainDB.publishUpdateMsg(baseDN, updateMsg)) 514 { 515 /* 516 * JNR: Matt and I had a hard time figuring out where to put this 517 * synchronized block. We elected to put it here, but without a strong 518 * conviction. 519 */ 520 synchronized (generationIDLock) 521 { 522 /* 523 * JNR: I think the generationIdSavedStatus is set to true because 524 * method above created a ReplicaDB which assumes the generationId was 525 * communicated to another server. Hence setting true on this field 526 * prevent the generationId from being reset. 527 */ 528 generationIdSavedStatus = true; 529 } 530 } 531 return true; 532 } 533 catch (ChangelogException e) 534 { 535 /* 536 * Because of database problem we can't save any more changes from at 537 * least one LDAP server. This replicationServer therefore can't do it's 538 * job properly anymore and needs to close all its connections and 539 * shutdown itself. 540 */ 541 logger.error(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR, stackTraceToSingleLineString(e)); 542 localReplicationServer.shutdown(); 543 return false; 544 } 545 } 546 547 private void addUpdate(ServerHandler sHandler, UpdateMsg updateMsg, 548 NotAssuredUpdateMsg notAssuredUpdateMsg, List<Integer> assuredServers) 549 { 550 // Assured mode: post an assured or not assured matching update message 551 // according to what has been computed for the destination server 552 if (notAssuredUpdateMsg != null 553 && !assuredServers.contains(sHandler.getServerId())) 554 { 555 sHandler.add(notAssuredUpdateMsg); 556 } 557 else 558 { 559 sHandler.add(updateMsg); 560 } 561 } 562 563 /** 564 * Helper class to be the return type of a method that processes a just 565 * received assured update message: 566 * - processSafeReadUpdateMsg 567 * - processSafeDataUpdateMsg 568 * This is a facility to pack many interesting returned object. 569 */ 570 private class PreparedAssuredInfo 571 { 572 /** 573 * The list of servers identified as servers we are interested in 574 * receiving acks from. If this list is not null, then expectedAcksInfo 575 * should be not null. 576 * Servers that are not in this list are servers not eligible for an ack 577 * request. 578 */ 579 public List<Integer> expectedServers; 580 581 /** 582 * The constructed ExpectedAcksInfo object to be used when acks will be 583 * received. Null if expectedServers is null. 584 */ 585 public ExpectedAcksInfo expectedAcksInfo; 586 } 587 588 /** 589 * Process a just received assured update message in Safe Read mode. If the 590 * ack can be sent immediately, it is done here. This will also determine to 591 * which suitable servers an ack should be requested from, and which ones are 592 * not eligible for an ack request. 593 * This method is an helper method for the put method. Have a look at the put 594 * method for a better understanding. 595 * @param update The just received assured update to process. 596 * @param sourceHandler The ServerHandler for the server from which the 597 * update was received 598 * @return A suitable PreparedAssuredInfo object that contains every needed 599 * info to proceed with post to server writers. 600 * @throws IOException When an IO exception happens during the update 601 * processing. 602 */ 603 private PreparedAssuredInfo processSafeReadUpdateMsg( 604 UpdateMsg update, ServerHandler sourceHandler) throws IOException 605 { 606 CSN csn = update.getCSN(); 607 byte groupId = localReplicationServer.getGroupId(); 608 byte sourceGroupId = sourceHandler.getGroupId(); 609 List<Integer> expectedServers = new ArrayList<>(); 610 List<Integer> wrongStatusServers = new ArrayList<>(); 611 612 if (sourceGroupId == groupId) 613 // Assured feature does not cross different group ids 614 { 615 if (sourceHandler.isDataServer()) 616 { 617 collectRSsEligibleForAssuredReplication(groupId, expectedServers); 618 } 619 620 // Look for DS eligible for assured 621 for (DataServerHandler dsHandler : connectedDSs.values()) 622 { 623 // Don't forward the change to the server that just sent it 624 if (dsHandler == sourceHandler) 625 { 626 continue; 627 } 628 if (dsHandler.getGroupId() == groupId) 629 // No ack expected from a DS with different group id 630 { 631 ServerStatus serverStatus = dsHandler.getStatus(); 632 if (serverStatus == ServerStatus.NORMAL_STATUS) 633 { 634 expectedServers.add(dsHandler.getServerId()); 635 } else if (serverStatus == ServerStatus.DEGRADED_STATUS) { 636 // No ack expected from a DS with wrong status 637 wrongStatusServers.add(dsHandler.getServerId()); 638 } 639 /* 640 * else 641 * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS: 642 * We do not want this to be reported as an error to the update 643 * maker -> no pollution or potential misunderstanding when 644 * reading logs or monitoring and it was just administration (for 645 * instance new server is being configured in topo: it goes in bad 646 * gen then full update). 647 */ 648 } 649 } 650 } 651 652 // Return computed structures 653 PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo(); 654 if (!expectedServers.isEmpty()) 655 { 656 // Some other acks to wait for 657 preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(csn, 658 sourceHandler, expectedServers, wrongStatusServers); 659 preparedAssuredInfo.expectedServers = expectedServers; 660 } 661 662 if (preparedAssuredInfo.expectedServers == null) 663 { 664 // No eligible servers found, send the ack immediately 665 sourceHandler.send(new AckMsg(csn)); 666 } 667 668 return preparedAssuredInfo; 669 } 670 671 /** 672 * Process a just received assured update message in Safe Data mode. If the 673 * ack can be sent immediately, it is done here. This will also determine to 674 * which suitable servers an ack should be requested from, and which ones are 675 * not eligible for an ack request. 676 * This method is an helper method for the put method. Have a look at the put 677 * method for a better understanding. 678 * @param update The just received assured update to process. 679 * @param sourceHandler The ServerHandler for the server from which the 680 * update was received 681 * @return A suitable PreparedAssuredInfo object that contains every needed 682 * info to proceed with post to server writers. 683 * @throws IOException When an IO exception happens during the update 684 * processing. 685 */ 686 private PreparedAssuredInfo processSafeDataUpdateMsg( 687 UpdateMsg update, ServerHandler sourceHandler) throws IOException 688 { 689 CSN csn = update.getCSN(); 690 boolean interestedInAcks = false; 691 byte safeDataLevel = update.getSafeDataLevel(); 692 byte groupId = localReplicationServer.getGroupId(); 693 byte sourceGroupId = sourceHandler.getGroupId(); 694 if (safeDataLevel < (byte) 1) 695 { 696 // Should never happen 697 logger.error(ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL, 698 localReplicationServer.getServerId(), safeDataLevel, baseDN, update); 699 } else if (sourceGroupId == groupId 700 // Assured feature does not cross different group IDS 701 && isSameGenerationId(sourceHandler.getGenerationId())) 702 // Ignore assured updates from wrong generationId servers 703 { 704 if (sourceHandler.isDataServer()) 705 { 706 if (safeDataLevel == (byte) 1) 707 { 708 /** 709 * Immediately return the ack for an assured message in safe data 710 * mode with safe data level 1, coming from a DS. No need to wait 711 * for more acks 712 */ 713 sourceHandler.send(new AckMsg(csn)); 714 } else 715 { 716 /** 717 * level > 1 : We need further acks 718 * The message will be posted in assured mode to eligible 719 * servers. The embedded safe data level is not changed, and his 720 * value will be used by a remote RS to determine if he must send 721 * an ack (level > 1) or not (level = 1) 722 */ 723 interestedInAcks = true; 724 } 725 } else 726 { // A RS sent us the safe data message, for sure no further ack to wait 727 /** 728 * Level 1 has already been reached so no further acks to wait. 729 * Just deal with level > 1 730 */ 731 if (safeDataLevel > (byte) 1) 732 { 733 sourceHandler.send(new AckMsg(csn)); 734 } 735 } 736 } 737 738 List<Integer> expectedServers = new ArrayList<>(); 739 if (interestedInAcks && sourceHandler.isDataServer()) 740 { 741 collectRSsEligibleForAssuredReplication(groupId, expectedServers); 742 } 743 744 // Return computed structures 745 PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo(); 746 int nExpectedServers = expectedServers.size(); 747 if (interestedInAcks) // interestedInAcks so level > 1 748 { 749 if (nExpectedServers > 0) 750 { 751 // Some other acks to wait for 752 int sdl = update.getSafeDataLevel(); 753 int neededAdditionalServers = sdl - 1; 754 // Change the number of expected acks if not enough available eligible 755 // servers: the level is a best effort thing, we do not want to timeout 756 // at every assured SD update for instance if a RS has had his gen id 757 // reseted 758 byte finalSdl = (nExpectedServers >= neededAdditionalServers) ? 759 (byte)sdl : // Keep level as it was 760 (byte)(nExpectedServers+1); // Change level to match what's available 761 preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(csn, 762 sourceHandler, finalSdl, expectedServers); 763 preparedAssuredInfo.expectedServers = expectedServers; 764 } else 765 { 766 // level > 1 and source is a DS but no eligible servers found, send the 767 // ack immediately 768 sourceHandler.send(new AckMsg(csn)); 769 } 770 } 771 772 return preparedAssuredInfo; 773 } 774 775 private void collectRSsEligibleForAssuredReplication(byte groupId, 776 List<Integer> expectedServers) 777 { 778 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 779 { 780 if (rsHandler.getGroupId() == groupId 781 // No ack expected from a RS with different group id 782 && isSameGenerationId(rsHandler.getGenerationId()) 783 // No ack expected from a RS with bad gen id 784 ) 785 { 786 expectedServers.add(rsHandler.getServerId()); 787 } 788 } 789 } 790 791 private boolean isSameGenerationId(long generationId) 792 { 793 return this.generationId > 0 && this.generationId == generationId; 794 } 795 796 private boolean isDifferentGenerationId(long generationId) 797 { 798 return this.generationId > 0 && this.generationId != generationId; 799 } 800 801 /** 802 * Process an ack received from a given server. 803 * 804 * @param ack The ack message received. 805 * @param ackingServer The server handler of the server that sent the ack. 806 */ 807 void processAck(AckMsg ack, ServerHandler ackingServer) 808 { 809 // Retrieve the expected acks info for the update matching the original 810 // sent update. 811 CSN csn = ack.getCSN(); 812 ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn); 813 814 if (expectedAcksInfo != null) 815 { 816 // Prevent concurrent access from processAck() or AssuredTimeoutTask.run() 817 synchronized (expectedAcksInfo) 818 { 819 if (expectedAcksInfo.isCompleted()) 820 { 821 // Timeout code is sending a timeout ack, do nothing and let him 822 // remove object from the map 823 return; 824 } 825 /** 826 * 827 * If this is the last ack we were waiting from, immediately create and 828 * send the final ack to the original server 829 */ 830 if (expectedAcksInfo.processReceivedAck(ackingServer, ack)) 831 { 832 // Remove the object from the map as no more needed 833 waitingAcks.remove(csn); 834 AckMsg finalAck = expectedAcksInfo.createAck(false); 835 ServerHandler origServer = expectedAcksInfo.getRequesterServer(); 836 try 837 { 838 origServer.send(finalAck); 839 } catch (IOException e) 840 { 841 /** 842 * An error happened trying the send back an ack to the server. 843 * Log an error and close the connection to this server. 844 */ 845 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 846 mb.append(ERR_RS_ERROR_SENDING_ACK.get( 847 localReplicationServer.getServerId(), origServer.getServerId(), csn, baseDN)); 848 mb.append(" "); 849 mb.append(stackTraceToSingleLineString(e)); 850 logger.error(mb.toMessage()); 851 stopServer(origServer, false); 852 } 853 // Mark the ack info object as completed to prevent potential timeout 854 // code parallel run 855 expectedAcksInfo.completed(); 856 } 857 } 858 } 859 /* Else the timeout occurred for the update matching this CSN 860 * and the ack with timeout error has probably already been sent. 861 */ 862 } 863 864 /** 865 * The code run when the timeout occurs while waiting for acks of the 866 * eligible servers. This basically sends a timeout ack (with any additional 867 * error info) to the original server that sent an assured update message. 868 */ 869 private class AssuredTimeoutTask extends TimerTask 870 { 871 private CSN csn; 872 873 /** 874 * Constructor for the timer task. 875 * @param csn The CSN of the assured update we are waiting acks for 876 */ 877 public AssuredTimeoutTask(CSN csn) 878 { 879 this.csn = csn; 880 } 881 882 /** 883 * Run when the assured timeout for an assured update message we are waiting 884 * acks for occurs. 885 */ 886 @Override 887 public void run() 888 { 889 ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn); 890 891 if (expectedAcksInfo != null) 892 { 893 synchronized (expectedAcksInfo) 894 { 895 if (expectedAcksInfo.isCompleted()) 896 { 897 // processAck() code is sending the ack, do nothing and let him 898 // remove object from the map 899 return; 900 } 901 // Remove the object from the map as no more needed 902 waitingAcks.remove(csn); 903 // Create the timeout ack and send him to the server the assured 904 // update message came from 905 AckMsg finalAck = expectedAcksInfo.createAck(true); 906 ServerHandler origServer = expectedAcksInfo.getRequesterServer(); 907 if (logger.isTraceEnabled()) 908 { 909 debug("sending timeout for assured update with CSN " + csn 910 + " to serverId=" + origServer.getServerId()); 911 } 912 try 913 { 914 origServer.send(finalAck); 915 } catch (IOException e) 916 { 917 /** 918 * An error happened trying the send back an ack to the server. 919 * Log an error and close the connection to this server. 920 */ 921 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 922 mb.append(ERR_RS_ERROR_SENDING_ACK.get( 923 localReplicationServer.getServerId(), origServer.getServerId(), csn, baseDN)); 924 mb.append(" "); 925 mb.append(stackTraceToSingleLineString(e)); 926 logger.error(mb.toMessage()); 927 stopServer(origServer, false); 928 } 929 // Increment assured counters 930 boolean safeRead = 931 expectedAcksInfo instanceof SafeReadExpectedAcksInfo; 932 if (safeRead) 933 { 934 origServer.incrementAssuredSrReceivedUpdatesTimeout(); 935 } else 936 { 937 if (origServer.isDataServer()) 938 { 939 origServer.incrementAssuredSdReceivedUpdatesTimeout(); 940 } 941 } 942 // retrieve expected servers in timeout to increment their counter 943 List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers(); 944 for (Integer serverId : serversInTimeout) 945 { 946 ServerHandler expectedDSInTimeout = connectedDSs.get(serverId); 947 ServerHandler expectedRSInTimeout = connectedRSs.get(serverId); 948 if (expectedDSInTimeout != null) 949 { 950 if (safeRead) 951 { 952 expectedDSInTimeout.incrementAssuredSrSentUpdatesTimeout(); 953 } // else no SD update sent to a DS (meaningless) 954 } else if (expectedRSInTimeout != null) 955 { 956 if (safeRead) 957 { 958 expectedRSInTimeout.incrementAssuredSrSentUpdatesTimeout(); 959 } 960 else 961 { 962 expectedRSInTimeout.incrementAssuredSdSentUpdatesTimeout(); 963 } 964 } 965 // else server disappeared ? Let's forget about it. 966 } 967 // Mark the ack info object as completed to prevent potential 968 // processAck() code parallel run 969 expectedAcksInfo.completed(); 970 } 971 } 972 } 973 } 974 975 976 /** 977 * Stop operations with a list of replication servers. 978 * 979 * @param serversToDisconnect 980 * the replication servers addresses for which we want to stop 981 * operations 982 */ 983 public void stopReplicationServers(Collection<HostPort> serversToDisconnect) 984 { 985 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 986 { 987 if (serversToDisconnect.contains( 988 HostPort.valueOf(rsHandler.getServerAddressURL()))) 989 { 990 stopServer(rsHandler, false); 991 } 992 } 993 } 994 995 /** 996 * Stop operations with all servers this domain is connected with (RS and DS). 997 * 998 * @param shutdown A boolean indicating if the stop is due to a 999 * shutdown condition. 1000 */ 1001 public void stopAllServers(boolean shutdown) 1002 { 1003 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 1004 { 1005 stopServer(rsHandler, shutdown); 1006 } 1007 1008 for (DataServerHandler dsHandler : connectedDSs.values()) 1009 { 1010 stopServer(dsHandler, shutdown); 1011 } 1012 } 1013 1014 /** 1015 * Checks whether it is already connected to a DS with same id. 1016 * 1017 * @param dsHandler 1018 * the DS we want to check 1019 * @return true if this DS is already connected to the current server 1020 */ 1021 public boolean isAlreadyConnectedToDS(DataServerHandler dsHandler) 1022 { 1023 if (connectedDSs.containsKey(dsHandler.getServerId())) 1024 { 1025 // looks like two connected LDAP servers have the same serverId 1026 logger.error(ERR_DUPLICATE_SERVER_ID, localReplicationServer.getMonitorInstanceName(), 1027 connectedDSs.get(dsHandler.getServerId()), dsHandler, dsHandler.getServerId()); 1028 return true; 1029 } 1030 return false; 1031 } 1032 1033 /** 1034 * Stop operations with a given server. 1035 * 1036 * @param sHandler the server for which we want to stop operations. 1037 * @param shutdown A boolean indicating if the stop is due to a 1038 * shutdown condition. 1039 */ 1040 public void stopServer(ServerHandler sHandler, boolean shutdown) 1041 { 1042 // TODO JNR merge with stopServer(MessageHandler) 1043 if (logger.isTraceEnabled()) 1044 { 1045 debug("stopServer() on the server handler " + sHandler); 1046 } 1047 /* 1048 * We must prevent deadlock on replication server domain lock, when for 1049 * instance this code is called from dying ServerReader but also dying 1050 * ServerWriter at the same time, or from a thread that wants to shut down 1051 * the handler. So use a thread safe flag to know if the job must be done 1052 * or not (is already being processed or not). 1053 */ 1054 if (!sHandler.engageShutdown()) 1055 // Only do this once (prevent other thread to enter here again) 1056 { 1057 if (!shutdown) 1058 { 1059 try 1060 { 1061 // Acquire lock on domain (see more details in comment of start() 1062 // method of ServerHandler) 1063 lock(); 1064 } 1065 catch (InterruptedException ex) 1066 { 1067 // We can't deal with this here, so re-interrupt thread so that it is 1068 // caught during subsequent IO. 1069 Thread.currentThread().interrupt(); 1070 return; 1071 } 1072 } 1073 1074 try 1075 { 1076 // Stop useless monitoring publisher if no more RS or DS in domain 1077 if ( (connectedDSs.size() + connectedRSs.size() )== 1) 1078 { 1079 if (logger.isTraceEnabled()) 1080 { 1081 debug("remote server " + sHandler 1082 + " is the last RS/DS to be stopped:" 1083 + " stopping monitoring publisher"); 1084 } 1085 stopMonitoringPublisher(); 1086 } 1087 1088 if (connectedRSs.containsKey(sHandler.getServerId())) 1089 { 1090 unregisterServerHandler(sHandler, shutdown, false); 1091 } 1092 else if (connectedDSs.containsKey(sHandler.getServerId())) 1093 { 1094 unregisterServerHandler(sHandler, shutdown, true); 1095 } 1096 } 1097 catch(Exception e) 1098 { 1099 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1100 } 1101 finally 1102 { 1103 if (!shutdown) 1104 { 1105 release(); 1106 } 1107 } 1108 } 1109 } 1110 1111 private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown, 1112 boolean isDirectoryServer) 1113 { 1114 unregisterServerHandler(sHandler); 1115 sHandler.shutdown(); 1116 1117 resetGenerationIdIfPossible(); 1118 if (!shutdown) 1119 { 1120 synchronized (pendingStatusMessagesLock) 1121 { 1122 if (isDirectoryServer) 1123 { 1124 // Update the remote replication servers with our list 1125 // of connected LDAP servers 1126 pendingStatusMessages.enqueueTopoInfoToAllRSs(); 1127 } 1128 // Warn our DSs that a RS or DS has quit (does not use this 1129 // handler as already removed from list) 1130 pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null); 1131 } 1132 statusAnalyzer.notifyPendingStatusMessage(); 1133 } 1134 } 1135 1136 /** 1137 * Unregister this handler from the list of handlers registered to this 1138 * domain. 1139 * @param sHandler the provided handler to unregister. 1140 */ 1141 private void unregisterServerHandler(ServerHandler sHandler) 1142 { 1143 if (sHandler.isReplicationServer()) 1144 { 1145 connectedRSs.remove(sHandler.getServerId()); 1146 } 1147 else 1148 { 1149 connectedDSs.remove(sHandler.getServerId()); 1150 } 1151 } 1152 1153 /** 1154 * This method resets the generationId for this domain if there is no LDAP 1155 * server currently connected in the whole topology on this domain and if the 1156 * generationId has never been saved. 1157 * <ul> 1158 * <li>test emptiness of {@link #connectedDSs} list</li> 1159 * <li>traverse {@link #connectedRSs} list and test for each if DS are 1160 * connected</li> 1161 * </ul> 1162 * So it strongly relies on the {@link #connectedDSs} list 1163 */ 1164 private void resetGenerationIdIfPossible() 1165 { 1166 if (logger.isTraceEnabled()) 1167 { 1168 debug("mayResetGenerationId generationIdSavedStatus=" 1169 + generationIdSavedStatus); 1170 } 1171 1172 // If there is no more any LDAP server connected to this domain in the 1173 // topology and the generationId has never been saved, then we can reset 1174 // it and the next LDAP server to connect will become the new reference. 1175 boolean ldapServersConnectedInTheTopology = false; 1176 if (connectedDSs.isEmpty()) 1177 { 1178 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 1179 { 1180 if (generationId != rsHandler.getGenerationId()) 1181 { 1182 if (logger.isTraceEnabled()) 1183 { 1184 debug("mayResetGenerationId skip RS " + rsHandler 1185 + " that has different genId"); 1186 } 1187 } 1188 else if (rsHandler.hasRemoteLDAPServers()) 1189 { 1190 ldapServersConnectedInTheTopology = true; 1191 1192 if (logger.isTraceEnabled()) 1193 { 1194 debug("mayResetGenerationId RS " + rsHandler 1195 + " has ldap servers connected to it" 1196 + " - will not reset generationId"); 1197 } 1198 break; 1199 } 1200 } 1201 } 1202 else 1203 { 1204 ldapServersConnectedInTheTopology = true; 1205 1206 if (logger.isTraceEnabled()) 1207 { 1208 debug("has ldap servers connected to it - will not reset generationId"); 1209 } 1210 } 1211 1212 if (!ldapServersConnectedInTheTopology 1213 && !generationIdSavedStatus 1214 && generationId != -1) 1215 { 1216 changeGenerationId(-1); 1217 } 1218 } 1219 1220 /** 1221 * Checks whether a remote RS is already connected to this hosting RS. 1222 * 1223 * @param rsHandler 1224 * The handler for the remote RS. 1225 * @return flag specifying whether the remote RS is already connected. 1226 * @throws DirectoryException 1227 * when a problem occurs. 1228 */ 1229 public boolean isAlreadyConnectedToRS(ReplicationServerHandler rsHandler) 1230 throws DirectoryException 1231 { 1232 ReplicationServerHandler oldRsHandler = 1233 connectedRSs.get(rsHandler.getServerId()); 1234 if (oldRsHandler == null) 1235 { 1236 return false; 1237 } 1238 1239 if (oldRsHandler.getServerAddressURL().equals( 1240 rsHandler.getServerAddressURL())) 1241 { 1242 // this is the same server, this means that our ServerStart messages 1243 // have been sent at about the same time and 2 connections 1244 // have been established. 1245 // Silently drop this connection. 1246 return true; 1247 } 1248 1249 // looks like two replication servers have the same serverId 1250 // log an error message and drop this connection. 1251 LocalizableMessage message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get( 1252 localReplicationServer.getMonitorInstanceName(), 1253 oldRsHandler.getServerAddressURL(), rsHandler.getServerAddressURL(), 1254 rsHandler.getServerId()); 1255 throw new DirectoryException(ResultCode.OTHER, message); 1256 } 1257 1258 /** 1259 * Creates and returns a cursor across this replication domain. 1260 * <p> 1261 * Client code must call {@link DBCursor#next()} to advance the cursor to the 1262 * next available record. 1263 * <p> 1264 * When the cursor is not used anymore, client code MUST call the 1265 * {@link DBCursor#close()} method to free the resources and locks used by the 1266 * cursor. 1267 * 1268 * @param startAfterServerState 1269 * Starting point for the replicaDB cursors. If null, start from the 1270 * oldest CSN 1271 * @return a non null {@link DBCursor} going from oldest to newest CSN 1272 * @throws ChangelogException 1273 * If a database problem happened 1274 * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, CursorOptions) 1275 */ 1276 public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState) 1277 throws ChangelogException 1278 { 1279 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); 1280 return domainDB.getCursorFrom(baseDN, startAfterServerState, options); 1281 } 1282 1283 /** 1284 * Get the baseDN. 1285 * 1286 * @return Returns the baseDN. 1287 */ 1288 public DN getBaseDN() 1289 { 1290 return baseDN; 1291 } 1292 1293 /** 1294 * Retrieves the destination handlers for a routable message. 1295 * 1296 * @param msg The message to route. 1297 * @param senderHandler The handler of the server that published this message. 1298 * @return The list of destination handlers. 1299 */ 1300 private List<ServerHandler> getDestinationServers(RoutableMsg msg, 1301 ServerHandler senderHandler) 1302 { 1303 List<ServerHandler> servers = new ArrayList<>(); 1304 1305 if (msg.getDestination() == RoutableMsg.THE_CLOSEST_SERVER) 1306 { 1307 // TODO Import from the "closest server" to be implemented 1308 } else if (msg.getDestination() == RoutableMsg.ALL_SERVERS) 1309 { 1310 if (!senderHandler.isReplicationServer()) 1311 { 1312 // Send to all replication servers with a least one remote 1313 // server connected 1314 for (ReplicationServerHandler rsh : connectedRSs.values()) 1315 { 1316 if (rsh.hasRemoteLDAPServers()) 1317 { 1318 servers.add(rsh); 1319 } 1320 } 1321 } 1322 1323 // Sends to all connected LDAP servers 1324 for (DataServerHandler destinationHandler : connectedDSs.values()) 1325 { 1326 // Don't loop on the sender 1327 if (destinationHandler == senderHandler) 1328 { 1329 continue; 1330 } 1331 servers.add(destinationHandler); 1332 } 1333 } else 1334 { 1335 // Destination is one server 1336 DataServerHandler destinationHandler = 1337 connectedDSs.get(msg.getDestination()); 1338 if (destinationHandler != null) 1339 { 1340 servers.add(destinationHandler); 1341 } else 1342 { 1343 // the targeted server is NOT connected 1344 // Let's search for the replication server that MAY 1345 // have the targeted server connected. 1346 if (senderHandler.isDataServer()) 1347 { 1348 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 1349 { 1350 // Send to all replication servers with a least one remote 1351 // server connected 1352 if (rsHandler.isRemoteLDAPServer(msg.getDestination())) 1353 { 1354 servers.add(rsHandler); 1355 } 1356 } 1357 } 1358 } 1359 } 1360 return servers; 1361 } 1362 1363 1364 1365 /** 1366 * Processes a message coming from one server in the topology and potentially 1367 * forwards it to one or all other servers. 1368 * 1369 * @param msg 1370 * The message received and to be processed. 1371 * @param sender 1372 * The server handler of the server that sent the message. 1373 */ 1374 void process(RoutableMsg msg, ServerHandler sender) 1375 { 1376 if (msg.getDestination() == localReplicationServer.getServerId()) 1377 { 1378 // Handle routable messages targeted at this RS. 1379 if (msg instanceof ErrorMsg) 1380 { 1381 ErrorMsg errorMsg = (ErrorMsg) msg; 1382 logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails()); 1383 } 1384 else 1385 { 1386 replyWithUnroutableMsgType(sender, msg); 1387 } 1388 } 1389 else 1390 { 1391 // Forward message not destined for this RS. 1392 List<ServerHandler> servers = getDestinationServers(msg, sender); 1393 if (!servers.isEmpty()) 1394 { 1395 forwardMsgToAllServers(msg, servers, sender); 1396 } 1397 else 1398 { 1399 replyWithUnreachablePeerMsg(sender, msg); 1400 } 1401 } 1402 } 1403 1404 /** 1405 * Responds to a monitor request message. 1406 * 1407 * @param msg 1408 * The monitor request message. 1409 * @param sender 1410 * The DS/RS which sent the monitor request. 1411 */ 1412 void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender) 1413 { 1414 enqueueMonitorMsg(msg, sender); 1415 } 1416 1417 /** 1418 * Responds to a monitor message. 1419 * 1420 * @param msg 1421 * The monitor message 1422 * @param sender 1423 * The DS/RS which sent the monitor. 1424 */ 1425 void processMonitorMsg(MonitorMsg msg, ServerHandler sender) 1426 { 1427 domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId()); 1428 } 1429 1430 private void replyWithUnroutableMsgType(ServerHandler msgEmitter, 1431 RoutableMsg msg) 1432 { 1433 String msgClassname = msg.getClass().getCanonicalName(); 1434 logger.info(NOTE_ERR_ROUTING_TO_SERVER, msgClassname); 1435 1436 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 1437 mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname)); 1438 mb.append("serverID:").append(msg.getDestination()); 1439 ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage()); 1440 try 1441 { 1442 msgEmitter.send(errMsg); 1443 } 1444 catch (IOException ignored) 1445 { 1446 // an error happened on the sender session trying to recover 1447 // from an error on the receiver session. 1448 // Not much more we can do at this point. 1449 } 1450 } 1451 1452 private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter, 1453 RoutableMsg msg) 1454 { 1455 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 1456 mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN, msg.getDestination())); 1457 mb.append(" In Replication Server=").append( 1458 this.localReplicationServer.getMonitorInstanceName()); 1459 mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); 1460 mb.append(" Details:routing table is empty"); 1461 final LocalizableMessage message = mb.toMessage(); 1462 logger.error(message); 1463 1464 ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(), 1465 msg.getSenderID(), message); 1466 try 1467 { 1468 msgEmitter.send(errMsg); 1469 } 1470 catch (IOException ignored) 1471 { 1472 // TODO Handle error properly (sender timeout in addition) 1473 /* 1474 * An error happened trying to send an error msg to this server. 1475 * Log an error and close the connection to this server. 1476 */ 1477 logger.error(ERR_CHANGELOG_ERROR_SENDING_ERROR, this, ignored); 1478 stopServer(msgEmitter, false); 1479 } 1480 } 1481 1482 private void forwardMsgToAllServers(RoutableMsg msg, 1483 List<ServerHandler> servers, ServerHandler sender) 1484 { 1485 for (ServerHandler targetHandler : servers) 1486 { 1487 try 1488 { 1489 targetHandler.send(msg); 1490 } catch (IOException ioe) 1491 { 1492 /* 1493 * An error happened trying to send a routable message to its 1494 * destination server. 1495 * Send back an error to the originator of the message. 1496 */ 1497 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 1498 mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN, msg.getDestination())); 1499 mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); 1500 mb.append(" Details: ").append(ioe.getLocalizedMessage()); 1501 final LocalizableMessage message = mb.toMessage(); 1502 logger.error(message); 1503 1504 ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message); 1505 try 1506 { 1507 sender.send(errMsg); 1508 } catch (IOException ioe1) 1509 { 1510 // an error happened on the sender session trying to recover 1511 // from an error on the receiver session. 1512 // We don't have much solution left beside closing the sessions. 1513 stopServer(sender, false); 1514 stopServer(targetHandler, false); 1515 } 1516 // TODO Handle error properly (sender timeout in addition) 1517 } 1518 } 1519 } 1520 1521 /** 1522 * Creates a new monitor message including monitoring information for the 1523 * whole topology. 1524 * 1525 * @param sender 1526 * The sender of this message. 1527 * @param destination 1528 * The destination of this message. 1529 * @return The newly created and filled MonitorMsg. Null if a problem occurred 1530 * during message creation. 1531 * @throws InterruptedException 1532 * if this thread is interrupted while waiting for a response 1533 */ 1534 public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination) 1535 throws InterruptedException 1536 { 1537 return createGlobalTopologyMonitorMsg(sender, destination, 1538 domainMonitor.recomputeMonitorData()); 1539 } 1540 1541 private MonitorMsg createGlobalTopologyMonitorMsg(int sender, 1542 int destination, ReplicationDomainMonitorData monitorData) 1543 { 1544 final MonitorMsg returnMsg = new MonitorMsg(sender, destination); 1545 returnMsg.setReplServerDbState(getLatestServerState()); 1546 1547 // Add the server state for each DS and RS currently in the topology. 1548 for (int replicaId : toIterable(monitorData.ldapIterator())) 1549 { 1550 returnMsg.setServerState(replicaId, 1551 monitorData.getLDAPServerState(replicaId), 1552 monitorData.getApproxFirstMissingDate(replicaId), true); 1553 } 1554 1555 for (int replicaId : toIterable(monitorData.rsIterator())) 1556 { 1557 returnMsg.setServerState(replicaId, 1558 monitorData.getRSStates(replicaId), 1559 monitorData.getRSApproxFirstMissingDate(replicaId), false); 1560 } 1561 1562 return returnMsg; 1563 } 1564 1565 1566 1567 /** 1568 * Creates a new monitor message including monitoring information for the 1569 * topology directly connected to this RS. This includes information for: - 1570 * local RS - all direct DSs - all direct RSs 1571 * 1572 * @param sender 1573 * The sender of this message. 1574 * @param destination 1575 * The destination of this message. 1576 * @return The newly created and filled MonitorMsg. Null if the current thread 1577 * was interrupted while attempting to get the domain lock. 1578 */ 1579 private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination) 1580 { 1581 final MonitorMsg monitorMsg = new MonitorMsg(sender, destination); 1582 monitorMsg.setReplServerDbState(getLatestServerState()); 1583 1584 // Add the server state for each connected DS and RS. 1585 for (DataServerHandler dsHandler : this.connectedDSs.values()) 1586 { 1587 monitorMsg.setServerState(dsHandler.getServerId(), 1588 dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(), 1589 true); 1590 } 1591 1592 for (ReplicationServerHandler rsHandler : this.connectedRSs.values()) 1593 { 1594 monitorMsg.setServerState(rsHandler.getServerId(), 1595 rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(), 1596 false); 1597 } 1598 return monitorMsg; 1599 } 1600 1601 /** 1602 * Shutdown this ReplicationServerDomain. 1603 */ 1604 public void shutdown() 1605 { 1606 DirectoryServer.deregisterMonitorProvider(this); 1607 1608 // Terminate the assured timer 1609 assuredTimeoutTimer.cancel(); 1610 1611 stopAllServers(true); 1612 statusAnalyzer.shutdown(); 1613 } 1614 1615 /** 1616 * Returns the latest most current ServerState describing the newest CSNs for 1617 * each server in this domain. 1618 * 1619 * @return The ServerState describing the newest CSNs for each server in in 1620 * this domain. 1621 */ 1622 public ServerState getLatestServerState() 1623 { 1624 return domainDB.getDomainNewestCSNs(baseDN); 1625 } 1626 1627 /** {@inheritDoc} */ 1628 @Override 1629 public String toString() 1630 { 1631 return "ReplicationServerDomain " + baseDN; 1632 } 1633 1634 1635 1636 /** 1637 * Creates a TopologyMsg filled with information to be sent to a remote RS. 1638 * We send remote RS the info of every DS that are directly connected to us 1639 * plus our own info as RS. 1640 * @return A suitable TopologyMsg PDU to be sent to a peer RS 1641 */ 1642 public TopologyMsg createTopologyMsgForRS() 1643 { 1644 List<DSInfo> dsInfos = new ArrayList<>(); 1645 for (DataServerHandler dsHandler : connectedDSs.values()) 1646 { 1647 dsInfos.add(dsHandler.toDSInfo()); 1648 } 1649 1650 // Create info for the local RS 1651 List<RSInfo> rsInfos = newArrayList(toRSInfo(localReplicationServer, generationId)); 1652 1653 return new TopologyMsg(dsInfos, rsInfos); 1654 } 1655 1656 /** 1657 * Creates a TopologyMsg filled with information to be sent to a DS. 1658 * We send remote DS the info of every known DS and RS in the topology (our 1659 * directly connected DSs plus the DSs connected to other RSs) except himself. 1660 * Also put info related to local RS. 1661 * 1662 * @param destDsId The id of the DS the TopologyMsg PDU is to be sent to and 1663 * that we must not include in the DS list. 1664 * @return A suitable TopologyMsg PDU to be sent to a peer DS 1665 */ 1666 public TopologyMsg createTopologyMsgForDS(int destDsId) 1667 { 1668 // Go through every DSs (except recipient of msg) 1669 List<DSInfo> dsInfos = new ArrayList<>(); 1670 for (DataServerHandler dsHandler : connectedDSs.values()) 1671 { 1672 if (dsHandler.getServerId() == destDsId) 1673 { 1674 continue; 1675 } 1676 dsInfos.add(dsHandler.toDSInfo()); 1677 } 1678 1679 1680 List<RSInfo> rsInfos = new ArrayList<>(); 1681 // Add our own info (local RS) 1682 rsInfos.add(toRSInfo(localReplicationServer, generationId)); 1683 1684 // Go through every peer RSs (and get their connected DSs), also add info 1685 // for RSs 1686 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 1687 { 1688 rsInfos.add(rsHandler.toRSInfo()); 1689 1690 rsHandler.addDSInfos(dsInfos); 1691 } 1692 1693 return new TopologyMsg(dsInfos, rsInfos); 1694 } 1695 1696 private RSInfo toRSInfo(ReplicationServer rs, long generationId) 1697 { 1698 return new RSInfo(rs.getServerId(), rs.getServerURL(), generationId, 1699 rs.getGroupId(), rs.getWeight()); 1700 } 1701 1702 /** 1703 * Get the generationId associated to this domain. 1704 * 1705 * @return The generationId 1706 */ 1707 public long getGenerationId() 1708 { 1709 return generationId; 1710 } 1711 1712 /** 1713 * Initialize the value of the generationID for this ReplicationServerDomain. 1714 * This method is intended to be used for initialization at startup and 1715 * simply stores the new value without any additional processing. 1716 * For example it does not clear the change-log DBs 1717 * 1718 * @param generationId The new value of generationId. 1719 */ 1720 public void initGenerationID(long generationId) 1721 { 1722 synchronized (generationIDLock) 1723 { 1724 this.generationId = generationId; 1725 this.generationIdSavedStatus = true; 1726 } 1727 } 1728 1729 /** 1730 * Sets the provided value as the new in memory generationId. 1731 * Also clear the changelog databases. 1732 * 1733 * @param generationId The new value of generationId. 1734 * @return The old generation id 1735 */ 1736 public long changeGenerationId(long generationId) 1737 { 1738 synchronized (generationIDLock) 1739 { 1740 long oldGenerationId = this.generationId; 1741 1742 if (this.generationId != generationId) 1743 { 1744 clearDbs(); 1745 1746 this.generationId = generationId; 1747 this.generationIdSavedStatus = false; 1748 } 1749 return oldGenerationId; 1750 } 1751 } 1752 1753 /** 1754 * Resets the generationID. 1755 * 1756 * @param senderHandler The handler associated to the server 1757 * that requested to reset the generationId. 1758 * @param genIdMsg The reset generation ID msg received. 1759 */ 1760 public void resetGenerationId(ServerHandler senderHandler, 1761 ResetGenerationIdMsg genIdMsg) 1762 { 1763 if (logger.isTraceEnabled()) 1764 { 1765 debug("Receiving ResetGenerationIdMsg from " 1766 + senderHandler.getServerId() + ":\n" + genIdMsg); 1767 } 1768 1769 try 1770 { 1771 // Acquire lock on domain (see more details in comment of start() method 1772 // of ServerHandler) 1773 lock(); 1774 } 1775 catch (InterruptedException ex) 1776 { 1777 // We can't deal with this here, so re-interrupt thread so that it is 1778 // caught during subsequent IO. 1779 Thread.currentThread().interrupt(); 1780 return; 1781 } 1782 1783 try 1784 { 1785 final long newGenId = genIdMsg.getGenerationId(); 1786 if (newGenId != this.generationId) 1787 { 1788 changeGenerationId(newGenId); 1789 } 1790 else 1791 { 1792 // Order to take a gen id we already have, just ignore 1793 if (logger.isTraceEnabled()) 1794 { 1795 debug("Reset generation id requested but generationId was already " 1796 + this.generationId + ":\n" + genIdMsg); 1797 } 1798 } 1799 1800 // If we are the first replication server warned, 1801 // then forwards the reset message to the remote replication servers 1802 for (ServerHandler rsHandler : connectedRSs.values()) 1803 { 1804 try 1805 { 1806 // After we'll have sent the message , the remote RS will adopt 1807 // the new genId 1808 rsHandler.setGenerationId(newGenId); 1809 if (senderHandler.isDataServer()) 1810 { 1811 rsHandler.send(genIdMsg); 1812 } 1813 } catch (IOException e) 1814 { 1815 logger.error(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID, baseDN, e.getMessage()); 1816 } 1817 } 1818 1819 // Change status of the connected DSs according to the requested new 1820 // reference generation id 1821 for (DataServerHandler dsHandler : connectedDSs.values()) 1822 { 1823 try 1824 { 1825 dsHandler.changeStatusForResetGenId(newGenId); 1826 } catch (IOException e) 1827 { 1828 logger.error(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID, baseDN, 1829 dsHandler.getServerId(), e.getMessage()); 1830 } 1831 } 1832 1833 // Update every peers (RS/DS) with potential topology changes (status 1834 // change). Rather than doing that each time a DS has a status change 1835 // (consecutive to reset gen id message), we prefer advertising once for 1836 // all after changes (less packet sent), here at the end of the reset msg 1837 // treatment. 1838 sendTopoInfoToAll(); 1839 1840 logger.info(NOTE_RESET_GENERATION_ID, baseDN, newGenId); 1841 } 1842 catch(Exception e) 1843 { 1844 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1845 } 1846 finally 1847 { 1848 release(); 1849 } 1850 } 1851 1852 /** 1853 * Process message of a remote server changing his status. 1854 * @param senderHandler The handler associated to the server 1855 * that changed his status. 1856 * @param csMsg The message containing the new status 1857 */ 1858 public void processNewStatus(DataServerHandler senderHandler, 1859 ChangeStatusMsg csMsg) 1860 { 1861 if (logger.isTraceEnabled()) 1862 { 1863 debug("receiving ChangeStatusMsg from " + senderHandler.getServerId() 1864 + ":\n" + csMsg); 1865 } 1866 1867 try 1868 { 1869 // Acquire lock on domain (see more details in comment of start() method 1870 // of ServerHandler) 1871 lock(); 1872 } 1873 catch (InterruptedException ex) 1874 { 1875 // We can't deal with this here, so re-interrupt thread so that it is 1876 // caught during subsequent IO. 1877 Thread.currentThread().interrupt(); 1878 return; 1879 } 1880 1881 try 1882 { 1883 ServerStatus newStatus = senderHandler.processNewStatus(csMsg); 1884 if (newStatus == ServerStatus.INVALID_STATUS) 1885 { 1886 // Already logged an error in processNewStatus() 1887 // just return not to forward a bad status to topology 1888 return; 1889 } 1890 1891 enqueueTopoInfoToAllExcept(senderHandler); 1892 1893 logger.info(NOTE_DIRECTORY_SERVER_CHANGED_STATUS, 1894 senderHandler.getServerId(), baseDN, newStatus); 1895 } 1896 catch(Exception e) 1897 { 1898 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1899 } 1900 finally 1901 { 1902 release(); 1903 } 1904 } 1905 1906 /** 1907 * Change the status of a directory server according to the event generated 1908 * from the status analyzer. 1909 * @param dsHandler The handler of the directory server to update 1910 * @param event The event to be used for new status computation 1911 * @return True if we have been interrupted (must stop), false otherwise 1912 */ 1913 private boolean changeStatus(DataServerHandler dsHandler, 1914 StatusMachineEvent event) 1915 { 1916 try 1917 { 1918 // Acquire lock on domain (see ServerHandler#start() for more details) 1919 lock(); 1920 } 1921 catch (InterruptedException ex) 1922 { 1923 // We have been interrupted for dying, from stopStatusAnalyzer 1924 // to prevent deadlock in this situation: 1925 // RS is being shutdown, and stopServer will call stopStatusAnalyzer. 1926 // Domain lock is taken by shutdown thread while status analyzer thread 1927 // is willing to change the status of a server at the same time so is 1928 // waiting for the domain lock at the same time. As shutdown thread is 1929 // waiting for analyzer thread death, a deadlock occurs. So we force 1930 // interruption of the status analyzer thread death after 2 seconds if 1931 // it has not finished (see StatusAnalyzer.waitForShutdown). This allows 1932 // to have the analyzer thread taking the domain lock only when the 1933 // status of a DS has to be changed. See more comments in run method of 1934 // StatusAnalyzer. 1935 if (logger.isTraceEnabled()) 1936 { 1937 logger.trace("Status analyzer for domain " + baseDN 1938 + " has been interrupted when" 1939 + " trying to acquire domain lock for changing the status of DS " 1940 + dsHandler.getServerId()); 1941 } 1942 return true; 1943 } 1944 1945 try 1946 { 1947 ServerStatus newStatus = ServerStatus.INVALID_STATUS; 1948 ServerStatus oldStatus = dsHandler.getStatus(); 1949 try 1950 { 1951 newStatus = dsHandler.changeStatus(event); 1952 } 1953 catch (IOException e) 1954 { 1955 logger.error(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER, 1956 baseDN, dsHandler.getServerId(), e.getMessage()); 1957 } 1958 1959 if (newStatus == ServerStatus.INVALID_STATUS || newStatus == oldStatus) 1960 { 1961 // Change was impossible or already occurred (see StatusAnalyzer 1962 // comments) 1963 return false; 1964 } 1965 1966 enqueueTopoInfoToAllExcept(dsHandler); 1967 } 1968 catch (Exception e) 1969 { 1970 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1971 } 1972 finally 1973 { 1974 release(); 1975 } 1976 1977 return false; 1978 } 1979 1980 /** 1981 * Update every peers (RS/DS) with topology changes. 1982 */ 1983 public void sendTopoInfoToAll() 1984 { 1985 enqueueTopoInfoToAllExcept(null); 1986 } 1987 1988 /** 1989 * Update every peers (RS/DS) with topology changes but one DS. 1990 * 1991 * @param dsHandler 1992 * if not null, the topology message will not be sent to this DS 1993 */ 1994 private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler) 1995 { 1996 synchronized (pendingStatusMessagesLock) 1997 { 1998 pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler); 1999 pendingStatusMessages.enqueueTopoInfoToAllRSs(); 2000 } 2001 statusAnalyzer.notifyPendingStatusMessage(); 2002 } 2003 2004 /** 2005 * Clears the Db associated with that domain. 2006 */ 2007 private void clearDbs() 2008 { 2009 try 2010 { 2011 domainDB.removeDomain(baseDN); 2012 } 2013 catch (ChangelogException e) 2014 { 2015 logger.error(ERR_ERROR_CLEARING_DB, baseDN, e.getMessage(), e); 2016 } 2017 } 2018 2019 /** 2020 * Returns whether the provided server is in degraded 2021 * state due to the fact that the peer server has an invalid 2022 * generationId for this domain. 2023 * 2024 * @param serverId The serverId for which we want to know the 2025 * the state. 2026 * @return Whether it is degraded or not. 2027 */ 2028 public boolean isDegradedDueToGenerationId(int serverId) 2029 { 2030 if (logger.isTraceEnabled()) 2031 { 2032 debug("isDegraded serverId=" + serverId + " given local generation Id=" 2033 + this.generationId); 2034 } 2035 2036 ServerHandler sHandler = connectedRSs.get(serverId); 2037 if (sHandler == null) 2038 { 2039 sHandler = connectedDSs.get(serverId); 2040 if (sHandler == null) 2041 { 2042 return false; 2043 } 2044 } 2045 2046 if (logger.isTraceEnabled()) 2047 { 2048 debug("Compute degradation of serverId=" + serverId 2049 + " LS server generation Id=" + sHandler.getGenerationId()); 2050 } 2051 return sHandler.getGenerationId() != this.generationId; 2052 } 2053 2054 /** 2055 * Process topology information received from a peer RS. 2056 * @param topoMsg The just received topo message from remote RS 2057 * @param rsHandler The handler that received the message. 2058 * @param allowResetGenId True for allowing to reset the generation id ( 2059 * when called after initial handshake) 2060 * @throws IOException If an error occurred. 2061 * @throws DirectoryException If an error occurred. 2062 */ 2063 public void receiveTopoInfoFromRS(TopologyMsg topoMsg, 2064 ReplicationServerHandler rsHandler, boolean allowResetGenId) 2065 throws IOException, DirectoryException 2066 { 2067 if (logger.isTraceEnabled()) 2068 { 2069 debug("receiving TopologyMsg from serverId=" + rsHandler.getServerId() 2070 + ":\n" + topoMsg); 2071 } 2072 2073 try 2074 { 2075 // Acquire lock on domain (see more details in comment of start() method 2076 // of ServerHandler) 2077 lock(); 2078 } 2079 catch (InterruptedException ex) 2080 { 2081 // We can't deal with this here, so re-interrupt thread so that it is 2082 // caught during subsequent IO. 2083 Thread.currentThread().interrupt(); 2084 return; 2085 } 2086 2087 try 2088 { 2089 // Store DS connected to remote RS & update information about the peer RS 2090 rsHandler.processTopoInfoFromRS(topoMsg); 2091 2092 // Handle generation id 2093 if (allowResetGenId) 2094 { 2095 resetGenerationIdIfPossible(); 2096 setGenerationIdIfUnset(rsHandler.getGenerationId()); 2097 } 2098 2099 if (isDifferentGenerationId(rsHandler.getGenerationId())) 2100 { 2101 LocalizableMessage message = WARN_BAD_GENERATION_ID_FROM_RS.get(rsHandler.getServerId(), 2102 rsHandler.session.getReadableRemoteAddress(), rsHandler.getGenerationId(), 2103 baseDN, getLocalRSServerId(), generationId); 2104 logger.warn(message); 2105 2106 ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(), 2107 rsHandler.getServerId(), message); 2108 rsHandler.send(errorMsg); 2109 } 2110 2111 /* 2112 * Sends the currently known topology information to every connected 2113 * DS we have. 2114 */ 2115 synchronized (pendingStatusMessagesLock) 2116 { 2117 pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null); 2118 } 2119 statusAnalyzer.notifyPendingStatusMessage(); 2120 } 2121 catch(Exception e) 2122 { 2123 logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); 2124 } 2125 finally 2126 { 2127 release(); 2128 } 2129 } 2130 2131 private void setGenerationIdIfUnset(long generationId) 2132 { 2133 if (this.generationId < 0) 2134 { 2135 this.generationId = generationId; 2136 } 2137 } 2138 2139 /** 2140 * Returns the latest monitor data available for this replication server 2141 * domain. 2142 * 2143 * @return The latest monitor data available for this replication server 2144 * domain, which is never {@code null}. 2145 */ 2146 ReplicationDomainMonitorData getDomainMonitorData() 2147 { 2148 return domainMonitor.getMonitorData(); 2149 } 2150 2151 /** 2152 * Get the map of connected DSs. 2153 * @return The map of connected DSs 2154 */ 2155 public Map<Integer, DataServerHandler> getConnectedDSs() 2156 { 2157 return Collections.unmodifiableMap(connectedDSs); 2158 } 2159 2160 /** 2161 * Get the map of connected RSs. 2162 * @return The map of connected RSs 2163 */ 2164 public Map<Integer, ReplicationServerHandler> getConnectedRSs() 2165 { 2166 return Collections.unmodifiableMap(connectedRSs); 2167 } 2168 2169 2170 /** 2171 * A synchronization mechanism is created to insure exclusive access to the 2172 * domain. The goal is to have a consistent view of the topology by locking 2173 * the structures holding the topology view of the domain: 2174 * {@link #connectedDSs} and {@link #connectedRSs}. When a connection is 2175 * established with a peer DS or RS, the lock should be taken before updating 2176 * these structures, then released. The same mechanism should be used when 2177 * updating any data related to the view of the topology: for instance if the 2178 * status of a DS is changed, the lock should be taken before updating the 2179 * matching server handler and sending the topology messages to peers and 2180 * released after.... This allows every member of the topology to have a 2181 * consistent view of the topology and to be sure it will not miss some 2182 * information. 2183 * <p> 2184 * So the locking system must be called (not exhaustive list): 2185 * <ul> 2186 * <li>when connection established with a DS or RS</li> 2187 * <li>when connection ended with a DS or RS</li> 2188 * <li>when receiving a TopologyMsg and updating structures</li> 2189 * <li>when creating and sending a TopologyMsg</li> 2190 * <li>when a DS status is changing (ChangeStatusMsg received or sent)...</li> 2191 * </ul> 2192 */ 2193 private final ReentrantLock lock = new ReentrantLock(); 2194 2195 /** 2196 * This lock is used to protect the generationId variable. 2197 */ 2198 private final Object generationIDLock = new Object(); 2199 2200 /** 2201 * Tests if the current thread has the lock on this domain. 2202 * @return True if the current thread has the lock. 2203 */ 2204 public boolean hasLock() 2205 { 2206 return lock.getHoldCount() > 0; 2207 } 2208 2209 /** 2210 * Takes the lock on this domain (blocking until lock can be acquired) or 2211 * calling thread is interrupted. 2212 * @throws java.lang.InterruptedException If interrupted. 2213 */ 2214 public void lock() throws InterruptedException 2215 { 2216 lock.lockInterruptibly(); 2217 } 2218 2219 /** 2220 * Releases the lock on this domain. 2221 */ 2222 public void release() 2223 { 2224 lock.unlock(); 2225 } 2226 2227 /** 2228 * Tries to acquire the lock on the domain within a given amount of time. 2229 * @param timeout The amount of milliseconds to wait for acquiring the lock. 2230 * @return True if the lock was acquired, false if timeout occurred. 2231 * @throws java.lang.InterruptedException When call was interrupted. 2232 */ 2233 public boolean tryLock(long timeout) throws InterruptedException 2234 { 2235 return lock.tryLock(timeout, TimeUnit.MILLISECONDS); 2236 } 2237 2238 /** 2239 * Starts the monitoring publisher for the domain if not already started. 2240 */ 2241 private void startMonitoringPublisher() 2242 { 2243 long period = localReplicationServer.getMonitoringPublisherPeriod(); 2244 if (period > 0) // 0 means no monitoring publisher 2245 { 2246 final MonitoringPublisher thread = new MonitoringPublisher(this, period); 2247 if (monitoringPublisher.compareAndSet(null, thread)) 2248 { 2249 thread.start(); 2250 } 2251 } 2252 } 2253 2254 /** 2255 * Stops the monitoring publisher for the domain. 2256 */ 2257 private void stopMonitoringPublisher() 2258 { 2259 final MonitoringPublisher thread = monitoringPublisher.get(); 2260 if (thread != null && monitoringPublisher.compareAndSet(thread, null)) 2261 { 2262 thread.shutdown(); 2263 thread.waitForShutdown(); 2264 } 2265 } 2266 2267 /** {@inheritDoc} */ 2268 @Override 2269 public void initializeMonitorProvider(MonitorProviderCfg configuraiton) 2270 { 2271 // Nothing to do for now 2272 } 2273 2274 /** {@inheritDoc} */ 2275 @Override 2276 public String getMonitorInstanceName() 2277 { 2278 return "Replication server RS(" + localReplicationServer.getServerId() 2279 + ") " + localReplicationServer.getServerURL() + ",cn=" 2280 + baseDN.toString().replace(',', '_').replace('=', '_') 2281 + ",cn=Replication"; 2282 } 2283 2284 /** {@inheritDoc} */ 2285 @Override 2286 public List<Attribute> getMonitorData() 2287 { 2288 // publish the server id and the port number. 2289 List<Attribute> attributes = new ArrayList<>(); 2290 attributes.add(Attributes.create("replication-server-id", 2291 String.valueOf(localReplicationServer.getServerId()))); 2292 attributes.add(Attributes.create("replication-server-port", 2293 String.valueOf(localReplicationServer.getReplicationPort()))); 2294 attributes.add(Attributes.create("domain-name", 2295 baseDN.toString())); 2296 attributes.add(Attributes.create("generation-id", 2297 baseDN + " " + generationId)); 2298 2299 // Missing changes 2300 long missingChanges = getDomainMonitorData().getMissingChangesRS( 2301 localReplicationServer.getServerId()); 2302 attributes.add(Attributes.create("missing-changes", 2303 String.valueOf(missingChanges))); 2304 2305 return attributes; 2306 } 2307 2308 /** 2309 * Returns the oldest known state for the domain, made of the oldest CSN 2310 * stored for each serverId. 2311 * <p> 2312 * Note: Because the replication changelogDB trimming always keep one change 2313 * whatever its date, the CSN contained in the returned state can be very old. 2314 * 2315 * @return the start state of the domain. 2316 */ 2317 public ServerState getOldestState() 2318 { 2319 return domainDB.getDomainOldestCSNs(baseDN); 2320 } 2321 2322 private void sendTopologyMsg(String type, ServerHandler handler, TopologyMsg msg) 2323 { 2324 for (int i = 1; i <= 2; i++) 2325 { 2326 if (!handler.shuttingDown() 2327 && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) 2328 { 2329 try 2330 { 2331 handler.sendTopoInfo(msg); 2332 break; 2333 } 2334 catch (IOException e) 2335 { 2336 if (i == 2) 2337 { 2338 logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, 2339 baseDN, type, handler.getServerId(), e.getMessage()); 2340 } 2341 } 2342 } 2343 sleep(100); 2344 } 2345 } 2346 2347 2348 2349 /** 2350 * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp) 2351 * value received, and forwarding the message to the other RSes. 2352 * @param senderHandler The handler for the server that sent the heartbeat. 2353 * @param msg The message to process. 2354 * @throws DirectoryException 2355 * if a problem occurs 2356 */ 2357 void processChangeTimeHeartbeatMsg(ServerHandler senderHandler, 2358 ChangeTimeHeartbeatMsg msg) throws DirectoryException 2359 { 2360 try 2361 { 2362 domainDB.replicaHeartbeat(baseDN, msg.getCSN()); 2363 } 2364 catch (ChangelogException e) 2365 { 2366 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e 2367 .getMessageObject(), e); 2368 } 2369 2370 if (senderHandler.isDataServer()) 2371 { 2372 /* 2373 * If we are the first replication server warned, then forward the message 2374 * to the remote replication servers. 2375 */ 2376 synchronized (pendingStatusMessagesLock) 2377 { 2378 pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg); 2379 } 2380 statusAnalyzer.notifyPendingStatusMessage(); 2381 } 2382 } 2383 2384 /** 2385 * Return the monitor instance name of the ReplicationServer that created the 2386 * current instance. 2387 * 2388 * @return the monitor instance name of the ReplicationServer that created the 2389 * current instance. 2390 */ 2391 String getLocalRSMonitorInstanceName() 2392 { 2393 return this.localReplicationServer.getMonitorInstanceName(); 2394 } 2395 2396 /** 2397 * Return the serverId of the ReplicationServer that created the current 2398 * instance. 2399 * 2400 * @return the serverId of the ReplicationServer that created the current 2401 * instance. 2402 */ 2403 int getLocalRSServerId() 2404 { 2405 return this.localReplicationServer.getServerId(); 2406 } 2407 2408 /** 2409 * Update the monitoring publisher with the new period value. 2410 * 2411 * @param period 2412 * The new period value. 2413 */ 2414 void updateMonitoringPeriod(long period) 2415 { 2416 if (period == 0) 2417 { 2418 // Requested to stop monitoring publishers 2419 stopMonitoringPublisher(); 2420 return; 2421 } 2422 2423 final MonitoringPublisher mpThread = monitoringPublisher.get(); 2424 if (mpThread != null) // it is running 2425 { 2426 mpThread.setPeriod(period); 2427 } 2428 else if (!connectedDSs.isEmpty() || !connectedRSs.isEmpty()) 2429 { 2430 // Requested to start monitoring publishers with provided period value 2431 startMonitoringPublisher(); 2432 } 2433 } 2434 2435 /** 2436 * Registers a DS handler into this domain and notifies the domain about the 2437 * new DS. 2438 * 2439 * @param dsHandler 2440 * The Directory Server Handler to register 2441 */ 2442 public void register(DataServerHandler dsHandler) 2443 { 2444 startMonitoringPublisher(); 2445 2446 // connected with new DS: store handler. 2447 connectedDSs.put(dsHandler.getServerId(), dsHandler); 2448 2449 // Tell peer RSs and DSs a new DS just connected to us 2450 // No need to re-send TopologyMsg to this just new DS 2451 enqueueTopoInfoToAllExcept(dsHandler); 2452 } 2453 2454 /** 2455 * Registers the RS handler into this domain and notifies the domain. 2456 * 2457 * @param rsHandler 2458 * The Replication Server Handler to register 2459 */ 2460 public void register(ReplicationServerHandler rsHandler) 2461 { 2462 startMonitoringPublisher(); 2463 2464 // connected with new RS (either outgoing or incoming 2465 // connection): store handler. 2466 connectedRSs.put(rsHandler.getServerId(), rsHandler); 2467 } 2468 2469 private void debug(String message) 2470 { 2471 logger.trace("In ReplicationServerDomain serverId=" 2472 + localReplicationServer.getServerId() + " for baseDN=" + baseDN 2473 + " and port=" + localReplicationServer.getReplicationPort() 2474 + ": " + message); 2475 } 2476 2477 2478 2479 /** 2480 * Go through each connected DS, get the number of pending changes we have for 2481 * it and change status accordingly if threshold value is crossed/uncrossed. 2482 */ 2483 void checkDSDegradedStatus() 2484 { 2485 final int degradedStatusThreshold = localReplicationServer 2486 .getDegradedStatusThreshold(); 2487 // Threshold value = 0 means no status analyzer (no degrading system) 2488 // we should not have that as the status analyzer thread should not be 2489 // created if this is the case, but for sanity purpose, we add this 2490 // test 2491 if (degradedStatusThreshold > 0) 2492 { 2493 for (DataServerHandler serverHandler : connectedDSs.values()) 2494 { 2495 // Get number of pending changes for this server 2496 final int nChanges = serverHandler.getRcvMsgQueueSize(); 2497 if (logger.isTraceEnabled()) 2498 { 2499 logger.trace("In RS " + getLocalRSServerId() + ", for baseDN=" 2500 + getBaseDN() + ": " + "Status analyzer: DS " 2501 + serverHandler.getServerId() + " has " + nChanges 2502 + " message(s) in writer queue."); 2503 } 2504 2505 // Check status to know if it is relevant to change the status. Do not 2506 // take RSD lock to test. If we attempt to change the status whereas 2507 // the current status does allow it, this will be noticed by 2508 // the changeStatusFromStatusAnalyzer() method. This allows to take the 2509 // lock roughly only when needed versus every sleep time timeout. 2510 if (nChanges >= degradedStatusThreshold) 2511 { 2512 if (serverHandler.getStatus() == NORMAL_STATUS 2513 && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT)) 2514 { 2515 break; // Interrupted. 2516 } 2517 } 2518 else 2519 { 2520 if (serverHandler.getStatus() == DEGRADED_STATUS 2521 && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT)) 2522 { 2523 break; // Interrupted. 2524 } 2525 } 2526 } 2527 } 2528 } 2529 2530 2531 2532 /** 2533 * Sends any enqueued status messages to the rest of the topology. 2534 */ 2535 void sendPendingStatusMessages() 2536 { 2537 /* 2538 * Take a snapshot of pending status notifications in order to avoid holding 2539 * the broadcast lock for too long. In addition, clear the notifications so 2540 * that they are not resent the next time. 2541 */ 2542 final PendingStatusMessages savedState; 2543 synchronized (pendingStatusMessagesLock) 2544 { 2545 savedState = pendingStatusMessages; 2546 pendingStatusMessages = new PendingStatusMessages(); 2547 } 2548 sendPendingChangeTimeHeartbeatMsgs(savedState); 2549 sendPendingTopologyMsgs(savedState); 2550 sendPendingMonitorMsgs(savedState); 2551 } 2552 2553 2554 2555 private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs) 2556 { 2557 for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs 2558 .entrySet()) 2559 { 2560 ServerHandler ds = connectedDSs.get(msg.getKey()); 2561 if (ds != null) 2562 { 2563 try 2564 { 2565 ds.send(msg.getValue()); 2566 } 2567 catch (IOException e) 2568 { 2569 // Ignore: connection closed. 2570 } 2571 } 2572 } 2573 for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs 2574 .entrySet()) 2575 { 2576 ServerHandler rs = connectedRSs.get(msg.getKey()); 2577 if (rs != null) 2578 { 2579 try 2580 { 2581 rs.send(msg.getValue()); 2582 } 2583 catch (IOException e) 2584 { 2585 // We log the error. The requestor will detect a timeout or 2586 // any other failure on the connection. 2587 2588 // FIXME: why do we log for RSs but not DSs? 2589 logger.traceException(e); 2590 logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getValue().getDestination()); 2591 } 2592 } 2593 } 2594 } 2595 2596 2597 2598 private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs) 2599 { 2600 for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats.values()) 2601 { 2602 for (ReplicationServerHandler rsHandler : connectedRSs.values()) 2603 { 2604 try 2605 { 2606 if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3) 2607 { 2608 rsHandler.send(pendingHeartbeat); 2609 } 2610 } 2611 catch (IOException e) 2612 { 2613 logger.traceException(e); 2614 logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, "Replication Server " 2615 + localReplicationServer.getReplicationPort() + " " + baseDN 2616 + " " + localReplicationServer.getServerId()); 2617 stopServer(rsHandler, false); 2618 } 2619 } 2620 } 2621 } 2622 2623 2624 2625 private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs) 2626 { 2627 if (pendingMsgs.sendDSTopologyMsg) 2628 { 2629 for (ServerHandler handler : connectedDSs.values()) 2630 { 2631 if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg) 2632 { 2633 final TopologyMsg topoMsg = createTopologyMsgForDS(handler 2634 .getServerId()); 2635 sendTopologyMsg("directory", handler, topoMsg); 2636 } 2637 } 2638 } 2639 2640 if (pendingMsgs.sendRSTopologyMsg && !connectedRSs.isEmpty()) 2641 { 2642 final TopologyMsg topoMsg = createTopologyMsgForRS(); 2643 for (ServerHandler handler : connectedRSs.values()) 2644 { 2645 sendTopologyMsg("replication", handler, topoMsg); 2646 } 2647 } 2648 } 2649 2650 2651 2652 private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender) 2653 { 2654 /* 2655 * If the request comes from a Directory Server we need to build the full 2656 * list of all servers in the topology and send back a MonitorMsg with the 2657 * full list of all the servers in the topology. 2658 */ 2659 if (sender.isDataServer()) 2660 { 2661 MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( 2662 msg.getDestination(), msg.getSenderID(), 2663 domainMonitor.getMonitorData()); 2664 synchronized (pendingStatusMessagesLock) 2665 { 2666 pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(), 2667 monitorMsg); 2668 } 2669 } 2670 else 2671 { 2672 MonitorMsg monitorMsg = createLocalTopologyMonitorMsg( 2673 msg.getDestination(), msg.getSenderID()); 2674 synchronized (pendingStatusMessagesLock) 2675 { 2676 pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(), 2677 monitorMsg); 2678 } 2679 } 2680 statusAnalyzer.notifyPendingStatusMessage(); 2681 } 2682}