001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.plugin; 028 029import static org.opends.messages.ReplicationMessages.*; 030import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*; 031import static org.opends.server.util.ServerConstants.*; 032import static org.opends.server.util.StaticUtils.*; 033 034import java.util.ArrayList; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.LinkedBlockingQueue; 043import java.util.concurrent.atomic.AtomicReference; 044 045import org.forgerock.i18n.LocalizableMessage; 046import org.forgerock.i18n.slf4j.LocalizedLogger; 047import org.forgerock.opendj.config.server.ConfigChangeResult; 048import org.forgerock.opendj.config.server.ConfigException; 049import org.forgerock.opendj.ldap.ResultCode; 050import org.opends.server.admin.server.ConfigurationAddListener; 051import org.opends.server.admin.server.ConfigurationChangeListener; 052import org.opends.server.admin.server.ConfigurationDeleteListener; 053import org.opends.server.admin.std.server.ReplicationDomainCfg; 054import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; 055import org.opends.server.api.Backend; 056import org.opends.server.api.BackupTaskListener; 057import org.opends.server.api.ExportTaskListener; 058import org.opends.server.api.ImportTaskListener; 059import org.opends.server.api.RestoreTaskListener; 060import org.opends.server.api.SynchronizationProvider; 061import org.opends.server.core.DirectoryServer; 062import org.opends.server.replication.service.DSRSShutdownSync; 063import org.opends.server.types.BackupConfig; 064import org.opends.server.types.Control; 065import org.opends.server.types.DN; 066import org.opends.server.types.DirectoryException; 067import org.opends.server.types.Entry; 068import org.opends.server.types.LDIFExportConfig; 069import org.opends.server.types.LDIFImportConfig; 070import org.opends.server.types.Modification; 071import org.opends.server.types.Operation; 072import org.opends.server.types.RestoreConfig; 073import org.opends.server.types.SynchronizationProviderResult; 074import org.opends.server.types.operation.PluginOperation; 075import org.opends.server.types.operation.PostOperationAddOperation; 076import org.opends.server.types.operation.PostOperationDeleteOperation; 077import org.opends.server.types.operation.PostOperationModifyDNOperation; 078import org.opends.server.types.operation.PostOperationModifyOperation; 079import org.opends.server.types.operation.PostOperationOperation; 080import org.opends.server.types.operation.PreOperationAddOperation; 081import org.opends.server.types.operation.PreOperationDeleteOperation; 082import org.opends.server.types.operation.PreOperationModifyDNOperation; 083import org.opends.server.types.operation.PreOperationModifyOperation; 084 085/** 086 * This class is used to load the Replication code inside the JVM 087 * and to trigger initialization of the replication. 088 * 089 * It also extends the SynchronizationProvider class in order to have some 090 * replication code running during the operation process 091 * as pre-op, conflictResolution, and post-op. 092 */ 093public class MultimasterReplication 094 extends SynchronizationProvider<ReplicationSynchronizationProviderCfg> 095 implements ConfigurationAddListener<ReplicationDomainCfg>, 096 ConfigurationDeleteListener<ReplicationDomainCfg>, 097 ConfigurationChangeListener 098 <ReplicationSynchronizationProviderCfg>, 099 BackupTaskListener, RestoreTaskListener, ImportTaskListener, 100 ExportTaskListener 101{ 102 103 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 104 105 private ReplicationServerListener replicationServerListener; 106 private static final Map<DN, LDAPReplicationDomain> domains = new ConcurrentHashMap<>(4); 107 private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync(); 108 /** The queue of received update messages, to be treated by the ReplayThread threads. */ 109 private static final BlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<>(10000); 110 /** The list of ReplayThread threads. */ 111 private static final List<ReplayThread> replayThreads = new ArrayList<>(); 112 /** The configurable number of replay threads. */ 113 private static int replayThreadNumber = 10; 114 115 /** Enum that symbolizes the state of the multimaster replication. */ 116 private static enum State 117 { 118 STARTING, RUNNING, STOPPING 119 } 120 121 private static final AtomicReference<State> state = new AtomicReference<>(State.STARTING); 122 123 /** The configurable connection/handshake timeout. */ 124 private static volatile int connectionTimeoutMS = 5000; 125 126 /** 127 * Finds the domain for a given DN. 128 * 129 * @param dn The DN for which the domain must be returned. 130 * @param pluginOp An optional operation for which the check is done. 131 * Can be null is the request has no associated operation. 132 * @return The domain for this DN. 133 */ 134 public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp) 135 { 136 /* 137 * Don't run the special replication code on Operation that are 138 * specifically marked as don't synchronize. 139 */ 140 if (pluginOp instanceof Operation) 141 { 142 final Operation op = (Operation) pluginOp; 143 if (op.dontSynchronize()) 144 { 145 return null; 146 } 147 148 /* 149 * Check if the provided operation is a repair operation and set the 150 * synchronization flags if necessary. 151 * The repair operations are tagged as synchronization operations so 152 * that the core server let the operation modify the entryuuid and 153 * ds-sync-hist attributes. 154 * They are also tagged as dontSynchronize so that the replication code 155 * running later do not generate CSN, solve conflicts and forward the 156 * operation to the replication server. 157 */ 158 final List<Control> controls = op.getRequestControls(); 159 for (Iterator<Control> iter = controls.iterator(); iter.hasNext();) 160 { 161 Control c = iter.next(); 162 if (OID_REPLICATION_REPAIR_CONTROL.equals(c.getOID())) 163 { 164 op.setSynchronizationOperation(true); 165 op.setDontSynchronize(true); 166 /* 167 remove this control from the list of controls since it has now been 168 processed and the local backend will fail if it finds a control that 169 it does not know about and that is marked as critical. 170 */ 171 iter.remove(); 172 return null; 173 } 174 } 175 } 176 177 178 LDAPReplicationDomain domain = null; 179 DN temp = dn; 180 while (domain == null && temp != null) 181 { 182 domain = domains.get(temp); 183 temp = temp.getParentDNInSuffix(); 184 } 185 186 return domain; 187 } 188 189 /** 190 * Creates a new domain from its configEntry, do the 191 * necessary initialization and starts it so that it is 192 * fully operational when this method returns. 193 * @param configuration The entry with the configuration of this domain. 194 * @return The domain created. 195 * @throws ConfigException When the configuration is not valid. 196 */ 197 public static LDAPReplicationDomain createNewDomain( 198 ReplicationDomainCfg configuration) 199 throws ConfigException 200 { 201 try 202 { 203 final LDAPReplicationDomain domain = new LDAPReplicationDomain( 204 configuration, updateToReplayQueue, dsrsShutdownSync); 205 if (domains.isEmpty()) 206 { 207 // Create the threads that will process incoming update messages 208 createReplayThreads(); 209 } 210 211 domains.put(domain.getBaseDN(), domain); 212 return domain; 213 } 214 catch (ConfigException e) 215 { 216 logger.error(ERR_COULD_NOT_START_REPLICATION, configuration.dn(), 217 e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 218 } 219 return null; 220 } 221 222 /** 223 * Creates a new domain from its configEntry, do the necessary initialization 224 * and starts it so that it is fully operational when this method returns. It 225 * is only used for tests so far. 226 * 227 * @param configuration The entry with the configuration of this domain. 228 * @param queue The BlockingQueue that this domain will use. 229 * 230 * @return The domain created. 231 * 232 * @throws ConfigException When the configuration is not valid. 233 */ 234 static LDAPReplicationDomain createNewDomain( 235 ReplicationDomainCfg configuration, 236 BlockingQueue<UpdateToReplay> queue) 237 throws ConfigException 238 { 239 final LDAPReplicationDomain domain = 240 new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync); 241 domains.put(domain.getBaseDN(), domain); 242 return domain; 243 } 244 245 /** 246 * Deletes a domain. 247 * @param dn : the base DN of the domain to delete. 248 */ 249 public static void deleteDomain(DN dn) 250 { 251 LDAPReplicationDomain domain = domains.remove(dn); 252 if (domain != null) 253 { 254 domain.delete(); 255 } 256 257 // No replay threads running if no replication need 258 if (domains.isEmpty()) { 259 stopReplayThreads(); 260 } 261 } 262 263 /** {@inheritDoc} */ 264 @Override 265 public void initializeSynchronizationProvider( 266 ReplicationSynchronizationProviderCfg cfg) throws ConfigException 267 { 268 domains.clear(); 269 replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync); 270 271 // Register as an add and delete listener with the root configuration so we 272 // can be notified if Multimaster domain entries are added or removed. 273 cfg.addReplicationDomainAddListener(this); 274 cfg.addReplicationDomainDeleteListener(this); 275 276 // Register as a root configuration listener so that we can be notified if 277 // number of replay threads is changed and apply changes. 278 cfg.addReplicationChangeListener(this); 279 280 replayThreadNumber = cfg.getNumUpdateReplayThreads(); 281 connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE); 282 283 // Create the list of domains that are already defined. 284 for (String name : cfg.listReplicationDomains()) 285 { 286 createNewDomain(cfg.getReplicationDomain(name)); 287 } 288 289 // If any schema changes were made with the server offline, then handle them now. 290 List<Modification> offlineSchemaChanges = 291 DirectoryServer.getOfflineSchemaChanges(); 292 if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty()) 293 { 294 processSchemaChange(offlineSchemaChanges); 295 } 296 297 DirectoryServer.registerBackupTaskListener(this); 298 DirectoryServer.registerRestoreTaskListener(this); 299 DirectoryServer.registerExportTaskListener(this); 300 DirectoryServer.registerImportTaskListener(this); 301 302 DirectoryServer.registerSupportedControl( 303 ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL); 304 } 305 306 /** 307 * Create the threads that will wait for incoming update messages. 308 */ 309 private static synchronized void createReplayThreads() 310 { 311 replayThreads.clear(); 312 313 for (int i = 0; i < replayThreadNumber; i++) 314 { 315 ReplayThread replayThread = new ReplayThread(updateToReplayQueue); 316 replayThread.start(); 317 replayThreads.add(replayThread); 318 } 319 } 320 321 /** 322 * Stop the threads that are waiting for incoming update messages. 323 */ 324 private static synchronized void stopReplayThreads() 325 { 326 // stop the replay threads 327 for (ReplayThread replayThread : replayThreads) 328 { 329 replayThread.shutdown(); 330 } 331 332 for (ReplayThread replayThread : replayThreads) 333 { 334 try 335 { 336 replayThread.join(); 337 } 338 catch(InterruptedException e) 339 { 340 Thread.currentThread().interrupt(); 341 } 342 } 343 replayThreads.clear(); 344 } 345 346 /** {@inheritDoc} */ 347 @Override 348 public boolean isConfigurationAddAcceptable( 349 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 350 { 351 return LDAPReplicationDomain.isConfigurationAcceptable( 352 configuration, unacceptableReasons); 353 } 354 355 /** {@inheritDoc} */ 356 @Override 357 public ConfigChangeResult applyConfigurationAdd( 358 ReplicationDomainCfg configuration) 359 { 360 ConfigChangeResult ccr = new ConfigChangeResult(); 361 try 362 { 363 LDAPReplicationDomain rd = createNewDomain(configuration); 364 if (State.RUNNING.equals(state.get())) 365 { 366 rd.start(); 367 if (State.STOPPING.equals(state.get())) { 368 rd.shutdown(); 369 } 370 } 371 } catch (ConfigException e) 372 { 373 // we should never get to this point because the configEntry has 374 // already been validated in isConfigurationAddAcceptable() 375 ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION); 376 } 377 return ccr; 378 } 379 380 /** {@inheritDoc} */ 381 @Override 382 public void doPostOperation(PostOperationAddOperation addOperation) 383 { 384 DN dn = addOperation.getEntryDN(); 385 genericPostOperation(addOperation, dn); 386 } 387 388 389 /** {@inheritDoc} */ 390 @Override 391 public void doPostOperation(PostOperationDeleteOperation deleteOperation) 392 { 393 DN dn = deleteOperation.getEntryDN(); 394 genericPostOperation(deleteOperation, dn); 395 } 396 397 /** {@inheritDoc} */ 398 @Override 399 public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation) 400 { 401 DN dn = modifyDNOperation.getEntryDN(); 402 genericPostOperation(modifyDNOperation, dn); 403 } 404 405 /** {@inheritDoc} */ 406 @Override 407 public void doPostOperation(PostOperationModifyOperation modifyOperation) 408 { 409 DN dn = modifyOperation.getEntryDN(); 410 genericPostOperation(modifyOperation, dn); 411 } 412 413 /** {@inheritDoc} */ 414 @Override 415 public SynchronizationProviderResult handleConflictResolution( 416 PreOperationModifyOperation modifyOperation) 417 { 418 LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation); 419 if (domain != null) 420 { 421 return domain.handleConflictResolution(modifyOperation); 422 } 423 return new SynchronizationProviderResult.ContinueProcessing(); 424 } 425 426 /** {@inheritDoc} */ 427 @Override 428 public SynchronizationProviderResult handleConflictResolution( 429 PreOperationAddOperation addOperation) throws DirectoryException 430 { 431 LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation); 432 if (domain != null) 433 { 434 return domain.handleConflictResolution(addOperation); 435 } 436 return new SynchronizationProviderResult.ContinueProcessing(); 437 } 438 439 /** {@inheritDoc} */ 440 @Override 441 public SynchronizationProviderResult handleConflictResolution( 442 PreOperationDeleteOperation deleteOperation) throws DirectoryException 443 { 444 LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation); 445 if (domain != null) 446 { 447 return domain.handleConflictResolution(deleteOperation); 448 } 449 return new SynchronizationProviderResult.ContinueProcessing(); 450 } 451 452 /** {@inheritDoc} */ 453 @Override 454 public SynchronizationProviderResult handleConflictResolution( 455 PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException 456 { 457 LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation); 458 if (domain != null) 459 { 460 return domain.handleConflictResolution(modifyDNOperation); 461 } 462 return new SynchronizationProviderResult.ContinueProcessing(); 463 } 464 465 /** {@inheritDoc} */ 466 @Override 467 public SynchronizationProviderResult 468 doPreOperation(PreOperationModifyOperation modifyOperation) 469 { 470 DN operationDN = modifyOperation.getEntryDN(); 471 LDAPReplicationDomain domain = findDomain(operationDN, modifyOperation); 472 473 if (domain == null || !domain.solveConflict()) 474 { 475 return new SynchronizationProviderResult.ContinueProcessing(); 476 } 477 478 EntryHistorical historicalInformation = (EntryHistorical) 479 modifyOperation.getAttachment(EntryHistorical.HISTORICAL); 480 if (historicalInformation == null) 481 { 482 Entry entry = modifyOperation.getModifiedEntry(); 483 historicalInformation = EntryHistorical.newInstanceFromEntry(entry); 484 modifyOperation.setAttachment(EntryHistorical.HISTORICAL, 485 historicalInformation); 486 } 487 historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); 488 historicalInformation.setHistoricalAttrToOperation(modifyOperation); 489 490 if (modifyOperation.getModifications().isEmpty()) 491 { 492 /* 493 * This operation becomes a no-op due to conflict resolution 494 * stop the processing and send an OK result 495 */ 496 return new SynchronizationProviderResult.StopProcessing( 497 ResultCode.SUCCESS, null); 498 } 499 500 return new SynchronizationProviderResult.ContinueProcessing(); 501 } 502 503 /** {@inheritDoc} */ 504 @Override 505 public SynchronizationProviderResult doPreOperation( 506 PreOperationDeleteOperation deleteOperation) throws DirectoryException 507 { 508 return new SynchronizationProviderResult.ContinueProcessing(); 509 } 510 511 /** {@inheritDoc} */ 512 @Override 513 public SynchronizationProviderResult doPreOperation( 514 PreOperationModifyDNOperation modifyDNOperation) 515 throws DirectoryException 516 { 517 DN operationDN = modifyDNOperation.getEntryDN(); 518 LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation); 519 520 if (domain == null || !domain.solveConflict()) 521 { 522 return new SynchronizationProviderResult.ContinueProcessing(); 523 } 524 525 // The historical object is retrieved from the attachment created 526 // in the HandleConflictResolution phase. 527 EntryHistorical historicalInformation = (EntryHistorical) 528 modifyDNOperation.getAttachment(EntryHistorical.HISTORICAL); 529 if (historicalInformation == null) 530 { 531 // When no Historical attached, create once by loading from the entry 532 // and attach it to the operation 533 Entry entry = modifyDNOperation.getUpdatedEntry(); 534 historicalInformation = EntryHistorical.newInstanceFromEntry(entry); 535 modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL, 536 historicalInformation); 537 } 538 historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); 539 540 // Add to the operation the historical attribute : "dn:changeNumber:moddn" 541 historicalInformation.setHistoricalAttrToOperation(modifyDNOperation); 542 543 return new SynchronizationProviderResult.ContinueProcessing(); 544 } 545 546 /** {@inheritDoc} */ 547 @Override 548 public SynchronizationProviderResult doPreOperation( 549 PreOperationAddOperation addOperation) 550 { 551 // Check replication domain 552 LDAPReplicationDomain domain = 553 findDomain(addOperation.getEntryDN(), addOperation); 554 if (domain == null) 555 { 556 return new SynchronizationProviderResult.ContinueProcessing(); 557 } 558 559 // For LOCAL op only, generate CSN and attach Context 560 if (!addOperation.isSynchronizationOperation()) 561 { 562 domain.doPreOperation(addOperation); 563 } 564 565 // Add to the operation the historical attribute : "dn:changeNumber:add" 566 EntryHistorical.setHistoricalAttrToOperation(addOperation); 567 568 return new SynchronizationProviderResult.ContinueProcessing(); 569 } 570 571 /** {@inheritDoc} */ 572 @Override 573 public void finalizeSynchronizationProvider() 574 { 575 setState(State.STOPPING); 576 577 for (LDAPReplicationDomain domain : domains.values()) 578 { 579 domain.shutdown(); 580 } 581 domains.clear(); 582 583 stopReplayThreads(); 584 585 if (replicationServerListener != null) 586 { 587 replicationServerListener.shutdown(); 588 } 589 590 DirectoryServer.deregisterBackupTaskListener(this); 591 DirectoryServer.deregisterRestoreTaskListener(this); 592 DirectoryServer.deregisterExportTaskListener(this); 593 DirectoryServer.deregisterImportTaskListener(this); 594 } 595 596 /** 597 * This method is called whenever the server detects a modification 598 * of the schema done by directly modifying the backing files 599 * of the schema backend. 600 * Call the schema Domain if it exists. 601 * 602 * @param modifications The list of modifications that was 603 * applied to the schema. 604 * 605 */ 606 @Override 607 public void processSchemaChange(List<Modification> modifications) 608 { 609 LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null); 610 if (domain != null) 611 { 612 domain.synchronizeSchemaModifications(modifications); 613 } 614 } 615 616 /** {@inheritDoc} */ 617 @Override 618 public void processBackupBegin(Backend backend, BackupConfig config) 619 { 620 for (DN dn : backend.getBaseDNs()) 621 { 622 LDAPReplicationDomain domain = findDomain(dn, null); 623 if (domain != null) 624 { 625 domain.backupStart(); 626 } 627 } 628 } 629 630 /** {@inheritDoc} */ 631 @Override 632 public void processBackupEnd(Backend backend, BackupConfig config, 633 boolean successful) 634 { 635 for (DN dn : backend.getBaseDNs()) 636 { 637 LDAPReplicationDomain domain = findDomain(dn, null); 638 if (domain != null) 639 { 640 domain.backupEnd(); 641 } 642 } 643 } 644 645 /** {@inheritDoc} */ 646 @Override 647 public void processRestoreBegin(Backend backend, RestoreConfig config) 648 { 649 for (DN dn : backend.getBaseDNs()) 650 { 651 LDAPReplicationDomain domain = findDomain(dn, null); 652 if (domain != null) 653 { 654 domain.disable(); 655 } 656 } 657 } 658 659 /** {@inheritDoc} */ 660 @Override 661 public void processRestoreEnd(Backend backend, RestoreConfig config, 662 boolean successful) 663 { 664 for (DN dn : backend.getBaseDNs()) 665 { 666 LDAPReplicationDomain domain = findDomain(dn, null); 667 if (domain != null) 668 { 669 domain.enable(); 670 } 671 } 672 } 673 674 /** {@inheritDoc} */ 675 @Override 676 public void processImportBegin(Backend backend, LDIFImportConfig config) 677 { 678 for (DN dn : backend.getBaseDNs()) 679 { 680 LDAPReplicationDomain domain = findDomain(dn, null); 681 if (domain != null) 682 { 683 domain.disable(); 684 } 685 } 686 } 687 688 /** {@inheritDoc} */ 689 @Override 690 public void processImportEnd(Backend backend, LDIFImportConfig config, 691 boolean successful) 692 { 693 for (DN dn : backend.getBaseDNs()) 694 { 695 LDAPReplicationDomain domain = findDomain(dn, null); 696 if (domain != null) 697 { 698 domain.enable(); 699 } 700 } 701 } 702 703 /** {@inheritDoc} */ 704 @Override 705 public void processExportBegin(Backend backend, LDIFExportConfig config) 706 { 707 for (DN dn : backend.getBaseDNs()) 708 { 709 LDAPReplicationDomain domain = findDomain(dn, null); 710 if (domain != null) 711 { 712 domain.backupStart(); 713 } 714 } 715 } 716 717 /** {@inheritDoc} */ 718 @Override 719 public void processExportEnd(Backend backend, LDIFExportConfig config, 720 boolean successful) 721 { 722 for (DN dn : backend.getBaseDNs()) 723 { 724 LDAPReplicationDomain domain = findDomain(dn, null); 725 if (domain != null) 726 { 727 domain.backupEnd(); 728 } 729 } 730 } 731 732 /** {@inheritDoc} */ 733 @Override 734 public ConfigChangeResult applyConfigurationDelete( 735 ReplicationDomainCfg configuration) 736 { 737 deleteDomain(configuration.getBaseDN()); 738 739 return new ConfigChangeResult(); 740 } 741 742 /** {@inheritDoc} */ 743 @Override 744 public boolean isConfigurationDeleteAcceptable( 745 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 746 { 747 return true; 748 } 749 750 /** 751 * Generic code for all the postOperation entry point. 752 * 753 * @param operation The Operation for which the post-operation is called. 754 * @param dn The Dn for which the post-operation is called. 755 */ 756 private void genericPostOperation(PostOperationOperation operation, DN dn) 757 { 758 LDAPReplicationDomain domain = findDomain(dn, operation); 759 if (domain != null) { 760 domain.synchronize(operation); 761 } 762 } 763 764 /** 765 * Returns the replication server listener associated to that Multimaster 766 * Replication. 767 * @return the listener. 768 */ 769 public ReplicationServerListener getReplicationServerListener() 770 { 771 return replicationServerListener; 772 } 773 774 /** {@inheritDoc} */ 775 @Override 776 public boolean isConfigurationChangeAcceptable( 777 ReplicationSynchronizationProviderCfg configuration, 778 List<LocalizableMessage> unacceptableReasons) 779 { 780 return true; 781 } 782 783 /** {@inheritDoc} */ 784 @Override 785 public ConfigChangeResult applyConfigurationChange( 786 ReplicationSynchronizationProviderCfg configuration) 787 { 788 int numUpdateRepayThread = configuration.getNumUpdateReplayThreads(); 789 790 // Stop threads then restart new number of threads 791 stopReplayThreads(); 792 replayThreadNumber = numUpdateRepayThread; 793 if (!domains.isEmpty()) 794 { 795 createReplayThreads(); 796 } 797 798 connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(), 799 Integer.MAX_VALUE); 800 801 return new ConfigChangeResult(); 802 } 803 804 /** {@inheritDoc} */ 805 @Override 806 public void completeSynchronizationProvider() 807 { 808 for (LDAPReplicationDomain domain : domains.values()) 809 { 810 domain.start(); 811 } 812 setState(State.RUNNING); 813 } 814 815 private void setState(State newState) 816 { 817 state.set(newState); 818 synchronized (state) 819 { 820 state.notifyAll(); 821 } 822 } 823 824 /** 825 * Gets the number of handled domain objects. 826 * @return The number of handled domain objects 827 */ 828 public static int getNumberOfDomains() 829 { 830 return domains.size(); 831 } 832 833 /** 834 * Gets the Set of domain baseDN which are disabled for the external changelog. 835 * 836 * @return The Set of domain baseDNs which are disabled for the external changelog. 837 * @throws DirectoryException 838 * if a problem occurs 839 */ 840 public static Set<DN> getExcludedChangelogDomains() throws DirectoryException 841 { 842 final Set<DN> disabledBaseDNs = new HashSet<>(domains.size() + 1); 843 disabledBaseDNs.add(DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT)); 844 for (LDAPReplicationDomain domain : domains.values()) 845 { 846 if (!domain.isECLEnabled()) 847 { 848 disabledBaseDNs.add(domain.getBaseDN()); 849 } 850 } 851 return disabledBaseDNs; 852 } 853 854 /** 855 * Returns whether the provided baseDN represents a replication domain enabled 856 * for the external changelog. 857 * 858 * @param baseDN 859 * the replication domain to check 860 * @return true if the provided baseDN is enabled for the external changelog, 861 * false if the provided baseDN is disabled for the external changelog 862 * or unknown to multimaster replication. 863 */ 864 public static boolean isECLEnabledDomain(DN baseDN) 865 { 866 if (State.STARTING.equals(state.get())) 867 { 868 synchronized (state) 869 { 870 while (State.STARTING.equals(state.get())) 871 { 872 try 873 { 874 state.wait(); 875 } 876 catch (InterruptedException ignored) 877 { 878 // loop and check state again 879 } 880 } 881 } 882 } 883 // if state is STOPPING, then we need to return from this method 884 final LDAPReplicationDomain domain = domains.get(baseDN); 885 return domain != null && domain.isECLEnabled(); 886 } 887 888 /** 889 * Returns the connection timeout in milli-seconds. 890 * 891 * @return The connection timeout in milli-seconds. 892 */ 893 public static int getConnectionTimeoutMS() 894 { 895 return connectionTimeoutMS; 896 } 897 898}