001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.plugin;
028
029import static org.forgerock.opendj.ldap.ResultCode.*;
030import static org.opends.messages.ReplicationMessages.*;
031import static org.opends.messages.ToolMessages.*;
032import static org.opends.server.protocols.internal.InternalClientConnection.*;
033import static org.opends.server.protocols.internal.Requests.*;
034import static org.opends.server.replication.plugin.EntryHistorical.*;
035import static org.opends.server.replication.protocol.OperationContext.*;
036import static org.opends.server.replication.service.ReplicationMonitor.*;
037import static org.opends.server.util.CollectionUtils.*;
038import static org.opends.server.util.ServerConstants.*;
039import static org.opends.server.util.StaticUtils.*;
040
041import java.io.File;
042import java.io.InputStream;
043import java.io.OutputStream;
044import java.io.StringReader;
045import java.util.*;
046import java.util.concurrent.BlockingQueue;
047import java.util.concurrent.TimeUnit;
048import java.util.concurrent.TimeoutException;
049import java.util.concurrent.atomic.AtomicBoolean;
050import java.util.concurrent.atomic.AtomicInteger;
051import java.util.concurrent.atomic.AtomicReference;
052import java.util.zip.DataFormatException;
053
054import org.forgerock.i18n.LocalizableMessage;
055import org.forgerock.i18n.slf4j.LocalizedLogger;
056import org.forgerock.opendj.config.server.ConfigChangeResult;
057import org.forgerock.opendj.config.server.ConfigException;
058import org.forgerock.opendj.ldap.ByteString;
059import org.forgerock.opendj.ldap.DecodeException;
060import org.forgerock.opendj.ldap.ModificationType;
061import org.forgerock.opendj.ldap.ResultCode;
062import org.forgerock.opendj.ldap.SearchScope;
063import org.opends.server.admin.server.ConfigurationChangeListener;
064import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
065import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
066import org.opends.server.admin.std.server.ReplicationDomainCfg;
067import org.opends.server.api.AlertGenerator;
068import org.opends.server.api.Backend;
069import org.opends.server.api.Backend.BackendOperation;
070import org.opends.server.api.DirectoryThread;
071import org.opends.server.api.SynchronizationProvider;
072import org.opends.server.backends.task.Task;
073import org.opends.server.core.*;
074import org.opends.server.protocols.internal.InternalClientConnection;
075import org.opends.server.protocols.internal.InternalSearchListener;
076import org.opends.server.protocols.internal.InternalSearchOperation;
077import org.opends.server.protocols.internal.Requests;
078import org.opends.server.protocols.internal.SearchRequest;
079import org.opends.server.protocols.ldap.LDAPAttribute;
080import org.opends.server.protocols.ldap.LDAPControl;
081import org.opends.server.protocols.ldap.LDAPFilter;
082import org.opends.server.protocols.ldap.LDAPModification;
083import org.opends.server.replication.common.CSN;
084import org.opends.server.replication.common.ServerState;
085import org.opends.server.replication.common.ServerStatus;
086import org.opends.server.replication.common.StatusMachineEvent;
087import org.opends.server.replication.protocol.*;
088import org.opends.server.replication.service.DSRSShutdownSync;
089import org.opends.server.replication.service.ReplicationBroker;
090import org.opends.server.replication.service.ReplicationDomain;
091import org.opends.server.tasks.PurgeConflictsHistoricalTask;
092import org.opends.server.tasks.TaskUtils;
093import org.opends.server.types.*;
094import org.opends.server.types.operation.*;
095import org.opends.server.util.LDIFReader;
096import org.opends.server.util.TimeThread;
097import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
098
099/**
100 *  This class implements the bulk part of the Directory Server side
101 *  of the replication code.
102 *  It contains the root method for publishing a change,
103 *  processing a change received from the replicationServer service,
104 *  handle conflict resolution,
105 *  handle protocol messages from the replicationServer.
106 * <p>
107 * FIXME Move this class to org.opends.server.replication.service
108 * or the equivalent package once this code is moved to a maven module.
109 */
110public final class LDAPReplicationDomain extends ReplicationDomain
111       implements ConfigurationChangeListener<ReplicationDomainCfg>,
112                  AlertGenerator
113{
114  /**
115   * Set of attributes that will return all the user attributes and the
116   * replication related operational attributes when used in a search operation.
117   */
118  private static final Set<String> USER_AND_REPL_OPERATIONAL_ATTRS =
119      newHashSet(HISTORICAL_ATTRIBUTE_NAME, ENTRYUUID_ATTRIBUTE_NAME, "*");
120
121  /**
122   * This class is used in the session establishment phase
123   * when no Replication Server with all the local changes has been found
124   * and we therefore need to recover them.
125   * A search is then performed on the database using this
126   * internalSearchListener.
127   */
128  private class ScanSearchListener implements InternalSearchListener
129  {
130    private final CSN startCSN;
131    private final CSN endCSN;
132
133    public ScanSearchListener(CSN startCSN, CSN endCSN)
134    {
135      this.startCSN = startCSN;
136      this.endCSN = endCSN;
137    }
138
139    @Override
140    public void handleInternalSearchEntry(
141        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
142        throws DirectoryException
143    {
144      // Build the list of Operations that happened on this entry after startCSN
145      // and before endCSN and add them to the replayOperations list
146      Iterable<FakeOperation> updates =
147        EntryHistorical.generateFakeOperations(searchEntry);
148
149      for (FakeOperation op : updates)
150      {
151        CSN csn = op.getCSN();
152        if (csn.isNewerThan(startCSN) && csn.isOlderThan(endCSN))
153        {
154          synchronized (replayOperations)
155          {
156            replayOperations.put(csn, op);
157          }
158        }
159      }
160    }
161
162    @Override
163    public void handleInternalSearchReference(
164        InternalSearchOperation searchOperation,
165        SearchResultReference searchReference) throws DirectoryException
166    {
167       // Nothing to do.
168    }
169  }
170
171  /** The fully-qualified name of this class. */
172  private static final String CLASS_NAME = LDAPReplicationDomain.class.getName();
173
174  /**
175   * The attribute used to mark conflicting entries.
176   * The value of this attribute should be the dn that this entry was
177   * supposed to have when it was marked as conflicting.
178   */
179  public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
180  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
181
182  private final DSRSShutdownSync dsrsShutdownSync;
183  /**
184   * The update to replay message queue where the listener thread is going to
185   * push incoming update messages.
186   */
187  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
188  /** The number of naming conflicts successfully resolved. */
189  private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
190  /** The number of modify conflicts successfully resolved. */
191  private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
192  /** The number of unresolved naming conflicts. */
193  private final AtomicInteger numUnresolvedNamingConflicts =
194      new AtomicInteger();
195  /** The number of updates replayed successfully by the replication. */
196  private final AtomicInteger numReplayedPostOpCalled = new AtomicInteger();
197
198  private final PersistentServerState state;
199  private volatile boolean generationIdSavedStatus;
200
201  /**
202   * This object is used to store the list of update currently being
203   * done on the local database.
204   * Is is useful to make sure that the local operations are sent in a
205   * correct order to the replication server and that the ServerState
206   * is not updated too early.
207   */
208  private final PendingChanges pendingChanges;
209  private final AtomicReference<RSUpdater> rsUpdater = new AtomicReference<>(null);
210
211  /**
212   * It contain the updates that were done on other servers, transmitted by the
213   * replication server and that are currently replayed.
214   * <p>
215   * It is useful to make sure that dependencies between operations are
216   * correctly fulfilled and to make sure that the ServerState is not updated
217   * too early.
218   */
219  private final RemotePendingChanges remotePendingChanges;
220  private boolean solveConflictFlag = true;
221
222  private final InternalClientConnection conn = getRootConnection();
223  private final AtomicBoolean shutdown = new AtomicBoolean();
224  private volatile boolean disabled;
225  private volatile boolean stateSavingDisabled;
226
227  /**
228   * This list is used to temporary store operations that needs to be replayed
229   * at session establishment time.
230   */
231  private final SortedMap<CSN, FakeOperation> replayOperations = new TreeMap<>();
232
233  private ExternalChangelogDomain eclDomain;
234
235  /** A boolean indicating if the thread used to save the persistentServerState is terminated. */
236  private volatile boolean done = true;
237
238  private final ServerStateFlush flushThread;
239
240  /** The attribute name used to store the generation id in the backend. */
241  private static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id";
242  /** The attribute name used to store the fractional include configuration in the backend. */
243  static final String REPLICATION_FRACTIONAL_INCLUDE = "ds-sync-fractional-include";
244  /** The attribute name used to store the fractional exclude configuration in the backend. */
245  static final String REPLICATION_FRACTIONAL_EXCLUDE = "ds-sync-fractional-exclude";
246
247  /**
248   * Fractional replication variables.
249   */
250
251  /** Holds the fractional configuration for this domain, if any. */
252  private final FractionalConfig fractionalConfig;
253
254  /** The list of attributes that cannot be used in fractional replication configuration. */
255  private static final String[] FRACTIONAL_PROHIBITED_ATTRIBUTES = new String[]
256  {
257    "objectClass",
258    "2.5.4.0" // objectClass OID
259  };
260
261  /**
262   * When true, this flag is used to force the domain status to be put in bad
263   * data set just after the connection to the replication server.
264   * This must be used when fractional replication is enabled with a
265   * configuration different from the previous one (or at the very first
266   * fractional usage time) : after connection, a ChangeStatusMsg is sent
267   * requesting the bad data set status. Then none of the update messages
268   * received from the replication server are taken into account until the
269   * backend is synchronized with brand new data set compliant with the new
270   * fractional configuration (i.e with compliant fractional configuration in
271   * domain root entry).
272   */
273  private boolean forceBadDataSet;
274
275  /**
276   * The message id to be used when an import is stopped with error by
277   * the fractional replication ldif import plugin.
278   */
279  private int importErrorMessageId = -1;
280  /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE. */
281  static final int IMPORT_ERROR_MESSAGE_BAD_REMOTE = 1;
282  /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL. */
283  static final int IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL = 2;
284
285  /*
286   * Definitions for the return codes of the
287   * fractionalFilterOperation(PreOperationModifyOperation
288   *  modifyOperation, boolean performFiltering) method
289   */
290  /**
291   * The operation contains attributes subject to fractional filtering according
292   * to the fractional configuration.
293   */
294  private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
295  /**
296   * The operation contains no attributes subject to fractional filtering
297   * according to the fractional configuration.
298   */
299  private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
300  /** The operation should become a no-op. */
301  private static final int FRACTIONAL_BECOME_NO_OP = 3;
302
303  /**
304   * The last CSN purged in this domain. Allows to have a continuous purging
305   * process from one purge processing (task run) to the next one. Values 0 when
306   * the server starts.
307   */
308  private CSN lastCSNPurgedFromHist = new CSN(0,0,0);
309
310  /**
311   * The thread that periodically saves the ServerState of this
312   * LDAPReplicationDomain in the database.
313   */
314  private class ServerStateFlush extends DirectoryThread
315  {
316    protected ServerStateFlush()
317    {
318      super("Replica DS(" + getServerId() + ") state checkpointer for domain \"" + getBaseDN() + "\"");
319    }
320
321    @Override
322    public void run()
323    {
324      done = false;
325
326      while (!isShutdownInitiated())
327      {
328        try
329        {
330          synchronized (this)
331          {
332            wait(1000);
333            if (!disabled && !stateSavingDisabled)
334            {
335              // save the ServerState
336              state.save();
337            }
338          }
339        }
340        catch (InterruptedException e)
341        {
342          // Thread interrupted: check for shutdown.
343          Thread.currentThread().interrupt();
344        }
345      }
346      state.save();
347
348      done = true;
349    }
350  }
351
352  /**
353   * The thread that is responsible to update the RS to which this domain is
354   * connected in case it is late and there is no RS which is up to date.
355   */
356  private class RSUpdater extends DirectoryThread
357  {
358    private final CSN startCSN;
359
360    protected RSUpdater(CSN replServerMaxCSN)
361    {
362      super("Replica DS(" + getServerId() + ") missing change publisher for domain \"" + getBaseDN() + "\"");
363      this.startCSN = replServerMaxCSN;
364    }
365
366    @Override
367    public void run()
368    {
369      // Replication server is missing some of our changes:
370      // let's send them to him.
371      logger.trace(DEBUG_GOING_TO_SEARCH_FOR_CHANGES);
372
373      /*
374       * Get all the changes that have not been seen by this
375       * replication server and publish them.
376       */
377      try
378      {
379        if (buildAndPublishMissingChanges(startCSN, broker))
380        {
381          logger.trace(DEBUG_CHANGES_SENT);
382          synchronized(replayOperations)
383          {
384            replayOperations.clear();
385          }
386        }
387        else
388        {
389          /*
390           * An error happened trying to search for the updates
391           * This server will start accepting again new updates but
392           * some inconsistencies will stay between servers.
393           * Log an error for the repair tool
394           * that will need to re-synchronize the servers.
395           */
396          logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN());
397        }
398      }
399      catch (Exception e)
400      {
401        /*
402         * An error happened trying to search for the updates
403         * This server will start accepting again new updates but
404         * some inconsistencies will stay between servers.
405         * Log an error for the repair tool
406         * that will need to re-synchronize the servers.
407         */
408        logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN());
409      }
410      finally
411      {
412        broker.setRecoveryRequired(false);
413        // RSUpdater thread has finished its work, let's remove it from memory
414        // so another RSUpdater thread can be started if needed.
415        rsUpdater.compareAndSet(this, null);
416      }
417    }
418  }
419
420  /**
421   * Creates a new ReplicationDomain using configuration from configEntry.
422   *
423   * @param configuration    The configuration of this ReplicationDomain.
424   * @param updateToReplayQueue The queue for update messages to replay.
425   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
426   * @throws ConfigException In case of invalid configuration.
427   */
428  LDAPReplicationDomain(ReplicationDomainCfg configuration,
429      BlockingQueue<UpdateToReplay> updateToReplayQueue,
430      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
431  {
432    super(configuration, -1);
433
434    this.updateToReplayQueue = updateToReplayQueue;
435    this.dsrsShutdownSync = dsrsShutdownSync;
436
437    // Get assured configuration
438    readAssuredConfig(configuration, false);
439
440    // Get fractional configuration
441    fractionalConfig = new FractionalConfig(getBaseDN());
442    readFractionalConfig(configuration, false);
443    storeECLConfiguration(configuration);
444    solveConflictFlag = isSolveConflict(configuration);
445
446    Backend<?> backend = getBackend();
447    if (backend == null)
448    {
449      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(getBaseDN()));
450    }
451
452    try
453    {
454      generationId = loadGenerationId();
455    }
456    catch (DirectoryException e)
457    {
458      logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
459    }
460
461    /*
462     * Create a new Persistent Server State that will be used to store
463     * the last CSN seen from all LDAP servers in the topology.
464     */
465    state = new PersistentServerState(getBaseDN(), getServerId(),
466        getServerState());
467    flushThread = new ServerStateFlush();
468
469    /*
470     * CSNGenerator is used to create new unique CSNs for each operation done on
471     * this replication domain.
472     *
473     * The generator time is adjusted to the time of the last CSN received from
474     * remote other servers.
475     */
476    pendingChanges = new PendingChanges(getGenerator(), this);
477    remotePendingChanges = new RemotePendingChanges(getServerState());
478
479    // listen for changes on the configuration
480    configuration.addChangeListener(this);
481
482    // register as an AlertGenerator
483    DirectoryServer.registerAlertGenerator(this);
484
485    startPublishService();
486  }
487
488  /**
489   * Modify conflicts are solved for all suffixes but the schema suffix because
490   * we don't want to store extra information in the schema ldif files. This has
491   * no negative impact because the changes on schema should not produce
492   * conflicts.
493   */
494  private boolean isSolveConflict(ReplicationDomainCfg cfg)
495  {
496    return !getBaseDN().equals(DirectoryServer.getSchemaDN())
497        && cfg.isSolveConflicts();
498  }
499
500  /**
501   * Sets the error message id to be used when online import is stopped with
502   * error by the fractional replication ldif import plugin.
503   * @param importErrorMessageId The message to use.
504   */
505  void setImportErrorMessageId(int importErrorMessageId)
506  {
507    this.importErrorMessageId = importErrorMessageId;
508  }
509
510  /**
511   * This flag is used by the fractional replication ldif import plugin to stop
512   * the (online) import process if a fractional configuration inconsistency is
513   * detected by it.
514   *
515   * @return true if the online import currently in progress should continue,
516   *         false otherwise.
517   */
518  private boolean isFollowImport()
519  {
520    return importErrorMessageId == -1;
521  }
522
523  /**
524   * Gets and stores the fractional replication configuration parameters.
525   * @param configuration The configuration object
526   * @param allowReconnection Tells if one must reconnect if significant changes
527   *        occurred
528   */
529  private void readFractionalConfig(ReplicationDomainCfg configuration,
530    boolean allowReconnection)
531  {
532    // Read the configuration entry
533    FractionalConfig newFractionalConfig;
534    try
535    {
536      newFractionalConfig = FractionalConfig.toFractionalConfig(configuration);
537    }
538    catch(ConfigException e)
539    {
540      // Should not happen as normally already called without problem in
541      // isConfigurationChangeAcceptable or isConfigurationAcceptable
542      // if we come up to this method
543      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
544      return;
545    }
546
547    /**
548     * Is there any change in fractional configuration ?
549     */
550
551    // Compute current configuration
552    boolean needReconnection;
553     try
554    {
555      needReconnection = !FractionalConfig.
556        isFractionalConfigEquivalent(fractionalConfig, newFractionalConfig);
557    }
558    catch  (ConfigException e)
559    {
560      // Should not happen
561      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
562      return;
563    }
564
565    // Disable service if configuration changed
566    final boolean needRestart = needReconnection && allowReconnection;
567    if (needRestart)
568    {
569      disableService();
570    }
571    // Set new configuration
572    int newFractionalMode = newFractionalConfig.fractionalConfigToInt();
573    fractionalConfig.setFractional(newFractionalMode !=
574      FractionalConfig.NOT_FRACTIONAL);
575    if (fractionalConfig.isFractional())
576    {
577      // Set new fractional configuration values
578      fractionalConfig.setFractionalExclusive(
579          newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
580      fractionalConfig.setFractionalSpecificClassesAttributes(
581        newFractionalConfig.getFractionalSpecificClassesAttributes());
582      fractionalConfig.setFractionalAllClassesAttributes(
583        newFractionalConfig.fractionalAllClassesAttributes);
584    } else
585    {
586      // Reset default values
587      fractionalConfig.setFractionalExclusive(true);
588      fractionalConfig.setFractionalSpecificClassesAttributes(
589        new HashMap<String, Set<String>>());
590      fractionalConfig.setFractionalAllClassesAttributes(new HashSet<String>());
591    }
592
593    // Reconnect if required
594    if (needRestart)
595    {
596      enableService();
597    }
598  }
599
600  /**
601   * Return true if the fractional configuration stored in the domain root
602   * entry of the backend is equivalent to the fractional configuration stored
603   * in the local variables.
604   */
605  private boolean isBackendFractionalConfigConsistent()
606  {
607    // Read config stored in domain root entry
608    if (logger.isTraceEnabled())
609    {
610      logger.trace("Attempt to read the potential fractional config in domain root entry " + getBaseDN());
611    }
612
613    // Search the domain root entry that is used to save the generation id
614    SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT)
615        .addAttribute(REPLICATION_GENERATION_ID, REPLICATION_FRACTIONAL_EXCLUDE, REPLICATION_FRACTIONAL_INCLUDE);
616    InternalSearchOperation search = conn.processSearch(request);
617
618    if (search.getResultCode() != ResultCode.SUCCESS
619        && search.getResultCode() != ResultCode.NO_SUCH_OBJECT)
620    {
621      String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage();
622      logger.error(ERR_SEARCHING_GENERATION_ID, errorMsg, getBaseDN());
623      return false;
624    }
625
626    SearchResultEntry resultEntry = findReplicationSearchResultEntry(search);
627    if (resultEntry == null)
628    {
629      /*
630       * The backend is probably empty: if there is some fractional
631       * configuration in memory, we do not let the domain being connected,
632       * otherwise, it's ok
633       */
634      return !fractionalConfig.isFractional();
635    }
636
637    // Now extract fractional configuration if any
638    Iterator<String> exclIt =
639        getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_EXCLUDE);
640    Iterator<String> inclIt =
641        getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_INCLUDE);
642
643    // Compare backend and local fractional configuration
644    return isFractionalConfigConsistent(fractionalConfig, exclIt, inclIt);
645  }
646
647  private SearchResultEntry findReplicationSearchResultEntry(
648      InternalSearchOperation searchOperation)
649  {
650    final SearchResultEntry resultEntry = getFirstResult(searchOperation);
651    if (resultEntry != null)
652    {
653      AttributeType synchronizationGenIDType =
654          DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
655      List<Attribute> attrs =
656          resultEntry.getAttribute(synchronizationGenIDType);
657      if (attrs != null)
658      {
659        Attribute attr = attrs.get(0);
660        if (attr.size() == 1)
661        {
662          return resultEntry;
663        }
664        if (attr.size() > 1)
665        {
666          String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString();
667          logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg);
668        }
669      }
670    }
671    return null;
672  }
673
674  private Iterator<String> getAttributeValueIterator(
675      SearchResultEntry resultEntry, String attrName)
676  {
677    AttributeType attrType = DirectoryServer.getAttributeType(attrName);
678    List<Attribute> exclAttrs = resultEntry.getAttribute(attrType);
679    if (exclAttrs != null)
680    {
681      Attribute exclAttr = exclAttrs.get(0);
682      if (exclAttr != null)
683      {
684        return new AttributeValueStringIterator(exclAttr.iterator());
685      }
686    }
687    return null;
688  }
689
690  /**
691   * Return true if the fractional configuration passed as fractional
692   * configuration attribute values is equivalent to the fractional
693   * configuration stored in the local variables.
694   * @param fractionalConfig The local fractional configuration
695   * @param exclIt Fractional exclude mode configuration attribute values to
696   * analyze.
697   * @param inclIt Fractional include mode configuration attribute values to
698   * analyze.
699   * @return True if the fractional configuration passed as fractional
700   * configuration attribute values is equivalent to the fractional
701   * configuration stored in the local variables.
702   */
703  static boolean isFractionalConfigConsistent(
704      FractionalConfig fractionalConfig, Iterator<String> exclIt,
705      Iterator<String> inclIt)
706  {
707    /*
708     * Parse fractional configuration stored in passed fractional configuration
709     * attributes values
710     */
711
712    Map<String, Set<String>> storedFractionalSpecificClassesAttributes = new HashMap<>();
713    Set<String> storedFractionalAllClassesAttributes = new HashSet<>();
714
715    int storedFractionalMode;
716    try
717    {
718      storedFractionalMode = FractionalConfig.parseFractionalConfig(exclIt,
719        inclIt, storedFractionalSpecificClassesAttributes,
720        storedFractionalAllClassesAttributes);
721    } catch (ConfigException e)
722    {
723      // Should not happen as configuration in domain root entry is flushed
724      // from valid configuration in local variables
725      logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e));
726      return false;
727    }
728
729    FractionalConfig storedFractionalConfig = new FractionalConfig(
730      fractionalConfig.getBaseDn());
731    storedFractionalConfig.setFractional(storedFractionalMode !=
732      FractionalConfig.NOT_FRACTIONAL);
733    // Set stored fractional configuration values
734    if (storedFractionalConfig.isFractional())
735    {
736      storedFractionalConfig.setFractionalExclusive(
737          storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
738    }
739    storedFractionalConfig.setFractionalSpecificClassesAttributes(
740      storedFractionalSpecificClassesAttributes);
741    storedFractionalConfig.setFractionalAllClassesAttributes(
742      storedFractionalAllClassesAttributes);
743
744    /*
745     * Compare configuration stored in passed fractional configuration
746     * attributes with local variable one
747     */
748    try
749    {
750      return FractionalConfig.
751        isFractionalConfigEquivalent(fractionalConfig, storedFractionalConfig);
752    } catch (ConfigException e)
753    {
754      // Should not happen as configuration in domain root entry is flushed
755      // from valid configuration in local variables so both should have already
756      // been checked
757      logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e));
758      return false;
759    }
760  }
761
762  /**
763   * Utility class to have get a string iterator from an AtributeValue iterator.
764   * Assuming the attribute values are strings.
765   */
766  static class AttributeValueStringIterator implements Iterator<String>
767  {
768    private final Iterator<ByteString> attrValIt;
769
770    /**
771     * Creates a new AttributeValueStringIterator object.
772     * @param attrValIt The underlying attribute iterator to use, assuming
773     * internal values are strings.
774     */
775    AttributeValueStringIterator(Iterator<ByteString> attrValIt)
776    {
777      this.attrValIt = attrValIt;
778    }
779
780    @Override
781    public boolean hasNext()
782    {
783      return attrValIt.hasNext();
784    }
785
786    @Override
787    public String next()
788    {
789      return attrValIt.next().toString();
790    }
791
792    // Should not be needed anyway
793    @Override
794    public void remove()
795    {
796      attrValIt.remove();
797    }
798  }
799
800  /**
801   * Compare 2 attribute collections and returns true if they are equivalent.
802   *
803   * @param attributes1
804   *          First attribute collection to compare.
805   * @param attributes2
806   *          Second attribute collection to compare.
807   * @return True if both attribute collection are equivalent.
808   * @throws ConfigException
809   *           If some attributes could not be retrieved from the schema.
810   */
811  private static boolean areAttributesEquivalent(
812      Collection<String> attributes1, Collection<String> attributes2)
813      throws ConfigException
814  {
815    // Compare all classes attributes
816    if (attributes1.size() != attributes2.size())
817    {
818      return false;
819    }
820
821    // Check consistency of all classes attributes
822    Schema schema = DirectoryServer.getSchema();
823    /*
824     * For each attribute in attributes1, check there is the matching
825     * one in attributes2.
826     */
827    for (String attrName1 : attributes1)
828    {
829      // Get attribute from attributes1
830      AttributeType attributeType1 = schema.getAttributeType(attrName1);
831      if (attributeType1 == null)
832      {
833        throw new ConfigException(
834          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName1));
835      }
836      // Look for matching one in attributes2
837      boolean foundAttribute = false;
838      for (String attrName2 : attributes2)
839      {
840        AttributeType attributeType2 = schema.getAttributeType(attrName2);
841        if (attributeType2 == null)
842        {
843          throw new ConfigException(
844            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName2));
845        }
846        if (attributeType1.equals(attributeType2))
847        {
848          foundAttribute = true;
849          break;
850        }
851      }
852      // Found matching attribute ?
853      if (!foundAttribute)
854      {
855        return false;
856      }
857    }
858
859    return true;
860  }
861
862  /**
863   * Check that the passed fractional configuration is acceptable
864   * regarding configuration syntax, schema constraints...
865   * Throws an exception if the configuration is not acceptable.
866   * @param configuration The configuration to analyze.
867   * @throws org.opends.server.config.ConfigException if the configuration is
868   * not acceptable.
869   */
870  private static void isFractionalConfigAcceptable(
871    ReplicationDomainCfg configuration) throws ConfigException
872  {
873    /*
874     * Parse fractional configuration
875     */
876
877    // Read the configuration entry
878    FractionalConfig newFractionalConfig = FractionalConfig.toFractionalConfig(
879        configuration);
880
881    if (!newFractionalConfig.isFractional())
882    {
883      // Nothing to check
884      return;
885    }
886
887    // Prepare variables to be filled with config
888    Map<String, Set<String>> newFractionalSpecificClassesAttributes =
889      newFractionalConfig.getFractionalSpecificClassesAttributes();
890    Set<String> newFractionalAllClassesAttributes =
891      newFractionalConfig.getFractionalAllClassesAttributes();
892
893    /*
894     * Check attributes consistency : we only allow to filter MAY (optional)
895     * attributes of a class : to be compliant with the schema, no MUST
896     * (mandatory) attribute can be filtered by fractional replication.
897     */
898
899    // Check consistency of specific classes attributes
900    Schema schema = DirectoryServer.getSchema();
901    int fractionalMode = newFractionalConfig.fractionalConfigToInt();
902    for (String className : newFractionalSpecificClassesAttributes.keySet())
903    {
904      // Does the class exist ?
905      ObjectClass fractionalClass = schema.getObjectClass(
906        className.toLowerCase());
907      if (fractionalClass == null)
908      {
909        throw new ConfigException(
910          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className));
911      }
912
913      boolean isExtensibleObjectClass =
914          "extensibleObject".equalsIgnoreCase(className);
915
916      Set<String> attributes =
917        newFractionalSpecificClassesAttributes.get(className);
918
919      for (String attrName : attributes)
920      {
921        // Not a prohibited attribute ?
922        if (isFractionalProhibitedAttr(attrName))
923        {
924          throw new ConfigException(
925            NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName));
926        }
927
928        // Does the attribute exist ?
929        AttributeType attributeType = schema.getAttributeType(attrName);
930        if (attributeType != null)
931        {
932          // No more checking for the extensibleObject class
933          if (!isExtensibleObjectClass
934              && fractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL
935              // Exclusive mode : the attribute must be optional
936              && !fractionalClass.isOptional(attributeType))
937          {
938            throw new ConfigException(
939                NOTE_ERR_FRACTIONAL_CONFIG_NOT_OPTIONAL_ATTRIBUTE.get(attrName,
940                    className));
941          }
942        }
943        else
944        {
945          throw new ConfigException(
946            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName));
947        }
948      }
949    }
950
951    // Check consistency of all classes attributes
952    for (String attrName : newFractionalAllClassesAttributes)
953    {
954      // Not a prohibited attribute ?
955      if (isFractionalProhibitedAttr(attrName))
956      {
957        throw new ConfigException(
958          NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName));
959      }
960
961      // Does the attribute exist ?
962      if (schema.getAttributeType(attrName) == null)
963      {
964        throw new ConfigException(
965          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName));
966      }
967    }
968  }
969
970  /**
971   * Test if the passed attribute is not allowed to be used in configuration of
972   * fractional replication.
973   * @param attr Attribute to test.
974   * @return true if the attribute is prohibited.
975   */
976  private static boolean isFractionalProhibitedAttr(String attr)
977  {
978    for (String forbiddenAttr : FRACTIONAL_PROHIBITED_ATTRIBUTES)
979    {
980      if (forbiddenAttr.equalsIgnoreCase(attr))
981      {
982        return true;
983      }
984    }
985    return false;
986  }
987
988  /**
989   * If fractional replication is enabled, this analyzes the operation and
990   * suppresses the forbidden attributes in it so that they are not added in
991   * the local backend.
992   *
993   * @param addOperation The operation to modify based on fractional
994   * replication configuration
995   * @param performFiltering Tells if the effective attribute filtering should
996   * be performed or if the call is just to analyze if there are some
997   * attributes filtered by fractional configuration
998   * @return true if the operation contains some attributes subject to filtering
999   * by the fractional configuration
1000   */
1001  private boolean fractionalFilterOperation(
1002    PreOperationAddOperation addOperation, boolean performFiltering)
1003  {
1004    return fractionalRemoveAttributesFromEntry(fractionalConfig,
1005      addOperation.getEntryDN().rdn(), addOperation.getObjectClasses(),
1006      addOperation.getUserAttributes(), performFiltering);
1007  }
1008
1009  /**
1010   * If fractional replication is enabled, this analyzes the operation and
1011   * suppresses the forbidden attributes in it so that they are not added in
1012   * the local backend.
1013   *
1014   * @param modifyDNOperation The operation to modify based on fractional
1015   * replication configuration
1016   * @param performFiltering Tells if the effective modifications should
1017   * be performed or if the call is just to analyze if there are some
1018   * inconsistency with fractional configuration
1019   * @return true if the operation is inconsistent with fractional
1020   * configuration
1021   */
1022  private boolean fractionalFilterOperation(
1023    PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering)
1024  {
1025    // Quick exit if not called for analyze and
1026    if (performFiltering && modifyDNOperation.deleteOldRDN())
1027    {
1028      // The core will remove any occurrence of attribute that was part of the
1029      // old RDN, nothing more to do.
1030      return true; // Will not be used as analyze was not requested
1031    }
1032
1033    // Create a list of filtered attributes for this entry
1034    Entry concernedEntry = modifyDNOperation.getOriginalEntry();
1035    Set<String> fractionalConcernedAttributes =
1036      createFractionalConcernedAttrList(fractionalConfig,
1037      concernedEntry.getObjectClasses().keySet());
1038
1039    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1040    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1041    {
1042      // No attributes to filter
1043      return false;
1044    }
1045
1046    /*
1047     * Analyze the old and new rdn to see if they are some attributes to be
1048     * removed: if the oldRDN contains some forbidden attributes (for instance
1049     * it is possible if the entry was created with an add operation and the
1050     * RDN used contains a forbidden attribute: in this case the attribute value
1051     * has been kept to be consistent with the dn of the entry.) that are no
1052     * more part of the new RDN, we must remove any attribute of this type by
1053     * putting a modification to delete the attribute.
1054     */
1055
1056    boolean inconsistentOperation = false;
1057    RDN rdn = modifyDNOperation.getEntryDN().rdn();
1058    RDN newRdn = modifyDNOperation.getNewRDN();
1059
1060    // Go through each attribute of the old RDN
1061    for (int i=0 ; i<rdn.getNumValues() ; i++)
1062    {
1063      AttributeType attributeType = rdn.getAttributeType(i);
1064      // Is it present in the fractional attributes established list ?
1065      boolean foundAttribute =
1066          exists(fractionalConcernedAttributes, attributeType);
1067      if (canRemoveAttribute(fractionalExclusive, foundAttribute)
1068          && !newRdn.hasAttributeType(attributeType)
1069          && !modifyDNOperation.deleteOldRDN())
1070      {
1071        /*
1072         * A forbidden attribute is in the old RDN and no more in the new RDN,
1073         * and it has not been requested to remove attributes from old RDN:
1074         * let's remove the attribute from the entry to stay consistent with
1075         * fractional configuration
1076         */
1077        Modification modification = new Modification(ModificationType.DELETE,
1078          Attributes.empty(attributeType));
1079        modifyDNOperation.addModification(modification);
1080        inconsistentOperation = true;
1081      }
1082    }
1083
1084    return inconsistentOperation;
1085  }
1086
1087  private boolean exists(Set<String> attrNames, AttributeType attrTypeToFind)
1088  {
1089    for (String attrName : attrNames)
1090    {
1091      if (DirectoryServer.getAttributeType(attrName).equals(attrTypeToFind))
1092      {
1093        return true;
1094      }
1095    }
1096    return false;
1097  }
1098
1099  /**
1100   * Remove attributes from an entry, according to the passed fractional
1101   * configuration. The entry is represented by the 2 passed parameters.
1102   * The attributes to be removed are removed using the remove method on the
1103   * passed iterator for the attributes in the entry.
1104   * @param fractionalConfig The fractional configuration to use
1105   * @param entryRdn The rdn of the entry to add
1106   * @param classes The object classes representing the entry to modify
1107   * @param attributesMap The map of attributes/values to be potentially removed
1108   * from the entry.
1109   * @param performFiltering Tells if the effective attribute filtering should
1110   * be performed or if the call is just an analyze to see if there are some
1111   * attributes filtered by fractional configuration
1112   * @return true if the operation contains some attributes subject to filtering
1113   * by the fractional configuration
1114   */
1115   static boolean fractionalRemoveAttributesFromEntry(
1116    FractionalConfig fractionalConfig, RDN entryRdn,
1117    Map<ObjectClass,String> classes, Map<AttributeType,
1118    List<Attribute>> attributesMap, boolean performFiltering)
1119  {
1120    boolean hasSomeAttributesToFilter = false;
1121    /*
1122     * Prepare a list of attributes to be included/excluded according to the
1123     * fractional replication configuration
1124     */
1125
1126    Set<String> fractionalConcernedAttributes =
1127      createFractionalConcernedAttrList(fractionalConfig, classes.keySet());
1128    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1129    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1130    {
1131      return false; // No attributes to filter
1132    }
1133
1134    // Prepare list of object classes of the added entry
1135    Set<ObjectClass> entryClasses = classes.keySet();
1136
1137    /*
1138     * Go through the user attributes and remove those that match filtered one
1139     * - exclude mode : remove only attributes that are in
1140     * fractionalConcernedAttributes
1141     * - include mode : remove any attribute that is not in
1142     * fractionalConcernedAttributes
1143     */
1144    List<List<Attribute>> newRdnAttrLists = new ArrayList<>();
1145    List<AttributeType> rdnAttrTypes = new ArrayList<>();
1146    final Set<AttributeType> attrTypes = attributesMap.keySet();
1147    for (Iterator<AttributeType> iter = attrTypes.iterator(); iter.hasNext();)
1148    {
1149      AttributeType attributeType = iter.next();
1150
1151      // Only optional attributes may be removed
1152      if (isMandatoryAttribute(entryClasses, attributeType)
1153      // Do not remove an attribute if it is a prohibited one
1154          || isFractionalProhibited(attributeType)
1155          || !canRemoveAttribute(attributeType, fractionalExclusive,
1156              fractionalConcernedAttributes))
1157      {
1158        continue;
1159      }
1160
1161      if (!performFiltering)
1162      {
1163        // The call was just to check : at least one attribute to filter
1164        // found, return immediately the answer;
1165        return true;
1166      }
1167
1168      // Do not remove an attribute/value that is part of the RDN of the
1169      // entry as it is forbidden
1170      if (entryRdn.hasAttributeType(attributeType))
1171      {
1172        /*
1173        We must remove all values of the attributes map for this
1174        attribute type but the one that has the value which is in the RDN
1175        of the entry. In fact the (underlying )attribute list does not
1176        support remove so we have to create a new list, keeping only the
1177        attribute value which is the same as in the RDN
1178        */
1179        ByteString rdnAttributeValue =
1180          entryRdn.getAttributeValue(attributeType);
1181        List<Attribute> attrList = attributesMap.get(attributeType);
1182        ByteString sameAttrValue = null;
1183        // Locate the attribute value identical to the one in the RDN
1184        for (Attribute attr : attrList)
1185        {
1186          if (attr.contains(rdnAttributeValue))
1187          {
1188            for (ByteString attrValue : attr) {
1189              if (rdnAttributeValue.equals(attrValue)) {
1190                // Keep the value we want
1191                sameAttrValue = attrValue;
1192              } else {
1193                hasSomeAttributesToFilter = true;
1194              }
1195            }
1196          }
1197          else
1198          {
1199            hasSomeAttributesToFilter = true;
1200          }
1201        }
1202        //    Recreate the attribute list with only the RDN attribute value
1203        if (sameAttrValue != null)
1204          // Paranoia check: should never be the case as we should always
1205          // find the attribute/value pair matching the pair in the RDN
1206        {
1207          // Construct and store new attribute list
1208          newRdnAttrLists.add(Attributes.createAsList(attributeType, sameAttrValue));
1209          /*
1210          Store matching attribute type
1211          The mapping will be done using object from rdnAttrTypes as key
1212          and object from newRdnAttrLists (at same index) as value in
1213          the user attribute map to be modified
1214          */
1215          rdnAttrTypes.add(attributeType);
1216        }
1217      }
1218      else
1219      {
1220        // Found an attribute to remove, remove it from the list.
1221        iter.remove();
1222        hasSomeAttributesToFilter = true;
1223      }
1224    }
1225    // Now overwrite the attribute values for the attribute types present in the
1226    // RDN, if there are some filtered attributes in the RDN
1227    for (int index = 0 ; index < rdnAttrTypes.size() ; index++)
1228    {
1229      attributesMap.put(rdnAttrTypes.get(index), newRdnAttrLists.get(index));
1230    }
1231    return hasSomeAttributesToFilter;
1232  }
1233
1234   private static boolean isMandatoryAttribute(Set<ObjectClass> entryClasses, AttributeType attributeType)
1235   {
1236     for (ObjectClass objectClass : entryClasses)
1237     {
1238       if (objectClass.isRequired(attributeType))
1239       {
1240         return true;
1241       }
1242     }
1243     return false;
1244   }
1245
1246   private static boolean isFractionalProhibited(AttributeType attrType)
1247   {
1248     String attributeName = attrType.getPrimaryName();
1249     return (attributeName != null && isFractionalProhibitedAttr(attributeName))
1250         || isFractionalProhibitedAttr(attrType.getOID());
1251   }
1252
1253  private static boolean canRemoveAttribute(AttributeType attributeType,
1254      boolean fractionalExclusive, Set<String> fractionalConcernedAttributes)
1255  {
1256    String attributeName = attributeType.getPrimaryName();
1257    String attributeOid = attributeType.getOID();
1258
1259    // Is the current attribute part of the established list ?
1260    boolean foundAttribute =
1261        contains(fractionalConcernedAttributes, attributeName, attributeOid);
1262    // Now remove the attribute or modification if:
1263    // - exclusive mode and attribute is in configuration
1264    // - inclusive mode and attribute is not in configuration
1265    return canRemoveAttribute(fractionalExclusive, foundAttribute);
1266  }
1267
1268  private static boolean canRemoveAttribute(boolean fractionalExclusive,
1269      boolean foundAttribute)
1270  {
1271    return (foundAttribute && fractionalExclusive)
1272        || (!foundAttribute && !fractionalExclusive);
1273  }
1274
1275  private static boolean contains(Set<String> attrNames, String attrName,
1276      String attrOID)
1277  {
1278    return attrNames.contains(attrOID)
1279        || (attrName != null && attrNames.contains(attrName.toLowerCase()));
1280  }
1281
1282  /**
1283   * Prepares a list of attributes of interest for the fractional feature.
1284   * @param fractionalConfig The fractional configuration to use
1285   * @param entryObjectClasses The object classes of an entry on which an
1286   * operation is going to be performed.
1287   * @return The list of attributes of the entry to be excluded/included
1288   * when the operation will be performed.
1289   */
1290  private static Set<String> createFractionalConcernedAttrList(
1291    FractionalConfig fractionalConfig, Set<ObjectClass> entryObjectClasses)
1292  {
1293    /*
1294     * Is the concerned entry of a type concerned by fractional replication
1295     * configuration ? If yes, add the matching attribute names to a set of
1296     * attributes to take into account for filtering
1297     * (inclusive or exclusive mode).
1298     * Using a Set to avoid duplicate attributes (from 2 inheriting classes for
1299     * instance)
1300     */
1301    Set<String> fractionalConcernedAttributes = new HashSet<>();
1302
1303    // Get object classes the entry matches
1304    Set<String> fractionalAllClassesAttributes =
1305      fractionalConfig.getFractionalAllClassesAttributes();
1306    Map<String, Set<String>> fractionalSpecificClassesAttributes =
1307      fractionalConfig.getFractionalSpecificClassesAttributes();
1308
1309    Set<String> fractionalClasses =
1310        fractionalSpecificClassesAttributes.keySet();
1311    for (ObjectClass entryObjectClass : entryObjectClasses)
1312    {
1313      for(String fractionalClass : fractionalClasses)
1314      {
1315        if (entryObjectClass.hasNameOrOID(fractionalClass.toLowerCase()))
1316        {
1317          fractionalConcernedAttributes.addAll(
1318              fractionalSpecificClassesAttributes.get(fractionalClass));
1319        }
1320      }
1321    }
1322
1323    // Add to the set any attribute which is class independent
1324    fractionalConcernedAttributes.addAll(fractionalAllClassesAttributes);
1325
1326    return fractionalConcernedAttributes;
1327  }
1328
1329  /**
1330   * If fractional replication is enabled, this analyzes the operation and
1331   * suppresses the forbidden attributes in it so that they are not added/
1332   * deleted/modified in the local backend.
1333   *
1334   * @param modifyOperation The operation to modify based on fractional
1335   * replication configuration
1336   * @param performFiltering Tells if the effective attribute filtering should
1337   * be performed or if the call is just to analyze if there are some
1338   * attributes filtered by fractional configuration
1339   * @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES,
1340   * FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP
1341   */
1342  private int fractionalFilterOperation(PreOperationModifyOperation
1343    modifyOperation, boolean performFiltering)
1344  {
1345    /*
1346     * Prepare a list of attributes to be included/excluded according to the
1347     * fractional replication configuration
1348     */
1349
1350    Entry modifiedEntry = modifyOperation.getCurrentEntry();
1351    Set<String> fractionalConcernedAttributes =
1352      createFractionalConcernedAttrList(fractionalConfig,
1353      modifiedEntry.getObjectClasses().keySet());
1354    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1355    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1356    {
1357      // No attributes to filter
1358      return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1359    }
1360
1361    // Prepare list of object classes of the modified entry
1362    DN entryToModifyDn = modifyOperation.getEntryDN();
1363    Entry entryToModify;
1364    try
1365    {
1366      entryToModify = DirectoryServer.getEntry(entryToModifyDn);
1367    }
1368    catch(DirectoryException e)
1369    {
1370      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
1371      return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1372    }
1373    Set<ObjectClass> entryClasses = entryToModify.getObjectClasses().keySet();
1374
1375    /*
1376     * Now go through the attribute modifications and filter the mods according
1377     * to the fractional configuration (using the just established concerned
1378     * attributes list):
1379     * - delete attributes: remove them if regarding a filtered attribute
1380     * - add attributes: remove them if regarding a filtered attribute
1381     * - modify attributes: remove them if regarding a filtered attribute
1382     */
1383
1384    int result = FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1385    List<Modification> mods = modifyOperation.getModifications();
1386    Iterator<Modification> modsIt = mods.iterator();
1387    while (modsIt.hasNext())
1388    {
1389      Modification mod = modsIt.next();
1390      Attribute attr = mod.getAttribute();
1391      AttributeType attrType = attr.getAttributeType();
1392      // Fractional replication ignores operational attributes
1393      if (attrType.isOperational()
1394          || isMandatoryAttribute(entryClasses, attrType)
1395          || isFractionalProhibited(attrType)
1396          || !canRemoveAttribute(attrType, fractionalExclusive,
1397              fractionalConcernedAttributes))
1398      {
1399        continue;
1400      }
1401
1402      if (!performFiltering)
1403      {
1404        // The call was just to check : at least one attribute to filter
1405        // found, return immediately the answer;
1406        return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
1407      }
1408
1409      // Found a modification to remove, remove it from the list.
1410      modsIt.remove();
1411      result = FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
1412      if (mods.isEmpty())
1413      {
1414        // This operation must become a no-op as no more modification in it
1415        return FRACTIONAL_BECOME_NO_OP;
1416      }
1417    }
1418
1419    return result;
1420  }
1421
1422  /**
1423   * This is overwritten to allow stopping the (online) import process by the
1424   * fractional ldif import plugin when it detects that the (imported) remote
1425   * data set is not consistent with the local fractional configuration.
1426   * {@inheritDoc}
1427   */
1428  @Override
1429  protected byte[] receiveEntryBytes()
1430  {
1431    if (isFollowImport())
1432    {
1433      // Ok, next entry is allowed to be received
1434      return super.receiveEntryBytes();
1435    }
1436
1437    // Fractional ldif import plugin detected inconsistency between local and
1438    // remote server fractional configuration and is stopping the import
1439    // process:
1440    // This is an error termination during the import
1441    // The error is stored and the import is ended by returning null
1442    final ImportExportContext ieCtx = getImportExportContext();
1443    LocalizableMessage msg = null;
1444    switch (importErrorMessageId)
1445    {
1446    case IMPORT_ERROR_MESSAGE_BAD_REMOTE:
1447      msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get(getBaseDN(), ieCtx.getImportSource());
1448      break;
1449    case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL:
1450      msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get(getBaseDN(), ieCtx.getImportSource());
1451      break;
1452    }
1453    ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg));
1454    return null;
1455  }
1456
1457  /**
1458   * This is overwritten to allow stopping the (online) export process if the
1459   * local domain is fractional and the destination is all other servers:
1460   * This make no sense to have only fractional servers in a replicated
1461   * topology. This prevents from administrator manipulation error that would
1462   * lead to whole topology data corruption.
1463   * {@inheritDoc}
1464   */
1465  @Override
1466  protected void initializeRemote(int target, int requestorID,
1467    Task initTask, int initWindow) throws DirectoryException
1468  {
1469    if (target == RoutableMsg.ALL_SERVERS && fractionalConfig.isFractional())
1470    {
1471      LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL.get(getBaseDN(), getServerId());
1472      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg);
1473    }
1474
1475    super.initializeRemote(target, requestorID, initTask, initWindow);
1476  }
1477
1478  /**
1479   * Implement the  handleConflictResolution phase of the deleteOperation.
1480   *
1481   * @param deleteOperation The deleteOperation.
1482   * @return A SynchronizationProviderResult indicating if the operation
1483   *         can continue.
1484   */
1485  SynchronizationProviderResult handleConflictResolution(
1486         PreOperationDeleteOperation deleteOperation)
1487  {
1488    if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected())
1489    {
1490      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1491      return new SynchronizationProviderResult.StopProcessing(
1492          ResultCode.UNWILLING_TO_PERFORM, msg);
1493    }
1494
1495    DeleteContext ctx =
1496      (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
1497    Entry deletedEntry = deleteOperation.getEntryToDelete();
1498
1499    if (ctx != null)
1500    {
1501      /*
1502       * This is a replication operation
1503       * Check that the modified entry has the same entryuuid
1504       * as it was in the original message.
1505       */
1506      String operationEntryUUID = ctx.getEntryUUID();
1507      String deletedEntryUUID = getEntryUUID(deletedEntry);
1508      if (!operationEntryUUID.equals(deletedEntryUUID))
1509      {
1510        /*
1511         * The changes entry is not the same entry as the one on
1512         * the original change was performed.
1513         * Probably the original entry was renamed and replaced with
1514         * another entry.
1515         * We must not let the change proceed, return a negative
1516         * result and set the result code to NO_SUCH_OBJECT.
1517         * When the operation will return, the thread that started the operation
1518         * will try to find the correct entry and restart a new operation.
1519         */
1520        return new SynchronizationProviderResult.StopProcessing(
1521            ResultCode.NO_SUCH_OBJECT, null);
1522      }
1523    }
1524    else
1525    {
1526      // There is no replication context attached to the operation
1527      // so this is not a replication operation.
1528      CSN csn = generateCSN(deleteOperation);
1529      String modifiedEntryUUID = getEntryUUID(deletedEntry);
1530      ctx = new DeleteContext(csn, modifiedEntryUUID);
1531      deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
1532
1533      synchronized (replayOperations)
1534      {
1535        int size = replayOperations.size();
1536        if (size >= 10000)
1537        {
1538          replayOperations.remove(replayOperations.firstKey());
1539        }
1540        FakeOperation op = new FakeDelOperation(
1541            deleteOperation.getEntryDN(), csn, modifiedEntryUUID);
1542        replayOperations.put(csn, op);
1543      }
1544    }
1545
1546    return new SynchronizationProviderResult.ContinueProcessing();
1547  }
1548
1549  /**
1550   * Implement the  handleConflictResolution phase of the addOperation.
1551   *
1552   * @param addOperation The AddOperation.
1553   * @return A SynchronizationProviderResult indicating if the operation
1554   *         can continue.
1555   */
1556  SynchronizationProviderResult handleConflictResolution(
1557      PreOperationAddOperation addOperation)
1558  {
1559    if (!addOperation.isSynchronizationOperation() && !brokerIsConnected())
1560    {
1561      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1562      return new SynchronizationProviderResult.StopProcessing(
1563          ResultCode.UNWILLING_TO_PERFORM, msg);
1564    }
1565
1566    if (fractionalConfig.isFractional())
1567    {
1568      if (addOperation.isSynchronizationOperation())
1569      {
1570        /*
1571         * Filter attributes here for fractional replication. If fractional
1572         * replication is enabled, we analyze the operation to suppress the
1573         * forbidden attributes in it so that they are not added in the local
1574         * backend. This must be called before any other plugin is called, to
1575         * keep coherency across plugin calls.
1576         */
1577        fractionalFilterOperation(addOperation, true);
1578      }
1579      else
1580      {
1581        /*
1582         * Direct access from an LDAP client : if some attributes are to be
1583         * removed according to the fractional configuration, simply forbid
1584         * the operation
1585         */
1586        if (fractionalFilterOperation(addOperation, false))
1587        {
1588          LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), addOperation);
1589          return new SynchronizationProviderResult.StopProcessing(
1590            ResultCode.UNWILLING_TO_PERFORM, msg);
1591        }
1592      }
1593    }
1594
1595    if (addOperation.isSynchronizationOperation())
1596    {
1597      AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
1598      /*
1599       * If an entry with the same entry uniqueID already exist then
1600       * this operation has already been replayed in the past.
1601       */
1602      String uuid = ctx.getEntryUUID();
1603      if (findEntryDN(uuid) != null)
1604      {
1605        return new SynchronizationProviderResult.StopProcessing(
1606            ResultCode.NO_OPERATION, null);
1607      }
1608
1609      /* The parent entry may have been renamed here since the change was done
1610       * on the first server, and another entry have taken the former dn
1611       * of the parent entry
1612       */
1613
1614      String parentEntryUUID = ctx.getParentEntryUUID();
1615      // root entry have no parent, there is no need to check for it.
1616      if (parentEntryUUID != null)
1617      {
1618        // There is a potential of perfs improvement here
1619        // if we could avoid the following parent entry retrieval
1620        DN parentDnFromCtx = findEntryDN(ctx.getParentEntryUUID());
1621        if (parentDnFromCtx == null)
1622        {
1623          // The parent does not exist with the specified unique id
1624          // stop the operation with NO_SUCH_OBJECT and let the
1625          // conflict resolution or the dependency resolution solve this.
1626          return new SynchronizationProviderResult.StopProcessing(
1627              ResultCode.NO_SUCH_OBJECT, null);
1628        }
1629
1630        DN entryDN = addOperation.getEntryDN();
1631        DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
1632        if (parentDnFromEntryDn != null
1633            && !parentDnFromCtx.equals(parentDnFromEntryDn))
1634        {
1635          // parentEntry has been renamed
1636          // replication name conflict resolution is expected to fix that
1637          // later in the flow
1638          return new SynchronizationProviderResult.StopProcessing(
1639              ResultCode.NO_SUCH_OBJECT, null);
1640        }
1641      }
1642    }
1643    return new SynchronizationProviderResult.ContinueProcessing();
1644  }
1645
1646  /**
1647   * Check that the broker associated to this ReplicationDomain has found
1648   * a Replication Server and that this LDAP server is therefore able to
1649   * process operations.
1650   * If not, set the ResultCode, the response message,
1651   * interrupt the operation, and return false
1652   *
1653   * @return  true when it OK to process the Operation, false otherwise.
1654   *          When false is returned the resultCode and the response message
1655   *          is also set in the Operation.
1656   */
1657  private boolean brokerIsConnected()
1658  {
1659    final IsolationPolicy isolationPolicy = config.getIsolationPolicy();
1660    if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
1661    {
1662      // this policy imply that we always accept updates.
1663      return true;
1664    }
1665    if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
1666    {
1667      // this isolation policy specifies that the updates are denied
1668      // when the broker had problems during the connection phase
1669      // Updates are still accepted if the broker is currently connecting..
1670      return !hasConnectionError();
1671    }
1672    // we should never get there as the only possible policies are
1673    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
1674    return true;
1675  }
1676
1677  /**
1678   * Implement the  handleConflictResolution phase of the ModifyDNOperation.
1679   *
1680   * @param modifyDNOperation The ModifyDNOperation.
1681   * @return A SynchronizationProviderResult indicating if the operation
1682   *         can continue.
1683   */
1684  SynchronizationProviderResult handleConflictResolution(
1685      PreOperationModifyDNOperation modifyDNOperation)
1686  {
1687    if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected())
1688    {
1689      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1690      return new SynchronizationProviderResult.StopProcessing(
1691          ResultCode.UNWILLING_TO_PERFORM, msg);
1692    }
1693
1694    if (fractionalConfig.isFractional())
1695    {
1696      if (modifyDNOperation.isSynchronizationOperation())
1697      {
1698        /*
1699         * Filter operation here for fractional replication. If fractional
1700         * replication is enabled, we analyze the operation and modify it if
1701         * necessary to stay consistent with what is defined in fractional
1702         * configuration.
1703         */
1704        fractionalFilterOperation(modifyDNOperation, true);
1705      }
1706      else
1707      {
1708        /*
1709         * Direct access from an LDAP client : something is inconsistent with
1710         * the fractional configuration, forbid the operation.
1711         */
1712        if (fractionalFilterOperation(modifyDNOperation, false))
1713        {
1714          LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyDNOperation);
1715          return new SynchronizationProviderResult.StopProcessing(
1716            ResultCode.UNWILLING_TO_PERFORM, msg);
1717        }
1718      }
1719    }
1720
1721    ModifyDnContext ctx =
1722      (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
1723    if (ctx != null)
1724    {
1725      /*
1726       * This is a replication operation
1727       * Check that the modified entry has the same entryuuid
1728       * as was in the original message.
1729       */
1730      final String modifiedEntryUUID =
1731          getEntryUUID(modifyDNOperation.getOriginalEntry());
1732      if (!modifiedEntryUUID.equals(ctx.getEntryUUID()))
1733      {
1734        /*
1735         * The modified entry is not the same entry as the one on
1736         * the original change was performed.
1737         * Probably the original entry was renamed and replaced with
1738         * another entry.
1739         * We must not let the change proceed, return a negative
1740         * result and set the result code to NO_SUCH_OBJECT.
1741         * When the operation will return, the thread that started the operation
1742         * will try to find the correct entry and restart a new operation.
1743         */
1744        return new SynchronizationProviderResult.StopProcessing(
1745            ResultCode.NO_SUCH_OBJECT, null);
1746      }
1747
1748      if (modifyDNOperation.getNewSuperior() != null)
1749      {
1750        /*
1751         * Also check that the current id of the
1752         * parent is the same as when the operation was performed.
1753         */
1754        String newParentId = findEntryUUID(modifyDNOperation.getNewSuperior());
1755        if (newParentId != null && ctx.getNewSuperiorEntryUUID() != null
1756            && !newParentId.equals(ctx.getNewSuperiorEntryUUID()))
1757        {
1758        return new SynchronizationProviderResult.StopProcessing(
1759            ResultCode.NO_SUCH_OBJECT, null);
1760        }
1761      }
1762
1763      /*
1764       * If the object has been renamed more recently than this
1765       * operation, cancel the operation.
1766       */
1767      EntryHistorical hist = EntryHistorical.newInstanceFromEntry(
1768          modifyDNOperation.getOriginalEntry());
1769      if (hist.addedOrRenamedAfter(ctx.getCSN()))
1770      {
1771        return new SynchronizationProviderResult.StopProcessing(
1772            ResultCode.NO_OPERATION, null);
1773      }
1774    }
1775    else
1776    {
1777      // There is no replication context attached to the operation
1778      // so this is not a replication operation.
1779      CSN csn = generateCSN(modifyDNOperation);
1780      String newParentId = null;
1781      if (modifyDNOperation.getNewSuperior() != null)
1782      {
1783        newParentId = findEntryUUID(modifyDNOperation.getNewSuperior());
1784      }
1785
1786      Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
1787      String modifiedEntryUUID = getEntryUUID(modifiedEntry);
1788      ctx = new ModifyDnContext(csn, modifiedEntryUUID, newParentId);
1789      modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
1790    }
1791    return new SynchronizationProviderResult.ContinueProcessing();
1792  }
1793
1794  /**
1795   * Handle the conflict resolution.
1796   * Called by the core server after locking the entry and before
1797   * starting the actual modification.
1798   * @param modifyOperation the operation
1799   * @return code indicating is operation must proceed
1800   */
1801  SynchronizationProviderResult handleConflictResolution(
1802         PreOperationModifyOperation modifyOperation)
1803  {
1804    if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected())
1805    {
1806      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1807      return new SynchronizationProviderResult.StopProcessing(
1808          ResultCode.UNWILLING_TO_PERFORM, msg);
1809    }
1810
1811    if (fractionalConfig.isFractional())
1812    {
1813      if  (modifyOperation.isSynchronizationOperation())
1814      {
1815        /*
1816         * Filter attributes here for fractional replication. If fractional
1817         * replication is enabled, we analyze the operation and modify it so
1818         * that no forbidden attribute is added/modified/deleted in the local
1819         * backend. This must be called before any other plugin is called, to
1820         * keep coherency across plugin calls.
1821         */
1822        if (fractionalFilterOperation(modifyOperation, true) ==
1823          FRACTIONAL_BECOME_NO_OP)
1824        {
1825          // Every modifications filtered in this operation: the operation
1826          // becomes a no-op
1827          return new SynchronizationProviderResult.StopProcessing(
1828            ResultCode.NO_OPERATION, null);
1829        }
1830      }
1831      else
1832      {
1833        /*
1834         * Direct access from an LDAP client : if some attributes are to be
1835         * removed according to the fractional configuration, simply forbid
1836         * the operation
1837         */
1838        switch(fractionalFilterOperation(modifyOperation, false))
1839        {
1840          case FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES:
1841            // Ok, let the operation happen
1842            break;
1843          case FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES:
1844            // Some attributes not compliant with fractional configuration :
1845            // forbid the operation
1846            LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyOperation);
1847            return new SynchronizationProviderResult.StopProcessing(
1848              ResultCode.UNWILLING_TO_PERFORM, msg);
1849        }
1850      }
1851    }
1852
1853    ModifyContext ctx =
1854      (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
1855
1856    Entry modifiedEntry = modifyOperation.getModifiedEntry();
1857    if (ctx == null)
1858    {
1859      // No replication ctx attached => not a replicated operation
1860      // - create a ctx with : CSN, entryUUID
1861      // - attach the context to the op
1862
1863      CSN csn = generateCSN(modifyOperation);
1864      ctx = new ModifyContext(csn, getEntryUUID(modifiedEntry));
1865
1866      modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
1867    }
1868    else
1869    {
1870      // Replication ctx attached => this is a replicated operation being
1871      // replayed here, it is necessary to
1872      // - check if the entry has been renamed
1873      // - check for conflicts
1874      String modifiedEntryUUID = ctx.getEntryUUID();
1875      String currentEntryUUID = getEntryUUID(modifiedEntry);
1876      if (currentEntryUUID != null
1877          && !currentEntryUUID.equals(modifiedEntryUUID))
1878      {
1879        /*
1880         * The current modified entry is not the same entry as the one on
1881         * the original modification was performed.
1882         * Probably the original entry was renamed and replaced with
1883         * another entry.
1884         * We must not let the modification proceed, return a negative
1885         * result and set the result code to NO_SUCH_OBJECT.
1886         * When the operation will return, the thread that started the
1887         * operation will try to find the correct entry and restart a new
1888         * operation.
1889         */
1890         return new SynchronizationProviderResult.StopProcessing(
1891              ResultCode.NO_SUCH_OBJECT, null);
1892      }
1893
1894      // Solve the conflicts between modify operations
1895      EntryHistorical historicalInformation =
1896        EntryHistorical.newInstanceFromEntry(modifiedEntry);
1897      modifyOperation.setAttachment(EntryHistorical.HISTORICAL,
1898                                    historicalInformation);
1899
1900      if (historicalInformation.replayOperation(modifyOperation, modifiedEntry))
1901      {
1902        numResolvedModifyConflicts.incrementAndGet();
1903      }
1904    }
1905    return new SynchronizationProviderResult.ContinueProcessing();
1906  }
1907
1908  /**
1909   * The preOperation phase for the add Operation.
1910   * Its job is to generate the replication context associated to the
1911   * operation. It is necessary to do it in this phase because contrary to
1912   * the other operations, the entry UUID is not set when the handleConflict
1913   * phase is called.
1914   *
1915   * @param addOperation The Add Operation.
1916   */
1917  void doPreOperation(PreOperationAddOperation addOperation)
1918  {
1919    final CSN csn = generateCSN(addOperation);
1920    final String entryUUID = getEntryUUID(addOperation);
1921    final AddContext ctx = new AddContext(csn, entryUUID,
1922        findEntryUUID(addOperation.getEntryDN().getParentDNInSuffix()));
1923    addOperation.setAttachment(SYNCHROCONTEXT, ctx);
1924  }
1925
1926  @Override
1927  public void publishReplicaOfflineMsg()
1928  {
1929    pendingChanges.putReplicaOfflineMsg();
1930    dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN());
1931  }
1932
1933  /**
1934   * Check if an operation must be synchronized.
1935   * Also update the list of pending changes and the server RUV
1936   * @param op the operation
1937   */
1938  void synchronize(PostOperationOperation op)
1939  {
1940    ResultCode result = op.getResultCode();
1941    // Note that a failed non-replication operation might not have a change
1942    // number.
1943    CSN curCSN = OperationContext.getCSN(op);
1944    if (curCSN != null && config.isLogChangenumber())
1945    {
1946      op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(),
1947          "replicationCSN", curCSN));
1948    }
1949
1950    if (result == ResultCode.SUCCESS)
1951    {
1952      if (op.isSynchronizationOperation())
1953      { // Replaying a sync operation
1954        numReplayedPostOpCalled.incrementAndGet();
1955        try
1956        {
1957          remotePendingChanges.commit(curCSN);
1958        }
1959        catch (NoSuchElementException e)
1960        {
1961          logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN);
1962          return;
1963        }
1964      }
1965      else
1966      {
1967        // Generate a replication message for a successful non-replication
1968        // operation.
1969        LDAPUpdateMsg msg = LDAPUpdateMsg.generateMsg(op);
1970
1971        if (msg == null)
1972        {
1973          /*
1974          * This is an operation type that we do not know about
1975          * It should never happen.
1976          */
1977          pendingChanges.remove(curCSN);
1978          logger.error(ERR_UNKNOWN_TYPE, op.getOperationType());
1979          return;
1980        }
1981
1982        addEntryAttributesForCL(msg,op);
1983
1984        // If assured replication is configured, this will prepare blocking
1985        // mechanism. If assured replication is disabled, this returns
1986        // immediately
1987        prepareWaitForAckIfAssuredEnabled(msg);
1988        try
1989        {
1990          msg.encode();
1991          pendingChanges.commitAndPushCommittedChanges(curCSN, msg);
1992        }
1993        catch (NoSuchElementException e)
1994        {
1995          logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN);
1996          return;
1997        }
1998        // If assured replication is enabled, this will wait for the matching
1999        // ack or time out. If assured replication is disabled, this returns
2000        // immediately
2001        try
2002        {
2003          waitForAckIfAssuredEnabled(msg);
2004        } catch (TimeoutException ex)
2005        {
2006          // This exception may only be raised if assured replication is enabled
2007          logger.info(NOTE_DS_ACK_TIMEOUT, getBaseDN(), getAssuredTimeout(), msg);
2008        }
2009      }
2010
2011      /*
2012       * If the operation is a DELETE on the base entry of the suffix
2013       * that is replicated, the generation is now lost because the
2014       * DB is empty. We need to save it again the next time we add an entry.
2015       */
2016      if (OperationType.DELETE.equals(op.getOperationType())
2017          && ((PostOperationDeleteOperation) op)
2018                .getEntryDN().equals(getBaseDN()))
2019      {
2020        generationIdSavedStatus = false;
2021      }
2022
2023      if (!generationIdSavedStatus)
2024      {
2025        saveGenerationId(generationId);
2026      }
2027    }
2028    else if (!op.isSynchronizationOperation() && curCSN != null)
2029    {
2030      // Remove an unsuccessful non-replication operation from the pending
2031      // changes list.
2032      pendingChanges.remove(curCSN);
2033      pendingChanges.pushCommittedChanges();
2034    }
2035
2036    checkForClearedConflict(op);
2037  }
2038
2039  /**
2040   * Check if the operation that just happened has cleared a conflict :
2041   * Clearing a conflict happens if the operation has free a DN that
2042   * for which an other entry was in conflict.
2043   * Steps:
2044   * - get the DN freed by a DELETE or MODRDN op
2045   * - search for entries put in the conflict space (dn=entryUUID'+'....)
2046   *   because the expected DN was not available (ds-sync-conflict=expected DN)
2047   * - retain the entry with the oldest conflict
2048   * - rename this entry with the freedDN as it was expected originally
2049   */
2050   private void checkForClearedConflict(PostOperationOperation op)
2051   {
2052     OperationType type = op.getOperationType();
2053     if (op.getResultCode() != ResultCode.SUCCESS)
2054     {
2055       // those operations cannot have cleared a conflict
2056       return;
2057     }
2058
2059     DN freedDN;
2060     if (type == OperationType.DELETE)
2061     {
2062       freedDN = ((PostOperationDeleteOperation) op).getEntryDN();
2063     }
2064     else if (type == OperationType.MODIFY_DN)
2065     {
2066       freedDN = ((PostOperationModifyDNOperation) op).getEntryDN();
2067     }
2068     else
2069     {
2070       return;
2071     }
2072
2073    SearchFilter filter;
2074    try
2075    {
2076      filter = LDAPFilter.createEqualityFilter(DS_SYNC_CONFLICT,
2077          ByteString.valueOf(freedDN.toString())).toSearchFilter();
2078    }
2079    catch (DirectoryException e)
2080    {
2081      // can not happen?
2082      logger.traceException(e);
2083      return;
2084    }
2085
2086    SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
2087        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
2088    InternalSearchOperation searchOp =  conn.processSearch(request);
2089
2090     Entry entryToRename = null;
2091     CSN entryToRenameCSN = null;
2092     for (SearchResultEntry entry : searchOp.getSearchEntries())
2093     {
2094       EntryHistorical history = EntryHistorical.newInstanceFromEntry(entry);
2095       if (entryToRename == null)
2096       {
2097         entryToRename = entry;
2098         entryToRenameCSN = history.getDNDate();
2099       }
2100       else if (!history.addedOrRenamedAfter(entryToRenameCSN))
2101       {
2102         // this conflict is older than the previous, keep it.
2103         entryToRename = entry;
2104         entryToRenameCSN = history.getDNDate();
2105       }
2106     }
2107
2108     if (entryToRename != null)
2109     {
2110       DN entryDN = entryToRename.getName();
2111       ModifyDNOperation newOp = renameEntry(
2112           entryDN, freedDN.rdn(), freedDN.parent(), false);
2113
2114       ResultCode res = newOp.getResultCode();
2115       if (res != ResultCode.SUCCESS)
2116       {
2117        logger.error(ERR_COULD_NOT_SOLVE_CONFLICT, entryDN, res);
2118       }
2119     }
2120   }
2121
2122  /**
2123   * Rename an Entry Using a synchronization, non-replicated operation.
2124   * This method should be used instead of the InternalConnection methods
2125   * when the operation that need to be run must be local only and therefore
2126   * not replicated to the RS.
2127   *
2128   * @param targetDN     The DN of the entry to rename.
2129   * @param newRDN       The new RDN to be used.
2130   * @param parentDN     The parentDN to be used.
2131   * @param markConflict A boolean indicating is this entry should be marked
2132   *                     as a conflicting entry. In such case the
2133   *                     DS_SYNC_CONFLICT attribute will be added to the entry
2134   *                     with the value of its original DN.
2135   *                     If false, the DS_SYNC_CONFLICT attribute will be
2136   *                     cleared.
2137   *
2138   * @return The operation that was run to rename the entry.
2139   */
2140  private ModifyDNOperation renameEntry(DN targetDN, RDN newRDN, DN parentDN,
2141      boolean markConflict)
2142  {
2143    ModifyDNOperation newOp = new ModifyDNOperationBasis(
2144        conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
2145        targetDN, newRDN, false, parentDN);
2146
2147    AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(DS_SYNC_CONFLICT);
2148    if (markConflict)
2149    {
2150      Attribute attr =
2151          Attributes.create(attrType, targetDN.toString());
2152      newOp.addModification(new Modification(ModificationType.REPLACE, attr));
2153    }
2154    else
2155    {
2156      Attribute attr = Attributes.empty(attrType);
2157      newOp.addModification(new Modification(ModificationType.DELETE, attr));
2158    }
2159
2160    runAsSynchronizedOperation(newOp);
2161    return newOp;
2162  }
2163
2164  private void runAsSynchronizedOperation(Operation op)
2165  {
2166    op.setInternalOperation(true);
2167    op.setSynchronizationOperation(true);
2168    op.setDontSynchronize(true);
2169    op.run();
2170  }
2171
2172  /** Delete this ReplicationDomain. */
2173  void delete()
2174  {
2175    shutdown();
2176    removeECLDomainCfg();
2177  }
2178
2179  /** Shutdown this ReplicationDomain. */
2180  public void shutdown()
2181  {
2182    if (shutdown.compareAndSet(false, true))
2183    {
2184      final RSUpdater rsUpdater = this.rsUpdater.get();
2185      if (rsUpdater != null)
2186      {
2187        rsUpdater.initiateShutdown();
2188      }
2189
2190      // stop the thread in charge of flushing the ServerState.
2191      if (flushThread != null)
2192      {
2193        flushThread.initiateShutdown();
2194        synchronized (flushThread)
2195        {
2196          flushThread.notify();
2197        }
2198      }
2199
2200      DirectoryServer.deregisterAlertGenerator(this);
2201
2202      // stop the ReplicationDomain
2203      disableService();
2204    }
2205
2206    // wait for completion of the ServerStateFlush thread.
2207    try
2208    {
2209      while (!done)
2210      {
2211        Thread.sleep(50);
2212      }
2213    } catch (InterruptedException e)
2214    {
2215      Thread.currentThread().interrupt();
2216    }
2217  }
2218
2219  /**
2220   * Create and replay a synchronized Operation from an UpdateMsg.
2221   *
2222   * @param msg
2223   *          The UpdateMsg to be replayed.
2224   * @param shutdown
2225   *          whether the server initiated shutdown
2226   */
2227  void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
2228  {
2229    // Try replay the operation, then flush (replaying) any pending operation
2230    // whose dependency has been replayed until no more left.
2231    do
2232    {
2233      Operation op = null; // the last operation on which replay was attempted
2234      boolean dependency = false;
2235      String replayErrorMsg = null;
2236      CSN csn = null;
2237      try
2238      {
2239        // The next operation for which to attempt replay.
2240        // This local variable allow to keep error messages in the "op" local
2241        // variable until the next loop iteration starts.
2242        // "op" is already initialized to the next Operation because of the
2243        // error handling paths.
2244        Operation nextOp = op = msg.createOperation(conn);
2245        dependency = remotePendingChanges.checkDependencies(op, msg);
2246
2247        boolean replayDone = false;
2248        int retryCount = 10;
2249        while (!dependency && !replayDone && retryCount-- > 0)
2250        {
2251          if (shutdown.get())
2252          {
2253            // shutdown initiated, let's leave
2254            return;
2255          }
2256          // Try replay the operation
2257          op = nextOp;
2258          op.setInternalOperation(true);
2259          op.setSynchronizationOperation(true);
2260
2261          // Always add the ManageDSAIT control so that updates to referrals
2262          // are processed locally.
2263          op.addRequestControl(new LDAPControl(OID_MANAGE_DSAIT_CONTROL));
2264
2265          csn = OperationContext.getCSN(op);
2266          op.run();
2267
2268          ResultCode result = op.getResultCode();
2269
2270          if (result != ResultCode.SUCCESS)
2271          {
2272            if (result == ResultCode.NO_OPERATION)
2273            {
2274              // Pre-operation conflict resolution detected that the operation
2275              // was a no-op. For example, an add which has already been
2276              // replayed, or a modify DN operation on an entry which has been
2277              // renamed by a more recent modify DN.
2278              replayDone = true;
2279            }
2280            else if (result == ResultCode.BUSY)
2281            {
2282              /*
2283               * We probably could not get a lock (OPENDJ-885). Give the server
2284               * another chance to process this operation immediately.
2285               */
2286              Thread.yield();
2287              continue;
2288            }
2289            else if (result == ResultCode.UNAVAILABLE)
2290            {
2291              /*
2292               * It can happen when a rebuild is performed or the backend is
2293               * offline (OPENDJ-49). Give the server another chance to process
2294               * this operation after some time.
2295               */
2296              Thread.sleep(50);
2297              continue;
2298            }
2299            else if (op instanceof ModifyOperation)
2300            {
2301              ModifyOperation castOp = (ModifyOperation) op;
2302              dependency = remotePendingChanges.checkDependencies(castOp);
2303              ModifyMsg modifyMsg = (ModifyMsg) msg;
2304              replayDone = solveNamingConflict(castOp, modifyMsg);
2305            }
2306            else if (op instanceof DeleteOperation)
2307            {
2308              DeleteOperation castOp = (DeleteOperation) op;
2309              dependency = remotePendingChanges.checkDependencies(castOp);
2310              replayDone = solveNamingConflict(castOp, msg);
2311            }
2312            else if (op instanceof AddOperation)
2313            {
2314              AddOperation castOp = (AddOperation) op;
2315              AddMsg addMsg = (AddMsg) msg;
2316              dependency = remotePendingChanges.checkDependencies(castOp);
2317              replayDone = solveNamingConflict(castOp, addMsg);
2318            }
2319            else if (op instanceof ModifyDNOperation)
2320            {
2321              ModifyDNOperation castOp = (ModifyDNOperation) op;
2322              replayDone = solveNamingConflict(castOp, msg);
2323            }
2324            else
2325            {
2326              replayDone = true; // unknown type of operation ?!
2327            }
2328
2329            if (replayDone)
2330            {
2331              // the update became a dummy update and the result
2332              // of the conflict resolution phase is to do nothing.
2333              // however we still need to push this change to the serverState
2334              updateError(csn);
2335            }
2336            else
2337            {
2338              /*
2339               * Create a new operation reflecting the new state of the
2340               * UpdateMsg after conflict resolution modified it.
2341               *  Note: When msg is a DeleteMsg, the DeleteOperation is properly
2342               *  created with subtreeDelete request control when needed.
2343               */
2344              nextOp = msg.createOperation(conn);
2345            }
2346          }
2347          else
2348          {
2349            replayDone = true;
2350          }
2351        }
2352
2353        if (!replayDone && !dependency)
2354        {
2355          // Continue with the next change but the servers could now become
2356          // inconsistent.
2357          // Let the repair tool know about this.
2358          final LocalizableMessage message = ERR_LOOP_REPLAYING_OPERATION.get(
2359              op, op.getErrorMessage());
2360          logger.error(message);
2361          numUnresolvedNamingConflicts.incrementAndGet();
2362          replayErrorMsg = message.toString();
2363          updateError(csn);
2364        }
2365      } catch (DecodeException | LDAPException | DataFormatException e)
2366      {
2367        replayErrorMsg = logDecodingOperationError(msg, e);
2368      } catch (Exception e)
2369      {
2370        if (csn != null)
2371        {
2372          /*
2373           * An Exception happened during the replay process.
2374           * Continue with the next change but the servers will now start
2375           * to be inconsistent.
2376           * Let the repair tool know about this.
2377           */
2378          LocalizableMessage message =
2379              ERR_EXCEPTION_REPLAYING_OPERATION.get(
2380                  stackTraceToSingleLineString(e), op);
2381          logger.error(message);
2382          replayErrorMsg = message.toString();
2383          updateError(csn);
2384        } else
2385        {
2386          replayErrorMsg = logDecodingOperationError(msg, e);
2387        }
2388      } finally
2389      {
2390        if (!dependency)
2391        {
2392          processUpdateDone(msg, replayErrorMsg);
2393        }
2394      }
2395
2396      // Now replay any pending update that had a dependency and whose
2397      // dependency has been replayed, do that until no more updates of that
2398      // type left...
2399      msg = remotePendingChanges.getNextUpdate();
2400    } while (msg != null);
2401  }
2402
2403  private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
2404  {
2405    LocalizableMessage message =
2406        ERR_EXCEPTION_DECODING_OPERATION.get(msg + " " + stackTraceToSingleLineString(e));
2407    logger.error(message);
2408    return message.toString();
2409  }
2410
2411  /**
2412   * This method is called when an error happens while replaying
2413   * an operation.
2414   * It is necessary because the postOperation does not always get
2415   * called when error or Exceptions happen during the operation replay.
2416   *
2417   * @param csn the CSN of the operation with error.
2418   */
2419  private void updateError(CSN csn)
2420  {
2421    try
2422    {
2423      remotePendingChanges.commit(csn);
2424    }
2425    catch (NoSuchElementException e)
2426    {
2427      // A failure occurred after the change had been removed from the pending
2428      // changes table.
2429      if (logger.isTraceEnabled())
2430      {
2431        logger.trace(
2432            "LDAPReplicationDomain.updateError: Unable to find remote "
2433                + "pending change for CSN %s", csn);
2434      }
2435    }
2436  }
2437
2438  /**
2439   * Generate a new CSN and insert it in the pending list.
2440   *
2441   * @param operation
2442   *          The operation for which the CSN must be generated.
2443   * @return The new CSN.
2444   */
2445  private CSN generateCSN(PluginOperation operation)
2446  {
2447    return pendingChanges.putLocalOperation(operation);
2448  }
2449
2450  /**
2451   * Find the Unique Id of the entry with the provided DN by doing a
2452   * search of the entry and extracting its entryUUID from its attributes.
2453   *
2454   * @param dn The dn of the entry for which the unique Id is searched.
2455   *
2456   * @return The unique Id of the entry with the provided DN.
2457   */
2458  static String findEntryUUID(DN dn)
2459  {
2460    if (dn == null)
2461    {
2462      return null;
2463    }
2464    final SearchRequest request = newSearchRequest(dn, SearchScope.BASE_OBJECT)
2465        .addAttribute(ENTRYUUID_ATTRIBUTE_NAME);
2466    final InternalSearchOperation search = getRootConnection().processSearch(request);
2467    final SearchResultEntry resultEntry = getFirstResult(search);
2468    if (resultEntry != null)
2469    {
2470      return getEntryUUID(resultEntry);
2471    }
2472    return null;
2473  }
2474
2475  private static SearchResultEntry getFirstResult(InternalSearchOperation search)
2476  {
2477    if (search.getResultCode() == ResultCode.SUCCESS)
2478    {
2479      final LinkedList<SearchResultEntry> results = search.getSearchEntries();
2480      if (!results.isEmpty())
2481      {
2482        return results.getFirst();
2483      }
2484    }
2485    return null;
2486  }
2487
2488  /**
2489   * Find the current DN of an entry from its entry UUID.
2490   *
2491   * @param uuid the Entry Unique ID.
2492   * @return The current DN of the entry or null if there is no entry with
2493   *         the specified UUID.
2494   */
2495  private DN findEntryDN(String uuid)
2496  {
2497    try
2498    {
2499      final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, "entryuuid=" + uuid);
2500      InternalSearchOperation search = conn.processSearch(request);
2501      final SearchResultEntry resultEntry = getFirstResult(search);
2502      if (resultEntry != null)
2503      {
2504        return resultEntry.getName();
2505      }
2506    }
2507    catch (DirectoryException e)
2508    {
2509      // never happens because the filter is always valid.
2510    }
2511    return null;
2512  }
2513
2514  /**
2515   * Solve a conflict detected when replaying a modify operation.
2516   *
2517   * @param op The operation that triggered the conflict detection.
2518   * @param msg The operation that triggered the conflict detection.
2519   * @return true if the process is completed, false if it must continue..
2520   */
2521  private boolean solveNamingConflict(ModifyOperation op, ModifyMsg msg)
2522  {
2523    ResultCode result = op.getResultCode();
2524    ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
2525    String entryUUID = ctx.getEntryUUID();
2526
2527    if (result == ResultCode.NO_SUCH_OBJECT)
2528    {
2529      /*
2530       * The operation is a modification but
2531       * the entry has been renamed on a different master in the same time.
2532       * search if the entry has been renamed, and return the new dn
2533       * of the entry.
2534       */
2535      DN newDN = findEntryDN(entryUUID);
2536      if (newDN != null)
2537      {
2538        // There is an entry with the same unique id as this modify operation
2539        // replay the modify using the current dn of this entry.
2540        msg.setDN(newDN);
2541        numResolvedNamingConflicts.incrementAndGet();
2542        return false;
2543      }
2544      else
2545      {
2546        // This entry does not exist anymore.
2547        // It has probably been deleted, stop the processing of this operation
2548        numResolvedNamingConflicts.incrementAndGet();
2549        return true;
2550      }
2551    }
2552    else if (result == ResultCode.NOT_ALLOWED_ON_RDN)
2553    {
2554      DN currentDN = findEntryDN(entryUUID);
2555      RDN currentRDN;
2556      if (currentDN != null)
2557      {
2558        currentRDN = currentDN.rdn();
2559      }
2560      else
2561      {
2562        // The entry does not exist anymore.
2563        numResolvedNamingConflicts.incrementAndGet();
2564        return true;
2565      }
2566
2567      // The modify operation is trying to delete the value that is
2568      // currently used in the RDN. We need to alter the modify so that it does
2569      // not remove the current RDN value(s).
2570
2571      List<Modification> mods = op.getModifications();
2572      for (Modification mod : mods)
2573      {
2574        AttributeType modAttrType = mod.getAttribute().getAttributeType();
2575        if ((mod.getModificationType() == ModificationType.DELETE
2576              || mod.getModificationType() == ModificationType.REPLACE)
2577            && currentRDN.hasAttributeType(modAttrType))
2578        {
2579          // the attribute can't be deleted because it is used in the RDN,
2580          // turn this operation is a replace with the current RDN value(s);
2581          mod.setModificationType(ModificationType.REPLACE);
2582          Attribute newAttribute = mod.getAttribute();
2583          AttributeBuilder attrBuilder = new AttributeBuilder(newAttribute);
2584          attrBuilder.add(currentRDN.getAttributeValue(modAttrType));
2585          mod.setAttribute(attrBuilder.toAttribute());
2586        }
2587      }
2588      msg.setMods(mods);
2589      numResolvedNamingConflicts.incrementAndGet();
2590      return false;
2591    }
2592    else
2593    {
2594      // The other type of errors can not be caused by naming conflicts.
2595      // Log a message for the repair tool.
2596      logger.error(ERR_ERROR_REPLAYING_OPERATION,
2597          op, ctx.getCSN(), result, op.getErrorMessage());
2598      return true;
2599    }
2600  }
2601
2602 /**
2603  * Solve a conflict detected when replaying a delete operation.
2604  *
2605  * @param op The operation that triggered the conflict detection.
2606  * @param msg The operation that triggered the conflict detection.
2607  * @return true if the process is completed, false if it must continue..
2608  */
2609 private boolean solveNamingConflict(DeleteOperation op, LDAPUpdateMsg msg)
2610 {
2611   ResultCode result = op.getResultCode();
2612   DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
2613   String entryUUID = ctx.getEntryUUID();
2614
2615   if (result == ResultCode.NO_SUCH_OBJECT)
2616   {
2617     /* Find if the entry is still in the database. */
2618     DN currentDN = findEntryDN(entryUUID);
2619     if (currentDN == null)
2620     {
2621       /*
2622        * The entry has already been deleted, either because this delete
2623        * has already been replayed or because another concurrent delete
2624        * has already done the job.
2625        * In any case, there is nothing more to do.
2626        */
2627       numResolvedNamingConflicts.incrementAndGet();
2628       return true;
2629     }
2630     else
2631     {
2632       // This entry has been renamed, replay the delete using its new DN.
2633       msg.setDN(currentDN);
2634       numResolvedNamingConflicts.incrementAndGet();
2635       return false;
2636     }
2637   }
2638   else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
2639   {
2640     /*
2641      * This may happen when we replay a DELETE done on a master
2642      * but children of this entry have been added on another master.
2643      *
2644      * Rename all the children by adding entryuuid in dn and delete this entry.
2645      *
2646      * The action taken here must be consistent with the actions
2647      * done in the solveNamingConflict(AddOperation) method
2648      * when we are adding an entry whose parent entry has already been deleted.
2649      */
2650     if (findAndRenameChild(op.getEntryDN(), op))
2651     {
2652       numUnresolvedNamingConflicts.incrementAndGet();
2653     }
2654
2655     return false;
2656   }
2657   else
2658   {
2659     // The other type of errors can not be caused by naming conflicts.
2660     // Log a message for the repair tool.
2661     logger.error(ERR_ERROR_REPLAYING_OPERATION,
2662         op, ctx.getCSN(), result, op.getErrorMessage());
2663     return true;
2664   }
2665 }
2666
2667/**
2668 * Solve a conflict detected when replaying a Modify DN operation.
2669 *
2670 * @param op The operation that triggered the conflict detection.
2671 * @param msg The operation that triggered the conflict detection.
2672 * @return true if the process is completed, false if it must continue.
2673 * @throws Exception When the operation is not valid.
2674 */
2675private boolean solveNamingConflict(ModifyDNOperation op, LDAPUpdateMsg msg)
2676    throws Exception
2677{
2678  ResultCode result = op.getResultCode();
2679  ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
2680  String entryUUID = ctx.getEntryUUID();
2681  String newSuperiorID = ctx.getNewSuperiorEntryUUID();
2682
2683  /*
2684   * four possible cases :
2685   * - the modified entry has been renamed
2686   * - the new parent has been renamed
2687   * - the operation is replayed for the second time.
2688   * - the entry has been deleted
2689   * action :
2690   *  - change the target dn and the new parent dn and
2691   *        restart the operation,
2692   *  - don't do anything if the operation is replayed.
2693   */
2694
2695  // get the current DN of this entry in the database.
2696  DN currentDN = findEntryDN(entryUUID);
2697
2698  // Construct the new DN to use for the entry.
2699  DN entryDN = op.getEntryDN();
2700  DN newSuperior;
2701  RDN newRDN = op.getNewRDN();
2702
2703  if (newSuperiorID != null)
2704  {
2705    newSuperior = findEntryDN(newSuperiorID);
2706  }
2707  else
2708  {
2709    newSuperior = entryDN.parent();
2710  }
2711
2712  //If we could not find the new parent entry, we missed this entry
2713  // earlier or it has disappeared from the database
2714  // Log this information for the repair tool and mark the entry
2715  // as conflicting.
2716  // stop the processing.
2717  if (newSuperior == null)
2718  {
2719    markConflictEntry(op, currentDN, currentDN.parent().child(newRDN));
2720    numUnresolvedNamingConflicts.incrementAndGet();
2721    return true;
2722  }
2723
2724  DN newDN = newSuperior.child(newRDN);
2725
2726  if (currentDN == null)
2727  {
2728    // The entry targeted by the Modify DN is not in the database
2729    // anymore.
2730    // This is a conflict between a delete and this modify DN.
2731    // The entry has been deleted, we can safely assume
2732    // that the operation is completed.
2733    numResolvedNamingConflicts.incrementAndGet();
2734    return true;
2735  }
2736
2737  // if the newDN and the current DN match then the operation
2738  // is a no-op (this was probably a second replay)
2739  // don't do anything.
2740  if (newDN.equals(currentDN))
2741  {
2742    numResolvedNamingConflicts.incrementAndGet();
2743    return true;
2744  }
2745
2746  if (result == ResultCode.NO_SUCH_OBJECT
2747      || result == ResultCode.UNWILLING_TO_PERFORM
2748      || result == ResultCode.OBJECTCLASS_VIOLATION)
2749  {
2750    /*
2751     * The entry or it's new parent has not been found
2752     * reconstruct the operation with the DN we just built
2753     */
2754    ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
2755    modifyDnMsg.setDN(currentDN);
2756    modifyDnMsg.setNewSuperior(newSuperior.toString());
2757    numResolvedNamingConflicts.incrementAndGet();
2758    return false;
2759  }
2760  else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
2761  {
2762    /*
2763     * This may happen when two modifyDn operation
2764     * are done on different servers but with the same target DN
2765     * add the conflict object class to the entry
2766     * and rename it using its entryuuid.
2767     */
2768    ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
2769    markConflictEntry(op, op.getEntryDN(), newDN);
2770    modifyDnMsg.setNewRDN(generateConflictRDN(entryUUID,
2771                          modifyDnMsg.getNewRDN()));
2772    modifyDnMsg.setNewSuperior(newSuperior.toString());
2773    numUnresolvedNamingConflicts.incrementAndGet();
2774    return false;
2775  }
2776  else
2777  {
2778    // The other type of errors can not be caused by naming conflicts.
2779    // Log a message for the repair tool.
2780    logger.error(ERR_ERROR_REPLAYING_OPERATION,
2781        op, ctx.getCSN(), result, op.getErrorMessage());
2782    return true;
2783  }
2784}
2785
2786  /**
2787   * Solve a conflict detected when replaying a ADD operation.
2788   *
2789   * @param op The operation that triggered the conflict detection.
2790   * @param msg The message that triggered the conflict detection.
2791   * @return true if the process is completed, false if it must continue.
2792   * @throws Exception When the operation is not valid.
2793   */
2794  private boolean solveNamingConflict(AddOperation op, AddMsg msg)
2795      throws Exception
2796  {
2797    ResultCode result = op.getResultCode();
2798    AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
2799    String entryUUID = ctx.getEntryUUID();
2800    String parentUniqueId = ctx.getParentEntryUUID();
2801
2802    if (result == ResultCode.NO_SUCH_OBJECT)
2803    {
2804      /*
2805       * This can happen if the parent has been renamed or deleted
2806       * find the parent dn and calculate a new dn for the entry
2807       */
2808      if (parentUniqueId == null)
2809      {
2810        /*
2811         * This entry is the base dn of the backend.
2812         * It is quite surprising that the operation result be NO_SUCH_OBJECT.
2813         * There is nothing more we can do except log a
2814         * message for the repair tool to look at this problem.
2815         * TODO : Log the message
2816         */
2817        return true;
2818      }
2819      DN parentDn = findEntryDN(parentUniqueId);
2820      if (parentDn == null)
2821      {
2822        /*
2823         * The parent has been deleted
2824         * rename the entry as a conflicting entry.
2825         * The action taken here must be consistent with the actions
2826         * done when in the solveNamingConflict(DeleteOperation) method
2827         * when we are deleting an entry that have some child entries.
2828         */
2829        addConflict(msg);
2830
2831        String conflictRDN =
2832            generateConflictRDN(entryUUID, op.getEntryDN().rdn().toString());
2833        msg.setDN(DN.valueOf(conflictRDN + "," + getBaseDN()));
2834        // reset the parent entryUUID so that the check done is the
2835        // handleConflict phase does not fail.
2836        msg.setParentEntryUUID(null);
2837        numUnresolvedNamingConflicts.incrementAndGet();
2838      }
2839      else
2840      {
2841        msg.setDN(DN.valueOf(msg.getDN().rdn() + "," + parentDn));
2842        numResolvedNamingConflicts.incrementAndGet();
2843      }
2844      return false;
2845    }
2846    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
2847    {
2848      /*
2849       * This can happen if
2850       *  - two adds are done on different servers but with the
2851       *    same target DN.
2852       *  - the same ADD is being replayed for the second time on this server.
2853       * if the entryUUID already exist, assume this is a replay and
2854       *        don't do anything
2855       * if the entry unique id do not exist, generate conflict.
2856       */
2857      if (findEntryDN(entryUUID) != null)
2858      {
2859        // entry already exist : this is a replay
2860        return true;
2861      }
2862      else
2863      {
2864        addConflict(msg);
2865        String conflictRDN =
2866            generateConflictRDN(entryUUID, msg.getDN().toString());
2867        msg.setDN(DN.valueOf(conflictRDN));
2868        numUnresolvedNamingConflicts.incrementAndGet();
2869        return false;
2870      }
2871    }
2872    else
2873    {
2874      // The other type of errors can not be caused by naming conflicts.
2875      // log a message for the repair tool.
2876      logger.error(ERR_ERROR_REPLAYING_OPERATION,
2877          op, ctx.getCSN(), result, op.getErrorMessage());
2878      return true;
2879    }
2880  }
2881
2882  /**
2883   * Find all the entries below the provided DN and rename them
2884   * so that they stay below the baseDN of this replicationDomain and
2885   * use the conflicting name and attribute.
2886   *
2887   * @param entryDN    The DN of the entry whose child must be renamed.
2888   * @param conflictOp The Operation that generated the conflict.
2889   */
2890  private boolean findAndRenameChild(DN entryDN, Operation conflictOp)
2891  {
2892    /*
2893     * TODO JNR Ludo thinks that: "Ideally, the operation should verify that the
2894     * entryUUID has not changed or try to use the entryUUID rather than the
2895     * DN.". entryUUID can be obtained from the caller of the current method.
2896     */
2897    boolean conflict = false;
2898
2899    // Find and rename child entries.
2900    final SearchRequest request = newSearchRequest(entryDN, SearchScope.SINGLE_LEVEL)
2901        .addAttribute(ENTRYUUID_ATTRIBUTE_NAME, HISTORICAL_ATTRIBUTE_NAME);
2902    InternalSearchOperation op = conn.processSearch(request);
2903    if (op.getResultCode() == ResultCode.SUCCESS)
2904    {
2905      for (SearchResultEntry entry : op.getSearchEntries())
2906      {
2907        /*
2908         * Check the ADD and ModRDN date of the child entry
2909         * (All of them, not only the one that are newer than the DEL op)
2910         * and keep the entry as a conflicting entry.
2911         */
2912        conflict = true;
2913        renameConflictEntry(conflictOp, entry.getName(), getEntryUUID(entry));
2914      }
2915    }
2916    else
2917    {
2918      // log error and information for the REPAIR tool.
2919      logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, entryDN, conflictOp, op.getResultCode());
2920    }
2921
2922    return conflict;
2923  }
2924
2925  /**
2926   * Rename an entry that was conflicting so that it stays below the
2927   * baseDN of the replicationDomain.
2928   *
2929   * @param conflictOp The Operation that caused the conflict.
2930   * @param dn         The DN of the entry to be renamed.
2931   * @param entryUUID        The uniqueID of the entry to be renamed.
2932   */
2933  private void renameConflictEntry(Operation conflictOp, DN dn,
2934      String entryUUID)
2935  {
2936    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(dn);
2937    DirectoryServer.sendAlertNotification(this,
2938        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
2939
2940    RDN newRDN = generateDeleteConflictDn(entryUUID, dn);
2941    ModifyDNOperation newOp = renameEntry(dn, newRDN, getBaseDN(), true);
2942
2943    if (newOp.getResultCode() != ResultCode.SUCCESS)
2944    {
2945      // log information for the repair tool.
2946      logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY,
2947          dn, conflictOp, newOp.getResultCode());
2948    }
2949  }
2950
2951  /**
2952   * Generate a modification to add the conflict attribute to an entry
2953   * whose Dn is now conflicting with another entry.
2954   *
2955   * @param op        The operation causing the conflict.
2956   * @param currentDN The current DN of the operation to mark as conflicting.
2957   * @param conflictDN     The newDn on which the conflict happened.
2958   */
2959  private void markConflictEntry(Operation op, DN currentDN, DN conflictDN)
2960  {
2961    // create new internal modify operation and run it.
2962    AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(DS_SYNC_CONFLICT);
2963    Attribute attr = Attributes.create(attrType, conflictDN.toString());
2964    List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
2965
2966    ModifyOperation newOp = new ModifyOperationBasis(
2967          conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
2968          currentDN, mods);
2969    runAsSynchronizedOperation(newOp);
2970
2971    if (newOp.getResultCode() != ResultCode.SUCCESS)
2972    {
2973      // Log information for the repair tool.
2974      logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, op, newOp.getResultCode());
2975    }
2976
2977    // Generate an alert to let the administration know that some
2978    // conflict could not be solved.
2979    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN);
2980    DirectoryServer.sendAlertNotification(this,
2981        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
2982  }
2983
2984  /**
2985   * Add the conflict attribute to an entry that could
2986   * not be added because it is conflicting with another entry.
2987   *
2988   * @param msg            The conflicting Add Operation.
2989   *
2990   * @throws DecodeException When an encoding error happened manipulating the
2991   *                       msg.
2992   */
2993  private void addConflict(AddMsg msg) throws DecodeException
2994  {
2995    String normalizedDN = msg.getDN().toString();
2996
2997    // Generate an alert to let the administrator know that some
2998    // conflict could not be solved.
2999    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN);
3000    DirectoryServer.sendAlertNotification(this,
3001        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
3002
3003    // Add the conflict attribute
3004    msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN);
3005  }
3006
3007  /**
3008   * Generate the Dn to use for a conflicting entry.
3009   *
3010   * @param entryUUID The unique identifier of the entry involved in the
3011   * conflict.
3012   * @param rdn Original rdn.
3013   * @return The generated RDN for a conflicting entry.
3014   */
3015  private String generateConflictRDN(String entryUUID, String rdn)
3016  {
3017    return "entryuuid=" + entryUUID + "+" + rdn;
3018  }
3019
3020  /**
3021   * Generate the RDN to use for a conflicting entry whose father was deleted.
3022   *
3023   * @param entryUUID The unique identifier of the entry involved in the
3024   *                 conflict.
3025   * @param dn       The original DN of the entry.
3026   *
3027   * @return The generated RDN for a conflicting entry.
3028   */
3029  private RDN generateDeleteConflictDn(String entryUUID, DN dn)
3030  {
3031    String newRDN =  "entryuuid=" + entryUUID + "+" + dn.rdn();
3032    try
3033    {
3034      return RDN.decode(newRDN);
3035    } catch (DirectoryException e)
3036    {
3037      // cannot happen
3038      return null;
3039    }
3040  }
3041
3042  /**
3043   * Check if the domain solve conflicts.
3044   *
3045   * @return a boolean indicating if the domain should solve conflicts.
3046   */
3047  boolean solveConflict()
3048  {
3049    return solveConflictFlag;
3050  }
3051
3052  /**
3053   * Disable the replication on this domain.
3054   * The session to the replication server will be stopped.
3055   * The domain will not be destroyed but call to the pre-operation
3056   * methods will result in failure.
3057   * The listener thread will be destroyed.
3058   * The monitor informations will still be accessible.
3059   */
3060  public void disable()
3061  {
3062    state.save();
3063    state.clearInMemory();
3064    disabled = true;
3065    disableService(); // This will cut the session and wake up the listener
3066  }
3067
3068  /**
3069   * Do what necessary when the data have changed : load state, load
3070   * generation Id.
3071   * If there is no such information check if there is a
3072   * ReplicaUpdateVector entry and translate it into a state
3073   * and generationId.
3074   * @exception DirectoryException Thrown when an error occurs.
3075   */
3076  private void loadDataState() throws DirectoryException
3077  {
3078    state.clearInMemory();
3079    state.loadState();
3080    getGenerator().adjust(state.getMaxCSN(getServerId()));
3081
3082    // Retrieves the generation ID associated with the data imported
3083    generationId = loadGenerationId();
3084  }
3085
3086  /**
3087   * Enable back the domain after a previous disable.
3088   * The domain will connect back to a replication Server and
3089   * will recreate threads to listen for messages from the Synchronization
3090   * server.
3091   * The generationId will be retrieved or computed if necessary.
3092   * The ServerState will also be read again from the local database.
3093   */
3094  public void enable()
3095  {
3096    try
3097    {
3098      loadDataState();
3099    }
3100    catch (Exception e)
3101    {
3102      /* TODO should mark that replicationServer service is
3103       * not available, log an error and retry upon timeout
3104       * should we stop the modifications ?
3105       */
3106      logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
3107      return;
3108    }
3109
3110    enableService();
3111
3112    disabled = false;
3113  }
3114
3115  /**
3116   * Compute the data generationId associated with the current data present
3117   * in the backend for this domain.
3118   * @return The computed generationId.
3119   * @throws DirectoryException When an error occurs.
3120   */
3121  private long computeGenerationId() throws DirectoryException
3122  {
3123    final long genId = exportBackend(null, true);
3124    if (logger.isTraceEnabled())
3125    {
3126      logger.trace("Computed generationId: generationId=" + genId);
3127    }
3128    return genId;
3129  }
3130
3131  /**
3132   * Run a modify operation to update the entry whose DN is given as
3133   * a parameter with the generationID information.
3134   *
3135   * @param entryDN       The DN of the entry to be updated.
3136   * @param generationId  The value of the generationID to be saved.
3137   *
3138   * @return A ResultCode indicating if the operation was successful.
3139   */
3140  private ResultCode runSaveGenerationId(DN entryDN, long generationId)
3141  {
3142    // The generationId is stored in the root entry of the domain.
3143    final ByteString asn1BaseDn = ByteString.valueOf(entryDN.toString());
3144
3145    LDAPAttribute attr = new LDAPAttribute(REPLICATION_GENERATION_ID, Long.toString(generationId));
3146    List<RawModification> mods = new ArrayList<>(1);
3147    mods.add(new LDAPModification(ModificationType.REPLACE, attr));
3148
3149    ModifyOperation op = new ModifyOperationBasis(
3150          conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
3151          asn1BaseDn, mods);
3152    runAsSynchronizedOperation(op);
3153    return op.getResultCode();
3154  }
3155
3156  /**
3157   * Stores the value of the generationId.
3158   * @param generationId The value of the generationId.
3159   * @return a ResultCode indicating if the method was successful.
3160   */
3161  private ResultCode saveGenerationId(long generationId)
3162  {
3163    ResultCode result = runSaveGenerationId(getBaseDN(), generationId);
3164    if (result != ResultCode.SUCCESS)
3165    {
3166      generationIdSavedStatus = false;
3167      if (result == ResultCode.NO_SUCH_OBJECT)
3168      {
3169        // If the base entry does not exist, save the generation
3170        // ID in the config entry
3171        result = runSaveGenerationId(config.dn(), generationId);
3172      }
3173
3174      if (result != ResultCode.SUCCESS)
3175      {
3176        logger.error(ERR_UPDATING_GENERATION_ID, result.getName(), getBaseDN());
3177      }
3178    }
3179    else
3180    {
3181      generationIdSavedStatus = true;
3182    }
3183    return result;
3184  }
3185
3186  /**
3187   * Load the GenerationId from the root entry of the domain
3188   * from the REPLICATION_GENERATION_ID attribute in database
3189   * to memory, or compute it if not found.
3190   *
3191   * @return generationId The retrieved value of generationId
3192   * @throws DirectoryException When an error occurs.
3193   */
3194  private long loadGenerationId() throws DirectoryException
3195  {
3196    if (logger.isTraceEnabled())
3197    {
3198      logger.trace("Attempt to read generation ID from DB " + getBaseDN());
3199    }
3200
3201    // Search the database entry that is used to periodically save the generation id
3202    final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT)
3203        .addAttribute(REPLICATION_GENERATION_ID);
3204    InternalSearchOperation search = conn.processSearch(request);
3205    if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT)
3206    {
3207      // if the base entry does not exist look for the generationID
3208      // in the config entry.
3209      request.setName(config.dn());
3210      search = conn.processSearch(request);
3211    }
3212
3213    boolean found = false;
3214    long aGenerationId = -1;
3215    if (search.getResultCode() != ResultCode.SUCCESS)
3216    {
3217      if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT)
3218      {
3219        String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage();
3220        logger.error(ERR_SEARCHING_GENERATION_ID, errorMsg, getBaseDN());
3221      }
3222    }
3223    else
3224    {
3225      List<SearchResultEntry> result = search.getSearchEntries();
3226      SearchResultEntry resultEntry = result.get(0);
3227      if (resultEntry != null)
3228      {
3229        AttributeType synchronizationGenIDType =
3230          DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
3231        List<Attribute> attrs =
3232          resultEntry.getAttribute(synchronizationGenIDType);
3233        if (attrs != null)
3234        {
3235          Attribute attr = attrs.get(0);
3236          if (attr.size()>1)
3237          {
3238            String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString();
3239            logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg);
3240          }
3241          else if (attr.size() == 1)
3242          {
3243            found = true;
3244            try
3245            {
3246              aGenerationId = Long.decode(attr.iterator().next().toString());
3247            }
3248            catch(Exception e)
3249            {
3250              logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
3251            }
3252          }
3253        }
3254      }
3255    }
3256
3257    if (!found)
3258    {
3259      aGenerationId = computeGenerationId();
3260      saveGenerationId(aGenerationId);
3261
3262      if (logger.isTraceEnabled())
3263      {
3264        logger.trace("Generation ID created for domain baseDN=" + getBaseDN() + " generationId=" + aGenerationId);
3265      }
3266    }
3267    else
3268    {
3269      generationIdSavedStatus = true;
3270      if (logger.isTraceEnabled())
3271      {
3272        logger.trace("Generation ID successfully read from domain baseDN=" + getBaseDN()
3273            + " generationId=" + aGenerationId);
3274      }
3275    }
3276    return aGenerationId;
3277  }
3278
3279  /**
3280   * Do whatever is needed when a backup is started.
3281   * We need to make sure that the serverState is correctly save.
3282   */
3283  void backupStart()
3284  {
3285    state.save();
3286  }
3287
3288  /** Do whatever is needed when a backup is finished. */
3289  void backupEnd()
3290  {
3291    // Nothing is needed at the moment
3292  }
3293
3294  /*
3295   * Total Update >>
3296   */
3297
3298  /**
3299   * This method trigger an export of the replicated data.
3300   *
3301   * @param output               The OutputStream where the export should
3302   *                             be produced.
3303   * @throws DirectoryException  When needed.
3304   */
3305  @Override
3306  protected void exportBackend(OutputStream output) throws DirectoryException
3307  {
3308    exportBackend(output, false);
3309  }
3310
3311  /**
3312   * Export the entries from the backend and/or compute the generation ID.
3313   * The ieContext must have been set before calling.
3314   *
3315   * @param output              The OutputStream where the export should
3316   *                            be produced.
3317   * @param checksumOutput      A boolean indicating if this export is
3318   *                            invoked to perform a checksum only
3319   *
3320   * @return The computed       GenerationID.
3321   *
3322   * @throws DirectoryException when an error occurred
3323   */
3324  private long exportBackend(OutputStream output, boolean checksumOutput)
3325      throws DirectoryException
3326  {
3327    Backend<?> backend = getBackend();
3328
3329    //  Acquire a shared lock for the backend.
3330    try
3331    {
3332      String lockFile = LockFileManager.getBackendLockFileName(backend);
3333      StringBuilder failureReason = new StringBuilder();
3334      if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
3335      {
3336        LocalizableMessage message =
3337            ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason);
3338        logger.error(message);
3339        throw new DirectoryException(ResultCode.OTHER, message);
3340      }
3341    }
3342    catch (Exception e)
3343    {
3344      LocalizableMessage message =
3345          ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(),
3346              stackTraceToSingleLineString(e));
3347      logger.error(message);
3348      throw new DirectoryException(ResultCode.OTHER, message);
3349    }
3350
3351    long numberOfEntries = backend.getNumberOfEntriesInBaseDN(getBaseDN());
3352    long entryCount = Math.min(numberOfEntries, 1000);
3353    OutputStream os;
3354    ReplLDIFOutputStream ros = null;
3355    if (checksumOutput)
3356    {
3357      ros = new ReplLDIFOutputStream(entryCount);
3358      os = ros;
3359      try
3360      {
3361        os.write(Long.toString(numberOfEntries).getBytes());
3362      }
3363      catch(Exception e)
3364      {
3365        // Should never happen
3366      }
3367    }
3368    else
3369    {
3370      os = output;
3371    }
3372
3373    // baseDN branch is the only one included in the export
3374    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
3375    exportConfig.setIncludeBranches(newArrayList(getBaseDN()));
3376
3377    // For the checksum computing mode, only consider the 'stable' attributes
3378    if (checksumOutput)
3379    {
3380      String includeAttributeStrings[] = { "objectclass", "sn", "cn", "entryuuid" };
3381      Set<AttributeType> includeAttributes = new HashSet<>();
3382      for (String attrName : includeAttributeStrings)
3383      {
3384        includeAttributes.add(DirectoryServer.getAttributeTypeOrDefault(attrName));
3385      }
3386      exportConfig.setIncludeAttributes(includeAttributes);
3387    }
3388
3389    //  Launch the export.
3390    long genID = 0;
3391    try
3392    {
3393      backend.exportLDIF(exportConfig);
3394    }
3395    catch (DirectoryException de)
3396    {
3397      if (ros == null || ros.getNumExportedEntries() < entryCount)
3398      {
3399        LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
3400        logger.error(message);
3401        throw new DirectoryException(ResultCode.OTHER, message);
3402      }
3403    }
3404    catch (Exception e)
3405    {
3406      LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(stackTraceToSingleLineString(e));
3407      logger.error(message);
3408      throw new DirectoryException(ResultCode.OTHER, message);
3409    }
3410    finally
3411    {
3412      // Clean up after the export by closing the export config.
3413      // Will also flush the export and export the remaining entries.
3414      exportConfig.close();
3415
3416      if (checksumOutput)
3417      {
3418        genID = ros.getChecksumValue();
3419      }
3420
3421      //  Release the shared lock on the backend.
3422      try
3423      {
3424        String lockFile = LockFileManager.getBackendLockFileName(backend);
3425        StringBuilder failureReason = new StringBuilder();
3426        if (! LockFileManager.releaseLock(lockFile, failureReason))
3427        {
3428          LocalizableMessage message =
3429              WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason);
3430          logger.warn(message);
3431          throw new DirectoryException(ResultCode.OTHER, message);
3432        }
3433      }
3434      catch (Exception e)
3435      {
3436        LocalizableMessage message =
3437            WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(),
3438                stackTraceToSingleLineString(e));
3439        logger.warn(message);
3440        throw new DirectoryException(ResultCode.OTHER, message);
3441      }
3442    }
3443    return genID;
3444  }
3445
3446  /**
3447   * Process backend before import.
3448   *
3449   * @param backend
3450   *          The backend.
3451   * @throws DirectoryException
3452   *           If the backend could not be disabled or locked exclusively.
3453   */
3454  private void preBackendImport(Backend<?> backend) throws DirectoryException
3455  {
3456    // Stop saving state
3457    stateSavingDisabled = true;
3458
3459    // FIXME setBackendEnabled should be part of TaskUtils ?
3460    TaskUtils.disableBackend(backend.getBackendID());
3461
3462    // Acquire an exclusive lock for the backend.
3463    String lockFile = LockFileManager.getBackendLockFileName(backend);
3464    StringBuilder failureReason = new StringBuilder();
3465    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
3466    {
3467      LocalizableMessage message = ERR_INIT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason);
3468      logger.error(message);
3469      throw new DirectoryException(ResultCode.OTHER, message);
3470    }
3471  }
3472
3473  /**
3474   * This method triggers an import of the replicated data.
3475   *
3476   * @param input                The InputStream from which the data are read.
3477   * @throws DirectoryException  When needed.
3478   */
3479  @Override
3480  protected void importBackend(InputStream input) throws DirectoryException
3481  {
3482    Backend<?> backend = getBackend();
3483
3484    LDIFImportConfig importConfig = null;
3485    ImportExportContext ieCtx = getImportExportContext();
3486    try
3487    {
3488      if (!backend.supports(BackendOperation.LDIF_IMPORT))
3489      {
3490        ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER,
3491            ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID())));
3492        return;
3493      }
3494
3495      importConfig = new LDIFImportConfig(input);
3496      importConfig.setIncludeBranches(newLinkedHashSet(getBaseDN()));
3497      importConfig.setAppendToExistingData(false);
3498      importConfig.setSkipDNValidation(true);
3499      // We should not validate schema for replication
3500      importConfig.setValidateSchema(false);
3501      // Allow fractional replication ldif import plugin to be called
3502      importConfig.setInvokeImportPlugins(true);
3503      // Reset the follow import flag and message before starting the import
3504      importErrorMessageId = -1;
3505
3506      // TODO How to deal with rejected entries during the import
3507      File rejectsFile =
3508          getFileForPath("logs" + File.separator + "replInitRejectedEntries");
3509      importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(),
3510          ExistingFileBehavior.OVERWRITE);
3511
3512      // Process import
3513      preBackendImport(backend);
3514      backend.importLDIF(importConfig, DirectoryServer.getInstance().getServerContext());
3515
3516      stateSavingDisabled = false;
3517    }
3518    catch(Exception e)
3519    {
3520      ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER,
3521          ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e))));
3522    }
3523    finally
3524    {
3525      try
3526      {
3527        // Cleanup
3528        if (importConfig != null)
3529        {
3530          importConfig.close();
3531          closeBackendImport(backend); // Re-enable backend
3532          backend = getBackend();
3533        }
3534
3535        loadDataState();
3536
3537        if (ieCtx.getException() != null)
3538        {
3539          // When an error occurred during an import, most of times
3540          // the generationId coming in the root entry of the imported data,
3541          // is not valid anymore (partial data in the backend).
3542          generationId = computeGenerationId();
3543          saveGenerationId(generationId);
3544        }
3545      }
3546      catch (DirectoryException fe)
3547      {
3548        // If we already catch an Exception it's quite possible
3549        // that the loadDataState() and setGenerationId() fail
3550        // so we don't bother about the new Exception.
3551        // However if there was no Exception before we want
3552        // to return this Exception to the task creator.
3553        ieCtx.setExceptionIfNoneSet(new DirectoryException(
3554            ResultCode.OTHER,
3555            ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe))));
3556      }
3557    }
3558
3559    if (ieCtx.getException() != null)
3560    {
3561      throw ieCtx.getException();
3562    }
3563  }
3564
3565  /**
3566   * Make post import operations.
3567   * @param backend The backend implied in the import.
3568   * @exception DirectoryException Thrown when an error occurs.
3569   */
3570  private void closeBackendImport(Backend<?> backend) throws DirectoryException
3571  {
3572    String lockFile = LockFileManager.getBackendLockFileName(backend);
3573    StringBuilder failureReason = new StringBuilder();
3574
3575    // Release lock
3576    if (!LockFileManager.releaseLock(lockFile, failureReason))
3577    {
3578      LocalizableMessage message =
3579          WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason);
3580      logger.warn(message);
3581      throw new DirectoryException(ResultCode.OTHER, message);
3582    }
3583
3584    TaskUtils.enableBackend(backend.getBackendID());
3585  }
3586
3587  /**
3588   * Retrieves a replication domain based on the baseDN.
3589   *
3590   * @param baseDN The baseDN of the domain to retrieve
3591   * @return The domain retrieved
3592   * @throws DirectoryException When an error occurred or no domain
3593   * match the provided baseDN.
3594   */
3595  public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDN)
3596      throws DirectoryException
3597  {
3598    LDAPReplicationDomain replicationDomain = null;
3599
3600    // Retrieves the domain
3601    for (SynchronizationProvider<?> provider :
3602      DirectoryServer.getSynchronizationProviders())
3603    {
3604      if (!(provider instanceof MultimasterReplication))
3605      {
3606        LocalizableMessage message = ERR_INVALID_PROVIDER.get();
3607        throw new DirectoryException(ResultCode.OTHER, message);
3608      }
3609
3610      // From the domainDN retrieves the replication domain
3611      LDAPReplicationDomain domain =
3612        MultimasterReplication.findDomain(baseDN, null);
3613      if (domain == null)
3614      {
3615        break;
3616      }
3617      if (replicationDomain != null)
3618      {
3619        // Should never happen
3620        LocalizableMessage message = ERR_MULTIPLE_MATCHING_DOMAIN.get();
3621        throw new DirectoryException(ResultCode.OTHER, message);
3622      }
3623      replicationDomain = domain;
3624    }
3625
3626    if (replicationDomain == null)
3627    {
3628      throw new DirectoryException(ResultCode.OTHER, ERR_NO_MATCHING_DOMAIN.get(baseDN));
3629    }
3630    return replicationDomain;
3631  }
3632
3633  /**
3634   * Returns the backend associated to this domain.
3635   * @return The associated backend.
3636   */
3637  private Backend<?> getBackend()
3638  {
3639    return DirectoryServer.getBackend(getBaseDN());
3640  }
3641
3642  /*
3643   * <<Total Update
3644   */
3645
3646  /**
3647   * Push the schema modifications contained in the given parameter as a
3648   * modification that would happen on a local server. The modifications are not
3649   * applied to the local schema backend and historical information is not
3650   * updated; but a CSN is generated and the ServerState associated to the
3651   * schema domain is updated.
3652   *
3653   * @param modifications
3654   *          The schema modifications to push
3655   */
3656  void synchronizeSchemaModifications(List<Modification> modifications)
3657  {
3658    ModifyOperation op = new ModifyOperationBasis(
3659        conn, nextOperationID(), nextMessageID(), null,
3660        DirectoryServer.getSchemaDN(), modifications);
3661
3662    final Entry schema;
3663    try
3664    {
3665      schema = DirectoryServer.getEntry(DirectoryServer.getSchemaDN());
3666    }
3667    catch (DirectoryException e)
3668    {
3669      logger.traceException(e);
3670      logger.error(ERR_BACKEND_SEARCH_ENTRY.get(DirectoryServer.getSchemaDN().toString(),
3671              stackTraceToSingleLineString(e)));
3672      return;
3673    }
3674
3675    LocalBackendModifyOperation localOp = new LocalBackendModifyOperation(op);
3676    CSN csn = generateCSN(localOp);
3677    OperationContext ctx = new ModifyContext(csn, getEntryUUID(schema));
3678    localOp.setAttachment(SYNCHROCONTEXT, ctx);
3679    localOp.setResultCode(ResultCode.SUCCESS);
3680    synchronize(localOp);
3681  }
3682
3683  /**
3684   * Check if the provided configuration is acceptable for add.
3685   *
3686   * @param configuration The configuration to check.
3687   * @param unacceptableReasons When the configuration is not acceptable, this
3688   *                            table is use to return the reasons why this
3689   *                            configuration is not acceptable.
3690   *
3691   * @return true if the configuration is acceptable, false other wise.
3692   */
3693  static boolean isConfigurationAcceptable(ReplicationDomainCfg configuration,
3694      List<LocalizableMessage> unacceptableReasons)
3695  {
3696    // Check that there is not already a domain with the same DN
3697    final DN dn = configuration.getBaseDN();
3698    LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
3699    if (domain != null && domain.getBaseDN().equals(dn))
3700    {
3701      unacceptableReasons.add(ERR_SYNC_INVALID_DN.get());
3702      return false;
3703    }
3704
3705    // Check that the base DN is configured as a base-dn of the directory server
3706    if (DirectoryServer.getBackend(dn) == null)
3707    {
3708      unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn));
3709      return false;
3710    }
3711
3712    // Check fractional configuration
3713    try
3714    {
3715      isFractionalConfigAcceptable(configuration);
3716    } catch (ConfigException e)
3717    {
3718      unacceptableReasons.add(e.getMessageObject());
3719      return false;
3720    }
3721
3722    return true;
3723  }
3724
3725  @Override
3726  public ConfigChangeResult applyConfigurationChange(
3727         ReplicationDomainCfg configuration)
3728  {
3729    this.config = configuration;
3730    changeConfig(configuration);
3731
3732    // Read assured + fractional configuration and each time reconnect if needed
3733    readAssuredConfig(configuration, true);
3734    readFractionalConfig(configuration, true);
3735
3736    solveConflictFlag = isSolveConflict(configuration);
3737
3738    final ConfigChangeResult ccr = new ConfigChangeResult();
3739    try
3740    {
3741      storeECLConfiguration(configuration);
3742    }
3743    catch(Exception e)
3744    {
3745      ccr.setResultCode(ResultCode.OTHER);
3746    }
3747    return ccr;
3748  }
3749
3750  @Override
3751  public boolean isConfigurationChangeAcceptable(
3752         ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
3753  {
3754    // Check that a import/export is not in progress
3755    if (ieRunning())
3756    {
3757      unacceptableReasons.add(
3758          NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get());
3759      return false;
3760    }
3761
3762    // Check fractional configuration
3763    try
3764    {
3765      isFractionalConfigAcceptable(configuration);
3766      return true;
3767    }
3768    catch (ConfigException e)
3769    {
3770      unacceptableReasons.add(e.getMessageObject());
3771      return false;
3772    }
3773  }
3774
3775  @Override
3776  public Map<String, String> getAlerts()
3777  {
3778    Map<String, String> alerts = new LinkedHashMap<>();
3779
3780    alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT,
3781               ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT);
3782    return alerts;
3783  }
3784
3785  @Override
3786  public String getClassName()
3787  {
3788    return CLASS_NAME;
3789  }
3790
3791  @Override
3792  public DN getComponentEntryDN()
3793  {
3794    return config.dn();
3795  }
3796
3797  /** Starts the Replication Domain. */
3798  public void start()
3799  {
3800    // Create the ServerStateFlush thread
3801    flushThread.start();
3802
3803    startListenService();
3804  }
3805
3806  /** Remove the configuration of the external changelog from this domain configuration. */
3807  private void removeECLDomainCfg()
3808  {
3809    try
3810    {
3811      DN eclConfigEntryDN = DN.valueOf("cn=external changeLog," + config.dn());
3812      if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN))
3813      {
3814        DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null);
3815      }
3816    }
3817    catch(Exception e)
3818    {
3819      logger.traceException(e);
3820      logger.error(ERR_CHECK_CREATE_REPL_BACKEND_FAILED, stackTraceToSingleLineString(e));
3821    }
3822  }
3823
3824  /**
3825   * Store the provided ECL configuration for the domain.
3826   * @param  domCfg       The provided configuration.
3827   * @throws ConfigException When an error occurred.
3828   */
3829  private void storeECLConfiguration(ReplicationDomainCfg domCfg)
3830      throws ConfigException
3831  {
3832    ExternalChangelogDomainCfg eclDomCfg = null;
3833    // create the ecl config if it does not exist
3834    // There may not be any config entry related to this domain in some
3835    // unit test cases
3836    try
3837    {
3838      DN configDn = config.dn();
3839      if (DirectoryServer.getConfigHandler().entryExists(configDn))
3840      {
3841        try
3842        { eclDomCfg = domCfg.getExternalChangelogDomain();
3843        } catch(Exception e) { /* do nothing */ }
3844        // domain with no config entry only when running unit tests
3845        if (eclDomCfg == null)
3846        {
3847          // no ECL config provided hence create a default one
3848          // create the default one
3849          DN eclConfigEntryDN = DN.valueOf("cn=external changelog," + configDn);
3850          if (!DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN))
3851          {
3852            // no entry exist yet for the ECL config for this domain
3853            // create it
3854            String ldif = makeLdif(
3855                "dn: cn=external changelog," + configDn,
3856                "objectClass: top",
3857                "objectClass: ds-cfg-external-changelog-domain",
3858                "cn: external changelog",
3859                "ds-cfg-enabled: " + !getBackend().isPrivateBackend());
3860            LDIFImportConfig ldifImportConfig = new LDIFImportConfig(
3861                new StringReader(ldif));
3862            // No need to validate schema in replication
3863            ldifImportConfig.setValidateSchema(false);
3864            LDIFReader reader = new LDIFReader(ldifImportConfig);
3865            Entry eclEntry = reader.readEntry();
3866            DirectoryServer.getConfigHandler().addEntry(eclEntry, null);
3867            ldifImportConfig.close();
3868          }
3869        }
3870      }
3871      eclDomCfg = domCfg.getExternalChangelogDomain();
3872      if (eclDomain != null)
3873      {
3874        eclDomain.applyConfigurationChange(eclDomCfg);
3875      }
3876      else
3877      {
3878        // Create the ECL domain object
3879        eclDomain = new ExternalChangelogDomain(this, eclDomCfg);
3880      }
3881    }
3882    catch (Exception e)
3883    {
3884      throw new ConfigException(NOTE_ERR_UNABLE_TO_ENABLE_ECL.get(
3885          "Replication Domain on " + getBaseDN(), stackTraceToSingleLineString(e)), e);
3886    }
3887  }
3888
3889  private static String makeLdif(String... lines)
3890  {
3891    final StringBuilder buffer = new StringBuilder();
3892    for (String line : lines) {
3893      buffer.append(line).append(EOL);
3894    }
3895    // Append an extra line so we can append LDIF Strings.
3896    buffer.append(EOL);
3897    return buffer.toString();
3898  }
3899
3900  @Override
3901  public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
3902  {
3903    // Check domain fractional configuration consistency with local
3904    // configuration variables
3905    forceBadDataSet = !isBackendFractionalConfigConsistent();
3906
3907    super.sessionInitiated(initStatus, rsState);
3908
3909    // Now for bad data set status if needed
3910    if (forceBadDataSet)
3911    {
3912      signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
3913      logger.info(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC, getBaseDN());
3914      return; // Do not send changes to the replication server
3915    }
3916
3917    try
3918    {
3919      /*
3920       * We must not publish changes to a replicationServer that has
3921       * not seen all our previous changes because this could cause
3922       * some other ldap servers to miss those changes.
3923       * Check that the ReplicationServer has seen all our previous
3924       * changes.
3925       */
3926      CSN replServerMaxCSN = rsState.getCSN(getServerId());
3927
3928      // we don't want to update from here (a DS) an empty RS because
3929      // normally the RS should have been updated by other RSes except for
3930      // very last changes lost if the local connection was broken
3931      // ... hence the RS we are connected to should not be empty
3932      // ... or if it is empty, it is due to a voluntary reset
3933      // and we don't want to update it with our changes that could be huge.
3934      if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0)
3935      {
3936        CSN ourMaxCSN = state.getMaxCSN(getServerId());
3937        if (ourMaxCSN != null
3938            && !ourMaxCSN.isOlderThanOrEqualTo(replServerMaxCSN))
3939        {
3940          pendingChanges.setRecovering(true);
3941          broker.setRecoveryRequired(true);
3942          final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
3943          if (this.rsUpdater.compareAndSet(null, rsUpdater))
3944          {
3945            rsUpdater.start();
3946          }
3947        }
3948      }
3949    } catch (Exception e)
3950    {
3951      logger.error(ERR_PUBLISHING_FAKE_OPS, getBaseDN(), stackTraceToSingleLineString(e));
3952    }
3953  }
3954
3955  /**
3956   * Build the list of changes that have been processed by this server after the
3957   * CSN given as a parameter and publish them using the given session.
3958   *
3959   * @param startCSN
3960   *          The CSN where we need to start the search
3961   * @param session
3962   *          The session to use to publish the changes
3963   * @return A boolean indicating he success of the operation.
3964   * @throws Exception
3965   *           if an Exception happens during the search.
3966   */
3967  boolean buildAndPublishMissingChanges(CSN startCSN, ReplicationBroker session)
3968      throws Exception
3969  {
3970    // Trim the changes in replayOperations that are older than the startCSN.
3971    synchronized (replayOperations)
3972    {
3973      Iterator<CSN> it = replayOperations.keySet().iterator();
3974      while (it.hasNext())
3975      {
3976        if (shutdown.get())
3977        {
3978          return false;
3979        }
3980        if (it.next().isNewerThan(startCSN))
3981        {
3982          break;
3983        }
3984        it.remove();
3985      }
3986    }
3987
3988    CSN lastRetrievedChange;
3989    InternalSearchOperation op;
3990    CSN currentStartCSN = startCSN;
3991    do
3992    {
3993      if (shutdown.get())
3994      {
3995        return false;
3996      }
3997
3998      lastRetrievedChange = null;
3999      // We can't do the search in one go because we need to store the results
4000      // so that we are sure we send the operations in order and because the
4001      // list might be large.
4002      // So we search by interval of 10 seconds and store the results in the
4003      // replayOperations list so that they are sorted before sending them.
4004      long missingChangesDelta = currentStartCSN.getTime() + 10000;
4005      CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, getServerId());
4006
4007      ScanSearchListener listener =
4008        new ScanSearchListener(currentStartCSN, endCSN);
4009      op = searchForChangedEntries(getBaseDN(), currentStartCSN, endCSN,
4010              listener);
4011
4012      // Publish and remove all the changes from the replayOperations list
4013      // that are older than the endCSN.
4014      final List<FakeOperation> opsToSend = new LinkedList<>();
4015      synchronized (replayOperations)
4016      {
4017        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
4018        while (itOp.hasNext())
4019        {
4020          if (shutdown.get())
4021          {
4022            return false;
4023          }
4024          FakeOperation fakeOp = itOp.next();
4025          if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
4026              || !state.cover(fakeOp.getCSN())
4027              // do not look for replay operations in the future
4028              || currentStartCSN.isNewerThan(now()))
4029          {
4030            break;
4031          }
4032
4033          lastRetrievedChange = fakeOp.getCSN();
4034          opsToSend.add(fakeOp);
4035          itOp.remove();
4036        }
4037      }
4038
4039      for (FakeOperation opToSend : opsToSend)
4040      {
4041        if (shutdown.get())
4042        {
4043          return false;
4044        }
4045        session.publishRecovery(opToSend.generateMessage());
4046      }
4047
4048      if (lastRetrievedChange != null)
4049      {
4050        if (logger.isInfoEnabled())
4051        {
4052          logger.info(LocalizableMessage.raw("publish loop"
4053                  + " >=" + currentStartCSN + " <=" + endCSN
4054                  + " nentries=" + op.getEntriesSent()
4055                  + " result=" + op.getResultCode()
4056                  + " lastRetrievedChange=" + lastRetrievedChange));
4057        }
4058        currentStartCSN = lastRetrievedChange;
4059      }
4060      else
4061      {
4062        if (logger.isInfoEnabled())
4063        {
4064          logger.info(LocalizableMessage.raw("publish loop"
4065                  + " >=" + currentStartCSN + " <=" + endCSN
4066                  + " nentries=" + op.getEntriesSent()
4067                  + " result=" + op.getResultCode()
4068                  + " no changes"));
4069        }
4070        currentStartCSN = endCSN;
4071      }
4072    } while (pendingChanges.recoveryUntil(currentStartCSN)
4073          && op.getResultCode().equals(ResultCode.SUCCESS));
4074
4075    return op.getResultCode().equals(ResultCode.SUCCESS);
4076  }
4077
4078  private static CSN now()
4079  {
4080    // ensure now() will always come last with isNewerThan() test,
4081    // even though the timestamp, or the timestamp and seqnum would be the same
4082    return new CSN(TimeThread.getTime(), Integer.MAX_VALUE, Integer.MAX_VALUE);
4083  }
4084
4085  /**
4086   * Search for the changes that happened since fromCSN based on the historical
4087   * attribute. The only changes that will be send will be the one generated on
4088   * the serverId provided in fromCSN.
4089   *
4090   * @param baseDN
4091   *          the base DN
4092   * @param fromCSN
4093   *          The CSN from which we want the changes
4094   * @param lastCSN
4095   *          The max CSN that the search should return
4096   * @param resultListener
4097   *          The listener that will process the entries returned
4098   * @return the internal search operation
4099   * @throws Exception
4100   *           when raised.
4101   */
4102  private static InternalSearchOperation searchForChangedEntries(DN baseDN,
4103      CSN fromCSN, CSN lastCSN, InternalSearchListener resultListener)
4104      throws Exception
4105  {
4106    String maxValueForId;
4107    if (lastCSN == null)
4108    {
4109      final Integer serverId = fromCSN.getServerId();
4110      maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
4111                      + "ffffffff";
4112    }
4113    else
4114    {
4115      maxValueForId = lastCSN.toString();
4116    }
4117
4118    String filter =
4119        "(&(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + fromCSN + ")" +
4120          "(" + HISTORICAL_ATTRIBUTE_NAME + "<=dummy:" + maxValueForId + "))";
4121    SearchRequest request = Requests.newSearchRequest(baseDN, SearchScope.WHOLE_SUBTREE, filter)
4122        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
4123    return getRootConnection().processSearch(request, resultListener);
4124  }
4125
4126  /**
4127   * Search for the changes that happened since fromCSN based on the historical
4128   * attribute. The only changes that will be send will be the one generated on
4129   * the serverId provided in fromCSN.
4130   *
4131   * @param baseDN
4132   *          the base DN
4133   * @param fromCSN
4134   *          The CSN from which we want the changes
4135   * @param resultListener
4136   *          that will process the entries returned.
4137   * @return the internal search operation
4138   * @throws Exception
4139   *           when raised.
4140   */
4141  static InternalSearchOperation searchForChangedEntries(DN baseDN,
4142      CSN fromCSN, InternalSearchListener resultListener) throws Exception
4143  {
4144    return searchForChangedEntries(baseDN, fromCSN, null, resultListener);
4145  }
4146
4147  /**
4148   * This method should return the total number of objects in the
4149   * replicated domain.
4150   * This count will be used for reporting.
4151   *
4152   * @throws DirectoryException when needed.
4153   *
4154   * @return The number of objects in the replication domain.
4155   */
4156  @Override
4157  public long countEntries() throws DirectoryException
4158  {
4159    Backend<?> backend = getBackend();
4160    if (!backend.supports(BackendOperation.LDIF_EXPORT))
4161    {
4162      LocalizableMessage msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID());
4163      logger.error(msg);
4164      throw new DirectoryException(ResultCode.OTHER, msg);
4165    }
4166
4167    return backend.getNumberOfEntriesInBaseDN(getBaseDN());
4168  }
4169
4170  @Override
4171  public boolean processUpdate(UpdateMsg updateMsg)
4172  {
4173    // Ignore message if fractional configuration is inconsistent and
4174    // we have been passed into bad data set status
4175    if (forceBadDataSet)
4176    {
4177      return false;
4178    }
4179
4180    if (updateMsg instanceof LDAPUpdateMsg)
4181    {
4182      LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg;
4183
4184      // Put the UpdateMsg in the RemotePendingChanges list.
4185      if (!remotePendingChanges.putRemoteUpdate(msg))
4186      {
4187        /*
4188         * Already received this change so ignore it. This may happen if there
4189         * are uncommitted changes in the queue and session failover occurs
4190         * causing a recovery of all changes since the current committed server
4191         * state. See OPENDJ-1115.
4192         */
4193        if (logger.isTraceEnabled())
4194        {
4195          logger.trace(
4196                  "LDAPReplicationDomain.processUpdate: ignoring "
4197                  + "duplicate change %s", msg.getCSN());
4198        }
4199        return true;
4200      }
4201
4202      // Put update message into the replay queue
4203      // (block until some place in the queue is available)
4204      final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
4205      while (!isListenerShuttingDown())
4206      {
4207        // loop until we can offer to the queue or shutdown was initiated
4208        try
4209        {
4210          if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
4211          {
4212            // successful offer to the queue, let's exit the loop
4213            break;
4214          }
4215        }
4216        catch (InterruptedException e)
4217        {
4218          // Thread interrupted: check for shutdown.
4219          Thread.currentThread().interrupt();
4220        }
4221      }
4222
4223      return false;
4224    }
4225
4226    // unknown message type, this should not happen, just ignore it.
4227    return true;
4228  }
4229
4230  /**
4231   * Monitoring information for the LDAPReplicationDomain.
4232   *
4233   * @return Monitoring attributes specific to the LDAPReplicationDomain.
4234   */
4235  @Override
4236  public Collection<Attribute> getAdditionalMonitoring()
4237  {
4238    List<Attribute> attributes = new ArrayList<>();
4239
4240    // number of updates in the pending list
4241    addMonitorData(attributes, "pending-updates", pendingChanges.size());
4242
4243    addMonitorData(attributes, "replayed-updates-ok",
4244        numReplayedPostOpCalled.get());
4245    addMonitorData(attributes, "resolved-modify-conflicts",
4246        numResolvedModifyConflicts.get());
4247    addMonitorData(attributes, "resolved-naming-conflicts",
4248        numResolvedNamingConflicts.get());
4249    addMonitorData(attributes, "unresolved-naming-conflicts",
4250        numUnresolvedNamingConflicts.get());
4251    addMonitorData(attributes, "remote-pending-changes-size",
4252        remotePendingChanges.getQueueSize());
4253
4254    return attributes;
4255  }
4256
4257  /**
4258   * Verifies that the given string represents a valid source
4259   * from which this server can be initialized.
4260   * @param sourceString The string representing the source
4261   * @return The source as a integer value
4262   * @throws DirectoryException if the string is not valid
4263   */
4264  public int decodeSource(String sourceString) throws DirectoryException
4265  {
4266    int source = 0;
4267    try
4268    {
4269      source = Integer.decode(sourceString);
4270      if (source >= -1 && source != getServerId())
4271      {
4272        // TODO Verifies serverID is in the domain
4273        // We should check here that this is a server implied
4274        // in the current domain.
4275        return source;
4276      }
4277    }
4278    catch (Exception e)
4279    {
4280      LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(
4281          getBaseDN(), getServerId(), sourceString, stackTraceToSingleLineString(e));
4282      throw new DirectoryException(ResultCode.OTHER, message, e);
4283    }
4284
4285    LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(getBaseDN(), getServerId(), source, "");
4286    throw new DirectoryException(ResultCode.OTHER, message);
4287  }
4288
4289  /**
4290   * Called by synchronize post op plugin in order to add the entry historical
4291   * attributes to the UpdateMsg.
4292   * @param msg an replication update message
4293   * @param op  the operation in progress
4294   */
4295  private void addEntryAttributesForCL(UpdateMsg msg,
4296      PostOperationOperation op)
4297  {
4298    if (op instanceof PostOperationDeleteOperation)
4299    {
4300      PostOperationDeleteOperation delOp = (PostOperationDeleteOperation) op;
4301      final Set<String> names = getEclIncludesForDeletes();
4302      Entry entry = delOp.getEntryToDelete();
4303      final DeleteMsg deleteMsg = (DeleteMsg) msg;
4304      deleteMsg.setEclIncludes(getIncludedAttributes(entry, names));
4305
4306      // For delete only, add the Authorized DN since it's required in the
4307      // ECL entry but is not part of rest of the message.
4308      DN deleterDN = delOp.getAuthorizationDN();
4309      if (deleterDN != null)
4310      {
4311        deleteMsg.setInitiatorsName(deleterDN.toString());
4312      }
4313    }
4314    else if (op instanceof PostOperationModifyOperation)
4315    {
4316      PostOperationModifyOperation modOp = (PostOperationModifyOperation) op;
4317      Set<String> names = getEclIncludes();
4318      Entry entry = modOp.getCurrentEntry();
4319      ((ModifyMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4320    }
4321    else if (op instanceof PostOperationModifyDNOperation)
4322    {
4323      PostOperationModifyDNOperation modDNOp =
4324        (PostOperationModifyDNOperation) op;
4325      Set<String> names = getEclIncludes();
4326      Entry entry = modDNOp.getOriginalEntry();
4327      ((ModifyDNMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4328    }
4329    else if (op instanceof PostOperationAddOperation)
4330    {
4331      PostOperationAddOperation addOp = (PostOperationAddOperation) op;
4332      Set<String> names = getEclIncludes();
4333      Entry entry = addOp.getEntryToAdd();
4334      ((AddMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4335    }
4336  }
4337
4338  private Collection<Attribute> getIncludedAttributes(Entry entry,
4339      Set<String> names)
4340  {
4341    if (names.isEmpty())
4342    {
4343      // Fast-path.
4344      return Collections.emptySet();
4345    }
4346    else if (names.size() == 1 && names.contains("*"))
4347    {
4348      // Potential fast-path for delete operations.
4349      List<Attribute> attributes = new LinkedList<>();
4350      for (List<Attribute> attributeList : entry.getUserAttributes().values())
4351      {
4352        attributes.addAll(attributeList);
4353      }
4354      Attribute objectClassAttribute = entry.getObjectClassAttribute();
4355      if (objectClassAttribute != null)
4356      {
4357        attributes.add(objectClassAttribute);
4358      }
4359      return attributes;
4360    }
4361    else
4362    {
4363      // Expand @objectclass references in attribute list if needed.
4364      // We do this now in order to take into account dynamic schema changes.
4365      final Set<String> expandedNames = getExpandedNames(names);
4366      final Entry filteredEntry =
4367          entry.filterEntry(expandedNames, false, false, false);
4368      return filteredEntry.getAttributes();
4369    }
4370  }
4371
4372  private Set<String> getExpandedNames(Set<String> names)
4373  {
4374    // Only rebuild the attribute set if necessary.
4375    if (!needsExpanding(names))
4376    {
4377      return names;
4378    }
4379
4380    final Set<String> expandedNames = new HashSet<>(names.size());
4381    for (String name : names)
4382    {
4383      if (name.startsWith("@"))
4384      {
4385        String ocName = name.substring(1);
4386        ObjectClass objectClass =
4387            DirectoryServer.getObjectClass(toLowerCase(ocName));
4388        if (objectClass != null)
4389        {
4390          for (AttributeType at : objectClass.getRequiredAttributeChain())
4391          {
4392            expandedNames.add(at.getNameOrOID());
4393          }
4394          for (AttributeType at : objectClass.getOptionalAttributeChain())
4395          {
4396            expandedNames.add(at.getNameOrOID());
4397          }
4398        }
4399      }
4400      else
4401      {
4402        expandedNames.add(name);
4403      }
4404    }
4405    return expandedNames;
4406  }
4407
4408  private boolean needsExpanding(Set<String> names)
4409  {
4410    for (String name : names)
4411    {
4412      if (name.startsWith("@"))
4413      {
4414        return true;
4415      }
4416    }
4417    return false;
4418  }
4419
4420  /**
4421   * Gets the fractional configuration of this domain.
4422   * @return The fractional configuration of this domain.
4423   */
4424  FractionalConfig getFractionalConfig()
4425  {
4426    return fractionalConfig;
4427  }
4428
4429  /**
4430   * This bean is a utility class used for holding the parsing
4431   * result of a fractional configuration. It also contains some facility
4432   * methods like fractional configuration comparison...
4433   */
4434  static class FractionalConfig
4435  {
4436    /**
4437     * Tells if fractional replication is enabled or not (some fractional
4438     * constraints have been put in place). If this is true then
4439     * fractionalExclusive explains the configuration mode and either
4440     * fractionalSpecificClassesAttributes or fractionalAllClassesAttributes or
4441     * both should be filled with something.
4442     */
4443    private boolean fractional;
4444
4445    /**
4446     * - If true, tells that the configured fractional replication is exclusive:
4447     * Every attributes contained in fractionalSpecificClassesAttributes and
4448     * fractionalAllClassesAttributes should be ignored when replaying operation
4449     * in local backend.
4450     * - If false, tells that the configured fractional replication is
4451     * inclusive:
4452     * Only attributes contained in fractionalSpecificClassesAttributes and
4453     * fractionalAllClassesAttributes should be taken into account in local
4454     * backend.
4455     */
4456    private boolean fractionalExclusive = true;
4457
4458    /**
4459     * Used in fractional replication: holds attributes of a specific object class.
4460     * - key = object class (name or OID of the class)
4461     * - value = the attributes of that class that should be taken into account
4462     * (inclusive or exclusive fractional replication) (name or OID of the
4463     * attribute)
4464     * When an operation coming from the network is to be locally replayed, if
4465     * the concerned entry has an objectClass attribute equals to 'key':
4466     * - inclusive mode: only the attributes in 'value' will be added/deleted/modified
4467     * - exclusive mode: the attributes in 'value' will not be added/deleted/modified
4468     */
4469    private Map<String, Set<String>> fractionalSpecificClassesAttributes = new HashMap<>();
4470
4471    /**
4472     * Used in fractional replication: holds attributes of any object class.
4473     * When an operation coming from the network is to be locally replayed:
4474     * - inclusive mode: only attributes of the matching entry not present in
4475     * fractionalAllClassesAttributes will be added/deleted/modified
4476     * - exclusive mode: attributes of the matching entry present in
4477     * fractionalAllClassesAttributes will not be added/deleted/modified
4478     * The attributes may be in human readable form of OID form.
4479     */
4480    private Set<String> fractionalAllClassesAttributes = new HashSet<>();
4481
4482    /** Base DN the fractional configuration is for. */
4483    private final DN baseDN;
4484
4485    /**
4486     * Constructs a new fractional configuration object.
4487     * @param baseDN The base DN the object is for.
4488     */
4489    private FractionalConfig(DN baseDN)
4490    {
4491      this.baseDN = baseDN;
4492    }
4493
4494    /**
4495     * Getter for fractional.
4496     * @return True if the configuration has fractional enabled
4497     */
4498    boolean isFractional()
4499    {
4500      return fractional;
4501    }
4502
4503    /**
4504     * Set the fractional parameter.
4505     * @param fractional The fractional parameter
4506     */
4507    private void setFractional(boolean fractional)
4508    {
4509      this.fractional = fractional;
4510    }
4511
4512    /**
4513     * Getter for fractionalExclusive.
4514     * @return True if the configuration has fractional exclusive enabled
4515     */
4516    boolean isFractionalExclusive()
4517    {
4518      return fractionalExclusive;
4519    }
4520
4521    /**
4522     * Set the fractionalExclusive parameter.
4523     * @param fractionalExclusive The fractionalExclusive parameter
4524     */
4525    private void setFractionalExclusive(boolean fractionalExclusive)
4526    {
4527      this.fractionalExclusive = fractionalExclusive;
4528    }
4529
4530    /**
4531     * Getter for fractionalSpecificClassesAttributes attribute.
4532     * @return The fractionalSpecificClassesAttributes attribute.
4533     */
4534    Map<String, Set<String>> getFractionalSpecificClassesAttributes()
4535    {
4536      return fractionalSpecificClassesAttributes;
4537    }
4538
4539    /**
4540     * Set the fractionalSpecificClassesAttributes parameter.
4541     * @param fractionalSpecificClassesAttributes The
4542     * fractionalSpecificClassesAttributes parameter to set.
4543     */
4544    private void setFractionalSpecificClassesAttributes(
4545        Map<String, Set<String>> fractionalSpecificClassesAttributes)
4546    {
4547      this.fractionalSpecificClassesAttributes =
4548        fractionalSpecificClassesAttributes;
4549    }
4550
4551    /**
4552     * Getter for fractionalSpecificClassesAttributes attribute.
4553     * @return The fractionalSpecificClassesAttributes attribute.
4554     */
4555    Set<String> getFractionalAllClassesAttributes()
4556    {
4557      return fractionalAllClassesAttributes;
4558    }
4559
4560    /**
4561     * Set the fractionalAllClassesAttributes parameter.
4562     * @param fractionalAllClassesAttributes The
4563     * fractionalSpecificClassesAttributes parameter to set.
4564     */
4565    private void setFractionalAllClassesAttributes(
4566        Set<String> fractionalAllClassesAttributes)
4567    {
4568      this.fractionalAllClassesAttributes = fractionalAllClassesAttributes;
4569    }
4570
4571    /**
4572     * Getter for the base baseDN.
4573     * @return The baseDN attribute.
4574     */
4575    DN getBaseDn()
4576    {
4577      return baseDN;
4578    }
4579
4580    /**
4581     * Extract the fractional configuration from the passed domain configuration
4582     * entry.
4583     * @param configuration The configuration object
4584     * @return The fractional replication configuration.
4585     * @throws ConfigException If an error occurred.
4586     */
4587    static FractionalConfig toFractionalConfig(
4588      ReplicationDomainCfg configuration) throws ConfigException
4589    {
4590      // Prepare fractional configuration variables to parse
4591      Iterator<String> exclIt = configuration.getFractionalExclude().iterator();
4592      Iterator<String> inclIt = configuration.getFractionalInclude().iterator();
4593
4594      // Get potentially new fractional configuration
4595      Map<String, Set<String>> newFractionalSpecificClassesAttributes = new HashMap<>();
4596      Set<String> newFractionalAllClassesAttributes = new HashSet<>();
4597
4598      int newFractionalMode = parseFractionalConfig(exclIt, inclIt,
4599        newFractionalSpecificClassesAttributes,
4600        newFractionalAllClassesAttributes);
4601
4602      // Create matching parsed config object
4603      FractionalConfig result = new FractionalConfig(configuration.getBaseDN());
4604      switch (newFractionalMode)
4605      {
4606        case NOT_FRACTIONAL:
4607          result.setFractional(false);
4608          result.setFractionalExclusive(true);
4609          break;
4610        case EXCLUSIVE_FRACTIONAL:
4611        case INCLUSIVE_FRACTIONAL:
4612          result.setFractional(true);
4613          result.setFractionalExclusive(
4614              newFractionalMode == EXCLUSIVE_FRACTIONAL);
4615          break;
4616      }
4617      result.setFractionalSpecificClassesAttributes(
4618        newFractionalSpecificClassesAttributes);
4619      result.setFractionalAllClassesAttributes(
4620        newFractionalAllClassesAttributes);
4621      return result;
4622    }
4623
4624    /**
4625     * Parses a fractional replication configuration, filling the empty passed
4626     * variables and returning the used fractional mode. The 2 passed variables
4627     * to fill should be initialized (not null) and empty.
4628     * @param exclIt The list of fractional exclude configuration values (may be
4629     *               null)
4630     * @param inclIt The list of fractional include configuration values (may be
4631     *               null)
4632     * @param fractionalSpecificClassesAttributes An empty map to be filled with
4633     *        what is read from the fractional configuration properties.
4634     * @param fractionalAllClassesAttributes An empty list to be filled with
4635     *        what is read from the fractional configuration properties.
4636     * @return the fractional mode deduced from the passed configuration:
4637     *         not fractional, exclusive fractional or inclusive fractional
4638     *         modes
4639     */
4640    private static int parseFractionalConfig (
4641      Iterator<String> exclIt, Iterator<String> inclIt,
4642      Map<String, Set<String>> fractionalSpecificClassesAttributes,
4643      Set<String> fractionalAllClassesAttributes) throws ConfigException
4644    {
4645      // Determine if fractional-exclude or fractional-include property is used:
4646      // only one of them is allowed
4647      int fractionalMode;
4648      Iterator<String> iterator;
4649      if (exclIt != null && exclIt.hasNext())
4650      {
4651        if (inclIt != null && inclIt.hasNext())
4652        {
4653          throw new ConfigException(
4654            NOTE_ERR_FRACTIONAL_CONFIG_BOTH_MODES.get());
4655        }
4656
4657        fractionalMode = EXCLUSIVE_FRACTIONAL;
4658        iterator = exclIt;
4659      }
4660      else
4661      {
4662        if (inclIt != null && inclIt.hasNext())
4663        {
4664          fractionalMode = INCLUSIVE_FRACTIONAL;
4665          iterator = inclIt;
4666        }
4667        else
4668        {
4669          return NOT_FRACTIONAL;
4670        }
4671      }
4672
4673      while (iterator.hasNext())
4674      {
4675        // Parse a value with the form class:attr1,attr2...
4676        // or *:attr1,attr2...
4677        String fractCfgStr = iterator.next();
4678        StringTokenizer st = new StringTokenizer(fractCfgStr, ":");
4679        int nTokens = st.countTokens();
4680        if (nTokens < 2)
4681        {
4682          throw new ConfigException(NOTE_ERR_FRACTIONAL_CONFIG_WRONG_FORMAT.
4683            get(fractCfgStr));
4684        }
4685        // Get the class name
4686        String classNameLower = st.nextToken().toLowerCase();
4687        boolean allClasses = "*".equals(classNameLower);
4688        // Get the attributes
4689        String attributes = st.nextToken();
4690        st = new StringTokenizer(attributes, ",");
4691        while (st.hasMoreTokens())
4692        {
4693          String attrNameLower = st.nextToken().toLowerCase();
4694          // Store attribute in the appropriate variable
4695          if (allClasses)
4696          {
4697            fractionalAllClassesAttributes.add(attrNameLower);
4698          }
4699          else
4700          {
4701            Set<String> attrList = fractionalSpecificClassesAttributes.get(classNameLower);
4702            if (attrList == null)
4703            {
4704              attrList = new LinkedHashSet<>();
4705              fractionalSpecificClassesAttributes.put(classNameLower, attrList);
4706            }
4707            attrList.add(attrNameLower);
4708          }
4709        }
4710      }
4711      return fractionalMode;
4712    }
4713
4714    /** Return type of the parseFractionalConfig method. */
4715    private static final int NOT_FRACTIONAL = 0;
4716    private static final int EXCLUSIVE_FRACTIONAL = 1;
4717    private static final int INCLUSIVE_FRACTIONAL = 2;
4718
4719    /**
4720     * Get an integer representation of the domain fractional configuration.
4721     * @return An integer representation of the domain fractional configuration.
4722     */
4723    private int fractionalConfigToInt()
4724    {
4725      if (!fractional)
4726      {
4727        return NOT_FRACTIONAL;
4728      }
4729      else if (fractionalExclusive)
4730      {
4731        return EXCLUSIVE_FRACTIONAL;
4732      }
4733      return INCLUSIVE_FRACTIONAL;
4734    }
4735
4736    /**
4737     * Compare 2 fractional replication configurations and returns true if they
4738     * are equivalent.
4739     * @param cfg1 First fractional configuration
4740     * @param cfg2 Second fractional configuration
4741     * @return True if both configurations are equivalent.
4742     * @throws ConfigException If some classes or attributes could not be
4743     * retrieved from the schema.
4744     */
4745    private static boolean isFractionalConfigEquivalent(FractionalConfig cfg1,
4746        FractionalConfig cfg2) throws ConfigException
4747    {
4748      // Compare base DNs just to be consistent
4749      if (!cfg1.getBaseDn().equals(cfg2.getBaseDn()))
4750      {
4751        return false;
4752      }
4753
4754      // Compare modes
4755      if (cfg1.isFractional() != cfg2.isFractional()
4756          || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive())
4757      {
4758        return false;
4759      }
4760
4761      // Compare all classes attributes
4762      Set<String> allClassesAttrs1 = cfg1.getFractionalAllClassesAttributes();
4763      Set<String> allClassesAttrs2 = cfg2.getFractionalAllClassesAttributes();
4764      if (!areAttributesEquivalent(allClassesAttrs1, allClassesAttrs2))
4765      {
4766        return false;
4767      }
4768
4769      // Compare specific classes attributes
4770      Map<String, Set<String>> specificClassesAttrs1 =
4771          cfg1.getFractionalSpecificClassesAttributes();
4772      Map<String, Set<String>> specificClassesAttrs2 =
4773          cfg2.getFractionalSpecificClassesAttributes();
4774      if (specificClassesAttrs1.size() != specificClassesAttrs2.size())
4775      {
4776        return false;
4777      }
4778
4779      /*
4780       * Check consistency of specific classes attributes
4781       *
4782       * For each class in specificClassesAttributes1, check that the attribute
4783       * list is equivalent to specificClassesAttributes2 attribute list
4784       */
4785      Schema schema = DirectoryServer.getSchema();
4786      for (String className1 : specificClassesAttrs1.keySet())
4787      {
4788        // Get class from specificClassesAttributes1
4789        ObjectClass objectClass1 = schema.getObjectClass(className1);
4790        if (objectClass1 == null)
4791        {
4792          throw new ConfigException(
4793            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className1));
4794        }
4795
4796        // Look for matching one in specificClassesAttributes2
4797        boolean foundClass = false;
4798        for (String className2 : specificClassesAttrs2.keySet())
4799        {
4800          ObjectClass objectClass2 = schema.getObjectClass(className2);
4801          if (objectClass2 == null)
4802          {
4803            throw new ConfigException(
4804              NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className2));
4805          }
4806          if (objectClass1.equals(objectClass2))
4807          {
4808            foundClass = true;
4809            // Now compare the 2 attribute lists
4810            Set<String> attributes1 = specificClassesAttrs1.get(className1);
4811            Set<String> attributes2 = specificClassesAttrs2.get(className2);
4812            if (!areAttributesEquivalent(attributes1, attributes2))
4813            {
4814              return false;
4815            }
4816            break;
4817          }
4818        }
4819        // Found matching class ?
4820        if (!foundClass)
4821        {
4822          return false;
4823        }
4824      }
4825
4826      return true;
4827    }
4828  }
4829
4830  /**
4831   * Specifies whether this domain is enabled/disabled regarding the ECL.
4832   * @return enabled/disabled for the ECL.
4833   */
4834  boolean isECLEnabled()
4835  {
4836    return this.eclDomain.isEnabled();
4837  }
4838
4839  /**
4840   * Return the minimum time (in ms) that the domain keeps the historical
4841   * information necessary to solve conflicts.
4842   *
4843   * @return the purge delay.
4844   */
4845  long getHistoricalPurgeDelay()
4846  {
4847    return config.getConflictsHistoricalPurgeDelay() * 60 * 1000;
4848  }
4849
4850  /**
4851   * Check if the operation that just happened has cleared a conflict : Clearing
4852   * a conflict happens if the operation has freed a DN for which another entry
4853   * was in conflict.
4854   * <p>
4855   * Steps:
4856   * <ul>
4857   * <li>get the DN freed by a DELETE or MODRDN op</li>
4858   * <li>search for entries put in the conflict space (dn=entryUUID'+'....)
4859   * because the expected DN was not available (ds-sync-conflict=expected DN)
4860   * </li>
4861   * <li>retain the entry with the oldest conflict</li>
4862   * <li>rename this entry with the freedDN as it was expected originally</li>
4863   * </ul>
4864   *
4865   * @param task
4866   *          the task raising this purge.
4867   * @param endDate
4868   *          the date to stop this task whether the job is done or not.
4869   * @throws DirectoryException
4870   *           when an exception happens.
4871   */
4872  public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
4873      long endDate) throws DirectoryException
4874  {
4875     logger.trace("[PURGE] purgeConflictsHistorical "
4876         + "on domain: " + getBaseDN()
4877         + "endDate:" + new Date(endDate)
4878         + "lastCSNPurgedFromHist: "
4879         + lastCSNPurgedFromHist.toStringUI());
4880
4881    String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")";
4882    SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
4883        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
4884    InternalSearchOperation searchOp = conn.processSearch(request);
4885
4886     int count = 0;
4887     if (task != null)
4888     {
4889       task.setProgressStats(lastCSNPurgedFromHist, count);
4890     }
4891
4892     for (SearchResultEntry entry : searchOp.getSearchEntries())
4893     {
4894       long maxTimeToRun = endDate - TimeThread.getTime();
4895       if (maxTimeToRun < 0)
4896       {
4897        throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED,
4898            LocalizableMessage.raw(" end date reached"));
4899       }
4900
4901       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
4902       lastCSNPurgedFromHist = entryHist.getOldestCSN();
4903       entryHist.setPurgeDelay(getHistoricalPurgeDelay());
4904       Attribute attr = entryHist.encodeAndPurge();
4905       count += entryHist.getLastPurgedValuesCount();
4906       List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
4907
4908       ModifyOperation newOp = new ModifyOperationBasis(
4909           conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
4910           entry.getName(), mods);
4911       runAsSynchronizedOperation(newOp);
4912
4913       if (newOp.getResultCode() != ResultCode.SUCCESS)
4914       {
4915         // Log information for the repair tool.
4916         logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode());
4917       }
4918       else if (task != null)
4919       {
4920         task.setProgressStats(lastCSNPurgedFromHist, count);
4921       }
4922     }
4923  }
4924}