001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2014-2015 ForgeRock AS.
025 */
026package org.opends.server.backends;
027
028import static org.opends.messages.BackendMessages.*;
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.config.ConfigConstants.*;
031import static org.opends.server.replication.plugin.MultimasterReplication.*;
032import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
033import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
034import static org.opends.server.util.LDIFWriter.*;
035import static org.opends.server.util.ServerConstants.*;
036import static org.opends.server.util.StaticUtils.*;
037
038import java.text.SimpleDateFormat;
039import java.util.Collection;
040import java.util.Collections;
041import java.util.Date;
042import java.util.Iterator;
043import java.util.LinkedHashMap;
044import java.util.List;
045import java.util.Map;
046import java.util.Set;
047import java.util.TimeZone;
048import java.util.concurrent.ConcurrentLinkedQueue;
049import java.util.concurrent.ConcurrentSkipListMap;
050import java.util.concurrent.atomic.AtomicReference;
051
052import org.forgerock.i18n.LocalizableMessage;
053import org.forgerock.i18n.slf4j.LocalizedLogger;
054import org.forgerock.opendj.config.server.ConfigException;
055import org.forgerock.opendj.ldap.ByteString;
056import org.forgerock.opendj.ldap.ConditionResult;
057import org.forgerock.opendj.ldap.ModificationType;
058import org.forgerock.opendj.ldap.ResultCode;
059import org.forgerock.opendj.ldap.SearchScope;
060import org.opends.server.admin.Configuration;
061import org.opends.server.api.Backend;
062import org.opends.server.config.ConfigConstants;
063import org.opends.server.controls.EntryChangelogNotificationControl;
064import org.opends.server.controls.ExternalChangelogRequestControl;
065import org.opends.server.core.AddOperation;
066import org.opends.server.core.DeleteOperation;
067import org.opends.server.core.DirectoryServer;
068import org.opends.server.core.ModifyDNOperation;
069import org.opends.server.core.ModifyOperation;
070import org.opends.server.core.PersistentSearch;
071import org.opends.server.core.SearchOperation;
072import org.opends.server.core.ServerContext;
073import org.opends.server.replication.common.CSN;
074import org.opends.server.replication.common.MultiDomainServerState;
075import org.opends.server.replication.common.ServerState;
076import org.opends.server.replication.protocol.AddMsg;
077import org.opends.server.replication.protocol.DeleteMsg;
078import org.opends.server.replication.protocol.LDAPUpdateMsg;
079import org.opends.server.replication.protocol.ModifyCommonMsg;
080import org.opends.server.replication.protocol.ModifyDNMsg;
081import org.opends.server.replication.protocol.UpdateMsg;
082import org.opends.server.replication.server.ReplicationServer;
083import org.opends.server.replication.server.ReplicationServerDomain;
084import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
085import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
086import org.opends.server.replication.server.changelog.api.ChangelogDB;
087import org.opends.server.replication.server.changelog.api.ChangelogException;
088import org.opends.server.replication.server.changelog.api.DBCursor;
089import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
090import org.opends.server.replication.server.changelog.api.ReplicaId;
091import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
092import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
093import org.opends.server.replication.server.changelog.file.ECLMultiDomainDBCursor;
094import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor;
095import org.opends.server.types.Attribute;
096import org.opends.server.types.AttributeType;
097import org.opends.server.types.Attributes;
098import org.opends.server.types.BackupConfig;
099import org.opends.server.types.BackupDirectory;
100import org.opends.server.types.CanceledOperationException;
101import org.opends.server.types.Control;
102import org.opends.server.types.DN;
103import org.opends.server.types.DirectoryException;
104import org.opends.server.types.Entry;
105import org.opends.server.types.FilterType;
106import org.opends.server.types.IndexType;
107import org.opends.server.types.InitializationException;
108import org.opends.server.types.LDIFExportConfig;
109import org.opends.server.types.LDIFImportConfig;
110import org.opends.server.types.LDIFImportResult;
111import org.opends.server.types.Modification;
112import org.opends.server.types.ObjectClass;
113import org.opends.server.types.Privilege;
114import org.opends.server.types.RDN;
115import org.opends.server.types.RawAttribute;
116import org.opends.server.types.RestoreConfig;
117import org.opends.server.types.SearchFilter;
118import org.opends.server.types.WritabilityMode;
119import org.opends.server.util.StaticUtils;
120
121/**
122 * A backend that provides access to the changelog, i.e. the "cn=changelog"
123 * suffix. It is a read-only backend that is created by a
124 * {@link ReplicationServer} and is not configurable.
125 * <p>
126 * There are two modes to search the changelog:
127 * <ul>
128 * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the
129 * request. The cookie provided in the control is used to retrieve entries from
130 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
131 * the entries.</li>
132 * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided
133 * with the request. The entries are retrieved using the ChangeNumberIndexDB and
134 * their attributes are set with the information from the ReplicasDBs. The
135 * <code>changeNumber</code> attribute value is set from the content of
136 * ChangeNumberIndexDB.</li>
137 * </ul>
138 * <h3>Searches flow</h3>
139 * <p>
140 * Here is the flow of searches within the changelog backend APIs:
141 * <ul>
142 * <li>Normal searches only go through:
143 * <ol>
144 * <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
145 * </ol>
146 * </li>
147 * <li>Persistent searches with <code>changesOnly=false</code> go through:
148 * <ol>
149 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
150 * (once, single threaded),</li>
151 * <li>
152 * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
153 * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
154 * threaded)</li>
155 * </ol>
156 * </li>
157 * <li>Persistent searches with <code>changesOnly=true</code> go through:
158 * <ol>
159 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
160 * (once, single threaded)</li>
161 * <li>
162 * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
163 * threaded)</li>
164 * </ol>
165 * </li>
166 * </ul>
167 *
168 * @see ReplicationServer
169 */
170public class ChangelogBackend extends Backend<Configuration>
171{
172  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
173
174  /** The id of this backend. */
175  public static final String BACKEND_ID = "changelog";
176
177  private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L;
178
179  private static final String CHANGE_NUMBER_ATTR = "changeNumber";
180  private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
181  private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
182
183  /** The set of objectclasses that will be used in root entry. */
184  private static final Map<ObjectClass, String>
185    CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<>(2);
186  static
187  {
188    CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
189    CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container");
190  }
191
192  /** The set of objectclasses that will be used in ECL entries. */
193  private static final Map<ObjectClass, String>
194    CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<>(2);
195  static
196  {
197    CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
198    CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY);
199  }
200
201  /** The attribute type for the "creatorsName" attribute. */
202  private static final AttributeType CREATORS_NAME_TYPE =
203      DirectoryServer.getAttributeTypeOrDefault(OP_ATTR_CREATORS_NAME_LC);
204  /** The attribute type for the "modifiersName" attribute. */
205  private static final AttributeType MODIFIERS_NAME_TYPE =
206      DirectoryServer.getAttributeTypeOrDefault(OP_ATTR_MODIFIERS_NAME_LC);
207
208  /** The base DN for the external change log. */
209  public static final DN CHANGELOG_BASE_DN;
210
211  static
212  {
213    try
214    {
215      CHANGELOG_BASE_DN = DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT);
216    }
217    catch (DirectoryException e)
218    {
219      throw new RuntimeException(e);
220    }
221  }
222
223  /** The set of base DNs for this backend. */
224  private DN[] baseDNs;
225  /** The set of supported controls for this backend. */
226  private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
227  /** Whether the base changelog entry has subordinates. */
228  private Boolean baseEntryHasSubordinates;
229
230  /** The replication server on which the changelog is read. */
231  private final ReplicationServer replicationServer;
232  private final ECLEnabledDomainPredicate domainPredicate;
233
234  /** The set of cookie-based persistent searches registered with this backend. */
235  private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches = new ConcurrentLinkedQueue<>();
236  /** The set of change number-based persistent searches registered with this backend. */
237  private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches =
238      new ConcurrentLinkedQueue<>();
239
240  /**
241   * Creates a new backend with the provided replication server.
242   *
243   * @param replicationServer
244   *          The replication server on which the changes are read.
245   * @param domainPredicate
246   *          Returns whether a domain is enabled for the external changelog.
247   */
248  public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate)
249  {
250    this.replicationServer = replicationServer;
251    this.domainPredicate = domainPredicate;
252    setBackendID(BACKEND_ID);
253    setWritabilityMode(WritabilityMode.DISABLED);
254    setPrivateBackend(true);
255  }
256
257  private ChangelogDB getChangelogDB()
258  {
259    return replicationServer.getChangelogDB();
260  }
261
262  /**
263   * Returns the ChangelogBackend configured for "cn=changelog" in this directory server.
264   *
265   * @return the ChangelogBackend configured for "cn=changelog" in this directory server
266   * @deprecated instead inject the required object where needed
267   */
268  @Deprecated
269  public static ChangelogBackend getInstance()
270  {
271    return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN);
272  }
273
274  /** {@inheritDoc} */
275  @Override
276  public void configureBackend(final Configuration config, ServerContext serverContext) throws ConfigException
277  {
278    throw new UnsupportedOperationException("The changelog backend is not configurable");
279  }
280
281  /** {@inheritDoc} */
282  @Override
283  public void openBackend() throws InitializationException
284  {
285    baseDNs = new DN[] { CHANGELOG_BASE_DN };
286
287    try
288    {
289      DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true);
290    }
291    catch (final DirectoryException e)
292    {
293      throw new InitializationException(
294          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e);
295    }
296  }
297
298  /** {@inheritDoc} */
299  @Override
300  public void closeBackend()
301  {
302    try
303    {
304      DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN);
305    }
306    catch (final DirectoryException e)
307    {
308      logger.traceException(e);
309    }
310  }
311
312  /** {@inheritDoc} */
313  @Override
314  public DN[] getBaseDNs()
315  {
316    return baseDNs;
317  }
318
319  /** {@inheritDoc} */
320  @Override
321  public boolean isIndexed(final AttributeType attributeType, final IndexType indexType)
322  {
323    return true;
324  }
325
326  /** {@inheritDoc} */
327  @Override
328  public Entry getEntry(final DN entryDN) throws DirectoryException
329  {
330    if (entryDN == null)
331    {
332      throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
333          ERR_BACKEND_GET_ENTRY_NULL.get(getBackendID()));
334    }
335    throw new RuntimeException("Not implemented");
336  }
337
338  /** {@inheritDoc} */
339  @Override
340  public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException
341  {
342    if (CHANGELOG_BASE_DN.equals(entryDN))
343    {
344      final Boolean hasSubs = baseChangelogHasSubordinates();
345      if (hasSubs == null)
346      {
347        return ConditionResult.UNDEFINED;
348      }
349      return ConditionResult.valueOf(hasSubs);
350    }
351    return ConditionResult.FALSE;
352  }
353
354  private Boolean baseChangelogHasSubordinates() throws DirectoryException
355  {
356    if (baseEntryHasSubordinates == null)
357    {
358      // compute its value
359      try
360      {
361        final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
362        CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
363        final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
364            new MultiDomainServerState(), options, getExcludedBaseDNs());
365        try
366        {
367          baseEntryHasSubordinates = cursor.next();
368        }
369        finally
370        {
371          close(cursor);
372        }
373      }
374      catch (ChangelogException e)
375      {
376        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get(
377            "hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e)));
378      }
379    }
380    return baseEntryHasSubordinates;
381  }
382
383  /** {@inheritDoc} */
384  @Override
385  public long getNumberOfEntriesInBaseDN(final DN baseDN) throws DirectoryException
386  {
387    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get());
388  }
389
390  /** {@inheritDoc} */
391  @Override
392  public long getNumberOfChildren(final DN parentDN) throws DirectoryException
393  {
394    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get());
395  }
396
397  /**
398   * Notifies persistent searches of this backend that a new cookie entry was added to it.
399   * <p>
400   * Note: This method correspond to the "persistent search" phase.
401   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
402   * <p>
403   * This method must only be called after the provided data have been persisted to disk.
404   *
405   * @param baseDN
406   *          the baseDN of the newly added entry.
407   * @param updateMsg
408   *          the update message of the newly added entry
409   * @throws ChangelogException
410   *           If a problem occurs while notifying of the newly added entry.
411   */
412  public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException
413  {
414    if (!(updateMsg instanceof LDAPUpdateMsg))
415    {
416      return;
417    }
418
419    try
420    {
421      for (PersistentSearch pSearch : cookieBasedPersistentSearches)
422      {
423        final SearchOperation searchOp = pSearch.getSearchOperation();
424        final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
425        entrySender.persistentSearchSendEntry(baseDN, updateMsg);
426      }
427    }
428    catch (DirectoryException e)
429    {
430      throw new ChangelogException(e.getMessageObject(), e);
431    }
432  }
433
434  /**
435   * Notifies persistent searches of this backend that a new change number entry was added to it.
436   * <p>
437   * Note: This method correspond to the "persistent search" phase.
438   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
439   * <p>
440   * This method must only be called after the provided data have been persisted to disk.
441   *
442   * @param baseDN
443   *          the baseDN of the newly added entry.
444   * @param changeNumber
445   *          the change number of the newly added entry. It will be greater
446   *          than zero for entries added to the change number index and less
447   *          than or equal to zero for entries added to any replica DB
448   * @param cookieString
449   *          a string representing the cookie of the newly added entry.
450   *          This is only meaningful for entries added to the change number index
451   * @param updateMsg
452   *          the update message of the newly added entry
453   * @throws ChangelogException
454   *           If a problem occurs while notifying of the newly added entry.
455   */
456  public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
457      throws ChangelogException
458  {
459    if (!(updateMsg instanceof LDAPUpdateMsg)
460        || changeNumberBasedPersistentSearches.isEmpty())
461    {
462      return;
463    }
464
465    try
466    {
467      // changeNumber entry can be shared with multiple persistent searches
468      final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
469      for (PersistentSearch pSearch : changeNumberBasedPersistentSearches)
470      {
471        final SearchOperation searchOp = pSearch.getSearchOperation();
472        final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
473        entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry);
474      }
475    }
476    catch (DirectoryException e)
477    {
478      throw new ChangelogException(e.getMessageObject(), e);
479    }
480  }
481
482  private boolean isCookieBased(final SearchOperation searchOp)
483  {
484    for (Control c : searchOp.getRequestControls())
485    {
486      if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID()))
487      {
488        return true;
489      }
490    }
491    return false;
492  }
493
494  /** {@inheritDoc} */
495  @Override
496  public void addEntry(Entry entry, AddOperation addOperation)
497      throws DirectoryException, CanceledOperationException
498  {
499    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
500        ERR_BACKEND_ADD_NOT_SUPPORTED.get(String.valueOf(entry.getName()), getBackendID()));
501  }
502
503  /** {@inheritDoc} */
504  @Override
505  public void deleteEntry(DN entryDN, DeleteOperation deleteOperation)
506      throws DirectoryException, CanceledOperationException
507  {
508    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
509        ERR_BACKEND_DELETE_NOT_SUPPORTED.get(String.valueOf(entryDN), getBackendID()));
510  }
511
512  /** {@inheritDoc} */
513  @Override
514  public void replaceEntry(Entry oldEntry, Entry newEntry,
515      ModifyOperation modifyOperation) throws DirectoryException,
516      CanceledOperationException
517  {
518    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
519        ERR_BACKEND_MODIFY_NOT_SUPPORTED.get(String.valueOf(newEntry.getName()), getBackendID()));
520  }
521
522  /** {@inheritDoc} */
523  @Override
524  public void renameEntry(DN currentDN, Entry entry,
525      ModifyDNOperation modifyDNOperation) throws DirectoryException,
526      CanceledOperationException
527  {
528    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
529        ERR_BACKEND_MODIFY_DN_NOT_SUPPORTED.get(String.valueOf(currentDN), getBackendID()));
530  }
531
532  /**
533   * {@inheritDoc}
534   * <p>
535   * Runs the "initial search" phase (as opposed to a "persistent search"
536   * phase). The "initial search" phase is the only search run by normal
537   * searches, but it is also run by persistent searches with
538   * <code>changesOnly=false</code>. Persistent searches with
539   * <code>changesOnly=true</code> never execute this code.
540   * <p>
541   * Note: this method is executed only once per persistent search, single
542   * threaded.
543   */
544  @Override
545  public void search(final SearchOperation searchOperation) throws DirectoryException
546  {
547    checkChangelogReadPrivilege(searchOperation);
548
549    final Set<DN> excludedBaseDNs = getExcludedBaseDNs();
550    final MultiDomainServerState cookie = getCookieFromControl(searchOperation, excludedBaseDNs);
551
552    final ChangeNumberRange range = optimizeSearch(searchOperation.getBaseDN(), searchOperation.getFilter());
553    try
554    {
555      final boolean isPersistentSearch = isPersistentSearch(searchOperation);
556      if (cookie != null)
557      {
558        initialSearchFromCookie(
559            getCookieEntrySender(SearchPhase.INITIAL, searchOperation, cookie, excludedBaseDNs, isPersistentSearch));
560      }
561      else
562      {
563        initialSearchFromChangeNumber(
564            getChangeNumberEntrySender(SearchPhase.INITIAL, searchOperation, range, isPersistentSearch));
565      }
566    }
567    catch (ChangelogException e)
568    {
569      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get(
570          searchOperation.getBaseDN(), searchOperation.getFilter(), stackTraceToSingleLineString(e)));
571    }
572  }
573
574  private MultiDomainServerState getCookieFromControl(final SearchOperation searchOperation, Set<DN> excludedBaseDNs)
575      throws DirectoryException
576  {
577    final ExternalChangelogRequestControl eclRequestControl =
578        searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
579    if (eclRequestControl != null)
580    {
581      final MultiDomainServerState cookie = eclRequestControl.getCookie();
582      validateProvidedCookie(cookie, excludedBaseDNs);
583      return cookie;
584    }
585    return null;
586  }
587
588  /** {@inheritDoc} */
589  @Override
590  public Set<String> getSupportedControls()
591  {
592    return supportedControls;
593  }
594
595  /** {@inheritDoc} */
596  @Override
597  public Set<String> getSupportedFeatures()
598  {
599    return Collections.emptySet();
600  }
601
602  /** {@inheritDoc} */
603  @Override
604  public boolean supports(BackendOperation backendOperation)
605  {
606    return false;
607  }
608
609  /** {@inheritDoc} */
610  @Override
611  public void exportLDIF(final LDIFExportConfig exportConfig)
612      throws DirectoryException
613  {
614    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
615        ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
616  }
617
618  /** {@inheritDoc} */
619  @Override
620  public LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext)
621      throws DirectoryException
622  {
623    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
624        ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
625  }
626
627  /** {@inheritDoc} */
628  @Override
629  public void createBackup(BackupConfig backupConfig) throws DirectoryException
630  {
631    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
632        ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
633  }
634
635  /** {@inheritDoc} */
636  @Override
637  public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
638  {
639      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
640          ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
641  }
642
643  /** {@inheritDoc} */
644  @Override
645  public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
646  {
647      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
648          ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
649  }
650
651  /** {@inheritDoc} */
652  @Override
653  public long getEntryCount()
654  {
655    try
656    {
657      return getNumberOfEntriesInBaseDN(CHANGELOG_BASE_DN) + 1;
658    }
659    catch (DirectoryException e)
660    {
661      logger.traceException(e);
662      return -1;
663    }
664  }
665
666  /**
667   * Represent the change number range targeted by a search operation.
668   * <p>
669   * This class should be visible for tests.
670   */
671  static final class ChangeNumberRange
672  {
673    private long lowerBound = -1;
674    private long upperBound = -1;
675
676    /**
677     * Returns the lowest change number to retrieve (inclusive).
678     *
679     * @return the lowest change number
680     */
681    long getLowerBound()
682    {
683      return lowerBound;
684    }
685
686    /**
687     * Returns the highest change number to retrieve (inclusive).
688     *
689     * @return the highest change number
690     */
691    long getUpperBound()
692    {
693      return upperBound;
694    }
695  }
696
697  /**
698   * Returns the set of DNs to exclude from the search.
699   *
700   * @return the DNs corresponding to domains to exclude from the search.
701   * @throws DirectoryException
702   *           If a DN can't be decoded.
703   */
704  private static Set<DN> getExcludedBaseDNs() throws DirectoryException
705  {
706    return getExcludedChangelogDomains();
707  }
708
709  /**
710   * Optimize the search parameters by analyzing the DN and filter.
711   * It also performs validation on some search parameters
712   * for both cookie and change number based changelogs.
713   *
714   * @param baseDN the provided search baseDN.
715   * @param userFilter the provided search filter.
716   * @return the optimized change number range
717   * @throws DirectoryException when an exception occurs.
718   */
719  ChangeNumberRange optimizeSearch(final DN baseDN, final SearchFilter userFilter) throws DirectoryException
720  {
721    SearchFilter equalityFilter = null;
722    switch (baseDN.size())
723    {
724    case 1:
725      // "cn=changelog" : use user-provided search filter.
726      break;
727    case 2:
728      // It is probably "changeNumber=xxx,cn=changelog", use equality filter
729      // But it also could be "<service-id>,cn=changelog" so need to check on attribute
730      equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR);
731      break;
732    default:
733      // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter
734      equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN");
735      break;
736    }
737
738    return optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter);
739  }
740
741  /**
742   * Build a search filter from given DN and attribute.
743   *
744   * @return the search filter or {@code null} if attribute is not present in
745   *         the provided DN
746   */
747  private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr)
748  {
749    final RDN rdn = baseDN.rdn();
750    AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(lowerCaseAttr, upperCaseAttr);
751    final ByteString attrValue = rdn.getAttributeValue(attrType);
752    if (attrValue != null)
753    {
754      return SearchFilter.createEqualityFilter(attrType, attrValue);
755    }
756    return null;
757  }
758
759  private ChangeNumberRange optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException
760  {
761    final ChangeNumberRange range = new ChangeNumberRange();
762    if (filter == null)
763    {
764      return range;
765    }
766
767    if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR))
768    {
769      range.lowerBound = decodeChangeNumber(filter.getAssertionValue());
770    }
771    else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR))
772    {
773      range.upperBound = decodeChangeNumber(filter.getAssertionValue());
774    }
775    else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR))
776    {
777      final long number = decodeChangeNumber(filter.getAssertionValue());
778      range.lowerBound = number;
779      range.upperBound = number;
780    }
781    else if (matches(filter, FilterType.EQUALITY, "replicationcsn"))
782    {
783      // == exact CSN
784      // validate provided CSN is correct
785      new CSN(filter.getAssertionValue().toString());
786    }
787    else if (filter.getFilterType() == FilterType.AND)
788    {
789      // TODO: it looks like it could be generalized to N components, not only two
790      final Collection<SearchFilter> components = filter.getFilterComponents();
791      final SearchFilter filters[] = components.toArray(new SearchFilter[0]);
792      long upper1 = -1;
793      long lower1 = -1;
794      long upper2 = -1;
795      long lower2 = -1;
796      if (filters.length > 0)
797      {
798        ChangeNumberRange range1 = optimizeSearchUsingFilter(filters[0]);
799        upper1 = range1.upperBound;
800        lower1 = range1.lowerBound;
801      }
802      if (filters.length > 1)
803      {
804        ChangeNumberRange range2 = optimizeSearchUsingFilter(filters[1]);
805        upper2 = range2.upperBound;
806        lower2 = range2.lowerBound;
807      }
808      if (upper1 == -1)
809      {
810        range.upperBound = upper2;
811      }
812      else if (upper2 == -1)
813      {
814        range.upperBound = upper1;
815      }
816      else
817      {
818        range.upperBound = Math.min(upper1, upper2);
819      }
820
821      range.lowerBound = Math.max(lower1, lower2);
822    }
823    return range;
824  }
825
826  private static long decodeChangeNumber(final ByteString assertionValue)
827      throws DirectoryException
828  {
829    try
830    {
831      return Long.decode(assertionValue.toString());
832    }
833    catch (NumberFormatException e)
834    {
835      throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX,
836          LocalizableMessage.raw("Could not convert value '%s' to long", assertionValue));
837    }
838  }
839
840  private boolean matches(SearchFilter filter, FilterType filterType, String primaryName)
841  {
842    return filter.getFilterType() == filterType
843           && filter.getAttributeType() != null
844           && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName);
845  }
846
847  /**
848   * Search the changelog when a cookie control is provided.
849   */
850  private void initialSearchFromCookie(final CookieEntrySender entrySender)
851      throws DirectoryException, ChangelogException
852  {
853    if (!sendBaseChangelogEntry(entrySender.searchOp))
854    { // only return the base entry: stop here
855      return;
856    }
857
858    ECLMultiDomainDBCursor replicaUpdatesCursor = null;
859    try
860    {
861      final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
862      CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
863      final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
864          entrySender.cookie, options, entrySender.excludedBaseDNs);
865      replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
866
867      if (sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor))
868      {
869        entrySender.transitioningToPersistentSearchPhase();
870        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
871      }
872    }
873    finally
874    {
875      entrySender.finalizeInitialSearch();
876      StaticUtils.close(replicaUpdatesCursor);
877    }
878  }
879
880  private CookieEntrySender getCookieEntrySender(SearchPhase startPhase, final SearchOperation searchOperation,
881      MultiDomainServerState cookie, Set<DN> excludedBaseDNs, boolean isPersistentSearch)
882  {
883    if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase))
884    {
885      return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
886    }
887    return new CookieEntrySender(searchOperation, startPhase, cookie, excludedBaseDNs);
888  }
889
890  private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
891      final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException
892  {
893    boolean continueSearch = true;
894    while (continueSearch && replicaUpdatesCursor.next())
895    {
896      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
897      final DN domainBaseDN = replicaUpdatesCursor.getData();
898      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN);
899    }
900    return continueSearch;
901  }
902
903  private boolean isPersistentSearch(SearchOperation op)
904  {
905    for (PersistentSearch pSearch : getPersistentSearches())
906    {
907      if (op == pSearch.getSearchOperation())
908      {
909        return true;
910      }
911    }
912    return false;
913  }
914
915  /** {@inheritDoc} */
916  @Override
917  public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException
918  {
919    initializePersistentSearch(pSearch);
920
921    if (isCookieBased(pSearch.getSearchOperation()))
922    {
923      cookieBasedPersistentSearches.add(pSearch);
924    }
925    else
926    {
927      changeNumberBasedPersistentSearches.add(pSearch);
928    }
929    super.registerPersistentSearch(pSearch);
930  }
931
932  private void initializePersistentSearch(PersistentSearch pSearch) throws DirectoryException
933  {
934    final SearchOperation searchOp = pSearch.getSearchOperation();
935
936    // Validation must be done during registration for changes only persistent searches.
937    // Otherwise, when there is an initial search phase,
938    // validation is performed by the search() method.
939    if (pSearch.isChangesOnly())
940    {
941      checkChangelogReadPrivilege(searchOp);
942    }
943    final ChangeNumberRange range = optimizeSearch(searchOp.getBaseDN(), searchOp.getFilter());
944
945    final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL;
946    if (isCookieBased(searchOp))
947    {
948      final Set<DN> excludedBaseDNs = getExcludedBaseDNs();
949      final MultiDomainServerState cookie = getCookie(pSearch.isChangesOnly(), searchOp, excludedBaseDNs);
950      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT,
951          new CookieEntrySender(searchOp, startPhase, cookie, excludedBaseDNs));
952    }
953    else
954    {
955      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT,
956          new ChangeNumberEntrySender(searchOp, startPhase, range));
957    }
958  }
959
960  private MultiDomainServerState getCookie(boolean isChangesOnly, SearchOperation searchOp, Set<DN> excludedBaseDNs)
961      throws DirectoryException
962  {
963    if (isChangesOnly)
964    {
965      // this changesOnly persistent search will not go through #initialSearch()
966      // so we must initialize the cookie here
967      return getNewestCookie(searchOp);
968    }
969    return getCookieFromControl(searchOp, excludedBaseDNs);
970  }
971
972  private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
973  {
974    if (!isCookieBased(searchOp))
975    {
976      return null;
977    }
978
979    final MultiDomainServerState cookie = new MultiDomainServerState();
980    for (final Iterator<ReplicationServerDomain> it =
981        replicationServer.getDomainIterator(); it.hasNext();)
982    {
983      final DN baseDN = it.next().getBaseDN();
984      final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN);
985      cookie.update(baseDN, state);
986    }
987    return cookie;
988  }
989
990  /**
991   * Validates the cookie contained in search parameters by checking its content
992   * with the actual replication server state.
993   *
994   * @throws DirectoryException
995   *           If the state is not valid
996   */
997  private void validateProvidedCookie(final MultiDomainServerState cookie, Set<DN> excludedBaseDNs)
998      throws DirectoryException
999  {
1000    if (cookie != null && !cookie.isEmpty())
1001    {
1002      replicationServer.validateCookie(cookie, excludedBaseDNs);
1003    }
1004  }
1005
1006  /**
1007   * Search the changelog using change number(s).
1008   */
1009  private void initialSearchFromChangeNumber(final ChangeNumberEntrySender entrySender)
1010      throws ChangelogException, DirectoryException
1011  {
1012    if (!sendBaseChangelogEntry(entrySender.searchOp))
1013    { // only return the base entry: stop here
1014      return;
1015    }
1016
1017    DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
1018    final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<>();
1019    try
1020    {
1021      cnIndexDBCursor = getCNIndexDBCursor(entrySender.lowestChangeNumber);
1022      final MultiDomainServerState cookie = new MultiDomainServerState();
1023
1024      if (sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie))
1025      {
1026        entrySender.transitioningToPersistentSearchPhase();
1027        sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie);
1028      }
1029    }
1030    finally
1031    {
1032      entrySender.finalizeInitialSearch();
1033      StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor.get());
1034    }
1035  }
1036
1037  private ChangeNumberEntrySender getChangeNumberEntrySender(SearchPhase startPhase,
1038      final SearchOperation searchOperation, ChangeNumberRange range, boolean isPersistentSearch)
1039  {
1040    if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase))
1041    {
1042      return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
1043    }
1044    return new ChangeNumberEntrySender(searchOperation, SearchPhase.INITIAL, range);
1045  }
1046
1047  private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
1048      DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor,
1049      MultiDomainServerState cookie) throws ChangelogException, DirectoryException
1050  {
1051    boolean continueSearch = true;
1052    while (continueSearch && cnIndexDBCursor.next())
1053    {
1054      // Handle the current cnIndex record
1055      final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
1056      if (replicaUpdatesCursor.get() == null)
1057      {
1058        replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
1059        initializeCookieForChangeNumberMode(cookie, cnIndexRecord);
1060      }
1061      else
1062      {
1063        cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
1064      }
1065      continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
1066      if (continueSearch)
1067      {
1068        final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN());
1069        if (updateMsg != null)
1070        {
1071          continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
1072          replicaUpdatesCursor.get().next();
1073        }
1074      }
1075    }
1076    return continueSearch;
1077  }
1078
1079  /** Initialize the provided cookie from the provided change number index record. */
1080  private void initializeCookieForChangeNumberMode(
1081      MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
1082  {
1083    // Initialize the multi domain cursor only from the change number index record.
1084    // The cookie is always empty at this stage.
1085    CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN());
1086    MultiDomainServerState unused = new MultiDomainServerState();
1087    MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options);
1088    try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor))
1089    {
1090      updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord);
1091    }
1092  }
1093
1094  /**
1095   * Rebuilds the changelogcookie starting at the newest change number index record.
1096   * <p>
1097   * It updates the provided cookie with the changes from the provided ECL cursor,
1098   * up to (and including) the provided change number index record.
1099   * <p>
1100   * Therefore, after calling this method, the cursor is positioned
1101   * to the change immediately following the provided change number index record.
1102   *
1103   * @param cookie the cookie to update
1104   * @param cursor the cursor where to read changes from
1105   * @param cnIndexRecord the change number index record to go right after
1106   * @throws ChangelogException if any problem occurs
1107   */
1108  public static void updateCookieToMediumConsistencyPoint(
1109      MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord)
1110          throws ChangelogException
1111  {
1112    if (cnIndexRecord == null)
1113    {
1114      return;
1115    }
1116
1117    while (cursor.next())
1118    {
1119      UpdateMsg updateMsg = cursor.getRecord();
1120      if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0)
1121      {
1122        break;
1123      }
1124      cookie.update(cursor.getData(), updateMsg.getCSN());
1125    }
1126  }
1127
1128  private MultiDomainDBCursor initializeReplicaUpdatesCursor(
1129      final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
1130  {
1131    final MultiDomainServerState state = new MultiDomainServerState();
1132    state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
1133
1134    // No need for ECLMultiDomainDBCursor in this case
1135    // as updateMsg will be matched with cnIndexRecord
1136    CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
1137    final MultiDomainDBCursor replicaUpdatesCursor =
1138        getChangelogDB().getReplicationDomainDB().getCursorFrom(state, options);
1139    replicaUpdatesCursor.next();
1140    return replicaUpdatesCursor;
1141  }
1142
1143  /**
1144   * Returns the replica update message corresponding to the provided
1145   * cnIndexRecord.
1146   *
1147   * @return the update message, which may be {@code null} if the update message
1148   *         could not be found because it was purged or because corresponding
1149   *         baseDN was removed from the changelog
1150   * @throws DirectoryException
1151   *           If inconsistency is detected between the available update
1152   *           messages and the provided cnIndexRecord
1153   */
1154  private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn)
1155      throws ChangelogException, DirectoryException
1156  {
1157    while (true)
1158    {
1159      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
1160      final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN());
1161      if (compareIndexWithUpdateMsg < 0) {
1162        // Either update message has been purged or baseDN has been removed from changelogDB,
1163        // ignore current index record and go to the next one
1164        return null;
1165      }
1166      else if (compareIndexWithUpdateMsg == 0)
1167      {
1168        // Found the matching update message
1169        return updateMsg;
1170      }
1171      // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet
1172      if (!replicaUpdatesCursor.next())
1173      {
1174        // Should never happen, as it means some messages have disappeared
1175        // TODO : put the correct I18N message
1176        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1177            LocalizableMessage.raw("Could not find replica update message matching index record. " +
1178                "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist."));
1179      }
1180    }
1181  }
1182
1183  /** Returns a cursor on CNIndexDB for the provided first change number. */
1184  private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(
1185      final long firstChangeNumber) throws ChangelogException
1186  {
1187    final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB();
1188    long changeNumberToUse = firstChangeNumber;
1189    if (changeNumberToUse <= 1)
1190    {
1191      final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
1192      changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber();
1193    }
1194    return cnIndexDB.getCursorFrom(changeNumberToUse);
1195  }
1196
1197  /**
1198   * Creates a changelog entry.
1199   */
1200  private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie,
1201      final UpdateMsg msg) throws DirectoryException
1202  {
1203    if (msg instanceof AddMsg)
1204    {
1205      return createAddMsg(baseDN, changeNumber, cookie, msg);
1206    }
1207    else if (msg instanceof ModifyCommonMsg)
1208    {
1209      return createModifyMsg(baseDN, changeNumber, cookie, msg);
1210    }
1211    else if (msg instanceof DeleteMsg)
1212    {
1213      final DeleteMsg delMsg = (DeleteMsg) msg;
1214      return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName());
1215    }
1216    throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1217        LocalizableMessage.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN,
1218            msg.getClass()));
1219  }
1220
1221  /**
1222   * Creates an entry from an add message.
1223   * <p>
1224   * Map addMsg to an LDIF string for the 'changes' attribute, and pull out
1225   * change initiators name if available which is contained in the creatorsName
1226   * attribute.
1227   */
1228  private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
1229      throws DirectoryException
1230  {
1231    final AddMsg addMsg = (AddMsg) msg;
1232    String changeInitiatorsName = null;
1233    String ldifChanges = null;
1234    try
1235    {
1236      final StringBuilder builder = new StringBuilder(256);
1237      for (Attribute attr : addMsg.getAttributes())
1238      {
1239        if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty())
1240        {
1241          // This attribute is not multi-valued.
1242          changeInitiatorsName = attr.iterator().next().toString();
1243        }
1244        final String attrName = attr.getNameWithOptions();
1245        for (ByteString value : attr)
1246        {
1247          builder.append(attrName);
1248          appendLDIFSeparatorAndValue(builder, value);
1249          builder.append('\n');
1250        }
1251      }
1252      ldifChanges = builder.toString();
1253    }
1254    catch (Exception e)
1255    {
1256      logEncodingMessageError("add", addMsg.getDN(), e);
1257    }
1258
1259    return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName);
1260  }
1261
1262  /**
1263   * Creates an entry from a modify message.
1264   * <p>
1265   * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull
1266   * out change initiators name if available which is contained in the
1267   * modifiersName attribute.
1268   */
1269  private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie,
1270      final UpdateMsg msg) throws DirectoryException
1271  {
1272    final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
1273    String changeInitiatorsName = null;
1274    String ldifChanges = null;
1275    try
1276    {
1277      final StringBuilder builder = new StringBuilder(128);
1278      for (Modification mod : modifyMsg.getMods())
1279      {
1280        final Attribute attr = mod.getAttribute();
1281        if (mod.getModificationType() == ModificationType.REPLACE
1282            && attr.getAttributeType().equals(MODIFIERS_NAME_TYPE)
1283            && !attr.isEmpty())
1284        {
1285          // This attribute is not multi-valued.
1286          changeInitiatorsName = attr.iterator().next().toString();
1287        }
1288        final String attrName = attr.getNameWithOptions();
1289        builder.append(mod.getModificationType());
1290        builder.append(": ");
1291        builder.append(attrName);
1292        builder.append('\n');
1293
1294        for (ByteString value : attr)
1295        {
1296          builder.append(attrName);
1297          appendLDIFSeparatorAndValue(builder, value);
1298          builder.append('\n');
1299        }
1300        builder.append("-\n");
1301      }
1302      ldifChanges = builder.toString();
1303    }
1304    catch (Exception e)
1305    {
1306      logEncodingMessageError("modify", modifyMsg.getDN(), e);
1307    }
1308
1309    final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg;
1310    final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges,
1311        isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName);
1312
1313    if (isModifyDNMsg)
1314    {
1315      final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg;
1316      addAttribute(entry, "newrdn", modDNMsg.getNewRDN());
1317      if (modDNMsg.getNewSuperior() != null)
1318      {
1319        addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior());
1320      }
1321      addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn()));
1322    }
1323    return entry;
1324  }
1325
1326  /**
1327   * Log an encoding message error.
1328   *
1329   * @param messageType
1330   *            String identifying type of message. Should be "add" or "modify".
1331   * @param entryDN
1332   *            DN of original entry
1333   */
1334  private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
1335  {
1336    logger.traceException(exception);
1337    logger.error(LocalizableMessage.raw(
1338        "An exception was encountered while trying to encode a replication " + messageType + " message for entry \""
1339        + entryDN + "\" into an External Change Log entry: " + exception.getMessage()));
1340  }
1341
1342  private void checkChangelogReadPrivilege(SearchOperation searchOp) throws DirectoryException
1343  {
1344    if (!searchOp.getClientConnection().hasPrivilege(Privilege.CHANGELOG_READ, searchOp))
1345    {
1346      throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS,
1347          NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
1348    }
1349  }
1350
1351  /**
1352   * Create a changelog entry from a set of provided information. This is the part of
1353   * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN).
1354   */
1355  private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie,
1356      final LDAPUpdateMsg msg, final String ldifChanges, final String changeType,
1357      final String changeInitiatorsName) throws DirectoryException
1358  {
1359    final CSN csn = msg.getCSN();
1360    String dnString;
1361    if (changeNumber > 0)
1362    {
1363      // change number mode
1364      dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
1365    }
1366    else
1367    {
1368      // Cookie mode
1369      dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT;
1370    }
1371
1372    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>();
1373    final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<>();
1374
1375    // Operational standard attributes
1376    addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC,
1377        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs);
1378    addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs);
1379    addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs);
1380    addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
1381
1382    // REQUIRED attributes
1383    if (changeNumber > 0)
1384    {
1385      addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
1386    }
1387    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME);
1388    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ??
1389    final String format = dateFormat.format(new Date(csn.getTime()));
1390    addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs);
1391    addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs);
1392    addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs);
1393
1394    // NON REQUESTED attributes
1395    addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs);
1396    addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()),
1397        userAttrs, opAttrs);
1398
1399    if (ldifChanges != null)
1400    {
1401      addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs);
1402    }
1403    if (changeInitiatorsName != null)
1404    {
1405      addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs);
1406    }
1407
1408    final String targetUUID = msg.getEntryUUID();
1409    if (targetUUID != null)
1410    {
1411      addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
1412    }
1413    final String cookie2 = cookie != null ? cookie : "";
1414    addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs);
1415
1416    final List<RawAttribute> includedAttributes = msg.getEclIncludes();
1417    if (includedAttributes != null && !includedAttributes.isEmpty())
1418    {
1419      final StringBuilder builder = new StringBuilder(256);
1420      for (final RawAttribute includedAttribute : includedAttributes)
1421      {
1422        final String name = includedAttribute.getAttributeType();
1423        for (final ByteString value : includedAttribute.getValues())
1424        {
1425          builder.append(name);
1426          appendLDIFSeparatorAndValue(builder, value);
1427          builder.append('\n');
1428        }
1429      }
1430      final String includedAttributesLDIF = builder.toString();
1431      addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs);
1432    }
1433
1434    return new Entry(DN.valueOf(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
1435  }
1436
1437  /**
1438   * Sends the entry if it matches the base, scope and filter of the current search operation.
1439   * It will also send the base changelog entry if it needs to be sent and was not sent before.
1440   *
1441   * @return {@code true} if search should continue, {@code false} otherwise
1442   */
1443  private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie)
1444      throws DirectoryException
1445  {
1446    if (matchBaseAndScopeAndFilter(searchOp, entry))
1447    {
1448      return searchOp.returnEntry(entry, getControls(cookie));
1449    }
1450    // maybe the next entry will match?
1451    return true;
1452  }
1453
1454  /** Indicates if the provided entry matches the filter, base and scope. */
1455  private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
1456  {
1457    return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
1458        && searchOp.getFilter().matchesEntry(entry);
1459  }
1460
1461  private static List<Control> getControls(String cookie)
1462  {
1463    if (cookie != null)
1464    {
1465      final Control c = new EntryChangelogNotificationControl(true, cookie);
1466      return Collections.singletonList(c);
1467    }
1468    return Collections.emptyList();
1469  }
1470
1471  /**
1472   * Create and returns the base changelog entry to the underlying search operation.
1473   * <p>
1474   * "initial search" phase must return the base entry immediately.
1475   *
1476   * @return {@code true} if search should continue, {@code false} otherwise
1477   */
1478  private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException
1479  {
1480    final DN baseDN = searchOp.getBaseDN();
1481    final SearchFilter filter = searchOp.getFilter();
1482    final SearchScope scope = searchOp.getScope();
1483
1484    if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
1485    {
1486      final Entry entry = buildBaseChangelogEntry();
1487      if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
1488      {
1489        // Abandon, size limit reached.
1490        return false;
1491      }
1492    }
1493    return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
1494        || !scope.equals(SearchScope.BASE_OBJECT);
1495  }
1496
1497  private Entry buildBaseChangelogEntry() throws DirectoryException
1498  {
1499    final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates());
1500
1501    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>();
1502    final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<>();
1503
1504    // We never return the numSubordinates attribute for the base changelog entry
1505    // and there is a very good reason for that:
1506    // - Either we compute it before sending the entries,
1507    // -- then we risk returning more entries if new entries come in after we computed numSubordinates
1508    // --   or we risk returning less entries if purge kicks in      after we computed numSubordinates
1509    // - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError
1510
1511    addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
1512    addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
1513        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
1514    addAttributeByUppercaseName("hassubordinates", "hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs);
1515    addAttributeByUppercaseName("entrydn", "entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
1516    return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
1517  }
1518
1519  private static void addAttribute(final Entry e, final String attrType, final String attrValue)
1520  {
1521    e.addAttribute(Attributes.create(attrType, attrValue), null);
1522  }
1523
1524  private static void addAttributeByType(String attrNameLowercase,
1525      String attrNameUppercase, String attrValue,
1526      Map<AttributeType, List<Attribute>> userAttrs,
1527      Map<AttributeType, List<Attribute>> operationalAttrs)
1528  {
1529    addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
1530  }
1531
1532  private static void addAttributeByUppercaseName(String attrNameLowercase,
1533      String attrNameUppercase,  String attrValue,
1534      Map<AttributeType, List<Attribute>> userAttrs,
1535      Map<AttributeType, List<Attribute>> operationalAttrs)
1536  {
1537    addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false);
1538  }
1539
1540  private static void addAttribute(final String attrNameLowercase,
1541      final String attrNameUppercase, final String attrValue,
1542      final Map<AttributeType, List<Attribute>> userAttrs,
1543      final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType)
1544  {
1545    AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(attrNameLowercase, attrNameUppercase);
1546    final Attribute a = addByType
1547        ? Attributes.create(attrType, attrValue)
1548        : Attributes.create(attrNameUppercase, attrValue);
1549    final List<Attribute> attrList = Collections.singletonList(a);
1550    if (attrType.isOperational())
1551    {
1552      operationalAttrs.put(attrType, attrList);
1553    }
1554    else
1555    {
1556      userAttrs.put(attrType, attrList);
1557    }
1558  }
1559
1560  /**
1561   * Describes the current search phase.
1562   */
1563  private enum SearchPhase
1564  {
1565    /**
1566     * "Initial search" phase. The "initial search" phase is running
1567     * concurrently. All update notifications are ignored.
1568     */
1569    INITIAL,
1570    /**
1571     * Transitioning from the "initial search" phase to the "persistent search"
1572     * phase. "Initial search" phase has finished reading from the DB. It now
1573     * verifies if any more updates have been persisted to the DB since stopping
1574     * and send them. All update notifications are blocked.
1575     */
1576    TRANSITIONING,
1577    /**
1578     * "Persistent search" phase. "Initial search" phase has completed. All
1579     * update notifications are published.
1580     */
1581    PERSISTENT;
1582  }
1583
1584  /**
1585   * Contains data to ensure that the same change is not sent twice to clients
1586   * because of race conditions between the "initial search" phase and the
1587   * "persistent search" phase.
1588   */
1589  private static class SendEntryData<K extends Comparable<K>>
1590  {
1591    private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<>(SearchPhase.INITIAL);
1592    private final Object transitioningLock = new Object();
1593    private volatile K lastKeySentByInitialSearch;
1594
1595    private SendEntryData(SearchPhase startPhase)
1596    {
1597      searchPhase.set(startPhase);
1598    }
1599
1600    private void finalizeInitialSearch()
1601    {
1602      searchPhase.set(SearchPhase.PERSISTENT);
1603      synchronized (transitioningLock)
1604      { // initial search phase has completed, release all persistent searches
1605        transitioningLock.notifyAll();
1606      }
1607    }
1608
1609    public void transitioningToPersistentSearchPhase()
1610    {
1611      searchPhase.set(SearchPhase.TRANSITIONING);
1612    }
1613
1614    private void initialSearchSendsEntry(final K key)
1615    {
1616      lastKeySentByInitialSearch = key;
1617    }
1618
1619    private boolean persistentSearchCanSendEntry(K key)
1620    {
1621      final SearchPhase stateValue = searchPhase.get();
1622      switch (stateValue)
1623      {
1624      case INITIAL:
1625        return false;
1626      case TRANSITIONING:
1627        synchronized (transitioningLock)
1628        {
1629          while (SearchPhase.TRANSITIONING.equals(searchPhase.get()))
1630          {
1631            // "initial search" phase is over, and is now verifying whether new
1632            // changes have been published to the DB.
1633            // Wait for this check to complete
1634            try
1635            {
1636              transitioningLock.wait();
1637            }
1638            catch (InterruptedException e)
1639            {
1640              Thread.currentThread().interrupt();
1641              // Shutdown must have been called. Stop sending entries.
1642              return false;
1643            }
1644          }
1645        }
1646        return key.compareTo(lastKeySentByInitialSearch) > 0;
1647      case PERSISTENT:
1648        return true;
1649      default:
1650        throw new RuntimeException("Not implemented for " + stateValue);
1651      }
1652    }
1653  }
1654
1655  /** Sends entries to clients for change number searches. */
1656  private static class ChangeNumberEntrySender
1657  {
1658    private final SearchOperation searchOp;
1659    private final long lowestChangeNumber;
1660    private final long highestChangeNumber;
1661    private final SendEntryData<Long> sendEntryData;
1662
1663    private ChangeNumberEntrySender(SearchOperation searchOp, SearchPhase startPhase, ChangeNumberRange range)
1664    {
1665      this.searchOp = searchOp;
1666      this.sendEntryData = new SendEntryData<>(startPhase);
1667      this.lowestChangeNumber = range.lowerBound;
1668      this.highestChangeNumber = range.upperBound;
1669    }
1670
1671    /**
1672     * Indicates if provided change number is compatible with last change
1673     * number.
1674     *
1675     * @param changeNumber
1676     *          The change number to test.
1677     * @return {@code true} if and only if the provided change number is in the
1678     *         range of the last change number.
1679     */
1680    boolean changeNumberIsInRange(long changeNumber)
1681    {
1682      return highestChangeNumber == -1 || changeNumber <= highestChangeNumber;
1683    }
1684
1685    private void finalizeInitialSearch()
1686    {
1687      sendEntryData.finalizeInitialSearch();
1688    }
1689
1690    private void transitioningToPersistentSearchPhase()
1691    {
1692      sendEntryData.transitioningToPersistentSearchPhase();
1693    }
1694
1695    /**
1696     * @return {@code true} if search should continue, {@code false} otherwise
1697     */
1698    private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg,
1699        MultiDomainServerState cookie) throws DirectoryException
1700    {
1701      final DN baseDN = cnIndexRecord.getBaseDN();
1702      sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
1703      final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
1704      return sendEntryIfMatches(searchOp, entry, null);
1705    }
1706
1707    private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException
1708    {
1709      if (sendEntryData.persistentSearchCanSendEntry(changeNumber))
1710      {
1711        sendEntryIfMatches(searchOp, entry, null);
1712      }
1713    }
1714  }
1715
1716  /** Sends entries to clients for cookie-based searches. */
1717  private static class CookieEntrySender {
1718    private final SearchOperation searchOp;
1719    private final SearchPhase startPhase;
1720    private final Set<DN> excludedBaseDNs;
1721    private final MultiDomainServerState cookie;
1722    private final ConcurrentSkipListMap<ReplicaId, SendEntryData<CSN>> replicaIdToSendEntryData =
1723        new ConcurrentSkipListMap<>();
1724
1725    private CookieEntrySender(SearchOperation searchOp, SearchPhase startPhase, MultiDomainServerState cookie,
1726        Set<DN> excludedBaseDNs)
1727    {
1728      this.searchOp = searchOp;
1729      this.startPhase = startPhase;
1730      this.cookie = cookie;
1731      this.excludedBaseDNs = excludedBaseDNs;
1732    }
1733
1734    private void finalizeInitialSearch()
1735    {
1736      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
1737      {
1738        sendEntryData.finalizeInitialSearch();
1739      }
1740    }
1741
1742    private void transitioningToPersistentSearchPhase()
1743    {
1744      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
1745      {
1746        sendEntryData.transitioningToPersistentSearchPhase();
1747      }
1748    }
1749
1750    private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn)
1751    {
1752      final ReplicaId replicaId = ReplicaId.of(baseDN, csn.getServerId());
1753      SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId);
1754      if (data == null)
1755      {
1756        final SendEntryData<CSN> newData = new SendEntryData<>(startPhase);
1757        data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData);
1758        return data == null ? newData : data;
1759      }
1760      return data;
1761    }
1762
1763    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException
1764    {
1765      final CSN csn = updateMsg.getCSN();
1766      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
1767      sendEntryData.initialSearchSendsEntry(csn);
1768      final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
1769      final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
1770      return sendEntryIfMatches(searchOp, entry, cookieString);
1771    }
1772
1773    private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg)
1774        throws DirectoryException
1775    {
1776      final CSN csn = updateMsg.getCSN();
1777      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
1778      if (sendEntryData.persistentSearchCanSendEntry(csn))
1779      {
1780        // multi threaded case: wait for the "initial search" phase to set the cookie
1781        final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
1782        final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
1783        // FIXME JNR use this instead of previous line:
1784        // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
1785        sendEntryIfMatches(searchOp, cookieEntry, cookieString);
1786      }
1787    }
1788
1789    private String updateCookie(DN baseDN, final CSN csn)
1790    {
1791      synchronized (cookie)
1792      { // forbid concurrent updates to the cookie
1793        cookie.update(baseDN, csn);
1794        return cookie.toString();
1795      }
1796    }
1797  }
1798}