001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.plugin; 028 029import static org.forgerock.opendj.ldap.ResultCode.*; 030import static org.opends.messages.ReplicationMessages.*; 031import static org.opends.messages.ToolMessages.*; 032import static org.opends.server.protocols.internal.InternalClientConnection.*; 033import static org.opends.server.protocols.internal.Requests.*; 034import static org.opends.server.replication.plugin.EntryHistorical.*; 035import static org.opends.server.replication.protocol.OperationContext.*; 036import static org.opends.server.replication.service.ReplicationMonitor.*; 037import static org.opends.server.util.CollectionUtils.*; 038import static org.opends.server.util.ServerConstants.*; 039import static org.opends.server.util.StaticUtils.*; 040 041import java.io.File; 042import java.io.InputStream; 043import java.io.OutputStream; 044import java.io.StringReader; 045import java.util.*; 046import java.util.concurrent.BlockingQueue; 047import java.util.concurrent.TimeUnit; 048import java.util.concurrent.TimeoutException; 049import java.util.concurrent.atomic.AtomicBoolean; 050import java.util.concurrent.atomic.AtomicInteger; 051import java.util.concurrent.atomic.AtomicReference; 052import java.util.zip.DataFormatException; 053 054import org.forgerock.i18n.LocalizableMessage; 055import org.forgerock.i18n.slf4j.LocalizedLogger; 056import org.forgerock.opendj.config.server.ConfigChangeResult; 057import org.forgerock.opendj.config.server.ConfigException; 058import org.forgerock.opendj.ldap.ByteString; 059import org.forgerock.opendj.ldap.DecodeException; 060import org.forgerock.opendj.ldap.ModificationType; 061import org.forgerock.opendj.ldap.ResultCode; 062import org.forgerock.opendj.ldap.SearchScope; 063import org.opends.server.admin.server.ConfigurationChangeListener; 064import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy; 065import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; 066import org.opends.server.admin.std.server.ReplicationDomainCfg; 067import org.opends.server.api.AlertGenerator; 068import org.opends.server.api.Backend; 069import org.opends.server.api.Backend.BackendOperation; 070import org.opends.server.api.DirectoryThread; 071import org.opends.server.api.SynchronizationProvider; 072import org.opends.server.backends.task.Task; 073import org.opends.server.core.*; 074import org.opends.server.protocols.internal.InternalClientConnection; 075import org.opends.server.protocols.internal.InternalSearchListener; 076import org.opends.server.protocols.internal.InternalSearchOperation; 077import org.opends.server.protocols.internal.Requests; 078import org.opends.server.protocols.internal.SearchRequest; 079import org.opends.server.protocols.ldap.LDAPAttribute; 080import org.opends.server.protocols.ldap.LDAPControl; 081import org.opends.server.protocols.ldap.LDAPFilter; 082import org.opends.server.protocols.ldap.LDAPModification; 083import org.opends.server.replication.common.CSN; 084import org.opends.server.replication.common.ServerState; 085import org.opends.server.replication.common.ServerStatus; 086import org.opends.server.replication.common.StatusMachineEvent; 087import org.opends.server.replication.protocol.*; 088import org.opends.server.replication.service.DSRSShutdownSync; 089import org.opends.server.replication.service.ReplicationBroker; 090import org.opends.server.replication.service.ReplicationDomain; 091import org.opends.server.tasks.PurgeConflictsHistoricalTask; 092import org.opends.server.tasks.TaskUtils; 093import org.opends.server.types.*; 094import org.opends.server.types.operation.*; 095import org.opends.server.util.LDIFReader; 096import org.opends.server.util.TimeThread; 097import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation; 098 099/** 100 * This class implements the bulk part of the Directory Server side 101 * of the replication code. 102 * It contains the root method for publishing a change, 103 * processing a change received from the replicationServer service, 104 * handle conflict resolution, 105 * handle protocol messages from the replicationServer. 106 * <p> 107 * FIXME Move this class to org.opends.server.replication.service 108 * or the equivalent package once this code is moved to a maven module. 109 */ 110public final class LDAPReplicationDomain extends ReplicationDomain 111 implements ConfigurationChangeListener<ReplicationDomainCfg>, 112 AlertGenerator 113{ 114 /** 115 * Set of attributes that will return all the user attributes and the 116 * replication related operational attributes when used in a search operation. 117 */ 118 private static final Set<String> USER_AND_REPL_OPERATIONAL_ATTRS = 119 newHashSet(HISTORICAL_ATTRIBUTE_NAME, ENTRYUUID_ATTRIBUTE_NAME, "*"); 120 121 /** 122 * This class is used in the session establishment phase 123 * when no Replication Server with all the local changes has been found 124 * and we therefore need to recover them. 125 * A search is then performed on the database using this 126 * internalSearchListener. 127 */ 128 private class ScanSearchListener implements InternalSearchListener 129 { 130 private final CSN startCSN; 131 private final CSN endCSN; 132 133 public ScanSearchListener(CSN startCSN, CSN endCSN) 134 { 135 this.startCSN = startCSN; 136 this.endCSN = endCSN; 137 } 138 139 @Override 140 public void handleInternalSearchEntry( 141 InternalSearchOperation searchOperation, SearchResultEntry searchEntry) 142 throws DirectoryException 143 { 144 // Build the list of Operations that happened on this entry after startCSN 145 // and before endCSN and add them to the replayOperations list 146 Iterable<FakeOperation> updates = 147 EntryHistorical.generateFakeOperations(searchEntry); 148 149 for (FakeOperation op : updates) 150 { 151 CSN csn = op.getCSN(); 152 if (csn.isNewerThan(startCSN) && csn.isOlderThan(endCSN)) 153 { 154 synchronized (replayOperations) 155 { 156 replayOperations.put(csn, op); 157 } 158 } 159 } 160 } 161 162 @Override 163 public void handleInternalSearchReference( 164 InternalSearchOperation searchOperation, 165 SearchResultReference searchReference) throws DirectoryException 166 { 167 // Nothing to do. 168 } 169 } 170 171 /** The fully-qualified name of this class. */ 172 private static final String CLASS_NAME = LDAPReplicationDomain.class.getName(); 173 174 /** 175 * The attribute used to mark conflicting entries. 176 * The value of this attribute should be the dn that this entry was 177 * supposed to have when it was marked as conflicting. 178 */ 179 public static final String DS_SYNC_CONFLICT = "ds-sync-conflict"; 180 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 181 182 private final DSRSShutdownSync dsrsShutdownSync; 183 /** 184 * The update to replay message queue where the listener thread is going to 185 * push incoming update messages. 186 */ 187 private final BlockingQueue<UpdateToReplay> updateToReplayQueue; 188 /** The number of naming conflicts successfully resolved. */ 189 private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger(); 190 /** The number of modify conflicts successfully resolved. */ 191 private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger(); 192 /** The number of unresolved naming conflicts. */ 193 private final AtomicInteger numUnresolvedNamingConflicts = 194 new AtomicInteger(); 195 /** The number of updates replayed successfully by the replication. */ 196 private final AtomicInteger numReplayedPostOpCalled = new AtomicInteger(); 197 198 private final PersistentServerState state; 199 private volatile boolean generationIdSavedStatus; 200 201 /** 202 * This object is used to store the list of update currently being 203 * done on the local database. 204 * Is is useful to make sure that the local operations are sent in a 205 * correct order to the replication server and that the ServerState 206 * is not updated too early. 207 */ 208 private final PendingChanges pendingChanges; 209 private final AtomicReference<RSUpdater> rsUpdater = new AtomicReference<>(null); 210 211 /** 212 * It contain the updates that were done on other servers, transmitted by the 213 * replication server and that are currently replayed. 214 * <p> 215 * It is useful to make sure that dependencies between operations are 216 * correctly fulfilled and to make sure that the ServerState is not updated 217 * too early. 218 */ 219 private final RemotePendingChanges remotePendingChanges; 220 private boolean solveConflictFlag = true; 221 222 private final InternalClientConnection conn = getRootConnection(); 223 private final AtomicBoolean shutdown = new AtomicBoolean(); 224 private volatile boolean disabled; 225 private volatile boolean stateSavingDisabled; 226 227 /** 228 * This list is used to temporary store operations that needs to be replayed 229 * at session establishment time. 230 */ 231 private final SortedMap<CSN, FakeOperation> replayOperations = new TreeMap<>(); 232 233 private ExternalChangelogDomain eclDomain; 234 235 /** A boolean indicating if the thread used to save the persistentServerState is terminated. */ 236 private volatile boolean done = true; 237 238 private final ServerStateFlush flushThread; 239 240 /** The attribute name used to store the generation id in the backend. */ 241 private static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id"; 242 /** The attribute name used to store the fractional include configuration in the backend. */ 243 static final String REPLICATION_FRACTIONAL_INCLUDE = "ds-sync-fractional-include"; 244 /** The attribute name used to store the fractional exclude configuration in the backend. */ 245 static final String REPLICATION_FRACTIONAL_EXCLUDE = "ds-sync-fractional-exclude"; 246 247 /** 248 * Fractional replication variables. 249 */ 250 251 /** Holds the fractional configuration for this domain, if any. */ 252 private final FractionalConfig fractionalConfig; 253 254 /** The list of attributes that cannot be used in fractional replication configuration. */ 255 private static final String[] FRACTIONAL_PROHIBITED_ATTRIBUTES = new String[] 256 { 257 "objectClass", 258 "2.5.4.0" // objectClass OID 259 }; 260 261 /** 262 * When true, this flag is used to force the domain status to be put in bad 263 * data set just after the connection to the replication server. 264 * This must be used when fractional replication is enabled with a 265 * configuration different from the previous one (or at the very first 266 * fractional usage time) : after connection, a ChangeStatusMsg is sent 267 * requesting the bad data set status. Then none of the update messages 268 * received from the replication server are taken into account until the 269 * backend is synchronized with brand new data set compliant with the new 270 * fractional configuration (i.e with compliant fractional configuration in 271 * domain root entry). 272 */ 273 private boolean forceBadDataSet; 274 275 /** 276 * The message id to be used when an import is stopped with error by 277 * the fractional replication ldif import plugin. 278 */ 279 private int importErrorMessageId = -1; 280 /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE. */ 281 static final int IMPORT_ERROR_MESSAGE_BAD_REMOTE = 1; 282 /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL. */ 283 static final int IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL = 2; 284 285 /* 286 * Definitions for the return codes of the 287 * fractionalFilterOperation(PreOperationModifyOperation 288 * modifyOperation, boolean performFiltering) method 289 */ 290 /** 291 * The operation contains attributes subject to fractional filtering according 292 * to the fractional configuration. 293 */ 294 private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1; 295 /** 296 * The operation contains no attributes subject to fractional filtering 297 * according to the fractional configuration. 298 */ 299 private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2; 300 /** The operation should become a no-op. */ 301 private static final int FRACTIONAL_BECOME_NO_OP = 3; 302 303 /** 304 * The last CSN purged in this domain. Allows to have a continuous purging 305 * process from one purge processing (task run) to the next one. Values 0 when 306 * the server starts. 307 */ 308 private CSN lastCSNPurgedFromHist = new CSN(0,0,0); 309 310 /** 311 * The thread that periodically saves the ServerState of this 312 * LDAPReplicationDomain in the database. 313 */ 314 private class ServerStateFlush extends DirectoryThread 315 { 316 protected ServerStateFlush() 317 { 318 super("Replica DS(" + getServerId() + ") state checkpointer for domain \"" + getBaseDN() + "\""); 319 } 320 321 @Override 322 public void run() 323 { 324 done = false; 325 326 while (!isShutdownInitiated()) 327 { 328 try 329 { 330 synchronized (this) 331 { 332 wait(1000); 333 if (!disabled && !stateSavingDisabled) 334 { 335 // save the ServerState 336 state.save(); 337 } 338 } 339 } 340 catch (InterruptedException e) 341 { 342 // Thread interrupted: check for shutdown. 343 Thread.currentThread().interrupt(); 344 } 345 } 346 state.save(); 347 348 done = true; 349 } 350 } 351 352 /** 353 * The thread that is responsible to update the RS to which this domain is 354 * connected in case it is late and there is no RS which is up to date. 355 */ 356 private class RSUpdater extends DirectoryThread 357 { 358 private final CSN startCSN; 359 360 protected RSUpdater(CSN replServerMaxCSN) 361 { 362 super("Replica DS(" + getServerId() + ") missing change publisher for domain \"" + getBaseDN() + "\""); 363 this.startCSN = replServerMaxCSN; 364 } 365 366 @Override 367 public void run() 368 { 369 // Replication server is missing some of our changes: 370 // let's send them to him. 371 logger.trace(DEBUG_GOING_TO_SEARCH_FOR_CHANGES); 372 373 /* 374 * Get all the changes that have not been seen by this 375 * replication server and publish them. 376 */ 377 try 378 { 379 if (buildAndPublishMissingChanges(startCSN, broker)) 380 { 381 logger.trace(DEBUG_CHANGES_SENT); 382 synchronized(replayOperations) 383 { 384 replayOperations.clear(); 385 } 386 } 387 else 388 { 389 /* 390 * An error happened trying to search for the updates 391 * This server will start accepting again new updates but 392 * some inconsistencies will stay between servers. 393 * Log an error for the repair tool 394 * that will need to re-synchronize the servers. 395 */ 396 logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN()); 397 } 398 } 399 catch (Exception e) 400 { 401 /* 402 * An error happened trying to search for the updates 403 * This server will start accepting again new updates but 404 * some inconsistencies will stay between servers. 405 * Log an error for the repair tool 406 * that will need to re-synchronize the servers. 407 */ 408 logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN()); 409 } 410 finally 411 { 412 broker.setRecoveryRequired(false); 413 // RSUpdater thread has finished its work, let's remove it from memory 414 // so another RSUpdater thread can be started if needed. 415 rsUpdater.compareAndSet(this, null); 416 } 417 } 418 } 419 420 /** 421 * Creates a new ReplicationDomain using configuration from configEntry. 422 * 423 * @param configuration The configuration of this ReplicationDomain. 424 * @param updateToReplayQueue The queue for update messages to replay. 425 * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. 426 * @throws ConfigException In case of invalid configuration. 427 */ 428 LDAPReplicationDomain(ReplicationDomainCfg configuration, 429 BlockingQueue<UpdateToReplay> updateToReplayQueue, 430 DSRSShutdownSync dsrsShutdownSync) throws ConfigException 431 { 432 super(configuration, -1); 433 434 this.updateToReplayQueue = updateToReplayQueue; 435 this.dsrsShutdownSync = dsrsShutdownSync; 436 437 // Get assured configuration 438 readAssuredConfig(configuration, false); 439 440 // Get fractional configuration 441 fractionalConfig = new FractionalConfig(getBaseDN()); 442 readFractionalConfig(configuration, false); 443 storeECLConfiguration(configuration); 444 solveConflictFlag = isSolveConflict(configuration); 445 446 Backend<?> backend = getBackend(); 447 if (backend == null) 448 { 449 throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(getBaseDN())); 450 } 451 452 try 453 { 454 generationId = loadGenerationId(); 455 } 456 catch (DirectoryException e) 457 { 458 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 459 } 460 461 /* 462 * Create a new Persistent Server State that will be used to store 463 * the last CSN seen from all LDAP servers in the topology. 464 */ 465 state = new PersistentServerState(getBaseDN(), getServerId(), 466 getServerState()); 467 flushThread = new ServerStateFlush(); 468 469 /* 470 * CSNGenerator is used to create new unique CSNs for each operation done on 471 * this replication domain. 472 * 473 * The generator time is adjusted to the time of the last CSN received from 474 * remote other servers. 475 */ 476 pendingChanges = new PendingChanges(getGenerator(), this); 477 remotePendingChanges = new RemotePendingChanges(getServerState()); 478 479 // listen for changes on the configuration 480 configuration.addChangeListener(this); 481 482 // register as an AlertGenerator 483 DirectoryServer.registerAlertGenerator(this); 484 485 startPublishService(); 486 } 487 488 /** 489 * Modify conflicts are solved for all suffixes but the schema suffix because 490 * we don't want to store extra information in the schema ldif files. This has 491 * no negative impact because the changes on schema should not produce 492 * conflicts. 493 */ 494 private boolean isSolveConflict(ReplicationDomainCfg cfg) 495 { 496 return !getBaseDN().equals(DirectoryServer.getSchemaDN()) 497 && cfg.isSolveConflicts(); 498 } 499 500 /** 501 * Sets the error message id to be used when online import is stopped with 502 * error by the fractional replication ldif import plugin. 503 * @param importErrorMessageId The message to use. 504 */ 505 void setImportErrorMessageId(int importErrorMessageId) 506 { 507 this.importErrorMessageId = importErrorMessageId; 508 } 509 510 /** 511 * This flag is used by the fractional replication ldif import plugin to stop 512 * the (online) import process if a fractional configuration inconsistency is 513 * detected by it. 514 * 515 * @return true if the online import currently in progress should continue, 516 * false otherwise. 517 */ 518 private boolean isFollowImport() 519 { 520 return importErrorMessageId == -1; 521 } 522 523 /** 524 * Gets and stores the fractional replication configuration parameters. 525 * @param configuration The configuration object 526 * @param allowReconnection Tells if one must reconnect if significant changes 527 * occurred 528 */ 529 private void readFractionalConfig(ReplicationDomainCfg configuration, 530 boolean allowReconnection) 531 { 532 // Read the configuration entry 533 FractionalConfig newFractionalConfig; 534 try 535 { 536 newFractionalConfig = FractionalConfig.toFractionalConfig(configuration); 537 } 538 catch(ConfigException e) 539 { 540 // Should not happen as normally already called without problem in 541 // isConfigurationChangeAcceptable or isConfigurationAcceptable 542 // if we come up to this method 543 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 544 return; 545 } 546 547 /** 548 * Is there any change in fractional configuration ? 549 */ 550 551 // Compute current configuration 552 boolean needReconnection; 553 try 554 { 555 needReconnection = !FractionalConfig. 556 isFractionalConfigEquivalent(fractionalConfig, newFractionalConfig); 557 } 558 catch (ConfigException e) 559 { 560 // Should not happen 561 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 562 return; 563 } 564 565 // Disable service if configuration changed 566 final boolean needRestart = needReconnection && allowReconnection; 567 if (needRestart) 568 { 569 disableService(); 570 } 571 // Set new configuration 572 int newFractionalMode = newFractionalConfig.fractionalConfigToInt(); 573 fractionalConfig.setFractional(newFractionalMode != 574 FractionalConfig.NOT_FRACTIONAL); 575 if (fractionalConfig.isFractional()) 576 { 577 // Set new fractional configuration values 578 fractionalConfig.setFractionalExclusive( 579 newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); 580 fractionalConfig.setFractionalSpecificClassesAttributes( 581 newFractionalConfig.getFractionalSpecificClassesAttributes()); 582 fractionalConfig.setFractionalAllClassesAttributes( 583 newFractionalConfig.fractionalAllClassesAttributes); 584 } else 585 { 586 // Reset default values 587 fractionalConfig.setFractionalExclusive(true); 588 fractionalConfig.setFractionalSpecificClassesAttributes( 589 new HashMap<String, Set<String>>()); 590 fractionalConfig.setFractionalAllClassesAttributes(new HashSet<String>()); 591 } 592 593 // Reconnect if required 594 if (needRestart) 595 { 596 enableService(); 597 } 598 } 599 600 /** 601 * Return true if the fractional configuration stored in the domain root 602 * entry of the backend is equivalent to the fractional configuration stored 603 * in the local variables. 604 */ 605 private boolean isBackendFractionalConfigConsistent() 606 { 607 // Read config stored in domain root entry 608 if (logger.isTraceEnabled()) 609 { 610 logger.trace("Attempt to read the potential fractional config in domain root entry " + getBaseDN()); 611 } 612 613 // Search the domain root entry that is used to save the generation id 614 SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT) 615 .addAttribute(REPLICATION_GENERATION_ID, REPLICATION_FRACTIONAL_EXCLUDE, REPLICATION_FRACTIONAL_INCLUDE); 616 InternalSearchOperation search = conn.processSearch(request); 617 618 if (search.getResultCode() != ResultCode.SUCCESS 619 && search.getResultCode() != ResultCode.NO_SUCH_OBJECT) 620 { 621 String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage(); 622 logger.error(ERR_SEARCHING_GENERATION_ID, errorMsg, getBaseDN()); 623 return false; 624 } 625 626 SearchResultEntry resultEntry = findReplicationSearchResultEntry(search); 627 if (resultEntry == null) 628 { 629 /* 630 * The backend is probably empty: if there is some fractional 631 * configuration in memory, we do not let the domain being connected, 632 * otherwise, it's ok 633 */ 634 return !fractionalConfig.isFractional(); 635 } 636 637 // Now extract fractional configuration if any 638 Iterator<String> exclIt = 639 getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_EXCLUDE); 640 Iterator<String> inclIt = 641 getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_INCLUDE); 642 643 // Compare backend and local fractional configuration 644 return isFractionalConfigConsistent(fractionalConfig, exclIt, inclIt); 645 } 646 647 private SearchResultEntry findReplicationSearchResultEntry( 648 InternalSearchOperation searchOperation) 649 { 650 final SearchResultEntry resultEntry = getFirstResult(searchOperation); 651 if (resultEntry != null) 652 { 653 AttributeType synchronizationGenIDType = 654 DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID); 655 List<Attribute> attrs = 656 resultEntry.getAttribute(synchronizationGenIDType); 657 if (attrs != null) 658 { 659 Attribute attr = attrs.get(0); 660 if (attr.size() == 1) 661 { 662 return resultEntry; 663 } 664 if (attr.size() > 1) 665 { 666 String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString(); 667 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg); 668 } 669 } 670 } 671 return null; 672 } 673 674 private Iterator<String> getAttributeValueIterator( 675 SearchResultEntry resultEntry, String attrName) 676 { 677 AttributeType attrType = DirectoryServer.getAttributeType(attrName); 678 List<Attribute> exclAttrs = resultEntry.getAttribute(attrType); 679 if (exclAttrs != null) 680 { 681 Attribute exclAttr = exclAttrs.get(0); 682 if (exclAttr != null) 683 { 684 return new AttributeValueStringIterator(exclAttr.iterator()); 685 } 686 } 687 return null; 688 } 689 690 /** 691 * Return true if the fractional configuration passed as fractional 692 * configuration attribute values is equivalent to the fractional 693 * configuration stored in the local variables. 694 * @param fractionalConfig The local fractional configuration 695 * @param exclIt Fractional exclude mode configuration attribute values to 696 * analyze. 697 * @param inclIt Fractional include mode configuration attribute values to 698 * analyze. 699 * @return True if the fractional configuration passed as fractional 700 * configuration attribute values is equivalent to the fractional 701 * configuration stored in the local variables. 702 */ 703 static boolean isFractionalConfigConsistent( 704 FractionalConfig fractionalConfig, Iterator<String> exclIt, 705 Iterator<String> inclIt) 706 { 707 /* 708 * Parse fractional configuration stored in passed fractional configuration 709 * attributes values 710 */ 711 712 Map<String, Set<String>> storedFractionalSpecificClassesAttributes = new HashMap<>(); 713 Set<String> storedFractionalAllClassesAttributes = new HashSet<>(); 714 715 int storedFractionalMode; 716 try 717 { 718 storedFractionalMode = FractionalConfig.parseFractionalConfig(exclIt, 719 inclIt, storedFractionalSpecificClassesAttributes, 720 storedFractionalAllClassesAttributes); 721 } catch (ConfigException e) 722 { 723 // Should not happen as configuration in domain root entry is flushed 724 // from valid configuration in local variables 725 logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e)); 726 return false; 727 } 728 729 FractionalConfig storedFractionalConfig = new FractionalConfig( 730 fractionalConfig.getBaseDn()); 731 storedFractionalConfig.setFractional(storedFractionalMode != 732 FractionalConfig.NOT_FRACTIONAL); 733 // Set stored fractional configuration values 734 if (storedFractionalConfig.isFractional()) 735 { 736 storedFractionalConfig.setFractionalExclusive( 737 storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); 738 } 739 storedFractionalConfig.setFractionalSpecificClassesAttributes( 740 storedFractionalSpecificClassesAttributes); 741 storedFractionalConfig.setFractionalAllClassesAttributes( 742 storedFractionalAllClassesAttributes); 743 744 /* 745 * Compare configuration stored in passed fractional configuration 746 * attributes with local variable one 747 */ 748 try 749 { 750 return FractionalConfig. 751 isFractionalConfigEquivalent(fractionalConfig, storedFractionalConfig); 752 } catch (ConfigException e) 753 { 754 // Should not happen as configuration in domain root entry is flushed 755 // from valid configuration in local variables so both should have already 756 // been checked 757 logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e)); 758 return false; 759 } 760 } 761 762 /** 763 * Utility class to have get a string iterator from an AtributeValue iterator. 764 * Assuming the attribute values are strings. 765 */ 766 static class AttributeValueStringIterator implements Iterator<String> 767 { 768 private final Iterator<ByteString> attrValIt; 769 770 /** 771 * Creates a new AttributeValueStringIterator object. 772 * @param attrValIt The underlying attribute iterator to use, assuming 773 * internal values are strings. 774 */ 775 AttributeValueStringIterator(Iterator<ByteString> attrValIt) 776 { 777 this.attrValIt = attrValIt; 778 } 779 780 @Override 781 public boolean hasNext() 782 { 783 return attrValIt.hasNext(); 784 } 785 786 @Override 787 public String next() 788 { 789 return attrValIt.next().toString(); 790 } 791 792 // Should not be needed anyway 793 @Override 794 public void remove() 795 { 796 attrValIt.remove(); 797 } 798 } 799 800 /** 801 * Compare 2 attribute collections and returns true if they are equivalent. 802 * 803 * @param attributes1 804 * First attribute collection to compare. 805 * @param attributes2 806 * Second attribute collection to compare. 807 * @return True if both attribute collection are equivalent. 808 * @throws ConfigException 809 * If some attributes could not be retrieved from the schema. 810 */ 811 private static boolean areAttributesEquivalent( 812 Collection<String> attributes1, Collection<String> attributes2) 813 throws ConfigException 814 { 815 // Compare all classes attributes 816 if (attributes1.size() != attributes2.size()) 817 { 818 return false; 819 } 820 821 // Check consistency of all classes attributes 822 Schema schema = DirectoryServer.getSchema(); 823 /* 824 * For each attribute in attributes1, check there is the matching 825 * one in attributes2. 826 */ 827 for (String attrName1 : attributes1) 828 { 829 // Get attribute from attributes1 830 AttributeType attributeType1 = schema.getAttributeType(attrName1); 831 if (attributeType1 == null) 832 { 833 throw new ConfigException( 834 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName1)); 835 } 836 // Look for matching one in attributes2 837 boolean foundAttribute = false; 838 for (String attrName2 : attributes2) 839 { 840 AttributeType attributeType2 = schema.getAttributeType(attrName2); 841 if (attributeType2 == null) 842 { 843 throw new ConfigException( 844 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName2)); 845 } 846 if (attributeType1.equals(attributeType2)) 847 { 848 foundAttribute = true; 849 break; 850 } 851 } 852 // Found matching attribute ? 853 if (!foundAttribute) 854 { 855 return false; 856 } 857 } 858 859 return true; 860 } 861 862 /** 863 * Check that the passed fractional configuration is acceptable 864 * regarding configuration syntax, schema constraints... 865 * Throws an exception if the configuration is not acceptable. 866 * @param configuration The configuration to analyze. 867 * @throws org.opends.server.config.ConfigException if the configuration is 868 * not acceptable. 869 */ 870 private static void isFractionalConfigAcceptable( 871 ReplicationDomainCfg configuration) throws ConfigException 872 { 873 /* 874 * Parse fractional configuration 875 */ 876 877 // Read the configuration entry 878 FractionalConfig newFractionalConfig = FractionalConfig.toFractionalConfig( 879 configuration); 880 881 if (!newFractionalConfig.isFractional()) 882 { 883 // Nothing to check 884 return; 885 } 886 887 // Prepare variables to be filled with config 888 Map<String, Set<String>> newFractionalSpecificClassesAttributes = 889 newFractionalConfig.getFractionalSpecificClassesAttributes(); 890 Set<String> newFractionalAllClassesAttributes = 891 newFractionalConfig.getFractionalAllClassesAttributes(); 892 893 /* 894 * Check attributes consistency : we only allow to filter MAY (optional) 895 * attributes of a class : to be compliant with the schema, no MUST 896 * (mandatory) attribute can be filtered by fractional replication. 897 */ 898 899 // Check consistency of specific classes attributes 900 Schema schema = DirectoryServer.getSchema(); 901 int fractionalMode = newFractionalConfig.fractionalConfigToInt(); 902 for (String className : newFractionalSpecificClassesAttributes.keySet()) 903 { 904 // Does the class exist ? 905 ObjectClass fractionalClass = schema.getObjectClass( 906 className.toLowerCase()); 907 if (fractionalClass == null) 908 { 909 throw new ConfigException( 910 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className)); 911 } 912 913 boolean isExtensibleObjectClass = 914 "extensibleObject".equalsIgnoreCase(className); 915 916 Set<String> attributes = 917 newFractionalSpecificClassesAttributes.get(className); 918 919 for (String attrName : attributes) 920 { 921 // Not a prohibited attribute ? 922 if (isFractionalProhibitedAttr(attrName)) 923 { 924 throw new ConfigException( 925 NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName)); 926 } 927 928 // Does the attribute exist ? 929 AttributeType attributeType = schema.getAttributeType(attrName); 930 if (attributeType != null) 931 { 932 // No more checking for the extensibleObject class 933 if (!isExtensibleObjectClass 934 && fractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL 935 // Exclusive mode : the attribute must be optional 936 && !fractionalClass.isOptional(attributeType)) 937 { 938 throw new ConfigException( 939 NOTE_ERR_FRACTIONAL_CONFIG_NOT_OPTIONAL_ATTRIBUTE.get(attrName, 940 className)); 941 } 942 } 943 else 944 { 945 throw new ConfigException( 946 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName)); 947 } 948 } 949 } 950 951 // Check consistency of all classes attributes 952 for (String attrName : newFractionalAllClassesAttributes) 953 { 954 // Not a prohibited attribute ? 955 if (isFractionalProhibitedAttr(attrName)) 956 { 957 throw new ConfigException( 958 NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName)); 959 } 960 961 // Does the attribute exist ? 962 if (schema.getAttributeType(attrName) == null) 963 { 964 throw new ConfigException( 965 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName)); 966 } 967 } 968 } 969 970 /** 971 * Test if the passed attribute is not allowed to be used in configuration of 972 * fractional replication. 973 * @param attr Attribute to test. 974 * @return true if the attribute is prohibited. 975 */ 976 private static boolean isFractionalProhibitedAttr(String attr) 977 { 978 for (String forbiddenAttr : FRACTIONAL_PROHIBITED_ATTRIBUTES) 979 { 980 if (forbiddenAttr.equalsIgnoreCase(attr)) 981 { 982 return true; 983 } 984 } 985 return false; 986 } 987 988 /** 989 * If fractional replication is enabled, this analyzes the operation and 990 * suppresses the forbidden attributes in it so that they are not added in 991 * the local backend. 992 * 993 * @param addOperation The operation to modify based on fractional 994 * replication configuration 995 * @param performFiltering Tells if the effective attribute filtering should 996 * be performed or if the call is just to analyze if there are some 997 * attributes filtered by fractional configuration 998 * @return true if the operation contains some attributes subject to filtering 999 * by the fractional configuration 1000 */ 1001 private boolean fractionalFilterOperation( 1002 PreOperationAddOperation addOperation, boolean performFiltering) 1003 { 1004 return fractionalRemoveAttributesFromEntry(fractionalConfig, 1005 addOperation.getEntryDN().rdn(), addOperation.getObjectClasses(), 1006 addOperation.getUserAttributes(), performFiltering); 1007 } 1008 1009 /** 1010 * If fractional replication is enabled, this analyzes the operation and 1011 * suppresses the forbidden attributes in it so that they are not added in 1012 * the local backend. 1013 * 1014 * @param modifyDNOperation The operation to modify based on fractional 1015 * replication configuration 1016 * @param performFiltering Tells if the effective modifications should 1017 * be performed or if the call is just to analyze if there are some 1018 * inconsistency with fractional configuration 1019 * @return true if the operation is inconsistent with fractional 1020 * configuration 1021 */ 1022 private boolean fractionalFilterOperation( 1023 PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering) 1024 { 1025 // Quick exit if not called for analyze and 1026 if (performFiltering && modifyDNOperation.deleteOldRDN()) 1027 { 1028 // The core will remove any occurrence of attribute that was part of the 1029 // old RDN, nothing more to do. 1030 return true; // Will not be used as analyze was not requested 1031 } 1032 1033 // Create a list of filtered attributes for this entry 1034 Entry concernedEntry = modifyDNOperation.getOriginalEntry(); 1035 Set<String> fractionalConcernedAttributes = 1036 createFractionalConcernedAttrList(fractionalConfig, 1037 concernedEntry.getObjectClasses().keySet()); 1038 1039 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1040 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1041 { 1042 // No attributes to filter 1043 return false; 1044 } 1045 1046 /* 1047 * Analyze the old and new rdn to see if they are some attributes to be 1048 * removed: if the oldRDN contains some forbidden attributes (for instance 1049 * it is possible if the entry was created with an add operation and the 1050 * RDN used contains a forbidden attribute: in this case the attribute value 1051 * has been kept to be consistent with the dn of the entry.) that are no 1052 * more part of the new RDN, we must remove any attribute of this type by 1053 * putting a modification to delete the attribute. 1054 */ 1055 1056 boolean inconsistentOperation = false; 1057 RDN rdn = modifyDNOperation.getEntryDN().rdn(); 1058 RDN newRdn = modifyDNOperation.getNewRDN(); 1059 1060 // Go through each attribute of the old RDN 1061 for (int i=0 ; i<rdn.getNumValues() ; i++) 1062 { 1063 AttributeType attributeType = rdn.getAttributeType(i); 1064 // Is it present in the fractional attributes established list ? 1065 boolean foundAttribute = 1066 exists(fractionalConcernedAttributes, attributeType); 1067 if (canRemoveAttribute(fractionalExclusive, foundAttribute) 1068 && !newRdn.hasAttributeType(attributeType) 1069 && !modifyDNOperation.deleteOldRDN()) 1070 { 1071 /* 1072 * A forbidden attribute is in the old RDN and no more in the new RDN, 1073 * and it has not been requested to remove attributes from old RDN: 1074 * let's remove the attribute from the entry to stay consistent with 1075 * fractional configuration 1076 */ 1077 Modification modification = new Modification(ModificationType.DELETE, 1078 Attributes.empty(attributeType)); 1079 modifyDNOperation.addModification(modification); 1080 inconsistentOperation = true; 1081 } 1082 } 1083 1084 return inconsistentOperation; 1085 } 1086 1087 private boolean exists(Set<String> attrNames, AttributeType attrTypeToFind) 1088 { 1089 for (String attrName : attrNames) 1090 { 1091 if (DirectoryServer.getAttributeType(attrName).equals(attrTypeToFind)) 1092 { 1093 return true; 1094 } 1095 } 1096 return false; 1097 } 1098 1099 /** 1100 * Remove attributes from an entry, according to the passed fractional 1101 * configuration. The entry is represented by the 2 passed parameters. 1102 * The attributes to be removed are removed using the remove method on the 1103 * passed iterator for the attributes in the entry. 1104 * @param fractionalConfig The fractional configuration to use 1105 * @param entryRdn The rdn of the entry to add 1106 * @param classes The object classes representing the entry to modify 1107 * @param attributesMap The map of attributes/values to be potentially removed 1108 * from the entry. 1109 * @param performFiltering Tells if the effective attribute filtering should 1110 * be performed or if the call is just an analyze to see if there are some 1111 * attributes filtered by fractional configuration 1112 * @return true if the operation contains some attributes subject to filtering 1113 * by the fractional configuration 1114 */ 1115 static boolean fractionalRemoveAttributesFromEntry( 1116 FractionalConfig fractionalConfig, RDN entryRdn, 1117 Map<ObjectClass,String> classes, Map<AttributeType, 1118 List<Attribute>> attributesMap, boolean performFiltering) 1119 { 1120 boolean hasSomeAttributesToFilter = false; 1121 /* 1122 * Prepare a list of attributes to be included/excluded according to the 1123 * fractional replication configuration 1124 */ 1125 1126 Set<String> fractionalConcernedAttributes = 1127 createFractionalConcernedAttrList(fractionalConfig, classes.keySet()); 1128 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1129 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1130 { 1131 return false; // No attributes to filter 1132 } 1133 1134 // Prepare list of object classes of the added entry 1135 Set<ObjectClass> entryClasses = classes.keySet(); 1136 1137 /* 1138 * Go through the user attributes and remove those that match filtered one 1139 * - exclude mode : remove only attributes that are in 1140 * fractionalConcernedAttributes 1141 * - include mode : remove any attribute that is not in 1142 * fractionalConcernedAttributes 1143 */ 1144 List<List<Attribute>> newRdnAttrLists = new ArrayList<>(); 1145 List<AttributeType> rdnAttrTypes = new ArrayList<>(); 1146 final Set<AttributeType> attrTypes = attributesMap.keySet(); 1147 for (Iterator<AttributeType> iter = attrTypes.iterator(); iter.hasNext();) 1148 { 1149 AttributeType attributeType = iter.next(); 1150 1151 // Only optional attributes may be removed 1152 if (isMandatoryAttribute(entryClasses, attributeType) 1153 // Do not remove an attribute if it is a prohibited one 1154 || isFractionalProhibited(attributeType) 1155 || !canRemoveAttribute(attributeType, fractionalExclusive, 1156 fractionalConcernedAttributes)) 1157 { 1158 continue; 1159 } 1160 1161 if (!performFiltering) 1162 { 1163 // The call was just to check : at least one attribute to filter 1164 // found, return immediately the answer; 1165 return true; 1166 } 1167 1168 // Do not remove an attribute/value that is part of the RDN of the 1169 // entry as it is forbidden 1170 if (entryRdn.hasAttributeType(attributeType)) 1171 { 1172 /* 1173 We must remove all values of the attributes map for this 1174 attribute type but the one that has the value which is in the RDN 1175 of the entry. In fact the (underlying )attribute list does not 1176 support remove so we have to create a new list, keeping only the 1177 attribute value which is the same as in the RDN 1178 */ 1179 ByteString rdnAttributeValue = 1180 entryRdn.getAttributeValue(attributeType); 1181 List<Attribute> attrList = attributesMap.get(attributeType); 1182 ByteString sameAttrValue = null; 1183 // Locate the attribute value identical to the one in the RDN 1184 for (Attribute attr : attrList) 1185 { 1186 if (attr.contains(rdnAttributeValue)) 1187 { 1188 for (ByteString attrValue : attr) { 1189 if (rdnAttributeValue.equals(attrValue)) { 1190 // Keep the value we want 1191 sameAttrValue = attrValue; 1192 } else { 1193 hasSomeAttributesToFilter = true; 1194 } 1195 } 1196 } 1197 else 1198 { 1199 hasSomeAttributesToFilter = true; 1200 } 1201 } 1202 // Recreate the attribute list with only the RDN attribute value 1203 if (sameAttrValue != null) 1204 // Paranoia check: should never be the case as we should always 1205 // find the attribute/value pair matching the pair in the RDN 1206 { 1207 // Construct and store new attribute list 1208 newRdnAttrLists.add(Attributes.createAsList(attributeType, sameAttrValue)); 1209 /* 1210 Store matching attribute type 1211 The mapping will be done using object from rdnAttrTypes as key 1212 and object from newRdnAttrLists (at same index) as value in 1213 the user attribute map to be modified 1214 */ 1215 rdnAttrTypes.add(attributeType); 1216 } 1217 } 1218 else 1219 { 1220 // Found an attribute to remove, remove it from the list. 1221 iter.remove(); 1222 hasSomeAttributesToFilter = true; 1223 } 1224 } 1225 // Now overwrite the attribute values for the attribute types present in the 1226 // RDN, if there are some filtered attributes in the RDN 1227 for (int index = 0 ; index < rdnAttrTypes.size() ; index++) 1228 { 1229 attributesMap.put(rdnAttrTypes.get(index), newRdnAttrLists.get(index)); 1230 } 1231 return hasSomeAttributesToFilter; 1232 } 1233 1234 private static boolean isMandatoryAttribute(Set<ObjectClass> entryClasses, AttributeType attributeType) 1235 { 1236 for (ObjectClass objectClass : entryClasses) 1237 { 1238 if (objectClass.isRequired(attributeType)) 1239 { 1240 return true; 1241 } 1242 } 1243 return false; 1244 } 1245 1246 private static boolean isFractionalProhibited(AttributeType attrType) 1247 { 1248 String attributeName = attrType.getPrimaryName(); 1249 return (attributeName != null && isFractionalProhibitedAttr(attributeName)) 1250 || isFractionalProhibitedAttr(attrType.getOID()); 1251 } 1252 1253 private static boolean canRemoveAttribute(AttributeType attributeType, 1254 boolean fractionalExclusive, Set<String> fractionalConcernedAttributes) 1255 { 1256 String attributeName = attributeType.getPrimaryName(); 1257 String attributeOid = attributeType.getOID(); 1258 1259 // Is the current attribute part of the established list ? 1260 boolean foundAttribute = 1261 contains(fractionalConcernedAttributes, attributeName, attributeOid); 1262 // Now remove the attribute or modification if: 1263 // - exclusive mode and attribute is in configuration 1264 // - inclusive mode and attribute is not in configuration 1265 return canRemoveAttribute(fractionalExclusive, foundAttribute); 1266 } 1267 1268 private static boolean canRemoveAttribute(boolean fractionalExclusive, 1269 boolean foundAttribute) 1270 { 1271 return (foundAttribute && fractionalExclusive) 1272 || (!foundAttribute && !fractionalExclusive); 1273 } 1274 1275 private static boolean contains(Set<String> attrNames, String attrName, 1276 String attrOID) 1277 { 1278 return attrNames.contains(attrOID) 1279 || (attrName != null && attrNames.contains(attrName.toLowerCase())); 1280 } 1281 1282 /** 1283 * Prepares a list of attributes of interest for the fractional feature. 1284 * @param fractionalConfig The fractional configuration to use 1285 * @param entryObjectClasses The object classes of an entry on which an 1286 * operation is going to be performed. 1287 * @return The list of attributes of the entry to be excluded/included 1288 * when the operation will be performed. 1289 */ 1290 private static Set<String> createFractionalConcernedAttrList( 1291 FractionalConfig fractionalConfig, Set<ObjectClass> entryObjectClasses) 1292 { 1293 /* 1294 * Is the concerned entry of a type concerned by fractional replication 1295 * configuration ? If yes, add the matching attribute names to a set of 1296 * attributes to take into account for filtering 1297 * (inclusive or exclusive mode). 1298 * Using a Set to avoid duplicate attributes (from 2 inheriting classes for 1299 * instance) 1300 */ 1301 Set<String> fractionalConcernedAttributes = new HashSet<>(); 1302 1303 // Get object classes the entry matches 1304 Set<String> fractionalAllClassesAttributes = 1305 fractionalConfig.getFractionalAllClassesAttributes(); 1306 Map<String, Set<String>> fractionalSpecificClassesAttributes = 1307 fractionalConfig.getFractionalSpecificClassesAttributes(); 1308 1309 Set<String> fractionalClasses = 1310 fractionalSpecificClassesAttributes.keySet(); 1311 for (ObjectClass entryObjectClass : entryObjectClasses) 1312 { 1313 for(String fractionalClass : fractionalClasses) 1314 { 1315 if (entryObjectClass.hasNameOrOID(fractionalClass.toLowerCase())) 1316 { 1317 fractionalConcernedAttributes.addAll( 1318 fractionalSpecificClassesAttributes.get(fractionalClass)); 1319 } 1320 } 1321 } 1322 1323 // Add to the set any attribute which is class independent 1324 fractionalConcernedAttributes.addAll(fractionalAllClassesAttributes); 1325 1326 return fractionalConcernedAttributes; 1327 } 1328 1329 /** 1330 * If fractional replication is enabled, this analyzes the operation and 1331 * suppresses the forbidden attributes in it so that they are not added/ 1332 * deleted/modified in the local backend. 1333 * 1334 * @param modifyOperation The operation to modify based on fractional 1335 * replication configuration 1336 * @param performFiltering Tells if the effective attribute filtering should 1337 * be performed or if the call is just to analyze if there are some 1338 * attributes filtered by fractional configuration 1339 * @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES, 1340 * FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP 1341 */ 1342 private int fractionalFilterOperation(PreOperationModifyOperation 1343 modifyOperation, boolean performFiltering) 1344 { 1345 /* 1346 * Prepare a list of attributes to be included/excluded according to the 1347 * fractional replication configuration 1348 */ 1349 1350 Entry modifiedEntry = modifyOperation.getCurrentEntry(); 1351 Set<String> fractionalConcernedAttributes = 1352 createFractionalConcernedAttrList(fractionalConfig, 1353 modifiedEntry.getObjectClasses().keySet()); 1354 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1355 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1356 { 1357 // No attributes to filter 1358 return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1359 } 1360 1361 // Prepare list of object classes of the modified entry 1362 DN entryToModifyDn = modifyOperation.getEntryDN(); 1363 Entry entryToModify; 1364 try 1365 { 1366 entryToModify = DirectoryServer.getEntry(entryToModifyDn); 1367 } 1368 catch(DirectoryException e) 1369 { 1370 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 1371 return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1372 } 1373 Set<ObjectClass> entryClasses = entryToModify.getObjectClasses().keySet(); 1374 1375 /* 1376 * Now go through the attribute modifications and filter the mods according 1377 * to the fractional configuration (using the just established concerned 1378 * attributes list): 1379 * - delete attributes: remove them if regarding a filtered attribute 1380 * - add attributes: remove them if regarding a filtered attribute 1381 * - modify attributes: remove them if regarding a filtered attribute 1382 */ 1383 1384 int result = FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1385 List<Modification> mods = modifyOperation.getModifications(); 1386 Iterator<Modification> modsIt = mods.iterator(); 1387 while (modsIt.hasNext()) 1388 { 1389 Modification mod = modsIt.next(); 1390 Attribute attr = mod.getAttribute(); 1391 AttributeType attrType = attr.getAttributeType(); 1392 // Fractional replication ignores operational attributes 1393 if (attrType.isOperational() 1394 || isMandatoryAttribute(entryClasses, attrType) 1395 || isFractionalProhibited(attrType) 1396 || !canRemoveAttribute(attrType, fractionalExclusive, 1397 fractionalConcernedAttributes)) 1398 { 1399 continue; 1400 } 1401 1402 if (!performFiltering) 1403 { 1404 // The call was just to check : at least one attribute to filter 1405 // found, return immediately the answer; 1406 return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES; 1407 } 1408 1409 // Found a modification to remove, remove it from the list. 1410 modsIt.remove(); 1411 result = FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES; 1412 if (mods.isEmpty()) 1413 { 1414 // This operation must become a no-op as no more modification in it 1415 return FRACTIONAL_BECOME_NO_OP; 1416 } 1417 } 1418 1419 return result; 1420 } 1421 1422 /** 1423 * This is overwritten to allow stopping the (online) import process by the 1424 * fractional ldif import plugin when it detects that the (imported) remote 1425 * data set is not consistent with the local fractional configuration. 1426 * {@inheritDoc} 1427 */ 1428 @Override 1429 protected byte[] receiveEntryBytes() 1430 { 1431 if (isFollowImport()) 1432 { 1433 // Ok, next entry is allowed to be received 1434 return super.receiveEntryBytes(); 1435 } 1436 1437 // Fractional ldif import plugin detected inconsistency between local and 1438 // remote server fractional configuration and is stopping the import 1439 // process: 1440 // This is an error termination during the import 1441 // The error is stored and the import is ended by returning null 1442 final ImportExportContext ieCtx = getImportExportContext(); 1443 LocalizableMessage msg = null; 1444 switch (importErrorMessageId) 1445 { 1446 case IMPORT_ERROR_MESSAGE_BAD_REMOTE: 1447 msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get(getBaseDN(), ieCtx.getImportSource()); 1448 break; 1449 case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL: 1450 msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get(getBaseDN(), ieCtx.getImportSource()); 1451 break; 1452 } 1453 ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg)); 1454 return null; 1455 } 1456 1457 /** 1458 * This is overwritten to allow stopping the (online) export process if the 1459 * local domain is fractional and the destination is all other servers: 1460 * This make no sense to have only fractional servers in a replicated 1461 * topology. This prevents from administrator manipulation error that would 1462 * lead to whole topology data corruption. 1463 * {@inheritDoc} 1464 */ 1465 @Override 1466 protected void initializeRemote(int target, int requestorID, 1467 Task initTask, int initWindow) throws DirectoryException 1468 { 1469 if (target == RoutableMsg.ALL_SERVERS && fractionalConfig.isFractional()) 1470 { 1471 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL.get(getBaseDN(), getServerId()); 1472 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg); 1473 } 1474 1475 super.initializeRemote(target, requestorID, initTask, initWindow); 1476 } 1477 1478 /** 1479 * Implement the handleConflictResolution phase of the deleteOperation. 1480 * 1481 * @param deleteOperation The deleteOperation. 1482 * @return A SynchronizationProviderResult indicating if the operation 1483 * can continue. 1484 */ 1485 SynchronizationProviderResult handleConflictResolution( 1486 PreOperationDeleteOperation deleteOperation) 1487 { 1488 if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected()) 1489 { 1490 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1491 return new SynchronizationProviderResult.StopProcessing( 1492 ResultCode.UNWILLING_TO_PERFORM, msg); 1493 } 1494 1495 DeleteContext ctx = 1496 (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT); 1497 Entry deletedEntry = deleteOperation.getEntryToDelete(); 1498 1499 if (ctx != null) 1500 { 1501 /* 1502 * This is a replication operation 1503 * Check that the modified entry has the same entryuuid 1504 * as it was in the original message. 1505 */ 1506 String operationEntryUUID = ctx.getEntryUUID(); 1507 String deletedEntryUUID = getEntryUUID(deletedEntry); 1508 if (!operationEntryUUID.equals(deletedEntryUUID)) 1509 { 1510 /* 1511 * The changes entry is not the same entry as the one on 1512 * the original change was performed. 1513 * Probably the original entry was renamed and replaced with 1514 * another entry. 1515 * We must not let the change proceed, return a negative 1516 * result and set the result code to NO_SUCH_OBJECT. 1517 * When the operation will return, the thread that started the operation 1518 * will try to find the correct entry and restart a new operation. 1519 */ 1520 return new SynchronizationProviderResult.StopProcessing( 1521 ResultCode.NO_SUCH_OBJECT, null); 1522 } 1523 } 1524 else 1525 { 1526 // There is no replication context attached to the operation 1527 // so this is not a replication operation. 1528 CSN csn = generateCSN(deleteOperation); 1529 String modifiedEntryUUID = getEntryUUID(deletedEntry); 1530 ctx = new DeleteContext(csn, modifiedEntryUUID); 1531 deleteOperation.setAttachment(SYNCHROCONTEXT, ctx); 1532 1533 synchronized (replayOperations) 1534 { 1535 int size = replayOperations.size(); 1536 if (size >= 10000) 1537 { 1538 replayOperations.remove(replayOperations.firstKey()); 1539 } 1540 FakeOperation op = new FakeDelOperation( 1541 deleteOperation.getEntryDN(), csn, modifiedEntryUUID); 1542 replayOperations.put(csn, op); 1543 } 1544 } 1545 1546 return new SynchronizationProviderResult.ContinueProcessing(); 1547 } 1548 1549 /** 1550 * Implement the handleConflictResolution phase of the addOperation. 1551 * 1552 * @param addOperation The AddOperation. 1553 * @return A SynchronizationProviderResult indicating if the operation 1554 * can continue. 1555 */ 1556 SynchronizationProviderResult handleConflictResolution( 1557 PreOperationAddOperation addOperation) 1558 { 1559 if (!addOperation.isSynchronizationOperation() && !brokerIsConnected()) 1560 { 1561 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1562 return new SynchronizationProviderResult.StopProcessing( 1563 ResultCode.UNWILLING_TO_PERFORM, msg); 1564 } 1565 1566 if (fractionalConfig.isFractional()) 1567 { 1568 if (addOperation.isSynchronizationOperation()) 1569 { 1570 /* 1571 * Filter attributes here for fractional replication. If fractional 1572 * replication is enabled, we analyze the operation to suppress the 1573 * forbidden attributes in it so that they are not added in the local 1574 * backend. This must be called before any other plugin is called, to 1575 * keep coherency across plugin calls. 1576 */ 1577 fractionalFilterOperation(addOperation, true); 1578 } 1579 else 1580 { 1581 /* 1582 * Direct access from an LDAP client : if some attributes are to be 1583 * removed according to the fractional configuration, simply forbid 1584 * the operation 1585 */ 1586 if (fractionalFilterOperation(addOperation, false)) 1587 { 1588 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), addOperation); 1589 return new SynchronizationProviderResult.StopProcessing( 1590 ResultCode.UNWILLING_TO_PERFORM, msg); 1591 } 1592 } 1593 } 1594 1595 if (addOperation.isSynchronizationOperation()) 1596 { 1597 AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT); 1598 /* 1599 * If an entry with the same entry uniqueID already exist then 1600 * this operation has already been replayed in the past. 1601 */ 1602 String uuid = ctx.getEntryUUID(); 1603 if (findEntryDN(uuid) != null) 1604 { 1605 return new SynchronizationProviderResult.StopProcessing( 1606 ResultCode.NO_OPERATION, null); 1607 } 1608 1609 /* The parent entry may have been renamed here since the change was done 1610 * on the first server, and another entry have taken the former dn 1611 * of the parent entry 1612 */ 1613 1614 String parentEntryUUID = ctx.getParentEntryUUID(); 1615 // root entry have no parent, there is no need to check for it. 1616 if (parentEntryUUID != null) 1617 { 1618 // There is a potential of perfs improvement here 1619 // if we could avoid the following parent entry retrieval 1620 DN parentDnFromCtx = findEntryDN(ctx.getParentEntryUUID()); 1621 if (parentDnFromCtx == null) 1622 { 1623 // The parent does not exist with the specified unique id 1624 // stop the operation with NO_SUCH_OBJECT and let the 1625 // conflict resolution or the dependency resolution solve this. 1626 return new SynchronizationProviderResult.StopProcessing( 1627 ResultCode.NO_SUCH_OBJECT, null); 1628 } 1629 1630 DN entryDN = addOperation.getEntryDN(); 1631 DN parentDnFromEntryDn = entryDN.getParentDNInSuffix(); 1632 if (parentDnFromEntryDn != null 1633 && !parentDnFromCtx.equals(parentDnFromEntryDn)) 1634 { 1635 // parentEntry has been renamed 1636 // replication name conflict resolution is expected to fix that 1637 // later in the flow 1638 return new SynchronizationProviderResult.StopProcessing( 1639 ResultCode.NO_SUCH_OBJECT, null); 1640 } 1641 } 1642 } 1643 return new SynchronizationProviderResult.ContinueProcessing(); 1644 } 1645 1646 /** 1647 * Check that the broker associated to this ReplicationDomain has found 1648 * a Replication Server and that this LDAP server is therefore able to 1649 * process operations. 1650 * If not, set the ResultCode, the response message, 1651 * interrupt the operation, and return false 1652 * 1653 * @return true when it OK to process the Operation, false otherwise. 1654 * When false is returned the resultCode and the response message 1655 * is also set in the Operation. 1656 */ 1657 private boolean brokerIsConnected() 1658 { 1659 final IsolationPolicy isolationPolicy = config.getIsolationPolicy(); 1660 if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) 1661 { 1662 // this policy imply that we always accept updates. 1663 return true; 1664 } 1665 if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) 1666 { 1667 // this isolation policy specifies that the updates are denied 1668 // when the broker had problems during the connection phase 1669 // Updates are still accepted if the broker is currently connecting.. 1670 return !hasConnectionError(); 1671 } 1672 // we should never get there as the only possible policies are 1673 // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES 1674 return true; 1675 } 1676 1677 /** 1678 * Implement the handleConflictResolution phase of the ModifyDNOperation. 1679 * 1680 * @param modifyDNOperation The ModifyDNOperation. 1681 * @return A SynchronizationProviderResult indicating if the operation 1682 * can continue. 1683 */ 1684 SynchronizationProviderResult handleConflictResolution( 1685 PreOperationModifyDNOperation modifyDNOperation) 1686 { 1687 if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected()) 1688 { 1689 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1690 return new SynchronizationProviderResult.StopProcessing( 1691 ResultCode.UNWILLING_TO_PERFORM, msg); 1692 } 1693 1694 if (fractionalConfig.isFractional()) 1695 { 1696 if (modifyDNOperation.isSynchronizationOperation()) 1697 { 1698 /* 1699 * Filter operation here for fractional replication. If fractional 1700 * replication is enabled, we analyze the operation and modify it if 1701 * necessary to stay consistent with what is defined in fractional 1702 * configuration. 1703 */ 1704 fractionalFilterOperation(modifyDNOperation, true); 1705 } 1706 else 1707 { 1708 /* 1709 * Direct access from an LDAP client : something is inconsistent with 1710 * the fractional configuration, forbid the operation. 1711 */ 1712 if (fractionalFilterOperation(modifyDNOperation, false)) 1713 { 1714 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyDNOperation); 1715 return new SynchronizationProviderResult.StopProcessing( 1716 ResultCode.UNWILLING_TO_PERFORM, msg); 1717 } 1718 } 1719 } 1720 1721 ModifyDnContext ctx = 1722 (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT); 1723 if (ctx != null) 1724 { 1725 /* 1726 * This is a replication operation 1727 * Check that the modified entry has the same entryuuid 1728 * as was in the original message. 1729 */ 1730 final String modifiedEntryUUID = 1731 getEntryUUID(modifyDNOperation.getOriginalEntry()); 1732 if (!modifiedEntryUUID.equals(ctx.getEntryUUID())) 1733 { 1734 /* 1735 * The modified entry is not the same entry as the one on 1736 * the original change was performed. 1737 * Probably the original entry was renamed and replaced with 1738 * another entry. 1739 * We must not let the change proceed, return a negative 1740 * result and set the result code to NO_SUCH_OBJECT. 1741 * When the operation will return, the thread that started the operation 1742 * will try to find the correct entry and restart a new operation. 1743 */ 1744 return new SynchronizationProviderResult.StopProcessing( 1745 ResultCode.NO_SUCH_OBJECT, null); 1746 } 1747 1748 if (modifyDNOperation.getNewSuperior() != null) 1749 { 1750 /* 1751 * Also check that the current id of the 1752 * parent is the same as when the operation was performed. 1753 */ 1754 String newParentId = findEntryUUID(modifyDNOperation.getNewSuperior()); 1755 if (newParentId != null && ctx.getNewSuperiorEntryUUID() != null 1756 && !newParentId.equals(ctx.getNewSuperiorEntryUUID())) 1757 { 1758 return new SynchronizationProviderResult.StopProcessing( 1759 ResultCode.NO_SUCH_OBJECT, null); 1760 } 1761 } 1762 1763 /* 1764 * If the object has been renamed more recently than this 1765 * operation, cancel the operation. 1766 */ 1767 EntryHistorical hist = EntryHistorical.newInstanceFromEntry( 1768 modifyDNOperation.getOriginalEntry()); 1769 if (hist.addedOrRenamedAfter(ctx.getCSN())) 1770 { 1771 return new SynchronizationProviderResult.StopProcessing( 1772 ResultCode.NO_OPERATION, null); 1773 } 1774 } 1775 else 1776 { 1777 // There is no replication context attached to the operation 1778 // so this is not a replication operation. 1779 CSN csn = generateCSN(modifyDNOperation); 1780 String newParentId = null; 1781 if (modifyDNOperation.getNewSuperior() != null) 1782 { 1783 newParentId = findEntryUUID(modifyDNOperation.getNewSuperior()); 1784 } 1785 1786 Entry modifiedEntry = modifyDNOperation.getOriginalEntry(); 1787 String modifiedEntryUUID = getEntryUUID(modifiedEntry); 1788 ctx = new ModifyDnContext(csn, modifiedEntryUUID, newParentId); 1789 modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx); 1790 } 1791 return new SynchronizationProviderResult.ContinueProcessing(); 1792 } 1793 1794 /** 1795 * Handle the conflict resolution. 1796 * Called by the core server after locking the entry and before 1797 * starting the actual modification. 1798 * @param modifyOperation the operation 1799 * @return code indicating is operation must proceed 1800 */ 1801 SynchronizationProviderResult handleConflictResolution( 1802 PreOperationModifyOperation modifyOperation) 1803 { 1804 if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected()) 1805 { 1806 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1807 return new SynchronizationProviderResult.StopProcessing( 1808 ResultCode.UNWILLING_TO_PERFORM, msg); 1809 } 1810 1811 if (fractionalConfig.isFractional()) 1812 { 1813 if (modifyOperation.isSynchronizationOperation()) 1814 { 1815 /* 1816 * Filter attributes here for fractional replication. If fractional 1817 * replication is enabled, we analyze the operation and modify it so 1818 * that no forbidden attribute is added/modified/deleted in the local 1819 * backend. This must be called before any other plugin is called, to 1820 * keep coherency across plugin calls. 1821 */ 1822 if (fractionalFilterOperation(modifyOperation, true) == 1823 FRACTIONAL_BECOME_NO_OP) 1824 { 1825 // Every modifications filtered in this operation: the operation 1826 // becomes a no-op 1827 return new SynchronizationProviderResult.StopProcessing( 1828 ResultCode.NO_OPERATION, null); 1829 } 1830 } 1831 else 1832 { 1833 /* 1834 * Direct access from an LDAP client : if some attributes are to be 1835 * removed according to the fractional configuration, simply forbid 1836 * the operation 1837 */ 1838 switch(fractionalFilterOperation(modifyOperation, false)) 1839 { 1840 case FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES: 1841 // Ok, let the operation happen 1842 break; 1843 case FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES: 1844 // Some attributes not compliant with fractional configuration : 1845 // forbid the operation 1846 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyOperation); 1847 return new SynchronizationProviderResult.StopProcessing( 1848 ResultCode.UNWILLING_TO_PERFORM, msg); 1849 } 1850 } 1851 } 1852 1853 ModifyContext ctx = 1854 (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT); 1855 1856 Entry modifiedEntry = modifyOperation.getModifiedEntry(); 1857 if (ctx == null) 1858 { 1859 // No replication ctx attached => not a replicated operation 1860 // - create a ctx with : CSN, entryUUID 1861 // - attach the context to the op 1862 1863 CSN csn = generateCSN(modifyOperation); 1864 ctx = new ModifyContext(csn, getEntryUUID(modifiedEntry)); 1865 1866 modifyOperation.setAttachment(SYNCHROCONTEXT, ctx); 1867 } 1868 else 1869 { 1870 // Replication ctx attached => this is a replicated operation being 1871 // replayed here, it is necessary to 1872 // - check if the entry has been renamed 1873 // - check for conflicts 1874 String modifiedEntryUUID = ctx.getEntryUUID(); 1875 String currentEntryUUID = getEntryUUID(modifiedEntry); 1876 if (currentEntryUUID != null 1877 && !currentEntryUUID.equals(modifiedEntryUUID)) 1878 { 1879 /* 1880 * The current modified entry is not the same entry as the one on 1881 * the original modification was performed. 1882 * Probably the original entry was renamed and replaced with 1883 * another entry. 1884 * We must not let the modification proceed, return a negative 1885 * result and set the result code to NO_SUCH_OBJECT. 1886 * When the operation will return, the thread that started the 1887 * operation will try to find the correct entry and restart a new 1888 * operation. 1889 */ 1890 return new SynchronizationProviderResult.StopProcessing( 1891 ResultCode.NO_SUCH_OBJECT, null); 1892 } 1893 1894 // Solve the conflicts between modify operations 1895 EntryHistorical historicalInformation = 1896 EntryHistorical.newInstanceFromEntry(modifiedEntry); 1897 modifyOperation.setAttachment(EntryHistorical.HISTORICAL, 1898 historicalInformation); 1899 1900 if (historicalInformation.replayOperation(modifyOperation, modifiedEntry)) 1901 { 1902 numResolvedModifyConflicts.incrementAndGet(); 1903 } 1904 } 1905 return new SynchronizationProviderResult.ContinueProcessing(); 1906 } 1907 1908 /** 1909 * The preOperation phase for the add Operation. 1910 * Its job is to generate the replication context associated to the 1911 * operation. It is necessary to do it in this phase because contrary to 1912 * the other operations, the entry UUID is not set when the handleConflict 1913 * phase is called. 1914 * 1915 * @param addOperation The Add Operation. 1916 */ 1917 void doPreOperation(PreOperationAddOperation addOperation) 1918 { 1919 final CSN csn = generateCSN(addOperation); 1920 final String entryUUID = getEntryUUID(addOperation); 1921 final AddContext ctx = new AddContext(csn, entryUUID, 1922 findEntryUUID(addOperation.getEntryDN().getParentDNInSuffix())); 1923 addOperation.setAttachment(SYNCHROCONTEXT, ctx); 1924 } 1925 1926 @Override 1927 public void publishReplicaOfflineMsg() 1928 { 1929 pendingChanges.putReplicaOfflineMsg(); 1930 dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN()); 1931 } 1932 1933 /** 1934 * Check if an operation must be synchronized. 1935 * Also update the list of pending changes and the server RUV 1936 * @param op the operation 1937 */ 1938 void synchronize(PostOperationOperation op) 1939 { 1940 ResultCode result = op.getResultCode(); 1941 // Note that a failed non-replication operation might not have a change 1942 // number. 1943 CSN curCSN = OperationContext.getCSN(op); 1944 if (curCSN != null && config.isLogChangenumber()) 1945 { 1946 op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(), 1947 "replicationCSN", curCSN)); 1948 } 1949 1950 if (result == ResultCode.SUCCESS) 1951 { 1952 if (op.isSynchronizationOperation()) 1953 { // Replaying a sync operation 1954 numReplayedPostOpCalled.incrementAndGet(); 1955 try 1956 { 1957 remotePendingChanges.commit(curCSN); 1958 } 1959 catch (NoSuchElementException e) 1960 { 1961 logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN); 1962 return; 1963 } 1964 } 1965 else 1966 { 1967 // Generate a replication message for a successful non-replication 1968 // operation. 1969 LDAPUpdateMsg msg = LDAPUpdateMsg.generateMsg(op); 1970 1971 if (msg == null) 1972 { 1973 /* 1974 * This is an operation type that we do not know about 1975 * It should never happen. 1976 */ 1977 pendingChanges.remove(curCSN); 1978 logger.error(ERR_UNKNOWN_TYPE, op.getOperationType()); 1979 return; 1980 } 1981 1982 addEntryAttributesForCL(msg,op); 1983 1984 // If assured replication is configured, this will prepare blocking 1985 // mechanism. If assured replication is disabled, this returns 1986 // immediately 1987 prepareWaitForAckIfAssuredEnabled(msg); 1988 try 1989 { 1990 msg.encode(); 1991 pendingChanges.commitAndPushCommittedChanges(curCSN, msg); 1992 } 1993 catch (NoSuchElementException e) 1994 { 1995 logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN); 1996 return; 1997 } 1998 // If assured replication is enabled, this will wait for the matching 1999 // ack or time out. If assured replication is disabled, this returns 2000 // immediately 2001 try 2002 { 2003 waitForAckIfAssuredEnabled(msg); 2004 } catch (TimeoutException ex) 2005 { 2006 // This exception may only be raised if assured replication is enabled 2007 logger.info(NOTE_DS_ACK_TIMEOUT, getBaseDN(), getAssuredTimeout(), msg); 2008 } 2009 } 2010 2011 /* 2012 * If the operation is a DELETE on the base entry of the suffix 2013 * that is replicated, the generation is now lost because the 2014 * DB is empty. We need to save it again the next time we add an entry. 2015 */ 2016 if (OperationType.DELETE.equals(op.getOperationType()) 2017 && ((PostOperationDeleteOperation) op) 2018 .getEntryDN().equals(getBaseDN())) 2019 { 2020 generationIdSavedStatus = false; 2021 } 2022 2023 if (!generationIdSavedStatus) 2024 { 2025 saveGenerationId(generationId); 2026 } 2027 } 2028 else if (!op.isSynchronizationOperation() && curCSN != null) 2029 { 2030 // Remove an unsuccessful non-replication operation from the pending 2031 // changes list. 2032 pendingChanges.remove(curCSN); 2033 pendingChanges.pushCommittedChanges(); 2034 } 2035 2036 checkForClearedConflict(op); 2037 } 2038 2039 /** 2040 * Check if the operation that just happened has cleared a conflict : 2041 * Clearing a conflict happens if the operation has free a DN that 2042 * for which an other entry was in conflict. 2043 * Steps: 2044 * - get the DN freed by a DELETE or MODRDN op 2045 * - search for entries put in the conflict space (dn=entryUUID'+'....) 2046 * because the expected DN was not available (ds-sync-conflict=expected DN) 2047 * - retain the entry with the oldest conflict 2048 * - rename this entry with the freedDN as it was expected originally 2049 */ 2050 private void checkForClearedConflict(PostOperationOperation op) 2051 { 2052 OperationType type = op.getOperationType(); 2053 if (op.getResultCode() != ResultCode.SUCCESS) 2054 { 2055 // those operations cannot have cleared a conflict 2056 return; 2057 } 2058 2059 DN freedDN; 2060 if (type == OperationType.DELETE) 2061 { 2062 freedDN = ((PostOperationDeleteOperation) op).getEntryDN(); 2063 } 2064 else if (type == OperationType.MODIFY_DN) 2065 { 2066 freedDN = ((PostOperationModifyDNOperation) op).getEntryDN(); 2067 } 2068 else 2069 { 2070 return; 2071 } 2072 2073 SearchFilter filter; 2074 try 2075 { 2076 filter = LDAPFilter.createEqualityFilter(DS_SYNC_CONFLICT, 2077 ByteString.valueOf(freedDN.toString())).toSearchFilter(); 2078 } 2079 catch (DirectoryException e) 2080 { 2081 // can not happen? 2082 logger.traceException(e); 2083 return; 2084 } 2085 2086 SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter) 2087 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS); 2088 InternalSearchOperation searchOp = conn.processSearch(request); 2089 2090 Entry entryToRename = null; 2091 CSN entryToRenameCSN = null; 2092 for (SearchResultEntry entry : searchOp.getSearchEntries()) 2093 { 2094 EntryHistorical history = EntryHistorical.newInstanceFromEntry(entry); 2095 if (entryToRename == null) 2096 { 2097 entryToRename = entry; 2098 entryToRenameCSN = history.getDNDate(); 2099 } 2100 else if (!history.addedOrRenamedAfter(entryToRenameCSN)) 2101 { 2102 // this conflict is older than the previous, keep it. 2103 entryToRename = entry; 2104 entryToRenameCSN = history.getDNDate(); 2105 } 2106 } 2107 2108 if (entryToRename != null) 2109 { 2110 DN entryDN = entryToRename.getName(); 2111 ModifyDNOperation newOp = renameEntry( 2112 entryDN, freedDN.rdn(), freedDN.parent(), false); 2113 2114 ResultCode res = newOp.getResultCode(); 2115 if (res != ResultCode.SUCCESS) 2116 { 2117 logger.error(ERR_COULD_NOT_SOLVE_CONFLICT, entryDN, res); 2118 } 2119 } 2120 } 2121 2122 /** 2123 * Rename an Entry Using a synchronization, non-replicated operation. 2124 * This method should be used instead of the InternalConnection methods 2125 * when the operation that need to be run must be local only and therefore 2126 * not replicated to the RS. 2127 * 2128 * @param targetDN The DN of the entry to rename. 2129 * @param newRDN The new RDN to be used. 2130 * @param parentDN The parentDN to be used. 2131 * @param markConflict A boolean indicating is this entry should be marked 2132 * as a conflicting entry. In such case the 2133 * DS_SYNC_CONFLICT attribute will be added to the entry 2134 * with the value of its original DN. 2135 * If false, the DS_SYNC_CONFLICT attribute will be 2136 * cleared. 2137 * 2138 * @return The operation that was run to rename the entry. 2139 */ 2140 private ModifyDNOperation renameEntry(DN targetDN, RDN newRDN, DN parentDN, 2141 boolean markConflict) 2142 { 2143 ModifyDNOperation newOp = new ModifyDNOperationBasis( 2144 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 2145 targetDN, newRDN, false, parentDN); 2146 2147 AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(DS_SYNC_CONFLICT); 2148 if (markConflict) 2149 { 2150 Attribute attr = 2151 Attributes.create(attrType, targetDN.toString()); 2152 newOp.addModification(new Modification(ModificationType.REPLACE, attr)); 2153 } 2154 else 2155 { 2156 Attribute attr = Attributes.empty(attrType); 2157 newOp.addModification(new Modification(ModificationType.DELETE, attr)); 2158 } 2159 2160 runAsSynchronizedOperation(newOp); 2161 return newOp; 2162 } 2163 2164 private void runAsSynchronizedOperation(Operation op) 2165 { 2166 op.setInternalOperation(true); 2167 op.setSynchronizationOperation(true); 2168 op.setDontSynchronize(true); 2169 op.run(); 2170 } 2171 2172 /** Delete this ReplicationDomain. */ 2173 void delete() 2174 { 2175 shutdown(); 2176 removeECLDomainCfg(); 2177 } 2178 2179 /** Shutdown this ReplicationDomain. */ 2180 public void shutdown() 2181 { 2182 if (shutdown.compareAndSet(false, true)) 2183 { 2184 final RSUpdater rsUpdater = this.rsUpdater.get(); 2185 if (rsUpdater != null) 2186 { 2187 rsUpdater.initiateShutdown(); 2188 } 2189 2190 // stop the thread in charge of flushing the ServerState. 2191 if (flushThread != null) 2192 { 2193 flushThread.initiateShutdown(); 2194 synchronized (flushThread) 2195 { 2196 flushThread.notify(); 2197 } 2198 } 2199 2200 DirectoryServer.deregisterAlertGenerator(this); 2201 2202 // stop the ReplicationDomain 2203 disableService(); 2204 } 2205 2206 // wait for completion of the ServerStateFlush thread. 2207 try 2208 { 2209 while (!done) 2210 { 2211 Thread.sleep(50); 2212 } 2213 } catch (InterruptedException e) 2214 { 2215 Thread.currentThread().interrupt(); 2216 } 2217 } 2218 2219 /** 2220 * Create and replay a synchronized Operation from an UpdateMsg. 2221 * 2222 * @param msg 2223 * The UpdateMsg to be replayed. 2224 * @param shutdown 2225 * whether the server initiated shutdown 2226 */ 2227 void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown) 2228 { 2229 // Try replay the operation, then flush (replaying) any pending operation 2230 // whose dependency has been replayed until no more left. 2231 do 2232 { 2233 Operation op = null; // the last operation on which replay was attempted 2234 boolean dependency = false; 2235 String replayErrorMsg = null; 2236 CSN csn = null; 2237 try 2238 { 2239 // The next operation for which to attempt replay. 2240 // This local variable allow to keep error messages in the "op" local 2241 // variable until the next loop iteration starts. 2242 // "op" is already initialized to the next Operation because of the 2243 // error handling paths. 2244 Operation nextOp = op = msg.createOperation(conn); 2245 dependency = remotePendingChanges.checkDependencies(op, msg); 2246 2247 boolean replayDone = false; 2248 int retryCount = 10; 2249 while (!dependency && !replayDone && retryCount-- > 0) 2250 { 2251 if (shutdown.get()) 2252 { 2253 // shutdown initiated, let's leave 2254 return; 2255 } 2256 // Try replay the operation 2257 op = nextOp; 2258 op.setInternalOperation(true); 2259 op.setSynchronizationOperation(true); 2260 2261 // Always add the ManageDSAIT control so that updates to referrals 2262 // are processed locally. 2263 op.addRequestControl(new LDAPControl(OID_MANAGE_DSAIT_CONTROL)); 2264 2265 csn = OperationContext.getCSN(op); 2266 op.run(); 2267 2268 ResultCode result = op.getResultCode(); 2269 2270 if (result != ResultCode.SUCCESS) 2271 { 2272 if (result == ResultCode.NO_OPERATION) 2273 { 2274 // Pre-operation conflict resolution detected that the operation 2275 // was a no-op. For example, an add which has already been 2276 // replayed, or a modify DN operation on an entry which has been 2277 // renamed by a more recent modify DN. 2278 replayDone = true; 2279 } 2280 else if (result == ResultCode.BUSY) 2281 { 2282 /* 2283 * We probably could not get a lock (OPENDJ-885). Give the server 2284 * another chance to process this operation immediately. 2285 */ 2286 Thread.yield(); 2287 continue; 2288 } 2289 else if (result == ResultCode.UNAVAILABLE) 2290 { 2291 /* 2292 * It can happen when a rebuild is performed or the backend is 2293 * offline (OPENDJ-49). Give the server another chance to process 2294 * this operation after some time. 2295 */ 2296 Thread.sleep(50); 2297 continue; 2298 } 2299 else if (op instanceof ModifyOperation) 2300 { 2301 ModifyOperation castOp = (ModifyOperation) op; 2302 dependency = remotePendingChanges.checkDependencies(castOp); 2303 ModifyMsg modifyMsg = (ModifyMsg) msg; 2304 replayDone = solveNamingConflict(castOp, modifyMsg); 2305 } 2306 else if (op instanceof DeleteOperation) 2307 { 2308 DeleteOperation castOp = (DeleteOperation) op; 2309 dependency = remotePendingChanges.checkDependencies(castOp); 2310 replayDone = solveNamingConflict(castOp, msg); 2311 } 2312 else if (op instanceof AddOperation) 2313 { 2314 AddOperation castOp = (AddOperation) op; 2315 AddMsg addMsg = (AddMsg) msg; 2316 dependency = remotePendingChanges.checkDependencies(castOp); 2317 replayDone = solveNamingConflict(castOp, addMsg); 2318 } 2319 else if (op instanceof ModifyDNOperation) 2320 { 2321 ModifyDNOperation castOp = (ModifyDNOperation) op; 2322 replayDone = solveNamingConflict(castOp, msg); 2323 } 2324 else 2325 { 2326 replayDone = true; // unknown type of operation ?! 2327 } 2328 2329 if (replayDone) 2330 { 2331 // the update became a dummy update and the result 2332 // of the conflict resolution phase is to do nothing. 2333 // however we still need to push this change to the serverState 2334 updateError(csn); 2335 } 2336 else 2337 { 2338 /* 2339 * Create a new operation reflecting the new state of the 2340 * UpdateMsg after conflict resolution modified it. 2341 * Note: When msg is a DeleteMsg, the DeleteOperation is properly 2342 * created with subtreeDelete request control when needed. 2343 */ 2344 nextOp = msg.createOperation(conn); 2345 } 2346 } 2347 else 2348 { 2349 replayDone = true; 2350 } 2351 } 2352 2353 if (!replayDone && !dependency) 2354 { 2355 // Continue with the next change but the servers could now become 2356 // inconsistent. 2357 // Let the repair tool know about this. 2358 final LocalizableMessage message = ERR_LOOP_REPLAYING_OPERATION.get( 2359 op, op.getErrorMessage()); 2360 logger.error(message); 2361 numUnresolvedNamingConflicts.incrementAndGet(); 2362 replayErrorMsg = message.toString(); 2363 updateError(csn); 2364 } 2365 } catch (DecodeException | LDAPException | DataFormatException e) 2366 { 2367 replayErrorMsg = logDecodingOperationError(msg, e); 2368 } catch (Exception e) 2369 { 2370 if (csn != null) 2371 { 2372 /* 2373 * An Exception happened during the replay process. 2374 * Continue with the next change but the servers will now start 2375 * to be inconsistent. 2376 * Let the repair tool know about this. 2377 */ 2378 LocalizableMessage message = 2379 ERR_EXCEPTION_REPLAYING_OPERATION.get( 2380 stackTraceToSingleLineString(e), op); 2381 logger.error(message); 2382 replayErrorMsg = message.toString(); 2383 updateError(csn); 2384 } else 2385 { 2386 replayErrorMsg = logDecodingOperationError(msg, e); 2387 } 2388 } finally 2389 { 2390 if (!dependency) 2391 { 2392 processUpdateDone(msg, replayErrorMsg); 2393 } 2394 } 2395 2396 // Now replay any pending update that had a dependency and whose 2397 // dependency has been replayed, do that until no more updates of that 2398 // type left... 2399 msg = remotePendingChanges.getNextUpdate(); 2400 } while (msg != null); 2401 } 2402 2403 private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e) 2404 { 2405 LocalizableMessage message = 2406 ERR_EXCEPTION_DECODING_OPERATION.get(msg + " " + stackTraceToSingleLineString(e)); 2407 logger.error(message); 2408 return message.toString(); 2409 } 2410 2411 /** 2412 * This method is called when an error happens while replaying 2413 * an operation. 2414 * It is necessary because the postOperation does not always get 2415 * called when error or Exceptions happen during the operation replay. 2416 * 2417 * @param csn the CSN of the operation with error. 2418 */ 2419 private void updateError(CSN csn) 2420 { 2421 try 2422 { 2423 remotePendingChanges.commit(csn); 2424 } 2425 catch (NoSuchElementException e) 2426 { 2427 // A failure occurred after the change had been removed from the pending 2428 // changes table. 2429 if (logger.isTraceEnabled()) 2430 { 2431 logger.trace( 2432 "LDAPReplicationDomain.updateError: Unable to find remote " 2433 + "pending change for CSN %s", csn); 2434 } 2435 } 2436 } 2437 2438 /** 2439 * Generate a new CSN and insert it in the pending list. 2440 * 2441 * @param operation 2442 * The operation for which the CSN must be generated. 2443 * @return The new CSN. 2444 */ 2445 private CSN generateCSN(PluginOperation operation) 2446 { 2447 return pendingChanges.putLocalOperation(operation); 2448 } 2449 2450 /** 2451 * Find the Unique Id of the entry with the provided DN by doing a 2452 * search of the entry and extracting its entryUUID from its attributes. 2453 * 2454 * @param dn The dn of the entry for which the unique Id is searched. 2455 * 2456 * @return The unique Id of the entry with the provided DN. 2457 */ 2458 static String findEntryUUID(DN dn) 2459 { 2460 if (dn == null) 2461 { 2462 return null; 2463 } 2464 final SearchRequest request = newSearchRequest(dn, SearchScope.BASE_OBJECT) 2465 .addAttribute(ENTRYUUID_ATTRIBUTE_NAME); 2466 final InternalSearchOperation search = getRootConnection().processSearch(request); 2467 final SearchResultEntry resultEntry = getFirstResult(search); 2468 if (resultEntry != null) 2469 { 2470 return getEntryUUID(resultEntry); 2471 } 2472 return null; 2473 } 2474 2475 private static SearchResultEntry getFirstResult(InternalSearchOperation search) 2476 { 2477 if (search.getResultCode() == ResultCode.SUCCESS) 2478 { 2479 final LinkedList<SearchResultEntry> results = search.getSearchEntries(); 2480 if (!results.isEmpty()) 2481 { 2482 return results.getFirst(); 2483 } 2484 } 2485 return null; 2486 } 2487 2488 /** 2489 * Find the current DN of an entry from its entry UUID. 2490 * 2491 * @param uuid the Entry Unique ID. 2492 * @return The current DN of the entry or null if there is no entry with 2493 * the specified UUID. 2494 */ 2495 private DN findEntryDN(String uuid) 2496 { 2497 try 2498 { 2499 final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, "entryuuid=" + uuid); 2500 InternalSearchOperation search = conn.processSearch(request); 2501 final SearchResultEntry resultEntry = getFirstResult(search); 2502 if (resultEntry != null) 2503 { 2504 return resultEntry.getName(); 2505 } 2506 } 2507 catch (DirectoryException e) 2508 { 2509 // never happens because the filter is always valid. 2510 } 2511 return null; 2512 } 2513 2514 /** 2515 * Solve a conflict detected when replaying a modify operation. 2516 * 2517 * @param op The operation that triggered the conflict detection. 2518 * @param msg The operation that triggered the conflict detection. 2519 * @return true if the process is completed, false if it must continue.. 2520 */ 2521 private boolean solveNamingConflict(ModifyOperation op, ModifyMsg msg) 2522 { 2523 ResultCode result = op.getResultCode(); 2524 ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT); 2525 String entryUUID = ctx.getEntryUUID(); 2526 2527 if (result == ResultCode.NO_SUCH_OBJECT) 2528 { 2529 /* 2530 * The operation is a modification but 2531 * the entry has been renamed on a different master in the same time. 2532 * search if the entry has been renamed, and return the new dn 2533 * of the entry. 2534 */ 2535 DN newDN = findEntryDN(entryUUID); 2536 if (newDN != null) 2537 { 2538 // There is an entry with the same unique id as this modify operation 2539 // replay the modify using the current dn of this entry. 2540 msg.setDN(newDN); 2541 numResolvedNamingConflicts.incrementAndGet(); 2542 return false; 2543 } 2544 else 2545 { 2546 // This entry does not exist anymore. 2547 // It has probably been deleted, stop the processing of this operation 2548 numResolvedNamingConflicts.incrementAndGet(); 2549 return true; 2550 } 2551 } 2552 else if (result == ResultCode.NOT_ALLOWED_ON_RDN) 2553 { 2554 DN currentDN = findEntryDN(entryUUID); 2555 RDN currentRDN; 2556 if (currentDN != null) 2557 { 2558 currentRDN = currentDN.rdn(); 2559 } 2560 else 2561 { 2562 // The entry does not exist anymore. 2563 numResolvedNamingConflicts.incrementAndGet(); 2564 return true; 2565 } 2566 2567 // The modify operation is trying to delete the value that is 2568 // currently used in the RDN. We need to alter the modify so that it does 2569 // not remove the current RDN value(s). 2570 2571 List<Modification> mods = op.getModifications(); 2572 for (Modification mod : mods) 2573 { 2574 AttributeType modAttrType = mod.getAttribute().getAttributeType(); 2575 if ((mod.getModificationType() == ModificationType.DELETE 2576 || mod.getModificationType() == ModificationType.REPLACE) 2577 && currentRDN.hasAttributeType(modAttrType)) 2578 { 2579 // the attribute can't be deleted because it is used in the RDN, 2580 // turn this operation is a replace with the current RDN value(s); 2581 mod.setModificationType(ModificationType.REPLACE); 2582 Attribute newAttribute = mod.getAttribute(); 2583 AttributeBuilder attrBuilder = new AttributeBuilder(newAttribute); 2584 attrBuilder.add(currentRDN.getAttributeValue(modAttrType)); 2585 mod.setAttribute(attrBuilder.toAttribute()); 2586 } 2587 } 2588 msg.setMods(mods); 2589 numResolvedNamingConflicts.incrementAndGet(); 2590 return false; 2591 } 2592 else 2593 { 2594 // The other type of errors can not be caused by naming conflicts. 2595 // Log a message for the repair tool. 2596 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2597 op, ctx.getCSN(), result, op.getErrorMessage()); 2598 return true; 2599 } 2600 } 2601 2602 /** 2603 * Solve a conflict detected when replaying a delete operation. 2604 * 2605 * @param op The operation that triggered the conflict detection. 2606 * @param msg The operation that triggered the conflict detection. 2607 * @return true if the process is completed, false if it must continue.. 2608 */ 2609 private boolean solveNamingConflict(DeleteOperation op, LDAPUpdateMsg msg) 2610 { 2611 ResultCode result = op.getResultCode(); 2612 DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); 2613 String entryUUID = ctx.getEntryUUID(); 2614 2615 if (result == ResultCode.NO_SUCH_OBJECT) 2616 { 2617 /* Find if the entry is still in the database. */ 2618 DN currentDN = findEntryDN(entryUUID); 2619 if (currentDN == null) 2620 { 2621 /* 2622 * The entry has already been deleted, either because this delete 2623 * has already been replayed or because another concurrent delete 2624 * has already done the job. 2625 * In any case, there is nothing more to do. 2626 */ 2627 numResolvedNamingConflicts.incrementAndGet(); 2628 return true; 2629 } 2630 else 2631 { 2632 // This entry has been renamed, replay the delete using its new DN. 2633 msg.setDN(currentDN); 2634 numResolvedNamingConflicts.incrementAndGet(); 2635 return false; 2636 } 2637 } 2638 else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF) 2639 { 2640 /* 2641 * This may happen when we replay a DELETE done on a master 2642 * but children of this entry have been added on another master. 2643 * 2644 * Rename all the children by adding entryuuid in dn and delete this entry. 2645 * 2646 * The action taken here must be consistent with the actions 2647 * done in the solveNamingConflict(AddOperation) method 2648 * when we are adding an entry whose parent entry has already been deleted. 2649 */ 2650 if (findAndRenameChild(op.getEntryDN(), op)) 2651 { 2652 numUnresolvedNamingConflicts.incrementAndGet(); 2653 } 2654 2655 return false; 2656 } 2657 else 2658 { 2659 // The other type of errors can not be caused by naming conflicts. 2660 // Log a message for the repair tool. 2661 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2662 op, ctx.getCSN(), result, op.getErrorMessage()); 2663 return true; 2664 } 2665 } 2666 2667/** 2668 * Solve a conflict detected when replaying a Modify DN operation. 2669 * 2670 * @param op The operation that triggered the conflict detection. 2671 * @param msg The operation that triggered the conflict detection. 2672 * @return true if the process is completed, false if it must continue. 2673 * @throws Exception When the operation is not valid. 2674 */ 2675private boolean solveNamingConflict(ModifyDNOperation op, LDAPUpdateMsg msg) 2676 throws Exception 2677{ 2678 ResultCode result = op.getResultCode(); 2679 ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); 2680 String entryUUID = ctx.getEntryUUID(); 2681 String newSuperiorID = ctx.getNewSuperiorEntryUUID(); 2682 2683 /* 2684 * four possible cases : 2685 * - the modified entry has been renamed 2686 * - the new parent has been renamed 2687 * - the operation is replayed for the second time. 2688 * - the entry has been deleted 2689 * action : 2690 * - change the target dn and the new parent dn and 2691 * restart the operation, 2692 * - don't do anything if the operation is replayed. 2693 */ 2694 2695 // get the current DN of this entry in the database. 2696 DN currentDN = findEntryDN(entryUUID); 2697 2698 // Construct the new DN to use for the entry. 2699 DN entryDN = op.getEntryDN(); 2700 DN newSuperior; 2701 RDN newRDN = op.getNewRDN(); 2702 2703 if (newSuperiorID != null) 2704 { 2705 newSuperior = findEntryDN(newSuperiorID); 2706 } 2707 else 2708 { 2709 newSuperior = entryDN.parent(); 2710 } 2711 2712 //If we could not find the new parent entry, we missed this entry 2713 // earlier or it has disappeared from the database 2714 // Log this information for the repair tool and mark the entry 2715 // as conflicting. 2716 // stop the processing. 2717 if (newSuperior == null) 2718 { 2719 markConflictEntry(op, currentDN, currentDN.parent().child(newRDN)); 2720 numUnresolvedNamingConflicts.incrementAndGet(); 2721 return true; 2722 } 2723 2724 DN newDN = newSuperior.child(newRDN); 2725 2726 if (currentDN == null) 2727 { 2728 // The entry targeted by the Modify DN is not in the database 2729 // anymore. 2730 // This is a conflict between a delete and this modify DN. 2731 // The entry has been deleted, we can safely assume 2732 // that the operation is completed. 2733 numResolvedNamingConflicts.incrementAndGet(); 2734 return true; 2735 } 2736 2737 // if the newDN and the current DN match then the operation 2738 // is a no-op (this was probably a second replay) 2739 // don't do anything. 2740 if (newDN.equals(currentDN)) 2741 { 2742 numResolvedNamingConflicts.incrementAndGet(); 2743 return true; 2744 } 2745 2746 if (result == ResultCode.NO_SUCH_OBJECT 2747 || result == ResultCode.UNWILLING_TO_PERFORM 2748 || result == ResultCode.OBJECTCLASS_VIOLATION) 2749 { 2750 /* 2751 * The entry or it's new parent has not been found 2752 * reconstruct the operation with the DN we just built 2753 */ 2754 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; 2755 modifyDnMsg.setDN(currentDN); 2756 modifyDnMsg.setNewSuperior(newSuperior.toString()); 2757 numResolvedNamingConflicts.incrementAndGet(); 2758 return false; 2759 } 2760 else if (result == ResultCode.ENTRY_ALREADY_EXISTS) 2761 { 2762 /* 2763 * This may happen when two modifyDn operation 2764 * are done on different servers but with the same target DN 2765 * add the conflict object class to the entry 2766 * and rename it using its entryuuid. 2767 */ 2768 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; 2769 markConflictEntry(op, op.getEntryDN(), newDN); 2770 modifyDnMsg.setNewRDN(generateConflictRDN(entryUUID, 2771 modifyDnMsg.getNewRDN())); 2772 modifyDnMsg.setNewSuperior(newSuperior.toString()); 2773 numUnresolvedNamingConflicts.incrementAndGet(); 2774 return false; 2775 } 2776 else 2777 { 2778 // The other type of errors can not be caused by naming conflicts. 2779 // Log a message for the repair tool. 2780 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2781 op, ctx.getCSN(), result, op.getErrorMessage()); 2782 return true; 2783 } 2784} 2785 2786 /** 2787 * Solve a conflict detected when replaying a ADD operation. 2788 * 2789 * @param op The operation that triggered the conflict detection. 2790 * @param msg The message that triggered the conflict detection. 2791 * @return true if the process is completed, false if it must continue. 2792 * @throws Exception When the operation is not valid. 2793 */ 2794 private boolean solveNamingConflict(AddOperation op, AddMsg msg) 2795 throws Exception 2796 { 2797 ResultCode result = op.getResultCode(); 2798 AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT); 2799 String entryUUID = ctx.getEntryUUID(); 2800 String parentUniqueId = ctx.getParentEntryUUID(); 2801 2802 if (result == ResultCode.NO_SUCH_OBJECT) 2803 { 2804 /* 2805 * This can happen if the parent has been renamed or deleted 2806 * find the parent dn and calculate a new dn for the entry 2807 */ 2808 if (parentUniqueId == null) 2809 { 2810 /* 2811 * This entry is the base dn of the backend. 2812 * It is quite surprising that the operation result be NO_SUCH_OBJECT. 2813 * There is nothing more we can do except log a 2814 * message for the repair tool to look at this problem. 2815 * TODO : Log the message 2816 */ 2817 return true; 2818 } 2819 DN parentDn = findEntryDN(parentUniqueId); 2820 if (parentDn == null) 2821 { 2822 /* 2823 * The parent has been deleted 2824 * rename the entry as a conflicting entry. 2825 * The action taken here must be consistent with the actions 2826 * done when in the solveNamingConflict(DeleteOperation) method 2827 * when we are deleting an entry that have some child entries. 2828 */ 2829 addConflict(msg); 2830 2831 String conflictRDN = 2832 generateConflictRDN(entryUUID, op.getEntryDN().rdn().toString()); 2833 msg.setDN(DN.valueOf(conflictRDN + "," + getBaseDN())); 2834 // reset the parent entryUUID so that the check done is the 2835 // handleConflict phase does not fail. 2836 msg.setParentEntryUUID(null); 2837 numUnresolvedNamingConflicts.incrementAndGet(); 2838 } 2839 else 2840 { 2841 msg.setDN(DN.valueOf(msg.getDN().rdn() + "," + parentDn)); 2842 numResolvedNamingConflicts.incrementAndGet(); 2843 } 2844 return false; 2845 } 2846 else if (result == ResultCode.ENTRY_ALREADY_EXISTS) 2847 { 2848 /* 2849 * This can happen if 2850 * - two adds are done on different servers but with the 2851 * same target DN. 2852 * - the same ADD is being replayed for the second time on this server. 2853 * if the entryUUID already exist, assume this is a replay and 2854 * don't do anything 2855 * if the entry unique id do not exist, generate conflict. 2856 */ 2857 if (findEntryDN(entryUUID) != null) 2858 { 2859 // entry already exist : this is a replay 2860 return true; 2861 } 2862 else 2863 { 2864 addConflict(msg); 2865 String conflictRDN = 2866 generateConflictRDN(entryUUID, msg.getDN().toString()); 2867 msg.setDN(DN.valueOf(conflictRDN)); 2868 numUnresolvedNamingConflicts.incrementAndGet(); 2869 return false; 2870 } 2871 } 2872 else 2873 { 2874 // The other type of errors can not be caused by naming conflicts. 2875 // log a message for the repair tool. 2876 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2877 op, ctx.getCSN(), result, op.getErrorMessage()); 2878 return true; 2879 } 2880 } 2881 2882 /** 2883 * Find all the entries below the provided DN and rename them 2884 * so that they stay below the baseDN of this replicationDomain and 2885 * use the conflicting name and attribute. 2886 * 2887 * @param entryDN The DN of the entry whose child must be renamed. 2888 * @param conflictOp The Operation that generated the conflict. 2889 */ 2890 private boolean findAndRenameChild(DN entryDN, Operation conflictOp) 2891 { 2892 /* 2893 * TODO JNR Ludo thinks that: "Ideally, the operation should verify that the 2894 * entryUUID has not changed or try to use the entryUUID rather than the 2895 * DN.". entryUUID can be obtained from the caller of the current method. 2896 */ 2897 boolean conflict = false; 2898 2899 // Find and rename child entries. 2900 final SearchRequest request = newSearchRequest(entryDN, SearchScope.SINGLE_LEVEL) 2901 .addAttribute(ENTRYUUID_ATTRIBUTE_NAME, HISTORICAL_ATTRIBUTE_NAME); 2902 InternalSearchOperation op = conn.processSearch(request); 2903 if (op.getResultCode() == ResultCode.SUCCESS) 2904 { 2905 for (SearchResultEntry entry : op.getSearchEntries()) 2906 { 2907 /* 2908 * Check the ADD and ModRDN date of the child entry 2909 * (All of them, not only the one that are newer than the DEL op) 2910 * and keep the entry as a conflicting entry. 2911 */ 2912 conflict = true; 2913 renameConflictEntry(conflictOp, entry.getName(), getEntryUUID(entry)); 2914 } 2915 } 2916 else 2917 { 2918 // log error and information for the REPAIR tool. 2919 logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, entryDN, conflictOp, op.getResultCode()); 2920 } 2921 2922 return conflict; 2923 } 2924 2925 /** 2926 * Rename an entry that was conflicting so that it stays below the 2927 * baseDN of the replicationDomain. 2928 * 2929 * @param conflictOp The Operation that caused the conflict. 2930 * @param dn The DN of the entry to be renamed. 2931 * @param entryUUID The uniqueID of the entry to be renamed. 2932 */ 2933 private void renameConflictEntry(Operation conflictOp, DN dn, 2934 String entryUUID) 2935 { 2936 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(dn); 2937 DirectoryServer.sendAlertNotification(this, 2938 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 2939 2940 RDN newRDN = generateDeleteConflictDn(entryUUID, dn); 2941 ModifyDNOperation newOp = renameEntry(dn, newRDN, getBaseDN(), true); 2942 2943 if (newOp.getResultCode() != ResultCode.SUCCESS) 2944 { 2945 // log information for the repair tool. 2946 logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, 2947 dn, conflictOp, newOp.getResultCode()); 2948 } 2949 } 2950 2951 /** 2952 * Generate a modification to add the conflict attribute to an entry 2953 * whose Dn is now conflicting with another entry. 2954 * 2955 * @param op The operation causing the conflict. 2956 * @param currentDN The current DN of the operation to mark as conflicting. 2957 * @param conflictDN The newDn on which the conflict happened. 2958 */ 2959 private void markConflictEntry(Operation op, DN currentDN, DN conflictDN) 2960 { 2961 // create new internal modify operation and run it. 2962 AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(DS_SYNC_CONFLICT); 2963 Attribute attr = Attributes.create(attrType, conflictDN.toString()); 2964 List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr)); 2965 2966 ModifyOperation newOp = new ModifyOperationBasis( 2967 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 2968 currentDN, mods); 2969 runAsSynchronizedOperation(newOp); 2970 2971 if (newOp.getResultCode() != ResultCode.SUCCESS) 2972 { 2973 // Log information for the repair tool. 2974 logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, op, newOp.getResultCode()); 2975 } 2976 2977 // Generate an alert to let the administration know that some 2978 // conflict could not be solved. 2979 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN); 2980 DirectoryServer.sendAlertNotification(this, 2981 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 2982 } 2983 2984 /** 2985 * Add the conflict attribute to an entry that could 2986 * not be added because it is conflicting with another entry. 2987 * 2988 * @param msg The conflicting Add Operation. 2989 * 2990 * @throws DecodeException When an encoding error happened manipulating the 2991 * msg. 2992 */ 2993 private void addConflict(AddMsg msg) throws DecodeException 2994 { 2995 String normalizedDN = msg.getDN().toString(); 2996 2997 // Generate an alert to let the administrator know that some 2998 // conflict could not be solved. 2999 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN); 3000 DirectoryServer.sendAlertNotification(this, 3001 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 3002 3003 // Add the conflict attribute 3004 msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN); 3005 } 3006 3007 /** 3008 * Generate the Dn to use for a conflicting entry. 3009 * 3010 * @param entryUUID The unique identifier of the entry involved in the 3011 * conflict. 3012 * @param rdn Original rdn. 3013 * @return The generated RDN for a conflicting entry. 3014 */ 3015 private String generateConflictRDN(String entryUUID, String rdn) 3016 { 3017 return "entryuuid=" + entryUUID + "+" + rdn; 3018 } 3019 3020 /** 3021 * Generate the RDN to use for a conflicting entry whose father was deleted. 3022 * 3023 * @param entryUUID The unique identifier of the entry involved in the 3024 * conflict. 3025 * @param dn The original DN of the entry. 3026 * 3027 * @return The generated RDN for a conflicting entry. 3028 */ 3029 private RDN generateDeleteConflictDn(String entryUUID, DN dn) 3030 { 3031 String newRDN = "entryuuid=" + entryUUID + "+" + dn.rdn(); 3032 try 3033 { 3034 return RDN.decode(newRDN); 3035 } catch (DirectoryException e) 3036 { 3037 // cannot happen 3038 return null; 3039 } 3040 } 3041 3042 /** 3043 * Check if the domain solve conflicts. 3044 * 3045 * @return a boolean indicating if the domain should solve conflicts. 3046 */ 3047 boolean solveConflict() 3048 { 3049 return solveConflictFlag; 3050 } 3051 3052 /** 3053 * Disable the replication on this domain. 3054 * The session to the replication server will be stopped. 3055 * The domain will not be destroyed but call to the pre-operation 3056 * methods will result in failure. 3057 * The listener thread will be destroyed. 3058 * The monitor informations will still be accessible. 3059 */ 3060 public void disable() 3061 { 3062 state.save(); 3063 state.clearInMemory(); 3064 disabled = true; 3065 disableService(); // This will cut the session and wake up the listener 3066 } 3067 3068 /** 3069 * Do what necessary when the data have changed : load state, load 3070 * generation Id. 3071 * If there is no such information check if there is a 3072 * ReplicaUpdateVector entry and translate it into a state 3073 * and generationId. 3074 * @exception DirectoryException Thrown when an error occurs. 3075 */ 3076 private void loadDataState() throws DirectoryException 3077 { 3078 state.clearInMemory(); 3079 state.loadState(); 3080 getGenerator().adjust(state.getMaxCSN(getServerId())); 3081 3082 // Retrieves the generation ID associated with the data imported 3083 generationId = loadGenerationId(); 3084 } 3085 3086 /** 3087 * Enable back the domain after a previous disable. 3088 * The domain will connect back to a replication Server and 3089 * will recreate threads to listen for messages from the Synchronization 3090 * server. 3091 * The generationId will be retrieved or computed if necessary. 3092 * The ServerState will also be read again from the local database. 3093 */ 3094 public void enable() 3095 { 3096 try 3097 { 3098 loadDataState(); 3099 } 3100 catch (Exception e) 3101 { 3102 /* TODO should mark that replicationServer service is 3103 * not available, log an error and retry upon timeout 3104 * should we stop the modifications ? 3105 */ 3106 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 3107 return; 3108 } 3109 3110 enableService(); 3111 3112 disabled = false; 3113 } 3114 3115 /** 3116 * Compute the data generationId associated with the current data present 3117 * in the backend for this domain. 3118 * @return The computed generationId. 3119 * @throws DirectoryException When an error occurs. 3120 */ 3121 private long computeGenerationId() throws DirectoryException 3122 { 3123 final long genId = exportBackend(null, true); 3124 if (logger.isTraceEnabled()) 3125 { 3126 logger.trace("Computed generationId: generationId=" + genId); 3127 } 3128 return genId; 3129 } 3130 3131 /** 3132 * Run a modify operation to update the entry whose DN is given as 3133 * a parameter with the generationID information. 3134 * 3135 * @param entryDN The DN of the entry to be updated. 3136 * @param generationId The value of the generationID to be saved. 3137 * 3138 * @return A ResultCode indicating if the operation was successful. 3139 */ 3140 private ResultCode runSaveGenerationId(DN entryDN, long generationId) 3141 { 3142 // The generationId is stored in the root entry of the domain. 3143 final ByteString asn1BaseDn = ByteString.valueOf(entryDN.toString()); 3144 3145 LDAPAttribute attr = new LDAPAttribute(REPLICATION_GENERATION_ID, Long.toString(generationId)); 3146 List<RawModification> mods = new ArrayList<>(1); 3147 mods.add(new LDAPModification(ModificationType.REPLACE, attr)); 3148 3149 ModifyOperation op = new ModifyOperationBasis( 3150 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 3151 asn1BaseDn, mods); 3152 runAsSynchronizedOperation(op); 3153 return op.getResultCode(); 3154 } 3155 3156 /** 3157 * Stores the value of the generationId. 3158 * @param generationId The value of the generationId. 3159 * @return a ResultCode indicating if the method was successful. 3160 */ 3161 private ResultCode saveGenerationId(long generationId) 3162 { 3163 ResultCode result = runSaveGenerationId(getBaseDN(), generationId); 3164 if (result != ResultCode.SUCCESS) 3165 { 3166 generationIdSavedStatus = false; 3167 if (result == ResultCode.NO_SUCH_OBJECT) 3168 { 3169 // If the base entry does not exist, save the generation 3170 // ID in the config entry 3171 result = runSaveGenerationId(config.dn(), generationId); 3172 } 3173 3174 if (result != ResultCode.SUCCESS) 3175 { 3176 logger.error(ERR_UPDATING_GENERATION_ID, result.getName(), getBaseDN()); 3177 } 3178 } 3179 else 3180 { 3181 generationIdSavedStatus = true; 3182 } 3183 return result; 3184 } 3185 3186 /** 3187 * Load the GenerationId from the root entry of the domain 3188 * from the REPLICATION_GENERATION_ID attribute in database 3189 * to memory, or compute it if not found. 3190 * 3191 * @return generationId The retrieved value of generationId 3192 * @throws DirectoryException When an error occurs. 3193 */ 3194 private long loadGenerationId() throws DirectoryException 3195 { 3196 if (logger.isTraceEnabled()) 3197 { 3198 logger.trace("Attempt to read generation ID from DB " + getBaseDN()); 3199 } 3200 3201 // Search the database entry that is used to periodically save the generation id 3202 final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT) 3203 .addAttribute(REPLICATION_GENERATION_ID); 3204 InternalSearchOperation search = conn.processSearch(request); 3205 if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT) 3206 { 3207 // if the base entry does not exist look for the generationID 3208 // in the config entry. 3209 request.setName(config.dn()); 3210 search = conn.processSearch(request); 3211 } 3212 3213 boolean found = false; 3214 long aGenerationId = -1; 3215 if (search.getResultCode() != ResultCode.SUCCESS) 3216 { 3217 if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT) 3218 { 3219 String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage(); 3220 logger.error(ERR_SEARCHING_GENERATION_ID, errorMsg, getBaseDN()); 3221 } 3222 } 3223 else 3224 { 3225 List<SearchResultEntry> result = search.getSearchEntries(); 3226 SearchResultEntry resultEntry = result.get(0); 3227 if (resultEntry != null) 3228 { 3229 AttributeType synchronizationGenIDType = 3230 DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID); 3231 List<Attribute> attrs = 3232 resultEntry.getAttribute(synchronizationGenIDType); 3233 if (attrs != null) 3234 { 3235 Attribute attr = attrs.get(0); 3236 if (attr.size()>1) 3237 { 3238 String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString(); 3239 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg); 3240 } 3241 else if (attr.size() == 1) 3242 { 3243 found = true; 3244 try 3245 { 3246 aGenerationId = Long.decode(attr.iterator().next().toString()); 3247 } 3248 catch(Exception e) 3249 { 3250 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 3251 } 3252 } 3253 } 3254 } 3255 } 3256 3257 if (!found) 3258 { 3259 aGenerationId = computeGenerationId(); 3260 saveGenerationId(aGenerationId); 3261 3262 if (logger.isTraceEnabled()) 3263 { 3264 logger.trace("Generation ID created for domain baseDN=" + getBaseDN() + " generationId=" + aGenerationId); 3265 } 3266 } 3267 else 3268 { 3269 generationIdSavedStatus = true; 3270 if (logger.isTraceEnabled()) 3271 { 3272 logger.trace("Generation ID successfully read from domain baseDN=" + getBaseDN() 3273 + " generationId=" + aGenerationId); 3274 } 3275 } 3276 return aGenerationId; 3277 } 3278 3279 /** 3280 * Do whatever is needed when a backup is started. 3281 * We need to make sure that the serverState is correctly save. 3282 */ 3283 void backupStart() 3284 { 3285 state.save(); 3286 } 3287 3288 /** Do whatever is needed when a backup is finished. */ 3289 void backupEnd() 3290 { 3291 // Nothing is needed at the moment 3292 } 3293 3294 /* 3295 * Total Update >> 3296 */ 3297 3298 /** 3299 * This method trigger an export of the replicated data. 3300 * 3301 * @param output The OutputStream where the export should 3302 * be produced. 3303 * @throws DirectoryException When needed. 3304 */ 3305 @Override 3306 protected void exportBackend(OutputStream output) throws DirectoryException 3307 { 3308 exportBackend(output, false); 3309 } 3310 3311 /** 3312 * Export the entries from the backend and/or compute the generation ID. 3313 * The ieContext must have been set before calling. 3314 * 3315 * @param output The OutputStream where the export should 3316 * be produced. 3317 * @param checksumOutput A boolean indicating if this export is 3318 * invoked to perform a checksum only 3319 * 3320 * @return The computed GenerationID. 3321 * 3322 * @throws DirectoryException when an error occurred 3323 */ 3324 private long exportBackend(OutputStream output, boolean checksumOutput) 3325 throws DirectoryException 3326 { 3327 Backend<?> backend = getBackend(); 3328 3329 // Acquire a shared lock for the backend. 3330 try 3331 { 3332 String lockFile = LockFileManager.getBackendLockFileName(backend); 3333 StringBuilder failureReason = new StringBuilder(); 3334 if (! LockFileManager.acquireSharedLock(lockFile, failureReason)) 3335 { 3336 LocalizableMessage message = 3337 ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason); 3338 logger.error(message); 3339 throw new DirectoryException(ResultCode.OTHER, message); 3340 } 3341 } 3342 catch (Exception e) 3343 { 3344 LocalizableMessage message = 3345 ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), 3346 stackTraceToSingleLineString(e)); 3347 logger.error(message); 3348 throw new DirectoryException(ResultCode.OTHER, message); 3349 } 3350 3351 long numberOfEntries = backend.getNumberOfEntriesInBaseDN(getBaseDN()); 3352 long entryCount = Math.min(numberOfEntries, 1000); 3353 OutputStream os; 3354 ReplLDIFOutputStream ros = null; 3355 if (checksumOutput) 3356 { 3357 ros = new ReplLDIFOutputStream(entryCount); 3358 os = ros; 3359 try 3360 { 3361 os.write(Long.toString(numberOfEntries).getBytes()); 3362 } 3363 catch(Exception e) 3364 { 3365 // Should never happen 3366 } 3367 } 3368 else 3369 { 3370 os = output; 3371 } 3372 3373 // baseDN branch is the only one included in the export 3374 LDIFExportConfig exportConfig = new LDIFExportConfig(os); 3375 exportConfig.setIncludeBranches(newArrayList(getBaseDN())); 3376 3377 // For the checksum computing mode, only consider the 'stable' attributes 3378 if (checksumOutput) 3379 { 3380 String includeAttributeStrings[] = { "objectclass", "sn", "cn", "entryuuid" }; 3381 Set<AttributeType> includeAttributes = new HashSet<>(); 3382 for (String attrName : includeAttributeStrings) 3383 { 3384 includeAttributes.add(DirectoryServer.getAttributeTypeOrDefault(attrName)); 3385 } 3386 exportConfig.setIncludeAttributes(includeAttributes); 3387 } 3388 3389 // Launch the export. 3390 long genID = 0; 3391 try 3392 { 3393 backend.exportLDIF(exportConfig); 3394 } 3395 catch (DirectoryException de) 3396 { 3397 if (ros == null || ros.getNumExportedEntries() < entryCount) 3398 { 3399 LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject()); 3400 logger.error(message); 3401 throw new DirectoryException(ResultCode.OTHER, message); 3402 } 3403 } 3404 catch (Exception e) 3405 { 3406 LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(stackTraceToSingleLineString(e)); 3407 logger.error(message); 3408 throw new DirectoryException(ResultCode.OTHER, message); 3409 } 3410 finally 3411 { 3412 // Clean up after the export by closing the export config. 3413 // Will also flush the export and export the remaining entries. 3414 exportConfig.close(); 3415 3416 if (checksumOutput) 3417 { 3418 genID = ros.getChecksumValue(); 3419 } 3420 3421 // Release the shared lock on the backend. 3422 try 3423 { 3424 String lockFile = LockFileManager.getBackendLockFileName(backend); 3425 StringBuilder failureReason = new StringBuilder(); 3426 if (! LockFileManager.releaseLock(lockFile, failureReason)) 3427 { 3428 LocalizableMessage message = 3429 WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason); 3430 logger.warn(message); 3431 throw new DirectoryException(ResultCode.OTHER, message); 3432 } 3433 } 3434 catch (Exception e) 3435 { 3436 LocalizableMessage message = 3437 WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), 3438 stackTraceToSingleLineString(e)); 3439 logger.warn(message); 3440 throw new DirectoryException(ResultCode.OTHER, message); 3441 } 3442 } 3443 return genID; 3444 } 3445 3446 /** 3447 * Process backend before import. 3448 * 3449 * @param backend 3450 * The backend. 3451 * @throws DirectoryException 3452 * If the backend could not be disabled or locked exclusively. 3453 */ 3454 private void preBackendImport(Backend<?> backend) throws DirectoryException 3455 { 3456 // Stop saving state 3457 stateSavingDisabled = true; 3458 3459 // FIXME setBackendEnabled should be part of TaskUtils ? 3460 TaskUtils.disableBackend(backend.getBackendID()); 3461 3462 // Acquire an exclusive lock for the backend. 3463 String lockFile = LockFileManager.getBackendLockFileName(backend); 3464 StringBuilder failureReason = new StringBuilder(); 3465 if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason)) 3466 { 3467 LocalizableMessage message = ERR_INIT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason); 3468 logger.error(message); 3469 throw new DirectoryException(ResultCode.OTHER, message); 3470 } 3471 } 3472 3473 /** 3474 * This method triggers an import of the replicated data. 3475 * 3476 * @param input The InputStream from which the data are read. 3477 * @throws DirectoryException When needed. 3478 */ 3479 @Override 3480 protected void importBackend(InputStream input) throws DirectoryException 3481 { 3482 Backend<?> backend = getBackend(); 3483 3484 LDIFImportConfig importConfig = null; 3485 ImportExportContext ieCtx = getImportExportContext(); 3486 try 3487 { 3488 if (!backend.supports(BackendOperation.LDIF_IMPORT)) 3489 { 3490 ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER, 3491 ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID()))); 3492 return; 3493 } 3494 3495 importConfig = new LDIFImportConfig(input); 3496 importConfig.setIncludeBranches(newLinkedHashSet(getBaseDN())); 3497 importConfig.setAppendToExistingData(false); 3498 importConfig.setSkipDNValidation(true); 3499 // We should not validate schema for replication 3500 importConfig.setValidateSchema(false); 3501 // Allow fractional replication ldif import plugin to be called 3502 importConfig.setInvokeImportPlugins(true); 3503 // Reset the follow import flag and message before starting the import 3504 importErrorMessageId = -1; 3505 3506 // TODO How to deal with rejected entries during the import 3507 File rejectsFile = 3508 getFileForPath("logs" + File.separator + "replInitRejectedEntries"); 3509 importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(), 3510 ExistingFileBehavior.OVERWRITE); 3511 3512 // Process import 3513 preBackendImport(backend); 3514 backend.importLDIF(importConfig, DirectoryServer.getInstance().getServerContext()); 3515 3516 stateSavingDisabled = false; 3517 } 3518 catch(Exception e) 3519 { 3520 ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, 3521 ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e)))); 3522 } 3523 finally 3524 { 3525 try 3526 { 3527 // Cleanup 3528 if (importConfig != null) 3529 { 3530 importConfig.close(); 3531 closeBackendImport(backend); // Re-enable backend 3532 backend = getBackend(); 3533 } 3534 3535 loadDataState(); 3536 3537 if (ieCtx.getException() != null) 3538 { 3539 // When an error occurred during an import, most of times 3540 // the generationId coming in the root entry of the imported data, 3541 // is not valid anymore (partial data in the backend). 3542 generationId = computeGenerationId(); 3543 saveGenerationId(generationId); 3544 } 3545 } 3546 catch (DirectoryException fe) 3547 { 3548 // If we already catch an Exception it's quite possible 3549 // that the loadDataState() and setGenerationId() fail 3550 // so we don't bother about the new Exception. 3551 // However if there was no Exception before we want 3552 // to return this Exception to the task creator. 3553 ieCtx.setExceptionIfNoneSet(new DirectoryException( 3554 ResultCode.OTHER, 3555 ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe)))); 3556 } 3557 } 3558 3559 if (ieCtx.getException() != null) 3560 { 3561 throw ieCtx.getException(); 3562 } 3563 } 3564 3565 /** 3566 * Make post import operations. 3567 * @param backend The backend implied in the import. 3568 * @exception DirectoryException Thrown when an error occurs. 3569 */ 3570 private void closeBackendImport(Backend<?> backend) throws DirectoryException 3571 { 3572 String lockFile = LockFileManager.getBackendLockFileName(backend); 3573 StringBuilder failureReason = new StringBuilder(); 3574 3575 // Release lock 3576 if (!LockFileManager.releaseLock(lockFile, failureReason)) 3577 { 3578 LocalizableMessage message = 3579 WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason); 3580 logger.warn(message); 3581 throw new DirectoryException(ResultCode.OTHER, message); 3582 } 3583 3584 TaskUtils.enableBackend(backend.getBackendID()); 3585 } 3586 3587 /** 3588 * Retrieves a replication domain based on the baseDN. 3589 * 3590 * @param baseDN The baseDN of the domain to retrieve 3591 * @return The domain retrieved 3592 * @throws DirectoryException When an error occurred or no domain 3593 * match the provided baseDN. 3594 */ 3595 public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDN) 3596 throws DirectoryException 3597 { 3598 LDAPReplicationDomain replicationDomain = null; 3599 3600 // Retrieves the domain 3601 for (SynchronizationProvider<?> provider : 3602 DirectoryServer.getSynchronizationProviders()) 3603 { 3604 if (!(provider instanceof MultimasterReplication)) 3605 { 3606 LocalizableMessage message = ERR_INVALID_PROVIDER.get(); 3607 throw new DirectoryException(ResultCode.OTHER, message); 3608 } 3609 3610 // From the domainDN retrieves the replication domain 3611 LDAPReplicationDomain domain = 3612 MultimasterReplication.findDomain(baseDN, null); 3613 if (domain == null) 3614 { 3615 break; 3616 } 3617 if (replicationDomain != null) 3618 { 3619 // Should never happen 3620 LocalizableMessage message = ERR_MULTIPLE_MATCHING_DOMAIN.get(); 3621 throw new DirectoryException(ResultCode.OTHER, message); 3622 } 3623 replicationDomain = domain; 3624 } 3625 3626 if (replicationDomain == null) 3627 { 3628 throw new DirectoryException(ResultCode.OTHER, ERR_NO_MATCHING_DOMAIN.get(baseDN)); 3629 } 3630 return replicationDomain; 3631 } 3632 3633 /** 3634 * Returns the backend associated to this domain. 3635 * @return The associated backend. 3636 */ 3637 private Backend<?> getBackend() 3638 { 3639 return DirectoryServer.getBackend(getBaseDN()); 3640 } 3641 3642 /* 3643 * <<Total Update 3644 */ 3645 3646 /** 3647 * Push the schema modifications contained in the given parameter as a 3648 * modification that would happen on a local server. The modifications are not 3649 * applied to the local schema backend and historical information is not 3650 * updated; but a CSN is generated and the ServerState associated to the 3651 * schema domain is updated. 3652 * 3653 * @param modifications 3654 * The schema modifications to push 3655 */ 3656 void synchronizeSchemaModifications(List<Modification> modifications) 3657 { 3658 ModifyOperation op = new ModifyOperationBasis( 3659 conn, nextOperationID(), nextMessageID(), null, 3660 DirectoryServer.getSchemaDN(), modifications); 3661 3662 final Entry schema; 3663 try 3664 { 3665 schema = DirectoryServer.getEntry(DirectoryServer.getSchemaDN()); 3666 } 3667 catch (DirectoryException e) 3668 { 3669 logger.traceException(e); 3670 logger.error(ERR_BACKEND_SEARCH_ENTRY.get(DirectoryServer.getSchemaDN().toString(), 3671 stackTraceToSingleLineString(e))); 3672 return; 3673 } 3674 3675 LocalBackendModifyOperation localOp = new LocalBackendModifyOperation(op); 3676 CSN csn = generateCSN(localOp); 3677 OperationContext ctx = new ModifyContext(csn, getEntryUUID(schema)); 3678 localOp.setAttachment(SYNCHROCONTEXT, ctx); 3679 localOp.setResultCode(ResultCode.SUCCESS); 3680 synchronize(localOp); 3681 } 3682 3683 /** 3684 * Check if the provided configuration is acceptable for add. 3685 * 3686 * @param configuration The configuration to check. 3687 * @param unacceptableReasons When the configuration is not acceptable, this 3688 * table is use to return the reasons why this 3689 * configuration is not acceptable. 3690 * 3691 * @return true if the configuration is acceptable, false other wise. 3692 */ 3693 static boolean isConfigurationAcceptable(ReplicationDomainCfg configuration, 3694 List<LocalizableMessage> unacceptableReasons) 3695 { 3696 // Check that there is not already a domain with the same DN 3697 final DN dn = configuration.getBaseDN(); 3698 LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null); 3699 if (domain != null && domain.getBaseDN().equals(dn)) 3700 { 3701 unacceptableReasons.add(ERR_SYNC_INVALID_DN.get()); 3702 return false; 3703 } 3704 3705 // Check that the base DN is configured as a base-dn of the directory server 3706 if (DirectoryServer.getBackend(dn) == null) 3707 { 3708 unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn)); 3709 return false; 3710 } 3711 3712 // Check fractional configuration 3713 try 3714 { 3715 isFractionalConfigAcceptable(configuration); 3716 } catch (ConfigException e) 3717 { 3718 unacceptableReasons.add(e.getMessageObject()); 3719 return false; 3720 } 3721 3722 return true; 3723 } 3724 3725 @Override 3726 public ConfigChangeResult applyConfigurationChange( 3727 ReplicationDomainCfg configuration) 3728 { 3729 this.config = configuration; 3730 changeConfig(configuration); 3731 3732 // Read assured + fractional configuration and each time reconnect if needed 3733 readAssuredConfig(configuration, true); 3734 readFractionalConfig(configuration, true); 3735 3736 solveConflictFlag = isSolveConflict(configuration); 3737 3738 final ConfigChangeResult ccr = new ConfigChangeResult(); 3739 try 3740 { 3741 storeECLConfiguration(configuration); 3742 } 3743 catch(Exception e) 3744 { 3745 ccr.setResultCode(ResultCode.OTHER); 3746 } 3747 return ccr; 3748 } 3749 3750 @Override 3751 public boolean isConfigurationChangeAcceptable( 3752 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 3753 { 3754 // Check that a import/export is not in progress 3755 if (ieRunning()) 3756 { 3757 unacceptableReasons.add( 3758 NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get()); 3759 return false; 3760 } 3761 3762 // Check fractional configuration 3763 try 3764 { 3765 isFractionalConfigAcceptable(configuration); 3766 return true; 3767 } 3768 catch (ConfigException e) 3769 { 3770 unacceptableReasons.add(e.getMessageObject()); 3771 return false; 3772 } 3773 } 3774 3775 @Override 3776 public Map<String, String> getAlerts() 3777 { 3778 Map<String, String> alerts = new LinkedHashMap<>(); 3779 3780 alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, 3781 ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT); 3782 return alerts; 3783 } 3784 3785 @Override 3786 public String getClassName() 3787 { 3788 return CLASS_NAME; 3789 } 3790 3791 @Override 3792 public DN getComponentEntryDN() 3793 { 3794 return config.dn(); 3795 } 3796 3797 /** Starts the Replication Domain. */ 3798 public void start() 3799 { 3800 // Create the ServerStateFlush thread 3801 flushThread.start(); 3802 3803 startListenService(); 3804 } 3805 3806 /** Remove the configuration of the external changelog from this domain configuration. */ 3807 private void removeECLDomainCfg() 3808 { 3809 try 3810 { 3811 DN eclConfigEntryDN = DN.valueOf("cn=external changeLog," + config.dn()); 3812 if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN)) 3813 { 3814 DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null); 3815 } 3816 } 3817 catch(Exception e) 3818 { 3819 logger.traceException(e); 3820 logger.error(ERR_CHECK_CREATE_REPL_BACKEND_FAILED, stackTraceToSingleLineString(e)); 3821 } 3822 } 3823 3824 /** 3825 * Store the provided ECL configuration for the domain. 3826 * @param domCfg The provided configuration. 3827 * @throws ConfigException When an error occurred. 3828 */ 3829 private void storeECLConfiguration(ReplicationDomainCfg domCfg) 3830 throws ConfigException 3831 { 3832 ExternalChangelogDomainCfg eclDomCfg = null; 3833 // create the ecl config if it does not exist 3834 // There may not be any config entry related to this domain in some 3835 // unit test cases 3836 try 3837 { 3838 DN configDn = config.dn(); 3839 if (DirectoryServer.getConfigHandler().entryExists(configDn)) 3840 { 3841 try 3842 { eclDomCfg = domCfg.getExternalChangelogDomain(); 3843 } catch(Exception e) { /* do nothing */ } 3844 // domain with no config entry only when running unit tests 3845 if (eclDomCfg == null) 3846 { 3847 // no ECL config provided hence create a default one 3848 // create the default one 3849 DN eclConfigEntryDN = DN.valueOf("cn=external changelog," + configDn); 3850 if (!DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN)) 3851 { 3852 // no entry exist yet for the ECL config for this domain 3853 // create it 3854 String ldif = makeLdif( 3855 "dn: cn=external changelog," + configDn, 3856 "objectClass: top", 3857 "objectClass: ds-cfg-external-changelog-domain", 3858 "cn: external changelog", 3859 "ds-cfg-enabled: " + !getBackend().isPrivateBackend()); 3860 LDIFImportConfig ldifImportConfig = new LDIFImportConfig( 3861 new StringReader(ldif)); 3862 // No need to validate schema in replication 3863 ldifImportConfig.setValidateSchema(false); 3864 LDIFReader reader = new LDIFReader(ldifImportConfig); 3865 Entry eclEntry = reader.readEntry(); 3866 DirectoryServer.getConfigHandler().addEntry(eclEntry, null); 3867 ldifImportConfig.close(); 3868 } 3869 } 3870 } 3871 eclDomCfg = domCfg.getExternalChangelogDomain(); 3872 if (eclDomain != null) 3873 { 3874 eclDomain.applyConfigurationChange(eclDomCfg); 3875 } 3876 else 3877 { 3878 // Create the ECL domain object 3879 eclDomain = new ExternalChangelogDomain(this, eclDomCfg); 3880 } 3881 } 3882 catch (Exception e) 3883 { 3884 throw new ConfigException(NOTE_ERR_UNABLE_TO_ENABLE_ECL.get( 3885 "Replication Domain on " + getBaseDN(), stackTraceToSingleLineString(e)), e); 3886 } 3887 } 3888 3889 private static String makeLdif(String... lines) 3890 { 3891 final StringBuilder buffer = new StringBuilder(); 3892 for (String line : lines) { 3893 buffer.append(line).append(EOL); 3894 } 3895 // Append an extra line so we can append LDIF Strings. 3896 buffer.append(EOL); 3897 return buffer.toString(); 3898 } 3899 3900 @Override 3901 public void sessionInitiated(ServerStatus initStatus, ServerState rsState) 3902 { 3903 // Check domain fractional configuration consistency with local 3904 // configuration variables 3905 forceBadDataSet = !isBackendFractionalConfigConsistent(); 3906 3907 super.sessionInitiated(initStatus, rsState); 3908 3909 // Now for bad data set status if needed 3910 if (forceBadDataSet) 3911 { 3912 signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT); 3913 logger.info(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC, getBaseDN()); 3914 return; // Do not send changes to the replication server 3915 } 3916 3917 try 3918 { 3919 /* 3920 * We must not publish changes to a replicationServer that has 3921 * not seen all our previous changes because this could cause 3922 * some other ldap servers to miss those changes. 3923 * Check that the ReplicationServer has seen all our previous 3924 * changes. 3925 */ 3926 CSN replServerMaxCSN = rsState.getCSN(getServerId()); 3927 3928 // we don't want to update from here (a DS) an empty RS because 3929 // normally the RS should have been updated by other RSes except for 3930 // very last changes lost if the local connection was broken 3931 // ... hence the RS we are connected to should not be empty 3932 // ... or if it is empty, it is due to a voluntary reset 3933 // and we don't want to update it with our changes that could be huge. 3934 if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0) 3935 { 3936 CSN ourMaxCSN = state.getMaxCSN(getServerId()); 3937 if (ourMaxCSN != null 3938 && !ourMaxCSN.isOlderThanOrEqualTo(replServerMaxCSN)) 3939 { 3940 pendingChanges.setRecovering(true); 3941 broker.setRecoveryRequired(true); 3942 final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN); 3943 if (this.rsUpdater.compareAndSet(null, rsUpdater)) 3944 { 3945 rsUpdater.start(); 3946 } 3947 } 3948 } 3949 } catch (Exception e) 3950 { 3951 logger.error(ERR_PUBLISHING_FAKE_OPS, getBaseDN(), stackTraceToSingleLineString(e)); 3952 } 3953 } 3954 3955 /** 3956 * Build the list of changes that have been processed by this server after the 3957 * CSN given as a parameter and publish them using the given session. 3958 * 3959 * @param startCSN 3960 * The CSN where we need to start the search 3961 * @param session 3962 * The session to use to publish the changes 3963 * @return A boolean indicating he success of the operation. 3964 * @throws Exception 3965 * if an Exception happens during the search. 3966 */ 3967 boolean buildAndPublishMissingChanges(CSN startCSN, ReplicationBroker session) 3968 throws Exception 3969 { 3970 // Trim the changes in replayOperations that are older than the startCSN. 3971 synchronized (replayOperations) 3972 { 3973 Iterator<CSN> it = replayOperations.keySet().iterator(); 3974 while (it.hasNext()) 3975 { 3976 if (shutdown.get()) 3977 { 3978 return false; 3979 } 3980 if (it.next().isNewerThan(startCSN)) 3981 { 3982 break; 3983 } 3984 it.remove(); 3985 } 3986 } 3987 3988 CSN lastRetrievedChange; 3989 InternalSearchOperation op; 3990 CSN currentStartCSN = startCSN; 3991 do 3992 { 3993 if (shutdown.get()) 3994 { 3995 return false; 3996 } 3997 3998 lastRetrievedChange = null; 3999 // We can't do the search in one go because we need to store the results 4000 // so that we are sure we send the operations in order and because the 4001 // list might be large. 4002 // So we search by interval of 10 seconds and store the results in the 4003 // replayOperations list so that they are sorted before sending them. 4004 long missingChangesDelta = currentStartCSN.getTime() + 10000; 4005 CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, getServerId()); 4006 4007 ScanSearchListener listener = 4008 new ScanSearchListener(currentStartCSN, endCSN); 4009 op = searchForChangedEntries(getBaseDN(), currentStartCSN, endCSN, 4010 listener); 4011 4012 // Publish and remove all the changes from the replayOperations list 4013 // that are older than the endCSN. 4014 final List<FakeOperation> opsToSend = new LinkedList<>(); 4015 synchronized (replayOperations) 4016 { 4017 Iterator<FakeOperation> itOp = replayOperations.values().iterator(); 4018 while (itOp.hasNext()) 4019 { 4020 if (shutdown.get()) 4021 { 4022 return false; 4023 } 4024 FakeOperation fakeOp = itOp.next(); 4025 if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check 4026 || !state.cover(fakeOp.getCSN()) 4027 // do not look for replay operations in the future 4028 || currentStartCSN.isNewerThan(now())) 4029 { 4030 break; 4031 } 4032 4033 lastRetrievedChange = fakeOp.getCSN(); 4034 opsToSend.add(fakeOp); 4035 itOp.remove(); 4036 } 4037 } 4038 4039 for (FakeOperation opToSend : opsToSend) 4040 { 4041 if (shutdown.get()) 4042 { 4043 return false; 4044 } 4045 session.publishRecovery(opToSend.generateMessage()); 4046 } 4047 4048 if (lastRetrievedChange != null) 4049 { 4050 if (logger.isInfoEnabled()) 4051 { 4052 logger.info(LocalizableMessage.raw("publish loop" 4053 + " >=" + currentStartCSN + " <=" + endCSN 4054 + " nentries=" + op.getEntriesSent() 4055 + " result=" + op.getResultCode() 4056 + " lastRetrievedChange=" + lastRetrievedChange)); 4057 } 4058 currentStartCSN = lastRetrievedChange; 4059 } 4060 else 4061 { 4062 if (logger.isInfoEnabled()) 4063 { 4064 logger.info(LocalizableMessage.raw("publish loop" 4065 + " >=" + currentStartCSN + " <=" + endCSN 4066 + " nentries=" + op.getEntriesSent() 4067 + " result=" + op.getResultCode() 4068 + " no changes")); 4069 } 4070 currentStartCSN = endCSN; 4071 } 4072 } while (pendingChanges.recoveryUntil(currentStartCSN) 4073 && op.getResultCode().equals(ResultCode.SUCCESS)); 4074 4075 return op.getResultCode().equals(ResultCode.SUCCESS); 4076 } 4077 4078 private static CSN now() 4079 { 4080 // ensure now() will always come last with isNewerThan() test, 4081 // even though the timestamp, or the timestamp and seqnum would be the same 4082 return new CSN(TimeThread.getTime(), Integer.MAX_VALUE, Integer.MAX_VALUE); 4083 } 4084 4085 /** 4086 * Search for the changes that happened since fromCSN based on the historical 4087 * attribute. The only changes that will be send will be the one generated on 4088 * the serverId provided in fromCSN. 4089 * 4090 * @param baseDN 4091 * the base DN 4092 * @param fromCSN 4093 * The CSN from which we want the changes 4094 * @param lastCSN 4095 * The max CSN that the search should return 4096 * @param resultListener 4097 * The listener that will process the entries returned 4098 * @return the internal search operation 4099 * @throws Exception 4100 * when raised. 4101 */ 4102 private static InternalSearchOperation searchForChangedEntries(DN baseDN, 4103 CSN fromCSN, CSN lastCSN, InternalSearchListener resultListener) 4104 throws Exception 4105 { 4106 String maxValueForId; 4107 if (lastCSN == null) 4108 { 4109 final Integer serverId = fromCSN.getServerId(); 4110 maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId) 4111 + "ffffffff"; 4112 } 4113 else 4114 { 4115 maxValueForId = lastCSN.toString(); 4116 } 4117 4118 String filter = 4119 "(&(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + fromCSN + ")" + 4120 "(" + HISTORICAL_ATTRIBUTE_NAME + "<=dummy:" + maxValueForId + "))"; 4121 SearchRequest request = Requests.newSearchRequest(baseDN, SearchScope.WHOLE_SUBTREE, filter) 4122 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS); 4123 return getRootConnection().processSearch(request, resultListener); 4124 } 4125 4126 /** 4127 * Search for the changes that happened since fromCSN based on the historical 4128 * attribute. The only changes that will be send will be the one generated on 4129 * the serverId provided in fromCSN. 4130 * 4131 * @param baseDN 4132 * the base DN 4133 * @param fromCSN 4134 * The CSN from which we want the changes 4135 * @param resultListener 4136 * that will process the entries returned. 4137 * @return the internal search operation 4138 * @throws Exception 4139 * when raised. 4140 */ 4141 static InternalSearchOperation searchForChangedEntries(DN baseDN, 4142 CSN fromCSN, InternalSearchListener resultListener) throws Exception 4143 { 4144 return searchForChangedEntries(baseDN, fromCSN, null, resultListener); 4145 } 4146 4147 /** 4148 * This method should return the total number of objects in the 4149 * replicated domain. 4150 * This count will be used for reporting. 4151 * 4152 * @throws DirectoryException when needed. 4153 * 4154 * @return The number of objects in the replication domain. 4155 */ 4156 @Override 4157 public long countEntries() throws DirectoryException 4158 { 4159 Backend<?> backend = getBackend(); 4160 if (!backend.supports(BackendOperation.LDIF_EXPORT)) 4161 { 4162 LocalizableMessage msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID()); 4163 logger.error(msg); 4164 throw new DirectoryException(ResultCode.OTHER, msg); 4165 } 4166 4167 return backend.getNumberOfEntriesInBaseDN(getBaseDN()); 4168 } 4169 4170 @Override 4171 public boolean processUpdate(UpdateMsg updateMsg) 4172 { 4173 // Ignore message if fractional configuration is inconsistent and 4174 // we have been passed into bad data set status 4175 if (forceBadDataSet) 4176 { 4177 return false; 4178 } 4179 4180 if (updateMsg instanceof LDAPUpdateMsg) 4181 { 4182 LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg; 4183 4184 // Put the UpdateMsg in the RemotePendingChanges list. 4185 if (!remotePendingChanges.putRemoteUpdate(msg)) 4186 { 4187 /* 4188 * Already received this change so ignore it. This may happen if there 4189 * are uncommitted changes in the queue and session failover occurs 4190 * causing a recovery of all changes since the current committed server 4191 * state. See OPENDJ-1115. 4192 */ 4193 if (logger.isTraceEnabled()) 4194 { 4195 logger.trace( 4196 "LDAPReplicationDomain.processUpdate: ignoring " 4197 + "duplicate change %s", msg.getCSN()); 4198 } 4199 return true; 4200 } 4201 4202 // Put update message into the replay queue 4203 // (block until some place in the queue is available) 4204 final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this); 4205 while (!isListenerShuttingDown()) 4206 { 4207 // loop until we can offer to the queue or shutdown was initiated 4208 try 4209 { 4210 if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS)) 4211 { 4212 // successful offer to the queue, let's exit the loop 4213 break; 4214 } 4215 } 4216 catch (InterruptedException e) 4217 { 4218 // Thread interrupted: check for shutdown. 4219 Thread.currentThread().interrupt(); 4220 } 4221 } 4222 4223 return false; 4224 } 4225 4226 // unknown message type, this should not happen, just ignore it. 4227 return true; 4228 } 4229 4230 /** 4231 * Monitoring information for the LDAPReplicationDomain. 4232 * 4233 * @return Monitoring attributes specific to the LDAPReplicationDomain. 4234 */ 4235 @Override 4236 public Collection<Attribute> getAdditionalMonitoring() 4237 { 4238 List<Attribute> attributes = new ArrayList<>(); 4239 4240 // number of updates in the pending list 4241 addMonitorData(attributes, "pending-updates", pendingChanges.size()); 4242 4243 addMonitorData(attributes, "replayed-updates-ok", 4244 numReplayedPostOpCalled.get()); 4245 addMonitorData(attributes, "resolved-modify-conflicts", 4246 numResolvedModifyConflicts.get()); 4247 addMonitorData(attributes, "resolved-naming-conflicts", 4248 numResolvedNamingConflicts.get()); 4249 addMonitorData(attributes, "unresolved-naming-conflicts", 4250 numUnresolvedNamingConflicts.get()); 4251 addMonitorData(attributes, "remote-pending-changes-size", 4252 remotePendingChanges.getQueueSize()); 4253 4254 return attributes; 4255 } 4256 4257 /** 4258 * Verifies that the given string represents a valid source 4259 * from which this server can be initialized. 4260 * @param sourceString The string representing the source 4261 * @return The source as a integer value 4262 * @throws DirectoryException if the string is not valid 4263 */ 4264 public int decodeSource(String sourceString) throws DirectoryException 4265 { 4266 int source = 0; 4267 try 4268 { 4269 source = Integer.decode(sourceString); 4270 if (source >= -1 && source != getServerId()) 4271 { 4272 // TODO Verifies serverID is in the domain 4273 // We should check here that this is a server implied 4274 // in the current domain. 4275 return source; 4276 } 4277 } 4278 catch (Exception e) 4279 { 4280 LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get( 4281 getBaseDN(), getServerId(), sourceString, stackTraceToSingleLineString(e)); 4282 throw new DirectoryException(ResultCode.OTHER, message, e); 4283 } 4284 4285 LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(getBaseDN(), getServerId(), source, ""); 4286 throw new DirectoryException(ResultCode.OTHER, message); 4287 } 4288 4289 /** 4290 * Called by synchronize post op plugin in order to add the entry historical 4291 * attributes to the UpdateMsg. 4292 * @param msg an replication update message 4293 * @param op the operation in progress 4294 */ 4295 private void addEntryAttributesForCL(UpdateMsg msg, 4296 PostOperationOperation op) 4297 { 4298 if (op instanceof PostOperationDeleteOperation) 4299 { 4300 PostOperationDeleteOperation delOp = (PostOperationDeleteOperation) op; 4301 final Set<String> names = getEclIncludesForDeletes(); 4302 Entry entry = delOp.getEntryToDelete(); 4303 final DeleteMsg deleteMsg = (DeleteMsg) msg; 4304 deleteMsg.setEclIncludes(getIncludedAttributes(entry, names)); 4305 4306 // For delete only, add the Authorized DN since it's required in the 4307 // ECL entry but is not part of rest of the message. 4308 DN deleterDN = delOp.getAuthorizationDN(); 4309 if (deleterDN != null) 4310 { 4311 deleteMsg.setInitiatorsName(deleterDN.toString()); 4312 } 4313 } 4314 else if (op instanceof PostOperationModifyOperation) 4315 { 4316 PostOperationModifyOperation modOp = (PostOperationModifyOperation) op; 4317 Set<String> names = getEclIncludes(); 4318 Entry entry = modOp.getCurrentEntry(); 4319 ((ModifyMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4320 } 4321 else if (op instanceof PostOperationModifyDNOperation) 4322 { 4323 PostOperationModifyDNOperation modDNOp = 4324 (PostOperationModifyDNOperation) op; 4325 Set<String> names = getEclIncludes(); 4326 Entry entry = modDNOp.getOriginalEntry(); 4327 ((ModifyDNMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4328 } 4329 else if (op instanceof PostOperationAddOperation) 4330 { 4331 PostOperationAddOperation addOp = (PostOperationAddOperation) op; 4332 Set<String> names = getEclIncludes(); 4333 Entry entry = addOp.getEntryToAdd(); 4334 ((AddMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4335 } 4336 } 4337 4338 private Collection<Attribute> getIncludedAttributes(Entry entry, 4339 Set<String> names) 4340 { 4341 if (names.isEmpty()) 4342 { 4343 // Fast-path. 4344 return Collections.emptySet(); 4345 } 4346 else if (names.size() == 1 && names.contains("*")) 4347 { 4348 // Potential fast-path for delete operations. 4349 List<Attribute> attributes = new LinkedList<>(); 4350 for (List<Attribute> attributeList : entry.getUserAttributes().values()) 4351 { 4352 attributes.addAll(attributeList); 4353 } 4354 Attribute objectClassAttribute = entry.getObjectClassAttribute(); 4355 if (objectClassAttribute != null) 4356 { 4357 attributes.add(objectClassAttribute); 4358 } 4359 return attributes; 4360 } 4361 else 4362 { 4363 // Expand @objectclass references in attribute list if needed. 4364 // We do this now in order to take into account dynamic schema changes. 4365 final Set<String> expandedNames = getExpandedNames(names); 4366 final Entry filteredEntry = 4367 entry.filterEntry(expandedNames, false, false, false); 4368 return filteredEntry.getAttributes(); 4369 } 4370 } 4371 4372 private Set<String> getExpandedNames(Set<String> names) 4373 { 4374 // Only rebuild the attribute set if necessary. 4375 if (!needsExpanding(names)) 4376 { 4377 return names; 4378 } 4379 4380 final Set<String> expandedNames = new HashSet<>(names.size()); 4381 for (String name : names) 4382 { 4383 if (name.startsWith("@")) 4384 { 4385 String ocName = name.substring(1); 4386 ObjectClass objectClass = 4387 DirectoryServer.getObjectClass(toLowerCase(ocName)); 4388 if (objectClass != null) 4389 { 4390 for (AttributeType at : objectClass.getRequiredAttributeChain()) 4391 { 4392 expandedNames.add(at.getNameOrOID()); 4393 } 4394 for (AttributeType at : objectClass.getOptionalAttributeChain()) 4395 { 4396 expandedNames.add(at.getNameOrOID()); 4397 } 4398 } 4399 } 4400 else 4401 { 4402 expandedNames.add(name); 4403 } 4404 } 4405 return expandedNames; 4406 } 4407 4408 private boolean needsExpanding(Set<String> names) 4409 { 4410 for (String name : names) 4411 { 4412 if (name.startsWith("@")) 4413 { 4414 return true; 4415 } 4416 } 4417 return false; 4418 } 4419 4420 /** 4421 * Gets the fractional configuration of this domain. 4422 * @return The fractional configuration of this domain. 4423 */ 4424 FractionalConfig getFractionalConfig() 4425 { 4426 return fractionalConfig; 4427 } 4428 4429 /** 4430 * This bean is a utility class used for holding the parsing 4431 * result of a fractional configuration. It also contains some facility 4432 * methods like fractional configuration comparison... 4433 */ 4434 static class FractionalConfig 4435 { 4436 /** 4437 * Tells if fractional replication is enabled or not (some fractional 4438 * constraints have been put in place). If this is true then 4439 * fractionalExclusive explains the configuration mode and either 4440 * fractionalSpecificClassesAttributes or fractionalAllClassesAttributes or 4441 * both should be filled with something. 4442 */ 4443 private boolean fractional; 4444 4445 /** 4446 * - If true, tells that the configured fractional replication is exclusive: 4447 * Every attributes contained in fractionalSpecificClassesAttributes and 4448 * fractionalAllClassesAttributes should be ignored when replaying operation 4449 * in local backend. 4450 * - If false, tells that the configured fractional replication is 4451 * inclusive: 4452 * Only attributes contained in fractionalSpecificClassesAttributes and 4453 * fractionalAllClassesAttributes should be taken into account in local 4454 * backend. 4455 */ 4456 private boolean fractionalExclusive = true; 4457 4458 /** 4459 * Used in fractional replication: holds attributes of a specific object class. 4460 * - key = object class (name or OID of the class) 4461 * - value = the attributes of that class that should be taken into account 4462 * (inclusive or exclusive fractional replication) (name or OID of the 4463 * attribute) 4464 * When an operation coming from the network is to be locally replayed, if 4465 * the concerned entry has an objectClass attribute equals to 'key': 4466 * - inclusive mode: only the attributes in 'value' will be added/deleted/modified 4467 * - exclusive mode: the attributes in 'value' will not be added/deleted/modified 4468 */ 4469 private Map<String, Set<String>> fractionalSpecificClassesAttributes = new HashMap<>(); 4470 4471 /** 4472 * Used in fractional replication: holds attributes of any object class. 4473 * When an operation coming from the network is to be locally replayed: 4474 * - inclusive mode: only attributes of the matching entry not present in 4475 * fractionalAllClassesAttributes will be added/deleted/modified 4476 * - exclusive mode: attributes of the matching entry present in 4477 * fractionalAllClassesAttributes will not be added/deleted/modified 4478 * The attributes may be in human readable form of OID form. 4479 */ 4480 private Set<String> fractionalAllClassesAttributes = new HashSet<>(); 4481 4482 /** Base DN the fractional configuration is for. */ 4483 private final DN baseDN; 4484 4485 /** 4486 * Constructs a new fractional configuration object. 4487 * @param baseDN The base DN the object is for. 4488 */ 4489 private FractionalConfig(DN baseDN) 4490 { 4491 this.baseDN = baseDN; 4492 } 4493 4494 /** 4495 * Getter for fractional. 4496 * @return True if the configuration has fractional enabled 4497 */ 4498 boolean isFractional() 4499 { 4500 return fractional; 4501 } 4502 4503 /** 4504 * Set the fractional parameter. 4505 * @param fractional The fractional parameter 4506 */ 4507 private void setFractional(boolean fractional) 4508 { 4509 this.fractional = fractional; 4510 } 4511 4512 /** 4513 * Getter for fractionalExclusive. 4514 * @return True if the configuration has fractional exclusive enabled 4515 */ 4516 boolean isFractionalExclusive() 4517 { 4518 return fractionalExclusive; 4519 } 4520 4521 /** 4522 * Set the fractionalExclusive parameter. 4523 * @param fractionalExclusive The fractionalExclusive parameter 4524 */ 4525 private void setFractionalExclusive(boolean fractionalExclusive) 4526 { 4527 this.fractionalExclusive = fractionalExclusive; 4528 } 4529 4530 /** 4531 * Getter for fractionalSpecificClassesAttributes attribute. 4532 * @return The fractionalSpecificClassesAttributes attribute. 4533 */ 4534 Map<String, Set<String>> getFractionalSpecificClassesAttributes() 4535 { 4536 return fractionalSpecificClassesAttributes; 4537 } 4538 4539 /** 4540 * Set the fractionalSpecificClassesAttributes parameter. 4541 * @param fractionalSpecificClassesAttributes The 4542 * fractionalSpecificClassesAttributes parameter to set. 4543 */ 4544 private void setFractionalSpecificClassesAttributes( 4545 Map<String, Set<String>> fractionalSpecificClassesAttributes) 4546 { 4547 this.fractionalSpecificClassesAttributes = 4548 fractionalSpecificClassesAttributes; 4549 } 4550 4551 /** 4552 * Getter for fractionalSpecificClassesAttributes attribute. 4553 * @return The fractionalSpecificClassesAttributes attribute. 4554 */ 4555 Set<String> getFractionalAllClassesAttributes() 4556 { 4557 return fractionalAllClassesAttributes; 4558 } 4559 4560 /** 4561 * Set the fractionalAllClassesAttributes parameter. 4562 * @param fractionalAllClassesAttributes The 4563 * fractionalSpecificClassesAttributes parameter to set. 4564 */ 4565 private void setFractionalAllClassesAttributes( 4566 Set<String> fractionalAllClassesAttributes) 4567 { 4568 this.fractionalAllClassesAttributes = fractionalAllClassesAttributes; 4569 } 4570 4571 /** 4572 * Getter for the base baseDN. 4573 * @return The baseDN attribute. 4574 */ 4575 DN getBaseDn() 4576 { 4577 return baseDN; 4578 } 4579 4580 /** 4581 * Extract the fractional configuration from the passed domain configuration 4582 * entry. 4583 * @param configuration The configuration object 4584 * @return The fractional replication configuration. 4585 * @throws ConfigException If an error occurred. 4586 */ 4587 static FractionalConfig toFractionalConfig( 4588 ReplicationDomainCfg configuration) throws ConfigException 4589 { 4590 // Prepare fractional configuration variables to parse 4591 Iterator<String> exclIt = configuration.getFractionalExclude().iterator(); 4592 Iterator<String> inclIt = configuration.getFractionalInclude().iterator(); 4593 4594 // Get potentially new fractional configuration 4595 Map<String, Set<String>> newFractionalSpecificClassesAttributes = new HashMap<>(); 4596 Set<String> newFractionalAllClassesAttributes = new HashSet<>(); 4597 4598 int newFractionalMode = parseFractionalConfig(exclIt, inclIt, 4599 newFractionalSpecificClassesAttributes, 4600 newFractionalAllClassesAttributes); 4601 4602 // Create matching parsed config object 4603 FractionalConfig result = new FractionalConfig(configuration.getBaseDN()); 4604 switch (newFractionalMode) 4605 { 4606 case NOT_FRACTIONAL: 4607 result.setFractional(false); 4608 result.setFractionalExclusive(true); 4609 break; 4610 case EXCLUSIVE_FRACTIONAL: 4611 case INCLUSIVE_FRACTIONAL: 4612 result.setFractional(true); 4613 result.setFractionalExclusive( 4614 newFractionalMode == EXCLUSIVE_FRACTIONAL); 4615 break; 4616 } 4617 result.setFractionalSpecificClassesAttributes( 4618 newFractionalSpecificClassesAttributes); 4619 result.setFractionalAllClassesAttributes( 4620 newFractionalAllClassesAttributes); 4621 return result; 4622 } 4623 4624 /** 4625 * Parses a fractional replication configuration, filling the empty passed 4626 * variables and returning the used fractional mode. The 2 passed variables 4627 * to fill should be initialized (not null) and empty. 4628 * @param exclIt The list of fractional exclude configuration values (may be 4629 * null) 4630 * @param inclIt The list of fractional include configuration values (may be 4631 * null) 4632 * @param fractionalSpecificClassesAttributes An empty map to be filled with 4633 * what is read from the fractional configuration properties. 4634 * @param fractionalAllClassesAttributes An empty list to be filled with 4635 * what is read from the fractional configuration properties. 4636 * @return the fractional mode deduced from the passed configuration: 4637 * not fractional, exclusive fractional or inclusive fractional 4638 * modes 4639 */ 4640 private static int parseFractionalConfig ( 4641 Iterator<String> exclIt, Iterator<String> inclIt, 4642 Map<String, Set<String>> fractionalSpecificClassesAttributes, 4643 Set<String> fractionalAllClassesAttributes) throws ConfigException 4644 { 4645 // Determine if fractional-exclude or fractional-include property is used: 4646 // only one of them is allowed 4647 int fractionalMode; 4648 Iterator<String> iterator; 4649 if (exclIt != null && exclIt.hasNext()) 4650 { 4651 if (inclIt != null && inclIt.hasNext()) 4652 { 4653 throw new ConfigException( 4654 NOTE_ERR_FRACTIONAL_CONFIG_BOTH_MODES.get()); 4655 } 4656 4657 fractionalMode = EXCLUSIVE_FRACTIONAL; 4658 iterator = exclIt; 4659 } 4660 else 4661 { 4662 if (inclIt != null && inclIt.hasNext()) 4663 { 4664 fractionalMode = INCLUSIVE_FRACTIONAL; 4665 iterator = inclIt; 4666 } 4667 else 4668 { 4669 return NOT_FRACTIONAL; 4670 } 4671 } 4672 4673 while (iterator.hasNext()) 4674 { 4675 // Parse a value with the form class:attr1,attr2... 4676 // or *:attr1,attr2... 4677 String fractCfgStr = iterator.next(); 4678 StringTokenizer st = new StringTokenizer(fractCfgStr, ":"); 4679 int nTokens = st.countTokens(); 4680 if (nTokens < 2) 4681 { 4682 throw new ConfigException(NOTE_ERR_FRACTIONAL_CONFIG_WRONG_FORMAT. 4683 get(fractCfgStr)); 4684 } 4685 // Get the class name 4686 String classNameLower = st.nextToken().toLowerCase(); 4687 boolean allClasses = "*".equals(classNameLower); 4688 // Get the attributes 4689 String attributes = st.nextToken(); 4690 st = new StringTokenizer(attributes, ","); 4691 while (st.hasMoreTokens()) 4692 { 4693 String attrNameLower = st.nextToken().toLowerCase(); 4694 // Store attribute in the appropriate variable 4695 if (allClasses) 4696 { 4697 fractionalAllClassesAttributes.add(attrNameLower); 4698 } 4699 else 4700 { 4701 Set<String> attrList = fractionalSpecificClassesAttributes.get(classNameLower); 4702 if (attrList == null) 4703 { 4704 attrList = new LinkedHashSet<>(); 4705 fractionalSpecificClassesAttributes.put(classNameLower, attrList); 4706 } 4707 attrList.add(attrNameLower); 4708 } 4709 } 4710 } 4711 return fractionalMode; 4712 } 4713 4714 /** Return type of the parseFractionalConfig method. */ 4715 private static final int NOT_FRACTIONAL = 0; 4716 private static final int EXCLUSIVE_FRACTIONAL = 1; 4717 private static final int INCLUSIVE_FRACTIONAL = 2; 4718 4719 /** 4720 * Get an integer representation of the domain fractional configuration. 4721 * @return An integer representation of the domain fractional configuration. 4722 */ 4723 private int fractionalConfigToInt() 4724 { 4725 if (!fractional) 4726 { 4727 return NOT_FRACTIONAL; 4728 } 4729 else if (fractionalExclusive) 4730 { 4731 return EXCLUSIVE_FRACTIONAL; 4732 } 4733 return INCLUSIVE_FRACTIONAL; 4734 } 4735 4736 /** 4737 * Compare 2 fractional replication configurations and returns true if they 4738 * are equivalent. 4739 * @param cfg1 First fractional configuration 4740 * @param cfg2 Second fractional configuration 4741 * @return True if both configurations are equivalent. 4742 * @throws ConfigException If some classes or attributes could not be 4743 * retrieved from the schema. 4744 */ 4745 private static boolean isFractionalConfigEquivalent(FractionalConfig cfg1, 4746 FractionalConfig cfg2) throws ConfigException 4747 { 4748 // Compare base DNs just to be consistent 4749 if (!cfg1.getBaseDn().equals(cfg2.getBaseDn())) 4750 { 4751 return false; 4752 } 4753 4754 // Compare modes 4755 if (cfg1.isFractional() != cfg2.isFractional() 4756 || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive()) 4757 { 4758 return false; 4759 } 4760 4761 // Compare all classes attributes 4762 Set<String> allClassesAttrs1 = cfg1.getFractionalAllClassesAttributes(); 4763 Set<String> allClassesAttrs2 = cfg2.getFractionalAllClassesAttributes(); 4764 if (!areAttributesEquivalent(allClassesAttrs1, allClassesAttrs2)) 4765 { 4766 return false; 4767 } 4768 4769 // Compare specific classes attributes 4770 Map<String, Set<String>> specificClassesAttrs1 = 4771 cfg1.getFractionalSpecificClassesAttributes(); 4772 Map<String, Set<String>> specificClassesAttrs2 = 4773 cfg2.getFractionalSpecificClassesAttributes(); 4774 if (specificClassesAttrs1.size() != specificClassesAttrs2.size()) 4775 { 4776 return false; 4777 } 4778 4779 /* 4780 * Check consistency of specific classes attributes 4781 * 4782 * For each class in specificClassesAttributes1, check that the attribute 4783 * list is equivalent to specificClassesAttributes2 attribute list 4784 */ 4785 Schema schema = DirectoryServer.getSchema(); 4786 for (String className1 : specificClassesAttrs1.keySet()) 4787 { 4788 // Get class from specificClassesAttributes1 4789 ObjectClass objectClass1 = schema.getObjectClass(className1); 4790 if (objectClass1 == null) 4791 { 4792 throw new ConfigException( 4793 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className1)); 4794 } 4795 4796 // Look for matching one in specificClassesAttributes2 4797 boolean foundClass = false; 4798 for (String className2 : specificClassesAttrs2.keySet()) 4799 { 4800 ObjectClass objectClass2 = schema.getObjectClass(className2); 4801 if (objectClass2 == null) 4802 { 4803 throw new ConfigException( 4804 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className2)); 4805 } 4806 if (objectClass1.equals(objectClass2)) 4807 { 4808 foundClass = true; 4809 // Now compare the 2 attribute lists 4810 Set<String> attributes1 = specificClassesAttrs1.get(className1); 4811 Set<String> attributes2 = specificClassesAttrs2.get(className2); 4812 if (!areAttributesEquivalent(attributes1, attributes2)) 4813 { 4814 return false; 4815 } 4816 break; 4817 } 4818 } 4819 // Found matching class ? 4820 if (!foundClass) 4821 { 4822 return false; 4823 } 4824 } 4825 4826 return true; 4827 } 4828 } 4829 4830 /** 4831 * Specifies whether this domain is enabled/disabled regarding the ECL. 4832 * @return enabled/disabled for the ECL. 4833 */ 4834 boolean isECLEnabled() 4835 { 4836 return this.eclDomain.isEnabled(); 4837 } 4838 4839 /** 4840 * Return the minimum time (in ms) that the domain keeps the historical 4841 * information necessary to solve conflicts. 4842 * 4843 * @return the purge delay. 4844 */ 4845 long getHistoricalPurgeDelay() 4846 { 4847 return config.getConflictsHistoricalPurgeDelay() * 60 * 1000; 4848 } 4849 4850 /** 4851 * Check if the operation that just happened has cleared a conflict : Clearing 4852 * a conflict happens if the operation has freed a DN for which another entry 4853 * was in conflict. 4854 * <p> 4855 * Steps: 4856 * <ul> 4857 * <li>get the DN freed by a DELETE or MODRDN op</li> 4858 * <li>search for entries put in the conflict space (dn=entryUUID'+'....) 4859 * because the expected DN was not available (ds-sync-conflict=expected DN) 4860 * </li> 4861 * <li>retain the entry with the oldest conflict</li> 4862 * <li>rename this entry with the freedDN as it was expected originally</li> 4863 * </ul> 4864 * 4865 * @param task 4866 * the task raising this purge. 4867 * @param endDate 4868 * the date to stop this task whether the job is done or not. 4869 * @throws DirectoryException 4870 * when an exception happens. 4871 */ 4872 public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task, 4873 long endDate) throws DirectoryException 4874 { 4875 logger.trace("[PURGE] purgeConflictsHistorical " 4876 + "on domain: " + getBaseDN() 4877 + "endDate:" + new Date(endDate) 4878 + "lastCSNPurgedFromHist: " 4879 + lastCSNPurgedFromHist.toStringUI()); 4880 4881 String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")"; 4882 SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter) 4883 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS); 4884 InternalSearchOperation searchOp = conn.processSearch(request); 4885 4886 int count = 0; 4887 if (task != null) 4888 { 4889 task.setProgressStats(lastCSNPurgedFromHist, count); 4890 } 4891 4892 for (SearchResultEntry entry : searchOp.getSearchEntries()) 4893 { 4894 long maxTimeToRun = endDate - TimeThread.getTime(); 4895 if (maxTimeToRun < 0) 4896 { 4897 throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED, 4898 LocalizableMessage.raw(" end date reached")); 4899 } 4900 4901 EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry); 4902 lastCSNPurgedFromHist = entryHist.getOldestCSN(); 4903 entryHist.setPurgeDelay(getHistoricalPurgeDelay()); 4904 Attribute attr = entryHist.encodeAndPurge(); 4905 count += entryHist.getLastPurgedValuesCount(); 4906 List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr)); 4907 4908 ModifyOperation newOp = new ModifyOperationBasis( 4909 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 4910 entry.getName(), mods); 4911 runAsSynchronizedOperation(newOp); 4912 4913 if (newOp.getResultCode() != ResultCode.SUCCESS) 4914 { 4915 // Log information for the repair tool. 4916 logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode()); 4917 } 4918 else if (task != null) 4919 { 4920 task.setProgressStats(lastCSNPurgedFromHist, count); 4921 } 4922 } 4923 } 4924}