001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.plugin;
028
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
031import static org.opends.server.util.ServerConstants.*;
032import static org.opends.server.util.StaticUtils.*;
033
034import java.util.ArrayList;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.concurrent.BlockingQueue;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.LinkedBlockingQueue;
043import java.util.concurrent.atomic.AtomicReference;
044
045import org.forgerock.i18n.LocalizableMessage;
046import org.forgerock.i18n.slf4j.LocalizedLogger;
047import org.forgerock.opendj.config.server.ConfigChangeResult;
048import org.forgerock.opendj.config.server.ConfigException;
049import org.forgerock.opendj.ldap.ResultCode;
050import org.opends.server.admin.server.ConfigurationAddListener;
051import org.opends.server.admin.server.ConfigurationChangeListener;
052import org.opends.server.admin.server.ConfigurationDeleteListener;
053import org.opends.server.admin.std.server.ReplicationDomainCfg;
054import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
055import org.opends.server.api.Backend;
056import org.opends.server.api.BackupTaskListener;
057import org.opends.server.api.ExportTaskListener;
058import org.opends.server.api.ImportTaskListener;
059import org.opends.server.api.RestoreTaskListener;
060import org.opends.server.api.SynchronizationProvider;
061import org.opends.server.core.DirectoryServer;
062import org.opends.server.replication.service.DSRSShutdownSync;
063import org.opends.server.types.BackupConfig;
064import org.opends.server.types.Control;
065import org.opends.server.types.DN;
066import org.opends.server.types.DirectoryException;
067import org.opends.server.types.Entry;
068import org.opends.server.types.LDIFExportConfig;
069import org.opends.server.types.LDIFImportConfig;
070import org.opends.server.types.Modification;
071import org.opends.server.types.Operation;
072import org.opends.server.types.RestoreConfig;
073import org.opends.server.types.SynchronizationProviderResult;
074import org.opends.server.types.operation.PluginOperation;
075import org.opends.server.types.operation.PostOperationAddOperation;
076import org.opends.server.types.operation.PostOperationDeleteOperation;
077import org.opends.server.types.operation.PostOperationModifyDNOperation;
078import org.opends.server.types.operation.PostOperationModifyOperation;
079import org.opends.server.types.operation.PostOperationOperation;
080import org.opends.server.types.operation.PreOperationAddOperation;
081import org.opends.server.types.operation.PreOperationDeleteOperation;
082import org.opends.server.types.operation.PreOperationModifyDNOperation;
083import org.opends.server.types.operation.PreOperationModifyOperation;
084
085/**
086 * This class is used to load the Replication code inside the JVM
087 * and to trigger initialization of the replication.
088 *
089 * It also extends the SynchronizationProvider class in order to have some
090 * replication code running during the operation process
091 * as pre-op, conflictResolution, and post-op.
092 */
093public class MultimasterReplication
094       extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
095       implements ConfigurationAddListener<ReplicationDomainCfg>,
096                  ConfigurationDeleteListener<ReplicationDomainCfg>,
097                  ConfigurationChangeListener
098                  <ReplicationSynchronizationProviderCfg>,
099                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
100                  ExportTaskListener
101{
102
103  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
104
105  private ReplicationServerListener replicationServerListener;
106  private static final Map<DN, LDAPReplicationDomain> domains = new ConcurrentHashMap<>(4);
107  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
108  /** The queue of received update messages, to be treated by the ReplayThread threads. */
109  private static final BlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<>(10000);
110  /** The list of ReplayThread threads. */
111  private static final List<ReplayThread> replayThreads = new ArrayList<>();
112  /** The configurable number of replay threads. */
113  private static int replayThreadNumber = 10;
114
115  /** Enum that symbolizes the state of the multimaster replication. */
116  private static enum State
117  {
118    STARTING, RUNNING, STOPPING
119  }
120
121  private static final AtomicReference<State> state = new AtomicReference<>(State.STARTING);
122
123  /** The configurable connection/handshake timeout. */
124  private static volatile int connectionTimeoutMS = 5000;
125
126  /**
127   * Finds the domain for a given DN.
128   *
129   * @param dn         The DN for which the domain must be returned.
130   * @param pluginOp   An optional operation for which the check is done.
131   *                   Can be null is the request has no associated operation.
132   * @return           The domain for this DN.
133   */
134  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
135  {
136    /*
137     * Don't run the special replication code on Operation that are
138     * specifically marked as don't synchronize.
139     */
140    if (pluginOp instanceof Operation)
141    {
142        final Operation op = (Operation) pluginOp;
143        if (op.dontSynchronize())
144        {
145          return null;
146        }
147
148        /*
149         * Check if the provided operation is a repair operation and set the
150         * synchronization flags if necessary.
151         * The repair operations are tagged as synchronization operations so
152         * that the core server let the operation modify the entryuuid and
153         * ds-sync-hist attributes.
154         * They are also tagged as dontSynchronize so that the replication code
155         * running later do not generate CSN, solve conflicts and forward the
156         * operation to the replication server.
157         */
158        final List<Control> controls = op.getRequestControls();
159        for (Iterator<Control> iter = controls.iterator(); iter.hasNext();)
160        {
161          Control c = iter.next();
162          if (OID_REPLICATION_REPAIR_CONTROL.equals(c.getOID()))
163          {
164            op.setSynchronizationOperation(true);
165            op.setDontSynchronize(true);
166            /*
167            remove this control from the list of controls since it has now been
168            processed and the local backend will fail if it finds a control that
169            it does not know about and that is marked as critical.
170            */
171            iter.remove();
172            return null;
173          }
174        }
175    }
176
177
178    LDAPReplicationDomain domain = null;
179    DN temp = dn;
180    while (domain == null && temp != null)
181    {
182      domain = domains.get(temp);
183      temp = temp.getParentDNInSuffix();
184    }
185
186    return domain;
187  }
188
189  /**
190   * Creates a new domain from its configEntry, do the
191   * necessary initialization and starts it so that it is
192   * fully operational when this method returns.
193   * @param configuration The entry with the configuration of this domain.
194   * @return The domain created.
195   * @throws ConfigException When the configuration is not valid.
196   */
197  public static LDAPReplicationDomain createNewDomain(
198      ReplicationDomainCfg configuration)
199      throws ConfigException
200  {
201    try
202    {
203      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
204          configuration, updateToReplayQueue, dsrsShutdownSync);
205      if (domains.isEmpty())
206      {
207        // Create the threads that will process incoming update messages
208        createReplayThreads();
209      }
210
211      domains.put(domain.getBaseDN(), domain);
212      return domain;
213    }
214    catch (ConfigException e)
215    {
216      logger.error(ERR_COULD_NOT_START_REPLICATION, configuration.dn(),
217          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
218    }
219    return null;
220  }
221
222  /**
223   * Creates a new domain from its configEntry, do the necessary initialization
224   * and starts it so that it is fully operational when this method returns. It
225   * is only used for tests so far.
226   *
227   * @param configuration The entry with the configuration of this domain.
228   * @param queue         The BlockingQueue that this domain will use.
229   *
230   * @return              The domain created.
231   *
232   * @throws ConfigException When the configuration is not valid.
233   */
234  static LDAPReplicationDomain createNewDomain(
235      ReplicationDomainCfg configuration,
236      BlockingQueue<UpdateToReplay> queue)
237      throws ConfigException
238  {
239    final LDAPReplicationDomain domain =
240        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
241    domains.put(domain.getBaseDN(), domain);
242    return domain;
243  }
244
245  /**
246   * Deletes a domain.
247   * @param dn : the base DN of the domain to delete.
248   */
249  public static void deleteDomain(DN dn)
250  {
251    LDAPReplicationDomain domain = domains.remove(dn);
252    if (domain != null)
253    {
254      domain.delete();
255    }
256
257    // No replay threads running if no replication need
258    if (domains.isEmpty()) {
259      stopReplayThreads();
260    }
261  }
262
263  /** {@inheritDoc} */
264  @Override
265  public void initializeSynchronizationProvider(
266      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
267  {
268    domains.clear();
269    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
270
271    // Register as an add and delete listener with the root configuration so we
272    // can be notified if Multimaster domain entries are added or removed.
273    cfg.addReplicationDomainAddListener(this);
274    cfg.addReplicationDomainDeleteListener(this);
275
276    // Register as a root configuration listener so that we can be notified if
277    // number of replay threads is changed and apply changes.
278    cfg.addReplicationChangeListener(this);
279
280    replayThreadNumber = cfg.getNumUpdateReplayThreads();
281    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
282
283    //  Create the list of domains that are already defined.
284    for (String name : cfg.listReplicationDomains())
285    {
286      createNewDomain(cfg.getReplicationDomain(name));
287    }
288
289    // If any schema changes were made with the server offline, then handle them now.
290    List<Modification> offlineSchemaChanges =
291         DirectoryServer.getOfflineSchemaChanges();
292    if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
293    {
294      processSchemaChange(offlineSchemaChanges);
295    }
296
297    DirectoryServer.registerBackupTaskListener(this);
298    DirectoryServer.registerRestoreTaskListener(this);
299    DirectoryServer.registerExportTaskListener(this);
300    DirectoryServer.registerImportTaskListener(this);
301
302    DirectoryServer.registerSupportedControl(
303        ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL);
304  }
305
306  /**
307   * Create the threads that will wait for incoming update messages.
308   */
309  private static synchronized void createReplayThreads()
310  {
311    replayThreads.clear();
312
313    for (int i = 0; i < replayThreadNumber; i++)
314    {
315      ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
316      replayThread.start();
317      replayThreads.add(replayThread);
318    }
319  }
320
321  /**
322   * Stop the threads that are waiting for incoming update messages.
323   */
324  private static synchronized void stopReplayThreads()
325  {
326    //  stop the replay threads
327    for (ReplayThread replayThread : replayThreads)
328    {
329      replayThread.shutdown();
330    }
331
332    for (ReplayThread replayThread : replayThreads)
333    {
334      try
335      {
336        replayThread.join();
337      }
338      catch(InterruptedException e)
339      {
340        Thread.currentThread().interrupt();
341      }
342    }
343    replayThreads.clear();
344  }
345
346  /** {@inheritDoc} */
347  @Override
348  public boolean isConfigurationAddAcceptable(
349      ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
350  {
351    return LDAPReplicationDomain.isConfigurationAcceptable(
352      configuration, unacceptableReasons);
353  }
354
355  /** {@inheritDoc} */
356  @Override
357  public ConfigChangeResult applyConfigurationAdd(
358     ReplicationDomainCfg configuration)
359  {
360    ConfigChangeResult ccr = new ConfigChangeResult();
361    try
362    {
363      LDAPReplicationDomain rd = createNewDomain(configuration);
364      if (State.RUNNING.equals(state.get()))
365      {
366        rd.start();
367        if (State.STOPPING.equals(state.get())) {
368          rd.shutdown();
369        }
370      }
371    } catch (ConfigException e)
372    {
373      // we should never get to this point because the configEntry has
374      // already been validated in isConfigurationAddAcceptable()
375      ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION);
376    }
377    return ccr;
378  }
379
380  /** {@inheritDoc} */
381  @Override
382  public void doPostOperation(PostOperationAddOperation addOperation)
383  {
384    DN dn = addOperation.getEntryDN();
385    genericPostOperation(addOperation, dn);
386  }
387
388
389  /** {@inheritDoc} */
390  @Override
391  public void doPostOperation(PostOperationDeleteOperation deleteOperation)
392  {
393    DN dn = deleteOperation.getEntryDN();
394    genericPostOperation(deleteOperation, dn);
395  }
396
397  /** {@inheritDoc} */
398  @Override
399  public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation)
400  {
401    DN dn = modifyDNOperation.getEntryDN();
402    genericPostOperation(modifyDNOperation, dn);
403  }
404
405  /** {@inheritDoc} */
406  @Override
407  public void doPostOperation(PostOperationModifyOperation modifyOperation)
408  {
409    DN dn = modifyOperation.getEntryDN();
410    genericPostOperation(modifyOperation, dn);
411  }
412
413  /** {@inheritDoc} */
414  @Override
415  public SynchronizationProviderResult handleConflictResolution(
416      PreOperationModifyOperation modifyOperation)
417  {
418    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
419    if (domain != null)
420    {
421      return domain.handleConflictResolution(modifyOperation);
422    }
423    return new SynchronizationProviderResult.ContinueProcessing();
424  }
425
426  /** {@inheritDoc} */
427  @Override
428  public SynchronizationProviderResult handleConflictResolution(
429      PreOperationAddOperation addOperation) throws DirectoryException
430  {
431    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
432    if (domain != null)
433    {
434      return domain.handleConflictResolution(addOperation);
435    }
436    return new SynchronizationProviderResult.ContinueProcessing();
437  }
438
439  /** {@inheritDoc} */
440  @Override
441  public SynchronizationProviderResult handleConflictResolution(
442      PreOperationDeleteOperation deleteOperation) throws DirectoryException
443  {
444    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
445    if (domain != null)
446    {
447      return domain.handleConflictResolution(deleteOperation);
448    }
449    return new SynchronizationProviderResult.ContinueProcessing();
450  }
451
452  /** {@inheritDoc} */
453  @Override
454  public SynchronizationProviderResult handleConflictResolution(
455      PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
456  {
457    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
458    if (domain != null)
459    {
460      return domain.handleConflictResolution(modifyDNOperation);
461    }
462    return new SynchronizationProviderResult.ContinueProcessing();
463  }
464
465  /** {@inheritDoc} */
466  @Override
467  public SynchronizationProviderResult
468         doPreOperation(PreOperationModifyOperation modifyOperation)
469  {
470    DN operationDN = modifyOperation.getEntryDN();
471    LDAPReplicationDomain domain = findDomain(operationDN, modifyOperation);
472
473    if (domain == null || !domain.solveConflict())
474    {
475      return new SynchronizationProviderResult.ContinueProcessing();
476    }
477
478    EntryHistorical historicalInformation = (EntryHistorical)
479      modifyOperation.getAttachment(EntryHistorical.HISTORICAL);
480    if (historicalInformation == null)
481    {
482      Entry entry = modifyOperation.getModifiedEntry();
483      historicalInformation = EntryHistorical.newInstanceFromEntry(entry);
484      modifyOperation.setAttachment(EntryHistorical.HISTORICAL,
485          historicalInformation);
486    }
487    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
488    historicalInformation.setHistoricalAttrToOperation(modifyOperation);
489
490    if (modifyOperation.getModifications().isEmpty())
491    {
492      /*
493       * This operation becomes a no-op due to conflict resolution
494       * stop the processing and send an OK result
495       */
496      return new SynchronizationProviderResult.StopProcessing(
497          ResultCode.SUCCESS, null);
498    }
499
500    return new SynchronizationProviderResult.ContinueProcessing();
501  }
502
503  /** {@inheritDoc} */
504  @Override
505  public SynchronizationProviderResult doPreOperation(
506         PreOperationDeleteOperation deleteOperation) throws DirectoryException
507  {
508    return new SynchronizationProviderResult.ContinueProcessing();
509  }
510
511  /** {@inheritDoc} */
512  @Override
513  public SynchronizationProviderResult doPreOperation(
514         PreOperationModifyDNOperation modifyDNOperation)
515         throws DirectoryException
516  {
517    DN operationDN = modifyDNOperation.getEntryDN();
518    LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
519
520    if (domain == null || !domain.solveConflict())
521    {
522      return new SynchronizationProviderResult.ContinueProcessing();
523    }
524
525    // The historical object is retrieved from the attachment created
526    // in the HandleConflictResolution phase.
527    EntryHistorical historicalInformation = (EntryHistorical)
528    modifyDNOperation.getAttachment(EntryHistorical.HISTORICAL);
529    if (historicalInformation == null)
530    {
531      // When no Historical attached, create once by loading from the entry
532      // and attach it to the operation
533      Entry entry = modifyDNOperation.getUpdatedEntry();
534      historicalInformation = EntryHistorical.newInstanceFromEntry(entry);
535      modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL,
536          historicalInformation);
537    }
538    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
539
540    // Add to the operation the historical attribute : "dn:changeNumber:moddn"
541    historicalInformation.setHistoricalAttrToOperation(modifyDNOperation);
542
543    return new SynchronizationProviderResult.ContinueProcessing();
544  }
545
546  /** {@inheritDoc} */
547  @Override
548  public SynchronizationProviderResult doPreOperation(
549         PreOperationAddOperation addOperation)
550  {
551    // Check replication domain
552    LDAPReplicationDomain domain =
553      findDomain(addOperation.getEntryDN(), addOperation);
554    if (domain == null)
555    {
556      return new SynchronizationProviderResult.ContinueProcessing();
557    }
558
559    // For LOCAL op only, generate CSN and attach Context
560    if (!addOperation.isSynchronizationOperation())
561    {
562      domain.doPreOperation(addOperation);
563    }
564
565    // Add to the operation the historical attribute : "dn:changeNumber:add"
566    EntryHistorical.setHistoricalAttrToOperation(addOperation);
567
568    return new SynchronizationProviderResult.ContinueProcessing();
569  }
570
571  /** {@inheritDoc} */
572  @Override
573  public void finalizeSynchronizationProvider()
574  {
575    setState(State.STOPPING);
576
577    for (LDAPReplicationDomain domain : domains.values())
578    {
579      domain.shutdown();
580    }
581    domains.clear();
582
583    stopReplayThreads();
584
585    if (replicationServerListener != null)
586    {
587      replicationServerListener.shutdown();
588    }
589
590    DirectoryServer.deregisterBackupTaskListener(this);
591    DirectoryServer.deregisterRestoreTaskListener(this);
592    DirectoryServer.deregisterExportTaskListener(this);
593    DirectoryServer.deregisterImportTaskListener(this);
594  }
595
596  /**
597   * This method is called whenever the server detects a modification
598   * of the schema done by directly modifying the backing files
599   * of the schema backend.
600   * Call the schema Domain if it exists.
601   *
602   * @param  modifications  The list of modifications that was
603   *                                      applied to the schema.
604   *
605   */
606  @Override
607  public void processSchemaChange(List<Modification> modifications)
608  {
609    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
610    if (domain != null)
611    {
612      domain.synchronizeSchemaModifications(modifications);
613    }
614  }
615
616  /** {@inheritDoc} */
617  @Override
618  public void processBackupBegin(Backend backend, BackupConfig config)
619  {
620    for (DN dn : backend.getBaseDNs())
621    {
622      LDAPReplicationDomain domain = findDomain(dn, null);
623      if (domain != null)
624      {
625        domain.backupStart();
626      }
627    }
628  }
629
630  /** {@inheritDoc} */
631  @Override
632  public void processBackupEnd(Backend backend, BackupConfig config,
633                               boolean successful)
634  {
635    for (DN dn : backend.getBaseDNs())
636    {
637      LDAPReplicationDomain domain = findDomain(dn, null);
638      if (domain != null)
639      {
640        domain.backupEnd();
641      }
642    }
643  }
644
645  /** {@inheritDoc} */
646  @Override
647  public void processRestoreBegin(Backend backend, RestoreConfig config)
648  {
649    for (DN dn : backend.getBaseDNs())
650    {
651      LDAPReplicationDomain domain = findDomain(dn, null);
652      if (domain != null)
653      {
654        domain.disable();
655      }
656    }
657  }
658
659  /** {@inheritDoc} */
660  @Override
661  public void processRestoreEnd(Backend backend, RestoreConfig config,
662                                boolean successful)
663  {
664    for (DN dn : backend.getBaseDNs())
665    {
666      LDAPReplicationDomain domain = findDomain(dn, null);
667      if (domain != null)
668      {
669        domain.enable();
670      }
671    }
672  }
673
674  /** {@inheritDoc} */
675  @Override
676  public void processImportBegin(Backend backend, LDIFImportConfig config)
677  {
678    for (DN dn : backend.getBaseDNs())
679    {
680      LDAPReplicationDomain domain = findDomain(dn, null);
681      if (domain != null)
682      {
683        domain.disable();
684      }
685    }
686  }
687
688  /** {@inheritDoc} */
689  @Override
690  public void processImportEnd(Backend backend, LDIFImportConfig config,
691                               boolean successful)
692  {
693    for (DN dn : backend.getBaseDNs())
694    {
695      LDAPReplicationDomain domain = findDomain(dn, null);
696      if (domain != null)
697      {
698        domain.enable();
699      }
700    }
701  }
702
703  /** {@inheritDoc} */
704  @Override
705  public void processExportBegin(Backend backend, LDIFExportConfig config)
706  {
707    for (DN dn : backend.getBaseDNs())
708    {
709      LDAPReplicationDomain domain = findDomain(dn, null);
710      if (domain != null)
711      {
712        domain.backupStart();
713      }
714    }
715  }
716
717  /** {@inheritDoc} */
718  @Override
719  public void processExportEnd(Backend backend, LDIFExportConfig config,
720                               boolean successful)
721  {
722    for (DN dn : backend.getBaseDNs())
723    {
724      LDAPReplicationDomain domain = findDomain(dn, null);
725      if (domain != null)
726      {
727        domain.backupEnd();
728      }
729    }
730  }
731
732  /** {@inheritDoc} */
733  @Override
734  public ConfigChangeResult applyConfigurationDelete(
735      ReplicationDomainCfg configuration)
736  {
737    deleteDomain(configuration.getBaseDN());
738
739    return new ConfigChangeResult();
740  }
741
742  /** {@inheritDoc} */
743  @Override
744  public boolean isConfigurationDeleteAcceptable(
745      ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
746  {
747    return true;
748  }
749
750  /**
751   * Generic code for all the postOperation entry point.
752   *
753   * @param operation The Operation for which the post-operation is called.
754   * @param dn The Dn for which the post-operation is called.
755   */
756  private void genericPostOperation(PostOperationOperation operation, DN dn)
757  {
758    LDAPReplicationDomain domain = findDomain(dn, operation);
759    if (domain != null) {
760      domain.synchronize(operation);
761    }
762  }
763
764  /**
765   * Returns the replication server listener associated to that Multimaster
766   * Replication.
767   * @return the listener.
768   */
769  public ReplicationServerListener getReplicationServerListener()
770  {
771    return replicationServerListener;
772  }
773
774  /** {@inheritDoc} */
775  @Override
776  public boolean isConfigurationChangeAcceptable(
777      ReplicationSynchronizationProviderCfg configuration,
778      List<LocalizableMessage> unacceptableReasons)
779  {
780    return true;
781  }
782
783  /** {@inheritDoc} */
784  @Override
785  public ConfigChangeResult applyConfigurationChange(
786      ReplicationSynchronizationProviderCfg configuration)
787  {
788    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
789
790    // Stop threads then restart new number of threads
791    stopReplayThreads();
792    replayThreadNumber = numUpdateRepayThread;
793    if (!domains.isEmpty())
794    {
795      createReplayThreads();
796    }
797
798    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
799        Integer.MAX_VALUE);
800
801    return new ConfigChangeResult();
802  }
803
804  /** {@inheritDoc} */
805  @Override
806  public void completeSynchronizationProvider()
807  {
808    for (LDAPReplicationDomain domain : domains.values())
809    {
810      domain.start();
811    }
812    setState(State.RUNNING);
813  }
814
815  private void setState(State newState)
816  {
817    state.set(newState);
818    synchronized (state)
819    {
820      state.notifyAll();
821    }
822  }
823
824  /**
825   * Gets the number of handled domain objects.
826   * @return The number of handled domain objects
827   */
828  public static int getNumberOfDomains()
829  {
830    return domains.size();
831  }
832
833  /**
834   * Gets the Set of domain baseDN which are disabled for the external changelog.
835   *
836   * @return The Set of domain baseDNs which are disabled for the external changelog.
837   * @throws DirectoryException
838   *            if a problem occurs
839   */
840  public static Set<DN> getExcludedChangelogDomains() throws DirectoryException
841  {
842    final Set<DN> disabledBaseDNs = new HashSet<>(domains.size() + 1);
843    disabledBaseDNs.add(DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT));
844    for (LDAPReplicationDomain domain : domains.values())
845    {
846      if (!domain.isECLEnabled())
847      {
848        disabledBaseDNs.add(domain.getBaseDN());
849      }
850    }
851    return disabledBaseDNs;
852  }
853
854  /**
855   * Returns whether the provided baseDN represents a replication domain enabled
856   * for the external changelog.
857   *
858   * @param baseDN
859   *          the replication domain to check
860   * @return true if the provided baseDN is enabled for the external changelog,
861   *         false if the provided baseDN is disabled for the external changelog
862   *         or unknown to multimaster replication.
863   */
864  public static boolean isECLEnabledDomain(DN baseDN)
865  {
866    if (State.STARTING.equals(state.get()))
867    {
868      synchronized (state)
869      {
870        while (State.STARTING.equals(state.get()))
871        {
872          try
873          {
874            state.wait();
875          }
876          catch (InterruptedException ignored)
877          {
878            // loop and check state again
879          }
880        }
881      }
882    }
883    // if state is STOPPING, then we need to return from this method
884    final LDAPReplicationDomain domain = domains.get(baseDN);
885    return domain != null && domain.isECLEnabled();
886  }
887
888  /**
889   * Returns the connection timeout in milli-seconds.
890   *
891   * @return The connection timeout in milli-seconds.
892   */
893  public static int getConnectionTimeoutMS()
894  {
895    return connectionTimeoutMS;
896  }
897
898}