001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2008-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.service;
028
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.replication.common.AssuredMode.*;
031import static org.opends.server.replication.common.StatusMachine.*;
032import static org.opends.server.util.CollectionUtils.*;
033
034import java.io.BufferedOutputStream;
035import java.io.IOException;
036import java.io.InputStream;
037import java.io.OutputStream;
038import java.net.SocketTimeoutException;
039import java.util.*;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.TimeoutException;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.concurrent.atomic.AtomicReference;
044
045import org.forgerock.i18n.LocalizableMessage;
046import org.forgerock.i18n.slf4j.LocalizedLogger;
047import org.forgerock.opendj.config.server.ConfigException;
048import org.forgerock.opendj.ldap.ResultCode;
049import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
050import org.opends.server.admin.std.server.ReplicationDomainCfg;
051import org.opends.server.api.DirectoryThread;
052import org.opends.server.backends.task.Task;
053import org.opends.server.replication.common.*;
054import org.opends.server.replication.protocol.*;
055import org.opends.server.tasks.InitializeTargetTask;
056import org.opends.server.tasks.InitializeTask;
057import org.opends.server.types.Attribute;
058import org.opends.server.types.DN;
059import org.opends.server.types.DirectoryException;
060
061/**
062 * This class should be used as a base for Replication implementations.
063 * <p>
064 * It is intended that developer in need of a replication mechanism
065 * subclass this class with their own implementation.
066 * <p>
067 *   The startup phase of the ReplicationDomain subclass,
068 *   should read the list of replication servers from the configuration,
069 *   instantiate a {@link ServerState} then start the publish service
070 *   by calling {@link #startPublishService()}.
071 *   At this point it can start calling the {@link #publish(UpdateMsg)}
072 *   method if needed.
073 * <p>
074 *   When the startup phase reach the point when the subclass is ready
075 *   to handle updates the Replication Domain implementation should call the
076 *   {@link #startListenService()} method.
077 *   At this point a Listener thread is created on the Replication Service
078 *   and which can start receiving updates.
079 * <p>
080 *   When updates are received the Replication Service calls the
081 *   {@link #processUpdate(UpdateMsg)} method.
082 *   ReplicationDomain implementation should implement the appropriate code
083 *   for replaying the update on the local repository.
084 *   When fully done the subclass must call the
085 *   {@link #processUpdateDone(UpdateMsg, String)} method.
086 *   This allows to process the update asynchronously if necessary.
087 *
088 * <p>
089 *   To propagate changes to other replica, a ReplicationDomain implementation
090 *   must use the {@link #publish(UpdateMsg)} method.
091 * <p>
092 *   If the Full Initialization process is needed then implementation
093 *   for {@code importBackend(InputStream)} and
094 *   {@code exportBackend(OutputStream)} must be
095 *   provided.
096 * <p>
097 *   Full Initialization of a replica can be triggered by LDAP clients
098 *   by creating InitializeTasks or InitializeTargetTask.
099 *   Full initialization can also be triggered from the ReplicationDomain
100 *   implementation using methods {@link #initializeRemote(int, Task)}
101 *   or {@link #initializeFromRemote(int, Task)}.
102 * <p>
103 *   At shutdown time, the {@link #disableService()} method should be called to
104 *   cleanly stop the replication service.
105 */
106public abstract class ReplicationDomain
107{
108
109  /**
110   * Contains all the attributes included for the ECL (External Changelog).
111   */
112  // @Immutable
113  private static final class ECLIncludes
114  {
115
116    final Map<Integer, Set<String>> includedAttrsByServer;
117    final Set<String> includedAttrsAllServers;
118
119    final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
120    final Set<String> includedAttrsForDeletesAllServers;
121
122    private ECLIncludes(
123        Map<Integer, Set<String>> includedAttrsByServer,
124        Set<String> includedAttrsAllServers,
125        Map<Integer, Set<String>> includedAttrsForDeletesByServer,
126        Set<String> includedAttrsForDeletesAllServers)
127    {
128      this.includedAttrsByServer = includedAttrsByServer;
129      this.includedAttrsAllServers = includedAttrsAllServers;
130
131      this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
132      this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
133    }
134
135    @SuppressWarnings("unchecked")
136    public ECLIncludes()
137    {
138      this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
139          Collections.EMPTY_SET);
140    }
141
142    /**
143     * Add attributes to be included in the ECL.
144     *
145     * @param serverId
146     *          Server where these attributes are configured.
147     * @param includeAttributes
148     *          Attributes to be included with all change records, may include
149     *          wild-cards.
150     * @param includeAttributesForDeletes
151     *          Additional attributes to be included with delete change records,
152     *          may include wild-cards.
153     * @return a new {@link ECLIncludes} object if included attributes have
154     *         changed, or the current object otherwise.
155     */
156    public ECLIncludes addIncludedAttributes(int serverId,
157        Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
158    {
159      boolean configurationChanged = false;
160
161      Set<String> s1 = new HashSet<>(includeAttributes);
162
163      // Combine all+delete attributes.
164      Set<String> s2 = new HashSet<>(s1);
165      s2.addAll(includeAttributesForDeletes);
166
167      Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
168      if (!s1.equals(this.includedAttrsByServer.get(serverId)))
169      {
170        configurationChanged = true;
171        eclIncludesByServer = new HashMap<>(this.includedAttrsByServer);
172        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
173      }
174
175      Map<Integer, Set<String>> eclIncludesForDeletesByServer = this.includedAttrsForDeletesByServer;
176      if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
177      {
178        configurationChanged = true;
179        eclIncludesForDeletesByServer = new HashMap<>(this.includedAttrsForDeletesByServer);
180        eclIncludesForDeletesByServer.put(serverId, Collections.unmodifiableSet(s2));
181      }
182
183      if (!configurationChanged)
184      {
185        return this;
186      }
187
188      // and rebuild the global list to be ready for usage
189      Set<String> eclIncludesAllServer = new HashSet<>();
190      for (Set<String> attributes : eclIncludesByServer.values())
191      {
192        eclIncludesAllServer.addAll(attributes);
193      }
194
195      Set<String> eclIncludesForDeletesAllServer = new HashSet<>();
196      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
197      {
198        eclIncludesForDeletesAllServer.addAll(attributes);
199      }
200      return new ECLIncludes(eclIncludesByServer,
201          Collections.unmodifiableSet(eclIncludesAllServer),
202          eclIncludesForDeletesByServer,
203          Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
204    }
205  }
206
207  /**
208   * Current status for this replicated domain.
209   */
210  private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
211  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
212
213  /** The configuration of the replication domain. */
214  protected volatile ReplicationDomainCfg config;
215  /**
216   * The assured configuration of the replication domain. It is a duplicate of
217   * {@link #config} because of its update model.
218   *
219   * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
220   */
221  private volatile ReplicationDomainCfg assuredConfig;
222
223  /**
224   * The ReplicationBroker that is used by this ReplicationDomain to
225   * connect to the ReplicationService.
226   */
227  protected ReplicationBroker broker;
228
229  /**
230   * This Map is used to store all outgoing assured messages in order
231   * to be able to correlate all the coming back acks to the original
232   * operation.
233   */
234  private final Map<CSN, UpdateMsg> waitingAckMsgs = new ConcurrentHashMap<>();
235  /**
236   * The context related to an import or export being processed
237   * Null when none is being processed.
238   */
239  private final AtomicReference<ImportExportContext> importExportContext = new AtomicReference<>();
240
241  /**
242   * The Thread waiting for incoming update messages for this domain and pushing
243   * them to the global incoming update message queue for later processing by
244   * replay threads.
245   */
246  private volatile DirectoryThread listenerThread;
247
248  /** A set of counters used for Monitoring. */
249  private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
250  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
251  private AtomicInteger numSentUpdates = new AtomicInteger(0);
252
253  /** Assured replication monitoring counters. */
254
255  /** Number of updates sent in Assured Mode, Safe Read. */
256  private AtomicInteger assuredSrSentUpdates = new AtomicInteger(0);
257  /**
258   * Number of updates sent in Assured Mode, Safe Read, that have been
259   * successfully acknowledged.
260   */
261  private AtomicInteger assuredSrAcknowledgedUpdates = new AtomicInteger(0);
262  /**
263   * Number of updates sent in Assured Mode, Safe Read, that have not been
264   * successfully acknowledged (either because of timeout, wrong status or error
265   * at replay).
266   */
267  private AtomicInteger assuredSrNotAcknowledgedUpdates = new AtomicInteger(0);
268  /**
269   * Number of updates sent in Assured Mode, Safe Read, that have not been
270   * successfully acknowledged because of timeout.
271   */
272  private AtomicInteger assuredSrTimeoutUpdates = new AtomicInteger(0);
273  /**
274   * Number of updates sent in Assured Mode, Safe Read, that have not been
275   * successfully acknowledged because of wrong status.
276   */
277  private AtomicInteger assuredSrWrongStatusUpdates = new AtomicInteger(0);
278  /**
279   * Number of updates sent in Assured Mode, Safe Read, that have not been
280   * successfully acknowledged because of replay error.
281   */
282  private AtomicInteger assuredSrReplayErrorUpdates = new AtomicInteger(0);
283  /**
284   * Multiple values allowed: number of updates sent in Assured Mode, Safe Read,
285   * that have not been successfully acknowledged (either because of timeout,
286   * wrong status or error at replay) for a particular server (DS or RS).
287   * <p>
288   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
289   */
290  private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = new HashMap<>();
291  /** Number of updates received in Assured Mode, Safe Read request. */
292  private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
293  /**
294   * Number of updates received in Assured Mode, Safe Read request that we have
295   * acked without errors.
296   */
297  private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
298  /**
299   * Number of updates received in Assured Mode, Safe Read request that we have
300   * acked with errors.
301   */
302  private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
303  /** Number of updates sent in Assured Mode, Safe Data. */
304  private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
305  /**
306   * Number of updates sent in Assured Mode, Safe Data, that have been
307   * successfully acknowledged.
308   */
309  private AtomicInteger assuredSdAcknowledgedUpdates = new AtomicInteger(0);
310  /**
311   * Number of updates sent in Assured Mode, Safe Data, that have not been
312   * successfully acknowledged because of timeout.
313   */
314  private AtomicInteger assuredSdTimeoutUpdates = new AtomicInteger(0);
315  /**
316   * Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
317   * that have not been successfully acknowledged because of timeout for a
318   * particular RS.
319   * <p>
320   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
321   */
322  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = new HashMap<>();
323
324  /* Status related monitoring fields */
325
326  /**
327   * Indicates the date when the status changed. This may be used to indicate
328   * the date the session with the current replication server started (when
329   * status is NORMAL for instance). All the above assured monitoring fields
330   * are also reset each time the status is changed
331   */
332  private Date lastStatusChangeDate = new Date();
333
334  /**
335   * The state maintained by the Concrete Class.
336   */
337  private final ServerState state;
338
339  /**
340   * The generator that will be used to generate {@link CSN}
341   * for this domain.
342   */
343  private final CSNGenerator generator;
344
345  private final AtomicReference<ECLIncludes> eclIncludes = new AtomicReference<>(new ECLIncludes());
346
347  /**
348   * An object used to protect the initialization of the underlying broker
349   * session of this ReplicationDomain.
350   */
351  private final Object sessionLock = new Object();
352
353  /**
354   * The generationId for this replication domain. It is made of a hash of the
355   * 1000 first entries for this domain.
356   */
357  protected volatile long generationId;
358
359  /**
360   * Returns the {@link CSNGenerator} that will be used to
361   * generate {@link CSN} for this domain.
362   *
363   * @return The {@link CSNGenerator} that will be used to
364   *         generate {@link CSN} for this domain.
365   */
366  public CSNGenerator getGenerator()
367  {
368    return generator;
369  }
370
371  /**
372   * Creates a ReplicationDomain with the provided parameters.
373   *
374   * @param config
375   *          The configuration object for this ReplicationDomain
376   * @param generationId
377   *          the generation of this ReplicationDomain
378   */
379  public ReplicationDomain(ReplicationDomainCfg config, long generationId)
380  {
381    this(config, generationId, new ServerState());
382  }
383
384  /**
385   * Creates a ReplicationDomain with the provided parameters. (for unit test
386   * purpose only)
387   *
388   * @param config
389   *          The configuration object for this ReplicationDomain
390   * @param generationId
391   *          the generation of this ReplicationDomain
392   * @param serverState
393   *          The serverState to use
394   */
395  public ReplicationDomain(ReplicationDomainCfg config, long generationId,
396      ServerState serverState)
397  {
398    this.config = config;
399    this.assuredConfig = config;
400    this.generationId = generationId;
401    this.state = serverState;
402    this.generator = new CSNGenerator(getServerId(), state);
403  }
404
405  /**
406   * Set the initial status of the domain and perform necessary initializations.
407   * This method will be called by the Broker each time the ReplicationBroker
408   * establish a new session to a Replication Server.
409   *
410   * Implementations may override this method when they need to perform
411   * additional computing after session establishment.
412   * The default implementation should be sufficient for ReplicationDomains
413   * that don't need to perform additional computing.
414   *
415   * @param initStatus              The status to enter the state machine with.
416   * @param rsState                 The ServerState of the ReplicationServer
417   *                                with which the session was established.
418   */
419  public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
420  {
421    // Sanity check: is it a valid initial status?
422    if (!isValidInitialStatus(initStatus))
423    {
424      logger.error(ERR_DS_INVALID_INIT_STATUS, initStatus, getBaseDN(), getServerId());
425    }
426    else
427    {
428      status = initStatus;
429    }
430    generator.adjust(state);
431    generator.adjust(rsState);
432  }
433
434  /**
435   * Processes an incoming ChangeStatusMsg. Compute new status according to
436   * given order. Then update domain for being compliant with new status
437   * definition.
438   * @param csMsg The received status message
439   */
440  private void receiveChangeStatus(ChangeStatusMsg csMsg)
441  {
442    if (logger.isTraceEnabled())
443    {
444      logger.trace("Replication domain " + getBaseDN() +
445        " received change status message:\n" + csMsg);
446    }
447
448    ServerStatus reqStatus = csMsg.getRequestedStatus();
449
450    // Translate requested status to a state machine event
451    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
452    if (event == StatusMachineEvent.INVALID_EVENT)
453    {
454      logger.error(ERR_DS_INVALID_REQUESTED_STATUS, reqStatus, getBaseDN(), getServerId());
455      return;
456    }
457
458    // Set the new status to the requested one
459    setNewStatus(event);
460  }
461
462  /**
463   * Called when first connection or disconnection detected.
464   */
465  void toNotConnectedStatus()
466  {
467    // Go into not connected status
468    setNewStatus(StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT);
469  }
470
471  /**
472   * Perform whatever actions are needed to apply properties for being
473   * compliant with new status. Must be called in synchronized section for
474   * status. The new status is already set in status variable.
475   */
476  private void updateDomainForNewStatus()
477  {
478    switch (status)
479    {
480      case FULL_UPDATE_STATUS:
481        // Signal RS we just entered the full update status
482        broker.signalStatusChange(status);
483        break;
484      case NOT_CONNECTED_STATUS:
485      case NORMAL_STATUS:
486      case DEGRADED_STATUS:
487      case BAD_GEN_ID_STATUS:
488        break;
489      default:
490        if (logger.isTraceEnabled())
491        {
492          logger.trace("updateDomainForNewStatus: unexpected status: " + status);
493        }
494    }
495  }
496
497  /**
498   * Gets the status for this domain.
499   * @return The status for this domain.
500   */
501  public ServerStatus getStatus()
502  {
503    return status;
504  }
505
506  /**
507   * Returns the base DN of this ReplicationDomain. All Replication Domain using
508   * this baseDN will be connected through the Replication Service.
509   *
510   * @return The base DN of this ReplicationDomain
511   */
512  public DN getBaseDN()
513  {
514    return config.getBaseDN();
515  }
516
517  /**
518   * Get the server ID. The identifier of this Replication Domain inside the
519   * Replication Service. Each Domain must use a unique ServerID.
520   *
521   * @return The server ID.
522   */
523  public int getServerId()
524  {
525    return config.getServerId();
526  }
527
528  /**
529   * Window size used during initialization .. between - the
530   * initializer/exporter DS that listens/waits acknowledges and that slows down
531   * data msg publishing based on the slowest server - and each
532   * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
533   * received.
534   *
535   * @return the initWindow
536   */
537  protected int getInitWindow()
538  {
539    return config.getInitializationWindowSize();
540  }
541
542  /**
543   * Tells if assured replication is enabled for this domain.
544   * @return True if assured replication is enabled for this domain.
545   */
546  public boolean isAssured()
547  {
548    return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
549        || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
550  }
551
552  /**
553   * Gives the mode for the assured replication of the domain. Only used when
554   * assured is true).
555   *
556   * @return The mode for the assured replication of the domain.
557   */
558  public AssuredMode getAssuredMode()
559  {
560    switch (assuredConfig.getAssuredType())
561    {
562    case SAFE_DATA:
563    case NOT_ASSURED: // The assured mode will be ignored in that case anyway
564      return AssuredMode.SAFE_DATA_MODE;
565    case SAFE_READ:
566      return AssuredMode.SAFE_READ_MODE;
567    }
568    return null; // should never happen
569  }
570
571  /**
572   * Gives the assured Safe Data level of the replication of the domain. (used
573   * when assuredMode is SAFE_DATA).
574   *
575   * @return The assured level of the replication of the domain.
576   */
577  public byte getAssuredSdLevel()
578  {
579    return (byte) assuredConfig.getAssuredSdLevel();
580  }
581
582  /**
583   * Gives the assured timeout of the replication of the domain (in ms).
584   * @return The assured timeout of the replication of the domain.
585   */
586  public long getAssuredTimeout()
587  {
588    return assuredConfig.getAssuredTimeout();
589  }
590
591  /**
592   * Gets the group id for this domain.
593   * @return The group id for this domain.
594   */
595  public byte getGroupId()
596  {
597    return (byte) config.getGroupId();
598  }
599
600  /**
601   * Gets the referrals URLs this domain publishes. Referrals urls to be
602   * published to other servers of the topology.
603   * <p>
604   * TODO: fill that with all currently opened urls if no urls configured
605   *
606   * @return The referrals URLs this domain publishes.
607   */
608  public Set<String> getRefUrls()
609  {
610    return config.getReferralsUrl();
611  }
612
613  /**
614   * Gets the info for Replicas in the topology (except us).
615   * @return The info for Replicas in the topology (except us)
616   */
617  public Map<Integer, DSInfo> getReplicaInfos()
618  {
619    return broker.getReplicaInfos();
620  }
621
622  /**
623   * Returns information about the DS server related to the provided serverId.
624   * based on the TopologyMsg we received when the remote replica connected or
625   * disconnected. Return null when no server with the provided serverId is
626   * connected.
627   *
628   * @param  dsId The provided serverId of the remote replica
629   * @return the info related to this remote server if it is connected,
630   *                  null is the server is NOT connected.
631   */
632  private DSInfo getConnectedRemoteDS(int dsId)
633  {
634    return getReplicaInfos().get(dsId);
635  }
636
637  /**
638   * Gets the States of all the Replicas currently in the
639   * Topology.
640   * When this method is called, a Monitoring message will be sent
641   * to the Replication Server to which this domain is currently connected
642   * so that it computes a table containing information about
643   * all Directory Servers in the topology.
644   * This Computation involves communications will all the servers
645   * currently connected and
646   *
647   * @return The States of all Replicas in the topology (except us)
648   */
649  public Map<Integer, ServerState> getReplicaStates()
650  {
651    return broker.getReplicaStates();
652  }
653
654  /**
655   * Gets the info for RSs in the topology (except the one we are connected
656   * to).
657   * @return The info for RSs in the topology (except the one we are connected
658   * to)
659   */
660  public List<RSInfo> getRsInfos()
661  {
662    return broker.getRsInfos();
663  }
664
665
666  /**
667   * Gets the server ID of the Replication Server to which the domain
668   * is currently connected.
669   *
670   * @return The server ID of the Replication Server to which the domain
671   *         is currently connected.
672   */
673  public int getRsServerId()
674  {
675    return broker.getRsServerId();
676  }
677
678  /**
679   * Increment the number of processed updates.
680   */
681  private void incProcessedUpdates()
682  {
683    numProcessedUpdates.incrementAndGet();
684  }
685
686  /**
687   * Get the number of updates replayed by the replication.
688   *
689   * @return The number of updates replayed by the replication
690   */
691  int getNumProcessedUpdates()
692  {
693    if (numProcessedUpdates != null)
694    {
695      return numProcessedUpdates.get();
696    }
697    return 0;
698  }
699
700  /**
701   * Get the number of updates received by the replication plugin.
702   *
703   * @return the number of updates received
704   */
705  int getNumRcvdUpdates()
706  {
707    if (numRcvdUpdates != null)
708    {
709      return numRcvdUpdates.get();
710    }
711    return 0;
712  }
713
714  /**
715   * Get the number of updates sent by the replication plugin.
716   *
717   * @return the number of updates sent
718   */
719  int getNumSentUpdates()
720  {
721    if (numSentUpdates != null)
722    {
723      return numSentUpdates.get();
724    }
725    return 0;
726  }
727
728  /**
729   * Receives an update message from the replicationServer.
730   * The other types of messages are processed in an opaque way for the caller.
731   * Also responsible for updating the list of pending changes
732   * @return the received message - null if none
733   */
734  private UpdateMsg receive()
735  {
736    UpdateMsg update = null;
737
738    while (update == null)
739    {
740      InitializeRequestMsg initReqMsg = null;
741      ReplicationMsg msg;
742      try
743      {
744        msg = broker.receive(true, true, false);
745        if (msg == null)
746        {
747          // The server is in the shutdown process
748          return null;
749        }
750
751        if (logger.isTraceEnabled() && !(msg instanceof HeartbeatMsg))
752        {
753          logger.trace("LocalizableMessage received <" + msg + ">");
754        }
755
756        if (msg instanceof AckMsg)
757        {
758          AckMsg ack = (AckMsg) msg;
759          receiveAck(ack);
760        }
761        else if (msg instanceof InitializeRequestMsg)
762        {
763          // Another server requests us to provide entries
764          // for a total update
765          initReqMsg = (InitializeRequestMsg)msg;
766        }
767        else if (msg instanceof InitializeTargetMsg)
768        {
769          // Another server is exporting its entries to us
770          InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
771
772          /*
773          This must be done while we are still holding the broker lock
774          because we are now going to receive a bunch of entries from the
775          remote server and we want the import thread to catch them and
776          not the ListenerThread.
777          */
778          initialize(initTargetMsg, initTargetMsg.getSenderID());
779        }
780        else if (msg instanceof ErrorMsg)
781        {
782          ErrorMsg errorMsg = (ErrorMsg)msg;
783          ImportExportContext ieCtx = importExportContext.get();
784          if (ieCtx != null)
785          {
786            /*
787            This is an error termination for the 2 following cases :
788            - either during an export
789            - or before an import really started
790            For example, when we publish a request and the
791            replicationServer did not find the import source.
792
793            A remote error during the import will be received in the
794            receiveEntryBytes() method.
795            */
796            if (logger.isTraceEnabled())
797            {
798              logger.trace(
799                  "[IE] processErrorMsg:" + getServerId() +
800                  " baseDN: " + getBaseDN() +
801                  " Error Msg received: " + errorMsg);
802            }
803
804            if (errorMsg.getCreationTime() > ieCtx.startTime)
805            {
806              // consider only ErrorMsg that relate to the current import/export
807              processErrorMsg(errorMsg, ieCtx);
808            }
809            else
810            {
811              /*
812              Simply log - happen when the ErrorMsg relates to a previous
813              attempt of initialization while we have started a new one
814              on this side.
815              */
816              logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
817            }
818          }
819          else
820          {
821            // Simply log - happen if import/export has been terminated
822            // on our side before receiving this ErrorMsg.
823            logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
824          }
825        }
826        else if (msg instanceof ChangeStatusMsg)
827        {
828          ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
829          receiveChangeStatus(csMsg);
830        }
831        else if (msg instanceof UpdateMsg)
832        {
833          update = (UpdateMsg) msg;
834          generator.adjust(update.getCSN());
835        }
836        else if (msg instanceof InitializeRcvAckMsg)
837        {
838          ImportExportContext ieCtx = importExportContext.get();
839          if (ieCtx != null)
840          {
841            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
842            ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
843          }
844          // Trash this msg When no input/export is running/should never happen
845        }
846      }
847      catch (SocketTimeoutException e)
848      {
849        // just retry
850      }
851      /*
852      Test if we have received and export request message and
853      if that's the case handle it now.
854      This must be done outside of the portion of code protected
855      by the broker lock so that we keep receiving update
856      when we are doing and export and so that a possible
857      closure of the socket happening when we are publishing the
858      entries to the remote can be handled by the other
859      replay thread when they call this method and therefore the
860      broker.receive() method.
861      */
862      if (initReqMsg != null)
863      {
864        // Do this work in a thread to allow replay thread continue working
865        ExportThread exportThread = new ExportThread(
866            initReqMsg.getSenderID(), initReqMsg.getInitWindow());
867        exportThread.start();
868      }
869    }
870
871    numRcvdUpdates.incrementAndGet();
872    if (update.isAssured()
873        && broker.getRsGroupId() == getGroupId()
874        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
875    {
876      assuredSrReceivedUpdates.incrementAndGet();
877    }
878    return update;
879  }
880
881  /**
882   * Updates the passed monitoring list of errors received for assured messages
883   * (safe data or safe read, depending of the passed list to update) for a
884   * particular server in the list. This increments the counter of error for the
885   * passed server, or creates an initial value of 1 error for it if the server
886   * is not yet present in the map.
887   * @param errorsByServer map of number of errors per serverID
888   * @param sid the ID of the server which produced an error
889   */
890  private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer,
891    Integer sid)
892  {
893    synchronized (errorsByServer)
894    {
895      Integer serverErrCount = errorsByServer.get(sid);
896      if (serverErrCount == null)
897      {
898        // Server not present in list, create an entry with an
899        // initial number of errors set to 1
900        errorsByServer.put(sid, 1);
901      } else
902      {
903        // Server already present in list, just increment number of
904        // errors for the server
905        int val = serverErrCount;
906        val++;
907        errorsByServer.put(sid, val);
908      }
909    }
910  }
911
912  /**
913   * Do the necessary processing when an AckMsg is received.
914   *
915   * @param ack The AckMsg that was received.
916   */
917  private void receiveAck(AckMsg ack)
918  {
919    CSN csn = ack.getCSN();
920
921    // Remove the message for pending ack list (this may already make the thread
922    // that is waiting for the ack be aware of its reception)
923    UpdateMsg update = waitingAckMsgs.remove(csn);
924
925    // Signal waiting thread ack has been received
926    if (update != null)
927    {
928      synchronized (update)
929      {
930        update.notify();
931      }
932
933      // Analyze status of embedded in the ack to see if everything went well
934      boolean hasTimeout = ack.hasTimeout();
935      boolean hasReplayErrors = ack.hasReplayError();
936      boolean hasWrongStatus = ack.hasWrongStatus();
937
938      AssuredMode updateAssuredMode = update.getAssuredMode();
939
940      if ( hasTimeout || hasReplayErrors || hasWrongStatus)
941      {
942        /*
943        Some problems detected: message did not correctly reach every
944        requested servers. Log problem
945        */
946        logger.info(NOTE_DS_RECEIVED_ACK_ERROR, getBaseDN(), getServerId(), update, ack.errorsToString());
947
948        List<Integer> failedServers = ack.getFailedServers();
949
950        // Increment assured replication monitoring counters
951        switch (updateAssuredMode)
952        {
953          case SAFE_READ_MODE:
954            assuredSrNotAcknowledgedUpdates.incrementAndGet();
955            if (hasTimeout)
956            {
957              assuredSrTimeoutUpdates.incrementAndGet();
958            }
959            if (hasReplayErrors)
960            {
961              assuredSrReplayErrorUpdates.incrementAndGet();
962            }
963            if (hasWrongStatus)
964            {
965              assuredSrWrongStatusUpdates.incrementAndGet();
966            }
967            if (failedServers != null) // This should always be the case !
968            {
969              for(Integer sid : failedServers)
970              {
971                updateAssuredErrorsByServer(
972                  assuredSrServerNotAcknowledgedUpdates, sid);
973              }
974            }
975            break;
976          case SAFE_DATA_MODE:
977            // The only possible cause of ack error in safe data mode is timeout
978            if (hasTimeout) // So should always be the case
979            {
980              assuredSdTimeoutUpdates.incrementAndGet();
981            }
982            if (failedServers != null) // This should always be the case !
983            {
984              for(Integer sid : failedServers)
985              {
986                updateAssuredErrorsByServer(
987                  assuredSdServerTimeoutUpdates, sid);
988              }
989            }
990            break;
991          default:
992          // Should not happen
993        }
994      } else
995      {
996        // Update has been acknowledged without errors
997        // Increment assured replication monitoring counters
998        switch (updateAssuredMode)
999        {
1000          case SAFE_READ_MODE:
1001            assuredSrAcknowledgedUpdates.incrementAndGet();
1002            break;
1003          case SAFE_DATA_MODE:
1004            assuredSdAcknowledgedUpdates.incrementAndGet();
1005            break;
1006          default:
1007          // Should not happen
1008        }
1009      }
1010    }
1011  }
1012
1013
1014  /*
1015   * After this point the code is related to the Total Update.
1016   */
1017
1018  /**
1019   * This thread is launched when we want to export data to another server.
1020   *
1021   * When a task is created locally (so this local server is the initiator)
1022   * of the export (Example: dsreplication initialize-all),
1023   * this thread is NOT used but the task thread is running the export instead).
1024   */
1025  private class ExportThread extends DirectoryThread
1026  {
1027    /** Id of server that will be initialized. */
1028    private final int serverIdToInitialize;
1029    private final int initWindow;
1030
1031
1032
1033    /**
1034     * Constructor for the ExportThread.
1035     *
1036     * @param serverIdToInitialize
1037     *          serverId of server that will receive entries
1038     * @param initWindow
1039     *          The value of the initialization window for flow control between
1040     *          the importer and the exporter.
1041     */
1042    public ExportThread(int serverIdToInitialize, int initWindow)
1043    {
1044      super("Export thread from serverId=" + getServerId() + " to serverId="
1045          + serverIdToInitialize);
1046      this.serverIdToInitialize = serverIdToInitialize;
1047      this.initWindow = initWindow;
1048    }
1049
1050
1051
1052    /**
1053     * Run method for this class.
1054     */
1055    @Override
1056    public void run()
1057    {
1058      if (logger.isTraceEnabled())
1059      {
1060        logger.trace("[IE] starting " + getName());
1061      }
1062      try
1063      {
1064        initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
1065            initWindow);
1066      } catch (DirectoryException de)
1067      {
1068        /*
1069        An error message has been sent to the peer
1070        This server is not the initiator of the export so there is
1071        nothing more to do locally.
1072        */
1073      }
1074
1075      if (logger.isTraceEnabled())
1076      {
1077        logger.trace("[IE] ending " + getName());
1078      }
1079    }
1080  }
1081
1082  /**
1083   * This class contains the context related to an import or export launched on
1084   * the domain.
1085   */
1086  protected class ImportExportContext
1087  {
1088    /** The private task that initiated the operation. */
1089    private Task initializeTask;
1090    /** The destination in the case of an export. */
1091    private int exportTarget = RoutableMsg.UNKNOWN_SERVER;
1092    /** The source in the case of an import. */
1093    private int importSource = RoutableMsg.UNKNOWN_SERVER;
1094
1095    /** The total entry count expected to be processed. */
1096    private long entryCount;
1097    /** The count for the entry not yet processed. */
1098    private long entryLeftCount;
1099
1100    /** Exception raised during the initialization. */
1101    private DirectoryException exception;
1102
1103    /** Whether the context is related to an import or an export. */
1104    private final boolean importInProgress;
1105
1106    /** Current counter of messages exchanged during the initialization. */
1107    private int msgCnt;
1108
1109    /**
1110     * Number of connections lost when we start the initialization. Will help
1111     * counting connections lost during initialization,
1112     */
1113    private int initNumLostConnections;
1114
1115    /**
1116     * Request message sent when this server has the initializeFromRemote task.
1117     */
1118    private InitializeRequestMsg initReqMsgSent;
1119
1120    /**
1121     * Start time of the initialization process. ErrorMsg timestamped before
1122     * this startTime will be ignored.
1123     */
1124    private final long startTime;
1125
1126    /** List for replicas (DS) connected to the topology when initialization started. */
1127    private final Set<Integer> startList = new HashSet<>(0);
1128
1129    /**
1130     * List for replicas (DS) with a failure (disconnected from the topology)
1131     * since the initialization started.
1132     */
1133    private final Set<Integer> failureList = new HashSet<>(0);
1134
1135    /**
1136     * Flow control during initialization: for each remote server, counter of
1137     * messages received.
1138     */
1139    private final Map<Integer, Integer> ackVals = new HashMap<>();
1140    /** ServerId of the slowest server (the one with the smallest non null counter). */
1141    private int slowestServerId = -1;
1142
1143    private short exporterProtocolVersion = -1;
1144
1145    /** Window used during this initialization. */
1146    private int initWindow;
1147
1148    /** Number of attempt already done for this initialization. */
1149    private short attemptCnt;
1150
1151    /**
1152     * Creates a new IEContext.
1153     *
1154     * @param importInProgress true if the IEContext will be used
1155     *                         for and import, false if the IEContext
1156     *                         will be used for and export.
1157     */
1158    private ImportExportContext(boolean importInProgress)
1159    {
1160      this.importInProgress = importInProgress;
1161      this.startTime = System.currentTimeMillis();
1162      this.attemptCnt = 0;
1163    }
1164
1165    /**
1166     * Returns a boolean indicating if a total update import is currently in
1167     * Progress.
1168     *
1169     * @return A boolean indicating if a total update import is currently in
1170     *         Progress.
1171     */
1172    boolean importInProgress()
1173    {
1174      return importInProgress;
1175    }
1176
1177    /**
1178     * Returns the total number of entries to be processed when a total update
1179     * is in progress.
1180     *
1181     * @return The total number of entries to be processed when a total update
1182     *         is in progress.
1183     */
1184    long getTotalEntryCount()
1185    {
1186      return entryCount;
1187    }
1188
1189    /**
1190     * Returns the number of entries still to be processed when a total update
1191     * is in progress.
1192     *
1193     * @return The number of entries still to be processed when a total update
1194     *         is in progress.
1195     */
1196    long getLeftEntryCount()
1197    {
1198      return entryLeftCount;
1199    }
1200
1201    /**
1202     * Initializes the import/export counters with the provider value.
1203     * @param total Total number of entries to be processed.
1204     * @throws DirectoryException if an error occurred.
1205     */
1206    private void initializeCounters(long total) throws DirectoryException
1207    {
1208      entryCount = total;
1209      entryLeftCount = total;
1210
1211      if (initializeTask instanceof InitializeTask)
1212      {
1213        final InitializeTask task = (InitializeTask) initializeTask;
1214        task.setTotal(entryCount);
1215        task.setLeft(entryCount);
1216      }
1217      else if (initializeTask instanceof InitializeTargetTask)
1218      {
1219        final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
1220        task.setTotal(entryCount);
1221        task.setLeft(entryCount);
1222      }
1223    }
1224
1225    /**
1226     * Update the counters of the task for each entry processed during
1227     * an import or export.
1228     *
1229     * @param  entriesDone The number of entries that were processed
1230     *                     since the last time this method was called.
1231     *
1232     * @throws DirectoryException if an error occurred.
1233     */
1234    private void updateCounters(int entriesDone) throws DirectoryException
1235    {
1236      entryLeftCount -= entriesDone;
1237
1238      if (initializeTask != null)
1239      {
1240        if (initializeTask instanceof InitializeTask)
1241        {
1242          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
1243        }
1244        else if (initializeTask instanceof InitializeTargetTask)
1245        {
1246          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
1247        }
1248      }
1249    }
1250
1251    /** {@inheritDoc} */
1252    @Override
1253    public String toString()
1254    {
1255      return "[Entry count=" + this.entryCount +
1256             ", Entry left count=" + this.entryLeftCount + "]";
1257    }
1258
1259    /**
1260     * Gets the server id of the exporting server.
1261     * @return the server id of the exporting server.
1262     */
1263    public int getExportTarget()
1264    {
1265      return exportTarget;
1266    }
1267
1268    /**
1269     * Gets the server id of the importing server.
1270     * @return the server id of the importing server.
1271     */
1272    public int getImportSource()
1273    {
1274      return importSource;
1275    }
1276
1277    /**
1278     * Get the exception that occurred during the import/export.
1279     * @return the exception that occurred during the import/export.
1280     */
1281    public DirectoryException getException()
1282    {
1283      return exception;
1284    }
1285
1286    /**
1287     * Set the exception that occurred during the import/export.
1288     * @param exception the exception that occurred during the import/export.
1289     */
1290    public void setException(DirectoryException exception)
1291    {
1292      this.exception = exception;
1293    }
1294
1295    /**
1296     * Only sets the exception that occurred during the import/export if none
1297     * was already set on this object.
1298     *
1299     * @param exception the exception that occurred during the import/export.
1300     */
1301    public void setExceptionIfNoneSet(DirectoryException exception)
1302    {
1303      if (exception == null)
1304      {
1305        this.exception = exception;
1306      }
1307    }
1308
1309    /**
1310     * Set the id of the EntryMsg acknowledged from a receiver (importer)server.
1311     * (updated via the listener thread)
1312     * @param serverId serverId of the acknowledger/receiver/importer server.
1313     * @param numAck   id of the message received.
1314     */
1315    private void setAckVal(int serverId, int numAck)
1316    {
1317      if (logger.isTraceEnabled())
1318      {
1319        logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
1320      }
1321
1322      this.ackVals.put(serverId, numAck);
1323
1324      // Recompute the server with the minAck returned,means the slowest server.
1325      slowestServerId = serverId;
1326      for (Integer sid : importExportContext.get().ackVals.keySet())
1327      {
1328        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
1329        {
1330          slowestServerId = sid;
1331        }
1332      }
1333    }
1334
1335    /**
1336     * Returns the serverId of the server that acknowledged the smallest
1337     * EntryMsg id.
1338     * @return serverId of the server with latest acknowledge.
1339     *                  0 when no ack has been received yet.
1340     */
1341    public int getSlowestServer()
1342    {
1343      if (logger.isTraceEnabled())
1344      {
1345        logger.trace("[IE] getSlowestServer" + slowestServerId
1346            + " " + this.ackVals.get(slowestServerId));
1347      }
1348
1349      return this.slowestServerId;
1350    }
1351
1352  }
1353
1354  /**
1355   * Verifies that the given string represents a valid source
1356   * from which this server can be initialized.
1357   *
1358   * @param targetString The string representing the source
1359   * @return The source as a integer value
1360   * @throws DirectoryException if the string is not valid
1361   */
1362  public int decodeTarget(String targetString) throws DirectoryException
1363  {
1364    if ("all".equalsIgnoreCase(targetString))
1365    {
1366      return RoutableMsg.ALL_SERVERS;
1367    }
1368
1369    // So should be a serverID
1370    try
1371    {
1372      int target = Integer.decode(targetString);
1373      if (target >= 0)
1374      {
1375        // FIXME Could we check now that it is a know server in the domain ?
1376        // JNR: Yes please
1377      }
1378      return target;
1379    }
1380    catch (Exception e)
1381    {
1382      ResultCode resultCode = ResultCode.OTHER;
1383      LocalizableMessage message = ERR_INVALID_EXPORT_TARGET.get();
1384      throw new DirectoryException(resultCode, message, e);
1385    }
1386  }
1387
1388  /**
1389   * Initializes a remote server from this server.
1390   * <p>
1391   * The {@code exportBackend(OutputStream)} will therefore be called
1392   * on this server, and the {@code importBackend(InputStream)}
1393   * will be called on the remote server.
1394   * <p>
1395   * The InputStream and OutputStream given as a parameter to those
1396   * methods will be connected through the replication protocol.
1397   *
1398   * @param target   The server-id of the server that should be initialized.
1399   *                 The target can be discovered using the
1400   *                 {@link #getReplicaInfos()} method.
1401   * @param initTask The task that triggers this initialization and that should
1402   *                 be updated with its progress.
1403   *
1404   * @throws DirectoryException If it was not possible to publish the
1405   *                            Initialization message to the Topology.
1406   */
1407  public void initializeRemote(int target, Task initTask)
1408  throws DirectoryException
1409  {
1410    initializeRemote(target, getServerId(), initTask, getInitWindow());
1411  }
1412
1413  /**
1414   * Process the initialization of some other server or servers in the topology
1415   * specified by the target argument when this initialization specifying the
1416   * server that requests the initialization.
1417   *
1418   * @param serverToInitialize The target server that should be initialized.
1419   * @param serverRunningTheTask The server that initiated the export. It can
1420   * be the serverID of this server, or the serverID of a remote server.
1421   * @param initTask The task in this server that triggers this initialization
1422   * and that should be updated with its progress. Null when the export is done
1423   * following a request coming from a remote server (task is remote).
1424   * @param initWindow The value of the initialization window for flow control
1425   * between the importer and the exporter.
1426   *
1427   * @exception DirectoryException When an error occurs. No exception raised
1428   * means success.
1429   */
1430  protected void initializeRemote(int serverToInitialize,
1431      int serverRunningTheTask, Task initTask, int initWindow)
1432  throws DirectoryException
1433  {
1434    final ImportExportContext ieCtx = acquireIEContext(false);
1435
1436    /*
1437    We manage the list of servers to initialize in order :
1438    - to test at the end that all expected servers have reconnected
1439    after their import and with the right genId
1440    - to update the task with the server(s) where this test failed
1441    */
1442
1443    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
1444    {
1445      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
1446          countEntries(), getBaseDN(), getServerId());
1447
1448      ieCtx.startList.addAll(getReplicaInfos().keySet());
1449
1450      // We manage the list of servers with which a flow control can be enabled
1451      for (DSInfo dsi : getReplicaInfos().values())
1452      {
1453        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
1454        {
1455          ieCtx.setAckVal(dsi.getDsId(), 0);
1456        }
1457      }
1458    }
1459    else
1460    {
1461      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START, countEntries(),
1462          getBaseDN(), getServerId(), serverToInitialize);
1463
1464      ieCtx.startList.add(serverToInitialize);
1465
1466      // We manage the list of servers with which a flow control can be enabled
1467      for (DSInfo dsi : getReplicaInfos().values())
1468      {
1469        if (dsi.getDsId() == serverToInitialize &&
1470            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
1471        {
1472          ieCtx.setAckVal(dsi.getDsId(), 0);
1473        }
1474      }
1475    }
1476
1477    DirectoryException exportRootException = null;
1478
1479    // loop for the case where the exporter is the initiator
1480    int attempt = 0;
1481    boolean done = false;
1482    while (!done && ++attempt < 2) // attempt loop
1483    {
1484      try
1485      {
1486        ieCtx.exportTarget = serverToInitialize;
1487        if (initTask != null)
1488        {
1489          ieCtx.initializeTask = initTask;
1490        }
1491        ieCtx.initializeCounters(countEntries());
1492        ieCtx.msgCnt = 0;
1493        ieCtx.initNumLostConnections = broker.getNumLostConnections();
1494        ieCtx.initWindow = initWindow;
1495
1496        // Send start message to the peer
1497        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
1498            getBaseDN(), getServerId(), serverToInitialize,
1499            serverRunningTheTask, ieCtx.entryCount, initWindow);
1500
1501        broker.publish(initTargetMsg);
1502
1503        // Wait for all servers to be ok
1504        waitForRemoteStartOfInit(ieCtx);
1505
1506        // Servers that left in the list are those for which we could not test
1507        // that they have been successfully initialized.
1508        if (!ieCtx.failureList.isEmpty())
1509        {
1510          throw new DirectoryException(
1511              ResultCode.OTHER,
1512              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDN(), ieCtx.failureList));
1513        }
1514
1515        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
1516
1517        // Notify the peer of the success
1518        broker.publish(
1519            new DoneMsg(getServerId(), initTargetMsg.getDestination()));
1520      }
1521      catch(DirectoryException exportException)
1522      {
1523        // Give priority to the first exception raised - stored in the context
1524        final DirectoryException ieEx = ieCtx.exception;
1525        exportRootException = ieEx != null ? ieEx : exportException;
1526      }
1527
1528      if (logger.isTraceEnabled())
1529      {
1530        logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
1531            + " export ends with connected=" + broker.isConnected()
1532            + " exportRootException=" + exportRootException);
1533      }
1534
1535      if (exportRootException != null)
1536      {
1537        try
1538        {
1539          /*
1540          Handling the errors during export
1541
1542          Note: we could have lost the connection and another thread
1543          the listener one) has already managed to reconnect.
1544          So we MUST rely on the test broker.isConnected()
1545          ONLY to do 'wait to be reconnected by another thread'
1546          (if not yet reconnected already).
1547          */
1548          if (!broker.isConnected())
1549          {
1550            // We are still disconnected, so we wait for the listener thread
1551            // to reconnect - wait 10s
1552            if (logger.isTraceEnabled())
1553            {
1554              logger.trace("[IE] Exporter wait for reconnection by the listener thread");
1555            }
1556            int att=0;
1557            while (!broker.shuttingDown()
1558                && !broker.isConnected()
1559                && ++att < 100)
1560            {
1561              try { Thread.sleep(100); }
1562              catch(Exception e){ /* do nothing */ }
1563            }
1564          }
1565
1566          if (initTask != null
1567              && broker.isConnected()
1568              && serverToInitialize != RoutableMsg.ALL_SERVERS)
1569          {
1570            /*
1571            NewAttempt case : In the case where
1572            - it's not an InitializeAll
1573            - AND the previous export attempt failed
1574            - AND we are (now) connected
1575            - and we own the task and this task is not an InitializeAll
1576            Let's :
1577            - sleep to let time to the other peer to reconnect if needed
1578            - and launch another attempt
1579            */
1580            try { Thread.sleep(1000); }
1581            catch(Exception e){ /* do nothing */ }
1582
1583            logger.info(NOTE_RESENDING_INIT_TARGET, exportRootException.getLocalizedMessage());
1584            continue;
1585          }
1586
1587          broker.publish(new ErrorMsg(
1588              serverToInitialize, exportRootException.getMessageObject()));
1589        }
1590        catch(Exception e)
1591        {
1592          // Ignore the failure raised while proceeding the root failure
1593        }
1594      }
1595
1596      // We are always done for this export ...
1597      // ... except in the NewAttempt case (see above)
1598      done = true;
1599
1600    } // attempt loop
1601
1602    // Wait for all servers to be ok, and build the failure list
1603    waitForRemoteEndOfInit(ieCtx);
1604
1605    // Servers that left in the list are those for which we could not test
1606    // that they have been successfully initialized.
1607    if (!ieCtx.failureList.isEmpty() && exportRootException == null)
1608    {
1609      exportRootException = new DirectoryException(ResultCode.OTHER,
1610              ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(getGenerationID(), ieCtx.failureList));
1611    }
1612
1613    // Don't forget to release IEcontext acquired at beginning.
1614    releaseIEContext(); // FIXME should not this be in a finally?
1615
1616    final String cause = exportRootException == null ? ""
1617        : exportRootException.getLocalizedMessage();
1618    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
1619    {
1620      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL,
1621          getBaseDN(), getServerId(), cause);
1622    }
1623    else
1624    {
1625      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END,
1626          getBaseDN(), getServerId(), serverToInitialize, cause);
1627    }
1628
1629
1630    if (exportRootException != null)
1631    {
1632      throw exportRootException;
1633    }
1634  }
1635
1636  /**
1637   * For all remote servers in the start list:
1638   * - wait it has finished the import and present the expected generationID,
1639   * - build the failureList.
1640   */
1641  private void waitForRemoteStartOfInit(ImportExportContext ieCtx)
1642  {
1643    final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList);
1644
1645    if (logger.isTraceEnabled())
1646    {
1647      logger.trace("[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
1648    }
1649
1650    int waitResultAttempt = 0;
1651    boolean done;
1652    do
1653    {
1654      done = true;
1655      for (DSInfo dsi : getReplicaInfos().values())
1656      {
1657        if (logger.isTraceEnabled())
1658        {
1659          logger.trace(
1660            "[IE] wait for start dsId " + dsi.getDsId()
1661            + " " + dsi.getStatus()
1662            + " " + dsi.getGenerationId()
1663            + " " + getGenerationID());
1664        }
1665        if (ieCtx.startList.contains(dsi.getDsId()))
1666        {
1667          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
1668          {
1669            // this one is still not doing the Full Update ... retry later
1670            done = false;
1671            try { Thread.sleep(100);
1672            }
1673            catch (InterruptedException e) {
1674              Thread.currentThread().interrupt();
1675            }
1676            waitResultAttempt++;
1677            break;
1678          }
1679          else
1680          {
1681            // this one is ok
1682            replicasWeAreWaitingFor.remove(dsi.getDsId());
1683          }
1684        }
1685      }
1686    }
1687    while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
1688
1689    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
1690
1691    if (logger.isTraceEnabled())
1692    {
1693      logger.trace("[IE] wait for start ends with " + ieCtx.failureList);
1694    }
1695  }
1696
1697  /**
1698   * For all remote servers in the start list:
1699   * - wait it has finished the import and present the expected generationID,
1700   * - build the failureList.
1701   */
1702  private void waitForRemoteEndOfInit(ImportExportContext ieCtx)
1703  {
1704    final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList);
1705
1706    if (logger.isTraceEnabled())
1707    {
1708      logger.trace("[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
1709    }
1710
1711    /*
1712    In case some new servers appear during the init, we want them to be
1713    considered in the processing of sorting the successfully initialized
1714    and the others
1715    */
1716    replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
1717
1718    boolean done;
1719    do
1720    {
1721      done = true;
1722      int reconnectMaxDelayInSec = 10;
1723      int reconnectWait = 0;
1724      Iterator<Integer> it = replicasWeAreWaitingFor.iterator();
1725      while (it.hasNext())
1726      {
1727        int serverId = it.next();
1728        if (ieCtx.failureList.contains(serverId))
1729        {
1730          /*
1731          this server has already been in error during initialization
1732          don't wait for it
1733          */
1734          continue;
1735        }
1736
1737        DSInfo dsInfo = getConnectedRemoteDS(serverId);
1738        if (dsInfo == null)
1739        {
1740          /*
1741          this server is disconnected
1742          may be for a long time if it crashed or had been stopped
1743          may be just the time to reconnect after import : should be short
1744          */
1745          if (++reconnectWait<reconnectMaxDelayInSec)
1746          {
1747            // let's still wait to give a chance to this server to reconnect
1748            done = false;
1749          }
1750          // Else we left enough time to the servers to reconnect
1751        }
1752        else
1753        {
1754          // this server is connected
1755          if (dsInfo.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
1756          {
1757            // this one is still doing the Full Update ... retry later
1758            done = false;
1759            break;
1760          }
1761
1762          if (dsInfo.getGenerationId() == getGenerationID())
1763          { // and with the expected generationId
1764            // We're done with this server
1765            it.remove();
1766          }
1767        }
1768      }
1769
1770      // loop and wait
1771      if (!done)
1772      {
1773        try { Thread.sleep(1000); }
1774        catch (InterruptedException e) {
1775          Thread.currentThread().interrupt();
1776        } // 1sec
1777      }
1778    }
1779    while (!done && !broker.shuttingDown()); // infinite wait
1780
1781    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
1782
1783    if (logger.isTraceEnabled())
1784    {
1785      logger.trace("[IE] wait for end ends with " + ieCtx.failureList);
1786    }
1787  }
1788
1789  /**
1790   * Get the ServerState maintained by the Concrete class.
1791   *
1792   * @return the ServerState maintained by the Concrete class.
1793   */
1794  public ServerState getServerState()
1795  {
1796    return state;
1797  }
1798
1799  /**
1800   * Acquire and initialize the import/export context, verifying no other
1801   * import/export is in progress.
1802   */
1803  private ImportExportContext acquireIEContext(boolean importInProgress)
1804      throws DirectoryException
1805  {
1806    final ImportExportContext ieCtx = new ImportExportContext(importInProgress);
1807    if (!importExportContext.compareAndSet(null, ieCtx))
1808    {
1809      // Rejects 2 simultaneous exports
1810      LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
1811      throw new DirectoryException(ResultCode.OTHER, message);
1812    }
1813    return ieCtx;
1814  }
1815
1816  private void releaseIEContext()
1817  {
1818    importExportContext.set(null);
1819  }
1820
1821  /**
1822   * Processes an error message received while an export is
1823   * on going, or an import will start.
1824   *
1825   * @param errorMsg The error message received.
1826   */
1827  private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx)
1828  {
1829    //Exporting must not be stopped on the first error, if we run initialize-all
1830    if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
1831    {
1832      // The ErrorMsg is received while we have started an initialization
1833      ieCtx.setExceptionIfNoneSet(new DirectoryException(
1834          ResultCode.OTHER, errorMsg.getDetails()));
1835
1836      /*
1837       * This can happen :
1838       * - on the first InitReqMsg sent when source in not known for example
1839       * - on the next attempt when source crashed and did not reconnect
1840       *   even after the nextInitAttemptDelay
1841       * During the import, the ErrorMsg will be received by receiveEntryBytes
1842       */
1843      if (ieCtx.initializeTask instanceof InitializeTask)
1844      {
1845        // Update the task that initiated the import
1846        ((InitializeTask) ieCtx.initializeTask)
1847            .updateTaskCompletionState(ieCtx.getException());
1848
1849        releaseIEContext();
1850      }
1851    }
1852  }
1853
1854  /**
1855   * Receives bytes related to an entry in the context of an import to
1856   * initialize the domain (called by ReplLDIFInputStream).
1857   *
1858   * @return The bytes. Null when the Done or Err message has been received
1859   */
1860  protected byte[] receiveEntryBytes()
1861  {
1862    ReplicationMsg msg;
1863    while (true)
1864    {
1865      ImportExportContext ieCtx = importExportContext.get();
1866      try
1867      {
1868        // In the context of the total update, we don't want any automatic
1869        // re-connection done transparently by the broker because of a better
1870        // RS or because of a connection failure.
1871        // We want to be notified of topology change in order to track a
1872        // potential disconnection of the exporter.
1873        msg = broker.receive(false, false, true);
1874
1875        if (logger.isTraceEnabled())
1876        {
1877          logger.trace("[IE] In "
1878              + broker.getReplicationMonitorInstanceName()
1879              + ", receiveEntryBytes " + msg);
1880        }
1881
1882        if (msg == null)
1883        {
1884          if (broker.shuttingDown())
1885          {
1886            // The server is in the shutdown process
1887            return null;
1888          }
1889          else
1890          {
1891            // Handle connection issues
1892            ieCtx.setExceptionIfNoneSet(new DirectoryException(
1893                ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT
1894                    .get(broker.getReplicationServer())));
1895            return null;
1896          }
1897        }
1898
1899        // Check good ordering of msg received
1900        if (msg instanceof EntryMsg)
1901        {
1902          EntryMsg entryMsg = (EntryMsg)msg;
1903          byte[] entryBytes = entryMsg.getEntryBytes();
1904          ieCtx.updateCounters(countEntryLimits(entryBytes));
1905
1906          if (ieCtx.exporterProtocolVersion >=
1907            ProtocolVersion.REPLICATION_PROTOCOL_V4)
1908          {
1909            // check the msgCnt of the msg received to check ordering
1910            if (++ieCtx.msgCnt != entryMsg.getMsgId())
1911            {
1912              ieCtx.setExceptionIfNoneSet(new DirectoryException(
1913                  ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(ieCtx.msgCnt, entryMsg.getMsgId())));
1914              return null;
1915            }
1916
1917            // send the ack of flow control mgmt
1918            if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0)
1919            {
1920              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
1921                  getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
1922              broker.publish(amsg, false);
1923              if (logger.isTraceEnabled())
1924              {
1925                logger.trace("[IE] In "
1926                    + broker.getReplicationMonitorInstanceName()
1927                    + ", publish InitializeRcvAckMsg" + amsg);
1928              }
1929            }
1930          }
1931          return entryBytes;
1932        }
1933        else if (msg instanceof DoneMsg)
1934        {
1935          /*
1936          This is the normal termination of the import
1937          No error is stored and the import is ended by returning null
1938          */
1939          return null;
1940        }
1941        else if (msg instanceof ErrorMsg)
1942        {
1943          /*
1944          This is an error termination during the import
1945          The error is stored and the import is ended by returning null
1946          */
1947          if (ieCtx.getException() == null)
1948          {
1949            ErrorMsg errMsg = (ErrorMsg)msg;
1950            if (errMsg.getCreationTime() > ieCtx.startTime)
1951            {
1952              ieCtx.setException(
1953                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
1954              return null;
1955            }
1956          }
1957        }
1958        else
1959        {
1960          // Other messages received during an import are trashed except
1961          // the topologyMsg.
1962          if (msg instanceof TopologyMsg
1963              && getConnectedRemoteDS(ieCtx.importSource) == null)
1964          {
1965            LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get(
1966                getBaseDN(), getServerId(), ieCtx.importSource);
1967            ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, errMsg));
1968            return null;
1969          }
1970        }
1971      }
1972      catch(Exception e)
1973      {
1974        ieCtx.setExceptionIfNoneSet(new DirectoryException(
1975            ResultCode.OTHER,
1976            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
1977      }
1978    }
1979  }
1980
1981  /**
1982   * Count the number of entries in the provided byte[].
1983   * This is based on the hypothesis that the entries are separated
1984   * by a "\n\n" String.
1985   *
1986   * @param   entryBytes the set of bytes containing one or more entries.
1987   * @return  The number of entries in the provided byte[].
1988   */
1989  private int countEntryLimits(byte[] entryBytes)
1990  {
1991    return countEntryLimits(entryBytes, 0, entryBytes.length);
1992  }
1993
1994  /**
1995   * Count the number of entries in the provided byte[].
1996   * This is based on the hypothesis that the entries are separated
1997   * by a "\n\n" String.
1998   *
1999   * @param   entryBytes the set of bytes containing one or more entries.
2000   * @return  The number of entries in the provided byte[].
2001   */
2002  private int countEntryLimits(byte[] entryBytes, int pos, int length)
2003  {
2004    int entryCount = 0;
2005    int count = 0;
2006    while (count<=length-2)
2007    {
2008      if (entryBytes[pos+count] == '\n' && entryBytes[pos+count+1] == '\n')
2009      {
2010        entryCount++;
2011        count++;
2012      }
2013      count++;
2014    }
2015    return entryCount;
2016  }
2017
2018  /**
2019   * Exports an entry in LDIF format.
2020   *
2021   * @param lDIFEntry The entry to be exported in byte[] form.
2022   * @param pos       The starting Position in the array.
2023   * @param length    Number of array elements to be copied.
2024   *
2025   * @throws IOException when an error occurred.
2026   */
2027  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
2028      throws IOException
2029  {
2030    if (logger.isTraceEnabled())
2031    {
2032      logger.trace("[IE] Entering exportLDIFEntry entry=" + Arrays.toString(lDIFEntry));
2033    }
2034
2035    // build the message
2036    ImportExportContext ieCtx = importExportContext.get();
2037    EntryMsg entryMessage = new EntryMsg(
2038        getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
2039        ++ieCtx.msgCnt);
2040
2041    // Waiting the slowest loop
2042    while (!broker.shuttingDown())
2043    {
2044      /*
2045      If an error was raised - like receiving an ErrorMsg from a remote
2046      server that have been stored by the listener thread in the ieContext,
2047      we just abandon the export by throwing an exception.
2048      */
2049      if (ieCtx.getException() != null)
2050      {
2051        throw new IOException(ieCtx.getException().getMessage());
2052      }
2053
2054      int slowestServerId = ieCtx.getSlowestServer();
2055      if (getConnectedRemoteDS(slowestServerId) == null)
2056      {
2057        ieCtx.setException(new DirectoryException(ResultCode.OTHER,
2058            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer())));
2059
2060        throw new IOException("IOException with nested DirectoryException",
2061            ieCtx.getException());
2062      }
2063
2064      int ourLastExportedCnt = ieCtx.msgCnt;
2065      int slowestCnt = ieCtx.ackVals.get(slowestServerId);
2066
2067      if (logger.isTraceEnabled())
2068      {
2069        logger.trace("[IE] Entering exportLDIFEntry waiting " +
2070            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
2071      }
2072
2073      if (ourLastExportedCnt - slowestCnt > ieCtx.initWindow)
2074      {
2075        if (logger.isTraceEnabled())
2076        {
2077          logger.trace("[IE] Entering exportLDIFEntry waiting");
2078        }
2079
2080        // our export is too far beyond the slowest importer - let's wait
2081        try { Thread.sleep(100); }
2082        catch(Exception e) { /* do nothing */ }
2083
2084        // process any connection error
2085        if (broker.hasConnectionError()
2086          || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
2087        {
2088          // publish failed - store the error in the ieContext ...
2089          DirectoryException de = new DirectoryException(ResultCode.OTHER,
2090              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId()));
2091          ieCtx.setExceptionIfNoneSet(de);
2092          // .. and abandon the export by throwing an exception.
2093          throw new IOException(de.getMessage());
2094        }
2095      }
2096      else
2097      {
2098        if (logger.isTraceEnabled())
2099        {
2100          logger.trace("[IE] slowest got to us => stop waiting");
2101        }
2102        break;
2103      }
2104    } // Waiting the slowest loop
2105
2106    if (logger.isTraceEnabled())
2107    {
2108      logger.trace("[IE] Entering exportLDIFEntry pub entry=" + Arrays.toString(lDIFEntry));
2109    }
2110
2111    boolean sent = broker.publish(entryMessage, false);
2112
2113    // process any publish error
2114    if (!sent
2115        || broker.hasConnectionError()
2116        || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
2117    {
2118      // publish failed - store the error in the ieContext ...
2119      DirectoryException de = new DirectoryException(ResultCode.OTHER,
2120          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId()));
2121      ieCtx.setExceptionIfNoneSet(de);
2122      // .. and abandon the export by throwing an exception.
2123      throw new IOException(de.getMessage());
2124    }
2125
2126    // publish succeeded
2127    try
2128    {
2129      ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length));
2130    }
2131    catch (DirectoryException de)
2132    {
2133      ieCtx.setExceptionIfNoneSet(de);
2134      // .. and abandon the export by throwing an exception.
2135      throw new IOException(de.getMessage());
2136    }
2137  }
2138
2139  /**
2140   * Initializes asynchronously this domain from a remote source server.
2141   * Before returning from this call, for the provided task :
2142   * - the progressing counters are updated during the initialization using
2143   *   setTotal() and setLeft().
2144   * - the end of the initialization using updateTaskCompletionState().
2145   * <p>
2146   * When this method is called, a request for initialization is sent to the
2147   * remote source server requesting initialization.
2148   * <p>
2149   *
2150   * @param source   The server-id of the source from which to initialize.
2151   *                 The source can be discovered using the
2152   *                 {@link #getReplicaInfos()} method.
2153   *
2154   * @param initTask The task that launched the initialization
2155   *                 and should be updated of its progress.
2156   *
2157   * @throws DirectoryException If it was not possible to publish the
2158   *                            Initialization message to the Topology.
2159   *                            The task state is updated.
2160   */
2161  public void initializeFromRemote(int source, Task initTask)
2162  throws DirectoryException
2163  {
2164    if (logger.isTraceEnabled())
2165    {
2166      logger.trace("[IE] Entering initializeFromRemote for " + this);
2167    }
2168
2169    LocalizableMessage errMsg = !broker.isConnected()
2170        ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDN())
2171        : null;
2172
2173    /*
2174    We must not test here whether the remote source is connected to
2175    the topology by testing if it stands in the replicas list since.
2176    In the case of a re-attempt of initialization, the listener thread is
2177    running this method directly coming from initialize() method and did
2178    not processed any topology message in between the failure and the
2179    new attempt.
2180    */
2181    try
2182    {
2183      /*
2184      We must immediately acquire a context to store the task inside
2185      The context will be used when we (the listener thread) will receive
2186      the InitializeTargetMsg, process the import, and at the end
2187      update the task.
2188      */
2189
2190      final ImportExportContext ieCtx = acquireIEContext(true);
2191      ieCtx.initializeTask = initTask;
2192      ieCtx.attemptCnt = 0;
2193      ieCtx.initReqMsgSent = new InitializeRequestMsg(
2194          getBaseDN(), getServerId(), source, getInitWindow());
2195      broker.publish(ieCtx.initReqMsgSent);
2196
2197      /*
2198      The normal success processing is now to receive InitTargetMsg then
2199      entries from the remote server.
2200      The error cases are :
2201      - either local error immediately caught below
2202      - a remote error we will receive as an ErrorMsg
2203      */
2204    }
2205    catch(DirectoryException de)
2206    {
2207      errMsg = de.getMessageObject();
2208    }
2209    catch(Exception e)
2210    {
2211      // Should not happen
2212      errMsg = LocalizableMessage.raw(e.getLocalizedMessage());
2213      logger.error(errMsg);
2214    }
2215
2216    // When error, update the task and raise the error to the caller
2217    if (errMsg != null)
2218    {
2219      // No need to call here updateTaskCompletionState - will be done
2220      // by the caller
2221      releaseIEContext();
2222      throw new DirectoryException(ResultCode.OTHER, errMsg);
2223    }
2224  }
2225
2226  /**
2227   * Processes an InitializeTargetMsg received from a remote server
2228   * meaning processes an initialization from the entries expected to be
2229   * received now.
2230   *
2231   * @param initTargetMsgReceived The message received from the remote server.
2232   *
2233   * @param requesterServerId The serverId of the server that requested the
2234   *                          initialization meaning the server where the
2235   *                          task has initially been created (this server,
2236   *                          or the remote server).
2237   */
2238  private void initialize(InitializeTargetMsg initTargetMsgReceived, int requesterServerId)
2239  {
2240    if (logger.isTraceEnabled())
2241    {
2242      logger.trace("[IE] Entering initialize - domain=" + this);
2243    }
2244
2245    InitializeTask initFromTask = null;
2246    int source = initTargetMsgReceived.getSenderID();
2247    ImportExportContext ieCtx = importExportContext.get();
2248    try
2249    {
2250      // Log starting
2251      logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START, getBaseDN(),
2252          initTargetMsgReceived.getSenderID(), getServerId());
2253
2254      // Go into full update status
2255      setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
2256
2257      // Acquire an import context if no already done (and initialize).
2258      if (initTargetMsgReceived.getInitiatorID() != getServerId())
2259      {
2260        /*
2261        The initTargetMsgReceived is for an import initiated by the remote server.
2262        Test and set if no import already in progress
2263        */
2264        ieCtx = acquireIEContext(true);
2265      }
2266
2267      // Initialize stuff
2268      ieCtx.importSource = source;
2269      ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
2270      ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
2271      ieCtx.exporterProtocolVersion = getProtocolVersion(source);
2272      initFromTask = (InitializeTask) ieCtx.initializeTask;
2273
2274      // Launch the import
2275      importBackend(new ReplInputStream(this));
2276    }
2277    catch (DirectoryException e)
2278    {
2279      /*
2280      Store the exception raised. It will be considered if no other exception
2281      has been previously stored in  the context
2282      */
2283      ieCtx.setExceptionIfNoneSet(e);
2284    }
2285    finally
2286    {
2287      if (logger.isTraceEnabled())
2288      {
2289        logger.trace("[IE] Domain=" + this
2290          + " ends import with exception=" + ieCtx.getException()
2291          + " connected=" + broker.isConnected());
2292      }
2293
2294      /*
2295      It is necessary to restart (reconnect to RS) for different reasons
2296      - when everything went well, reconnect in order to exchange
2297      new state, new generation ID
2298      - when we have connection failure, reconnect to retry a new import
2299      right here, right now
2300      we never want retryOnFailure if we fails reconnecting in the restart.
2301      */
2302      broker.reStart(false);
2303
2304      if (ieCtx.getException() != null
2305          && broker.isConnected()
2306          && initFromTask != null
2307          && ++ieCtx.attemptCnt < 2)
2308      {
2309          /*
2310          Worth a new attempt
2311          since initFromTask is in this server, connection is ok
2312          */
2313          try
2314          {
2315            /*
2316            Wait for the exporter to stabilize - eventually reconnect as
2317            well if it was connected to the same RS than the one we lost ...
2318            */
2319            Thread.sleep(1000);
2320
2321            /*
2322            Restart the whole import protocol exchange by sending again
2323            the request
2324            */
2325            logger.info(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST,
2326                ieCtx.getException().getLocalizedMessage());
2327
2328            broker.publish(ieCtx.initReqMsgSent);
2329
2330            ieCtx.initializeCounters(0);
2331            ieCtx.exception = null;
2332            ieCtx.msgCnt = 0;
2333
2334            // Processing of the received initTargetMsgReceived is done
2335            // let's wait for the next one
2336            return;
2337          }
2338          catch(Exception e)
2339          {
2340            /*
2341            An error occurs when sending a new request for a new import.
2342            This error is not stored, preferring to keep the initial one.
2343            */
2344            logger.error(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST,
2345                e.getLocalizedMessage(), ieCtx.getException().getLocalizedMessage());
2346          }
2347      }
2348
2349      // ===================
2350      // No new attempt case
2351
2352      if (logger.isTraceEnabled())
2353      {
2354        logger.trace("[IE] Domain=" + this
2355          + " ends initialization with exception=" + ieCtx.getException()
2356          + " connected=" + broker.isConnected()
2357          + " task=" + initFromTask
2358          + " attempt=" + ieCtx.attemptCnt);
2359      }
2360
2361      try
2362      {
2363        if (broker.isConnected() && ieCtx.getException() != null)
2364        {
2365          // Let's notify the exporter
2366          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
2367              ieCtx.getException().getMessageObject());
2368          broker.publish(errorMsg);
2369        }
2370        /*
2371        Update the task that initiated the import must be the last thing.
2372        Particularly, broker.restart() after import success must be done
2373        before some other operations/tasks to be launched,
2374        like resetting the generation ID.
2375        */
2376        if (initFromTask != null)
2377        {
2378          initFromTask.updateTaskCompletionState(ieCtx.getException());
2379        }
2380      }
2381      finally
2382      {
2383        String errorMsg = ieCtx.getException() != null ? ieCtx.getException().getLocalizedMessage() : "";
2384        logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END,
2385            getBaseDN(), initTargetMsgReceived.getSenderID(), getServerId(), errorMsg);
2386        releaseIEContext();
2387      } // finally
2388    } // finally
2389  }
2390
2391  /**
2392   * Return the protocol version of the DS related to the provided serverId.
2393   * Returns -1 when the protocol version is not known.
2394   * @param dsServerId The provided serverId.
2395   * @return The protocol version.
2396   */
2397  private short getProtocolVersion(int dsServerId)
2398  {
2399    final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
2400    if (dsInfo != null)
2401    {
2402      return dsInfo.getProtocolVersion();
2403    }
2404    return -1;
2405  }
2406
2407  /**
2408   * Sets the status to a new value depending of the passed status machine
2409   * event.
2410   * @param event The event that may make the status be changed
2411   */
2412  protected void signalNewStatus(StatusMachineEvent event)
2413  {
2414    setNewStatus(event);
2415    broker.signalStatusChange(status);
2416  }
2417
2418  private void setNewStatus(StatusMachineEvent event)
2419  {
2420    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
2421    if (newStatus == ServerStatus.INVALID_STATUS)
2422    {
2423      logger.error(ERR_DS_CANNOT_CHANGE_STATUS, getBaseDN(), getServerId(), status, event);
2424      return;
2425    }
2426
2427    if (newStatus != status)
2428    {
2429      // Reset status date
2430      lastStatusChangeDate = new Date();
2431      // Reset monitoring counters if reconnection
2432      if (newStatus == ServerStatus.NOT_CONNECTED_STATUS)
2433      {
2434        resetMonitoringCounters();
2435      }
2436
2437      status = newStatus;
2438      if (logger.isTraceEnabled())
2439      {
2440        logger.trace("Replication domain " + getBaseDN()
2441            + " new status is: " + status);
2442      }
2443
2444      // Perform whatever actions are needed to apply properties for being
2445      // compliant with new status
2446      updateDomainForNewStatus();
2447    }
2448  }
2449
2450  /**
2451   * Returns a boolean indicating if an import or export is currently
2452   * processed.
2453   *
2454   * @return The status
2455   */
2456  public boolean ieRunning()
2457  {
2458    return importExportContext.get() != null;
2459  }
2460
2461  /**
2462   * Check the value of the Replication Servers generation ID.
2463   *
2464   * @param generationID        The expected value of the generation ID.
2465   *
2466   * @throws DirectoryException When the generation ID of the Replication
2467   *                            Servers is not the expected value.
2468   */
2469  private void checkGenerationID(long generationID) throws DirectoryException
2470  {
2471    boolean allSet = true;
2472
2473    for (int i = 0; i< 50; i++)
2474    {
2475      allSet = true;
2476      for (RSInfo rsInfo : getRsInfos())
2477      {
2478        // the 'empty' RSes (generationId==-1) are considered as good citizens
2479        if (rsInfo.getGenerationId() != -1 &&
2480            rsInfo.getGenerationId() != generationID)
2481        {
2482          try
2483          {
2484            Thread.sleep(i*100);
2485          } catch (InterruptedException e)
2486          {
2487            Thread.currentThread().interrupt();
2488          }
2489          allSet = false;
2490          break;
2491        }
2492      }
2493      if (allSet)
2494      {
2495        break;
2496      }
2497    }
2498    if (!allSet)
2499    {
2500      LocalizableMessage message = ERR_RESET_GENERATION_ID_FAILED.get(getBaseDN());
2501      throw new DirectoryException(ResultCode.OTHER, message);
2502    }
2503  }
2504
2505  /**
2506   * Reset the Replication Log.
2507   * Calling this method will remove all the Replication information that
2508   * was kept on all the Replication Servers currently connected in the
2509   * topology.
2510   *
2511   * @throws DirectoryException If this ReplicationDomain is not currently
2512   *                           connected to a Replication Server or it
2513   *                           was not possible to contact it.
2514   */
2515  void resetReplicationLog() throws DirectoryException
2516  {
2517    // Reset the Generation ID to -1 to clean the ReplicationServers.
2518    resetGenerationId(-1L);
2519
2520    // check that at least one ReplicationServer did change its generation-id
2521    checkGenerationID(-1);
2522
2523    // Reconnect to the Replication Server so that it adopts our GenerationID.
2524    restartService();
2525
2526    // wait for the domain to reconnect.
2527    int count = 0;
2528    while (!isConnected() && count < 10)
2529    {
2530      try
2531      {
2532        Thread.sleep(100);
2533      } catch (InterruptedException e)
2534      {
2535        Thread.currentThread().interrupt();
2536      }
2537    }
2538
2539    resetGenerationId(getGenerationID());
2540
2541    // check that at least one ReplicationServer did change its generation-id
2542    checkGenerationID(getGenerationID());
2543  }
2544
2545  /**
2546   * Reset the generationId of this domain in the whole topology.
2547   * A message is sent to the Replication Servers for them to reset
2548   * their change dbs.
2549   *
2550   * @param generationIdNewValue  The new value of the generation Id.
2551   * @throws DirectoryException   When an error occurs
2552   */
2553  public void resetGenerationId(Long generationIdNewValue)
2554      throws DirectoryException
2555  {
2556    if (logger.isTraceEnabled())
2557    {
2558      logger.trace("Server id " + getServerId() + " and domain "
2559          + getBaseDN() + " resetGenerationId " + generationIdNewValue);
2560    }
2561
2562    ResetGenerationIdMsg genIdMessage =
2563        new ResetGenerationIdMsg(getGenId(generationIdNewValue));
2564
2565    if (!isConnected())
2566    {
2567      LocalizableMessage message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDN(),
2568          getServerId(), genIdMessage.getGenerationId());
2569      throw new DirectoryException(ResultCode.OTHER, message);
2570    }
2571    broker.publish(genIdMessage);
2572
2573    // check that at least one ReplicationServer did change its generation-id
2574    checkGenerationID(getGenId(generationIdNewValue));
2575  }
2576
2577  private long getGenId(Long generationIdNewValue)
2578  {
2579    if (generationIdNewValue != null)
2580    {
2581      return generationIdNewValue;
2582    }
2583    return getGenerationID();
2584  }
2585
2586
2587  /*
2588   ******** End of The total Update code *********
2589   */
2590
2591  /*
2592   ******* Start of Monitoring Code **********
2593   */
2594
2595  /**
2596   * Get the maximum receive window size.
2597   *
2598   * @return The maximum receive window size.
2599   */
2600  int getMaxRcvWindow()
2601  {
2602    if (broker != null)
2603    {
2604      return broker.getMaxRcvWindow();
2605    }
2606    return 0;
2607  }
2608
2609  /**
2610   * Get the current receive window size.
2611   *
2612   * @return The current receive window size.
2613   */
2614  int getCurrentRcvWindow()
2615  {
2616    if (broker != null)
2617    {
2618      return broker.getCurrentRcvWindow();
2619    }
2620    return 0;
2621  }
2622
2623  /**
2624   * Get the maximum send window size.
2625   *
2626   * @return The maximum send window size.
2627   */
2628  int getMaxSendWindow()
2629  {
2630    if (broker != null)
2631    {
2632      return broker.getMaxSendWindow();
2633    }
2634    return 0;
2635  }
2636
2637  /**
2638   * Get the current send window size.
2639   *
2640   * @return The current send window size.
2641   */
2642  int getCurrentSendWindow()
2643  {
2644    if (broker != null)
2645    {
2646      return broker.getCurrentSendWindow();
2647    }
2648    return 0;
2649  }
2650
2651  /**
2652   * Get the number of times the replication connection was lost.
2653   * @return The number of times the replication connection was lost.
2654   */
2655  int getNumLostConnections()
2656  {
2657    if (broker != null)
2658    {
2659      return broker.getNumLostConnections();
2660    }
2661    return 0;
2662  }
2663
2664  /**
2665   * Determine whether the connection to the replication server is encrypted.
2666   * @return true if the connection is encrypted, false otherwise.
2667   */
2668  boolean isSessionEncrypted()
2669  {
2670    return broker != null && broker.isSessionEncrypted();
2671  }
2672
2673  /**
2674   * Check if the domain is connected to a ReplicationServer.
2675   *
2676   * @return true if the server is connected, false if not.
2677   */
2678  public boolean isConnected()
2679  {
2680    return broker != null && broker.isConnected();
2681  }
2682
2683  /**
2684   * Check if the domain has a connection error.
2685   * A Connection error happens when the broker could not be created
2686   * or when the broker could not find any ReplicationServer to connect to.
2687   *
2688   * @return true if the domain has a connection error.
2689   */
2690  public boolean hasConnectionError()
2691  {
2692    return broker == null || broker.hasConnectionError();
2693  }
2694
2695  /**
2696   * Get the name of the replicationServer to which this domain is currently
2697   * connected.
2698   *
2699   * @return the name of the replicationServer to which this domain
2700   *         is currently connected.
2701   */
2702  public String getReplicationServer()
2703  {
2704    if (broker != null)
2705    {
2706      return broker.getReplicationServer();
2707    }
2708    return ReplicationBroker.NO_CONNECTED_SERVER;
2709  }
2710
2711  /**
2712   * Gets the number of updates sent in assured safe read mode.
2713   * @return The number of updates sent in assured safe read mode.
2714   */
2715  public int getAssuredSrSentUpdates()
2716  {
2717    return assuredSrSentUpdates.get();
2718  }
2719
2720  /**
2721   * Gets the number of updates sent in assured safe read mode that have been
2722   * acknowledged without errors.
2723   * @return The number of updates sent in assured safe read mode that have been
2724   * acknowledged without errors.
2725   */
2726  public int getAssuredSrAcknowledgedUpdates()
2727  {
2728    return assuredSrAcknowledgedUpdates.get();
2729  }
2730
2731  /**
2732   * Gets the number of updates sent in assured safe read mode that have not
2733   * been acknowledged.
2734   * @return The number of updates sent in assured safe read mode that have not
2735   * been acknowledged.
2736   */
2737  public int getAssuredSrNotAcknowledgedUpdates()
2738  {
2739    return assuredSrNotAcknowledgedUpdates.get();
2740  }
2741
2742  /**
2743   * Gets the number of updates sent in assured safe read mode that have not
2744   * been acknowledged due to timeout error.
2745   * @return The number of updates sent in assured safe read mode that have not
2746   * been acknowledged due to timeout error.
2747   */
2748  public int getAssuredSrTimeoutUpdates()
2749  {
2750    return assuredSrTimeoutUpdates.get();
2751  }
2752
2753  /**
2754   * Gets the number of updates sent in assured safe read mode that have not
2755   * been acknowledged due to wrong status error.
2756   * @return The number of updates sent in assured safe read mode that have not
2757   * been acknowledged due to wrong status error.
2758   */
2759  public int getAssuredSrWrongStatusUpdates()
2760  {
2761    return assuredSrWrongStatusUpdates.get();
2762  }
2763
2764  /**
2765   * Gets the number of updates sent in assured safe read mode that have not
2766   * been acknowledged due to replay error.
2767   * @return The number of updates sent in assured safe read mode that have not
2768   * been acknowledged due to replay error.
2769   */
2770  public int getAssuredSrReplayErrorUpdates()
2771  {
2772    return assuredSrReplayErrorUpdates.get();
2773  }
2774
2775  /**
2776   * Gets the number of updates sent in assured safe read mode that have not
2777   * been acknowledged per server.
2778   * @return A copy of the map that contains the number of updates sent in
2779   * assured safe read mode that have not been acknowledged per server.
2780   */
2781  public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates()
2782  {
2783    synchronized(assuredSrServerNotAcknowledgedUpdates)
2784    {
2785      return new HashMap<>(assuredSrServerNotAcknowledgedUpdates);
2786    }
2787  }
2788
2789  /**
2790   * Gets the number of updates received in assured safe read mode request.
2791   * @return The number of updates received in assured safe read mode request.
2792   */
2793  public int getAssuredSrReceivedUpdates()
2794  {
2795    return assuredSrReceivedUpdates.get();
2796  }
2797
2798  /**
2799   * Gets the number of updates received in assured safe read mode that we acked
2800   * without error (no replay error).
2801   * @return The number of updates received in assured safe read mode that we
2802   * acked without error (no replay error).
2803   */
2804  public int getAssuredSrReceivedUpdatesAcked()
2805  {
2806    return this.assuredSrReceivedUpdatesAcked.get();
2807  }
2808
2809  /**
2810   * Gets the number of updates received in assured safe read mode that we did
2811   * not ack due to error (replay error).
2812   * @return The number of updates received in assured safe read mode that we
2813   * did not ack due to error (replay error).
2814   */
2815  public int getAssuredSrReceivedUpdatesNotAcked()
2816  {
2817    return this.assuredSrReceivedUpdatesNotAcked.get();
2818  }
2819
2820  /**
2821   * Gets the number of updates sent in assured safe data mode.
2822   * @return The number of updates sent in assured safe data mode.
2823   */
2824  public int getAssuredSdSentUpdates()
2825  {
2826    return assuredSdSentUpdates.get();
2827  }
2828
2829  /**
2830   * Gets the number of updates sent in assured safe data mode that have been
2831   * acknowledged without errors.
2832   * @return The number of updates sent in assured safe data mode that have been
2833   * acknowledged without errors.
2834   */
2835  public int getAssuredSdAcknowledgedUpdates()
2836  {
2837    return assuredSdAcknowledgedUpdates.get();
2838  }
2839
2840  /**
2841   * Gets the number of updates sent in assured safe data mode that have not
2842   * been acknowledged due to timeout error.
2843   * @return The number of updates sent in assured safe data mode that have not
2844   * been acknowledged due to timeout error.
2845   */
2846  public int getAssuredSdTimeoutUpdates()
2847  {
2848    return assuredSdTimeoutUpdates.get();
2849  }
2850
2851  /**
2852   * Gets the number of updates sent in assured safe data mode that have not
2853   * been acknowledged due to timeout error per server.
2854   * @return A copy of the map that contains the number of updates sent in
2855   * assured safe data mode that have not been acknowledged due to timeout
2856   * error per server.
2857   */
2858  public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates()
2859  {
2860    synchronized(assuredSdServerTimeoutUpdates)
2861    {
2862      return new HashMap<>(assuredSdServerTimeoutUpdates);
2863    }
2864  }
2865
2866  /**
2867   * Gets the date of the last status change.
2868   * @return The date of the last status change.
2869   */
2870  public Date getLastStatusChangeDate()
2871  {
2872    return lastStatusChangeDate;
2873  }
2874
2875  /**
2876   * Resets the values of the monitoring counters.
2877   */
2878  private void resetMonitoringCounters()
2879  {
2880    numProcessedUpdates = new AtomicInteger(0);
2881    numRcvdUpdates = new AtomicInteger(0);
2882    numSentUpdates = new AtomicInteger(0);
2883
2884    assuredSrSentUpdates = new AtomicInteger(0);
2885    assuredSrAcknowledgedUpdates = new AtomicInteger(0);
2886    assuredSrNotAcknowledgedUpdates = new AtomicInteger(0);
2887    assuredSrTimeoutUpdates = new AtomicInteger(0);
2888    assuredSrWrongStatusUpdates = new AtomicInteger(0);
2889    assuredSrReplayErrorUpdates = new AtomicInteger(0);
2890    synchronized (assuredSrServerNotAcknowledgedUpdates)
2891    {
2892      assuredSrServerNotAcknowledgedUpdates.clear();
2893    }
2894    assuredSrReceivedUpdates = new AtomicInteger(0);
2895    assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
2896    assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
2897    assuredSdSentUpdates = new AtomicInteger(0);
2898    assuredSdAcknowledgedUpdates = new AtomicInteger(0);
2899    assuredSdTimeoutUpdates = new AtomicInteger(0);
2900    synchronized (assuredSdServerTimeoutUpdates)
2901    {
2902      assuredSdServerTimeoutUpdates.clear();
2903    }
2904  }
2905
2906  /*
2907   ********** End of Monitoring Code **************
2908   */
2909
2910  /**
2911   * Start the publish mechanism of the Replication Service. After this method
2912   * has been called, the publish service can be used by calling the
2913   * {@link #publish(UpdateMsg)} method.
2914   *
2915   * @throws ConfigException
2916   *           If the DirectoryServer configuration was incorrect.
2917   */
2918  public void startPublishService() throws ConfigException
2919  {
2920    synchronized (sessionLock)
2921    {
2922      if (broker == null)
2923      {
2924        // create the broker object used to publish and receive changes
2925        broker = new ReplicationBroker(
2926            this, state, config, new ReplSessionSecurity());
2927        broker.start();
2928      }
2929    }
2930  }
2931
2932  /**
2933   * Starts the receiver side of the Replication Service.
2934   * <p>
2935   * After this method has been called, the Replication Service will start
2936   * calling the {@link #processUpdate(UpdateMsg)}.
2937   * <p>
2938   * This method must be called once and must be called after the
2939   * {@link #startPublishService()}.
2940   */
2941  public void startListenService()
2942  {
2943    synchronized (sessionLock)
2944    {
2945      final String threadName = "Replica DS(" + getServerId()
2946          + ") listener for domain \"" + getBaseDN() + "\"";
2947
2948      listenerThread = new DirectoryThread(new Runnable()
2949      {
2950        @Override
2951        public void run()
2952        {
2953          if (logger.isTraceEnabled())
2954          {
2955            logger.trace("Replication Listener thread starting.");
2956          }
2957
2958          // Loop processing any incoming update messages.
2959          while (!listenerThread.isShutdownInitiated())
2960          {
2961            final UpdateMsg updateMsg = receive();
2962            if (updateMsg == null)
2963            {
2964              // The server is shutting down.
2965              listenerThread.initiateShutdown();
2966            }
2967            else if (processUpdate(updateMsg)
2968                && updateMsg.contributesToDomainState())
2969            {
2970              /*
2971               * Warning: in synchronous mode, no way to tell the replay of an
2972               * update went wrong Just put null in processUpdateDone so that if
2973               * assured replication is used the ack is sent without error at
2974               * replay flag.
2975               */
2976              processUpdateDone(updateMsg, null);
2977              state.update(updateMsg.getCSN());
2978            }
2979          }
2980
2981          if (logger.isTraceEnabled())
2982          {
2983            logger.trace("Replication Listener thread stopping.");
2984          }
2985        }
2986      }, threadName);
2987
2988      listenerThread.start();
2989    }
2990  }
2991
2992  /**
2993   * Temporarily disable the Replication Service.
2994   * The Replication Service can be enabled again using
2995   * {@link #enableService()}.
2996   * <p>
2997   * It can be useful to disable the Replication Service when the
2998   * repository where the replicated information is stored becomes
2999   * temporarily unavailable and replicated updates can therefore not
3000   * be replayed during a while. This method is not MT safe.
3001   */
3002  public void disableService()
3003  {
3004    synchronized (sessionLock)
3005    {
3006      /*
3007      Stop the broker first in order to prevent the listener from
3008      reconnecting - see OPENDJ-457.
3009      */
3010      if (broker != null)
3011      {
3012        broker.stop();
3013      }
3014
3015      // Stop the listener thread
3016      if (listenerThread != null)
3017      {
3018        listenerThread.initiateShutdown();
3019        try
3020        {
3021          listenerThread.join();
3022        }
3023        catch (InterruptedException e)
3024        {
3025          // Give up waiting.
3026        }
3027        listenerThread = null;
3028      }
3029    }
3030  }
3031
3032  /**
3033   * Returns {@code true} if the listener thread is shutting down or has
3034   * shutdown.
3035   *
3036   * @return {@code true} if the listener thread is shutting down or has
3037   *         shutdown.
3038   */
3039  protected final boolean isListenerShuttingDown()
3040  {
3041    final DirectoryThread tmp = listenerThread;
3042    return tmp == null || tmp.isShutdownInitiated();
3043  }
3044
3045  /**
3046   * Restart the Replication service after a {@link #disableService()}.
3047   * <p>
3048   * The Replication Service will restart from the point indicated by the
3049   * {@link ServerState} that was given as a parameter to the
3050   * {@link #startPublishService()} at startup time.
3051   * <p>
3052   * If some data have changed in the repository during the period of time when
3053   * the Replication Service was disabled, this {@link ServerState} should
3054   * therefore be updated by the Replication Domain subclass before calling this
3055   * method. This method is not MT safe.
3056   */
3057  public void enableService()
3058  {
3059    synchronized (sessionLock)
3060    {
3061      broker.start();
3062      startListenService();
3063    }
3064  }
3065
3066  /**
3067   * Change some ReplicationDomain parameters.
3068   *
3069   * @param config
3070   *          The new configuration that this domain should now use.
3071   */
3072  protected void changeConfig(ReplicationDomainCfg config)
3073  {
3074    if (broker != null && broker.changeConfig(config))
3075    {
3076      restartService();
3077    }
3078  }
3079
3080  /**
3081   * Applies a configuration change to the attributes which should be included
3082   * in the ECL.
3083   *
3084   * @param includeAttributes
3085   *          attributes to be included with all change records.
3086   * @param includeAttributesForDeletes
3087   *          additional attributes to be included with delete change records.
3088   */
3089  public void changeConfig(Set<String> includeAttributes,
3090      Set<String> includeAttributesForDeletes)
3091  {
3092    final boolean attrsModified = setEclIncludes(
3093        getServerId(), includeAttributes, includeAttributesForDeletes);
3094    if (attrsModified && broker != null)
3095    {
3096      restartService();
3097    }
3098  }
3099
3100  private void restartService()
3101  {
3102    disableService();
3103    enableService();
3104  }
3105
3106  /**
3107   * This method should trigger an export of the replicated data.
3108   * to the provided outputStream.
3109   * When finished the outputStream should be flushed and closed.
3110   *
3111   * @param output               The OutputStream where the export should
3112   *                             be produced.
3113   * @throws DirectoryException  When needed.
3114   */
3115  protected abstract void exportBackend(OutputStream output)
3116           throws DirectoryException;
3117
3118  /**
3119   * This method should trigger an import of the replicated data.
3120   *
3121   * @param input                The InputStream from which
3122   *                             the import should be reading entries.
3123   *
3124   * @throws DirectoryException  When needed.
3125   */
3126  protected abstract void importBackend(InputStream input)
3127           throws DirectoryException;
3128
3129  /**
3130   * This method should return the total number of objects in the
3131   * replicated domain.
3132   * This count will be used for reporting.
3133   *
3134   * @throws DirectoryException when needed.
3135   *
3136   * @return The number of objects in the replication domain.
3137   */
3138  public abstract long countEntries() throws DirectoryException;
3139
3140
3141
3142  /**
3143   * This method should handle the processing of {@link UpdateMsg} receive from
3144   * remote replication entities.
3145   * <p>
3146   * This method will be called by a single thread and should therefore should
3147   * not be blocking.
3148   *
3149   * @param updateMsg
3150   *          The {@link UpdateMsg} that was received.
3151   * @return A boolean indicating if the processing is completed at return time.
3152   *         If <code> true </code> is returned, no further processing is
3153   *         necessary. If <code> false </code> is returned, the subclass should
3154   *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
3155   *         update the ServerState When this processing is complete.
3156   */
3157  public abstract boolean processUpdate(UpdateMsg updateMsg);
3158
3159  /**
3160   * This method must be called after each call to
3161   * {@link #processUpdate(UpdateMsg)} when the processing of the
3162   * update is completed.
3163   * <p>
3164   * It is useful for implementation needing to process the update in an
3165   * asynchronous way or using several threads, but must be called even by
3166   * implementation doing it in a synchronous, single-threaded way.
3167   *
3168   * @param msg
3169   *          The UpdateMsg whose processing was completed.
3170   * @param replayErrorMsg
3171   *          if not null, this means an error occurred during the replay of
3172   *          this update, and this is the matching human readable message
3173   *          describing the problem.
3174   */
3175  protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
3176  {
3177    broker.updateWindowAfterReplay();
3178
3179    /*
3180    Send an ack if it was requested and the group id is the same of the RS
3181    one. Only Safe Read mode makes sense in DS for returning an ack.
3182    */
3183    // Assured feature is supported starting from replication protocol V2
3184    if (msg.isAssured()
3185      && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
3186    {
3187      if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
3188      {
3189        if (broker.getRsGroupId() == getGroupId())
3190        {
3191          // Send the ack
3192          AckMsg ackMsg = new AckMsg(msg.getCSN());
3193          if (replayErrorMsg != null)
3194          {
3195            // Mark the error in the ack
3196            //   -> replay error occurred
3197            ackMsg.setHasReplayError(true);
3198            //   -> replay error occurred in our server
3199            ackMsg.setFailedServers(newArrayList(getServerId()));
3200          }
3201          broker.publish(ackMsg);
3202          if (replayErrorMsg != null)
3203          {
3204            assuredSrReceivedUpdatesNotAcked.incrementAndGet();
3205          }
3206          else
3207          {
3208            assuredSrReceivedUpdatesAcked.incrementAndGet();
3209          }
3210        }
3211      }
3212      else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
3213      {
3214        logger.error(ERR_DS_UNKNOWN_ASSURED_MODE, getServerId(), msg.getAssuredMode(), getBaseDN(), msg);
3215      }
3216        // Nothing to do in Assured safe data mode, only RS ack updates.
3217    }
3218
3219    incProcessedUpdates();
3220  }
3221
3222  /**
3223   * Prepare a message if it is to be sent in assured mode.
3224   * If the assured mode is enabled, this method should be called before
3225   * publish(UpdateMsg msg) method. This will configure the update accordingly
3226   * before it is sent and will prepare the mechanism that will block until the
3227   * matching ack is received. To wait for the ack after publish call, use
3228   * the waitForAckIfAssuredEnabled() method.
3229   * The expected typical usage in a service inheriting from this class is
3230   * the following sequence:
3231   * UpdateMsg msg = xxx;
3232   * prepareWaitForAckIfAssuredEnabled(msg);
3233   * publish(msg);
3234   * waitForAckIfAssuredEnabled(msg);
3235   *
3236   * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
3237   * no effect if assured replication is disabled.
3238   * Note: this mechanism should not be used if using publish(byte[] msg)
3239   * version as usage of these methods is already hidden inside.
3240   *
3241   * @param msg The update message to be sent soon.
3242   */
3243  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
3244  {
3245    /*
3246     * If assured configured, set message accordingly to request an ack in the
3247     * right assured mode.
3248     * No ack requested for a RS with a different group id.
3249     * Assured replication supported for the same locality,
3250     * i.e: a topology working in the same geographical location).
3251     * If we are connected to a RS which is not in our locality,
3252     * no need to ask for an ack.
3253     */
3254    if (needsAck())
3255    {
3256      msg.setAssured(true);
3257      msg.setAssuredMode(getAssuredMode());
3258      if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
3259      {
3260        msg.setSafeDataLevel(getAssuredSdLevel());
3261      }
3262
3263      // Add the assured message to the list of update that are waiting for acks
3264      waitingAckMsgs.put(msg.getCSN(), msg);
3265    }
3266  }
3267
3268  private boolean needsAck()
3269  {
3270    return isAssured() && broker.getRsGroupId() == getGroupId();
3271  }
3272
3273  /**
3274   * Wait for the processing of an assured message after it has been sent, if
3275   * assured replication is configured, otherwise, do nothing.
3276   * The prepareWaitForAckIfAssuredEnabled method should have been called
3277   * before, see its comment for the full picture.
3278   *
3279   * @param msg The UpdateMsg for which we are waiting for an ack.
3280   * @throws TimeoutException When the configured timeout occurs waiting for the
3281   * ack.
3282   */
3283  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
3284    throws TimeoutException
3285  {
3286    if (needsAck())
3287    {
3288      // Increment assured replication monitoring counters
3289      switch (getAssuredMode())
3290      {
3291        case SAFE_READ_MODE:
3292          assuredSrSentUpdates.incrementAndGet();
3293          break;
3294        case SAFE_DATA_MODE:
3295          assuredSdSentUpdates.incrementAndGet();
3296          break;
3297        default:
3298        // Should not happen
3299      }
3300    } else
3301    {
3302      // Not assured or bad group id, return immediately
3303      return;
3304    }
3305
3306    // Wait for the ack to be received, timing out if necessary
3307    long startTime = System.currentTimeMillis();
3308    synchronized (msg)
3309    {
3310      CSN csn = msg.getCSN();
3311      while (waitingAckMsgs.containsKey(csn))
3312      {
3313        try
3314        {
3315          /*
3316          WARNING: this timeout may be difficult to optimize: too low, it
3317          may use too much CPU, too high, it may penalize performance...
3318          */
3319          msg.wait(10);
3320        } catch (InterruptedException e)
3321        {
3322          if (logger.isTraceEnabled())
3323          {
3324            logger.trace("waitForAck method interrupted for replication " +
3325              "baseDN: " + getBaseDN());
3326          }
3327          break;
3328        }
3329        // Timeout ?
3330        if (System.currentTimeMillis() - startTime >= getAssuredTimeout())
3331        {
3332          /*
3333          Timeout occurred, be sure that ack is not being received and if so,
3334          remove the update from the wait list, log the timeout error and
3335          also update assured monitoring counters
3336          */
3337          final UpdateMsg update = waitingAckMsgs.remove(csn);
3338          if (update == null)
3339          {
3340            // Ack received just before timeout limit: we can exit
3341            break;
3342          }
3343
3344          // No luck, this is a real timeout
3345          // Increment assured replication monitoring counters
3346          switch (msg.getAssuredMode())
3347          {
3348          case SAFE_READ_MODE:
3349            assuredSrNotAcknowledgedUpdates.incrementAndGet();
3350            assuredSrTimeoutUpdates.incrementAndGet();
3351            // Increment number of errors for our RS
3352            updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates,
3353                broker.getRsServerId());
3354            break;
3355          case SAFE_DATA_MODE:
3356            assuredSdTimeoutUpdates.incrementAndGet();
3357            // Increment number of errors for our RS
3358            updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
3359                broker.getRsServerId());
3360            break;
3361          default:
3362            // Should not happen
3363          }
3364
3365          throw new TimeoutException("No ack received for message csn: " + csn
3366              + " and replication domain: " + getBaseDN() + " after "
3367              + getAssuredTimeout() + " ms.");
3368        }
3369      }
3370    }
3371  }
3372
3373  /**
3374   * Publish an {@link UpdateMsg} to the Replication Service.
3375   * <p>
3376   * The Replication Service will handle the delivery of this {@link UpdateMsg}
3377   * to all the participants of this Replication Domain. These members will be
3378   * receive this {@link UpdateMsg} through a call of the
3379   * {@link #processUpdate(UpdateMsg)} message.
3380   *
3381   * @param msg The UpdateMsg that should be published.
3382   */
3383  public void publish(UpdateMsg msg)
3384  {
3385    broker.publish(msg);
3386    if (msg.contributesToDomainState())
3387    {
3388      state.update(msg.getCSN());
3389    }
3390    numSentUpdates.incrementAndGet();
3391  }
3392
3393  /**
3394   * Publishes a replica offline message if all pending changes for current
3395   * replica have been sent out.
3396   */
3397  public void publishReplicaOfflineMsg()
3398  {
3399    // Here to be overridden
3400  }
3401
3402  /**
3403   * This method should return the generationID to use for this
3404   * ReplicationDomain.
3405   * This method can be called at any time after the ReplicationDomain
3406   * has been started.
3407   *
3408   * @return The GenerationID.
3409   */
3410  public long getGenerationID()
3411  {
3412    return generationId;
3413  }
3414
3415  /**
3416   * Sets the generationId for this replication domain.
3417   *
3418   * @param generationId
3419   *          the generationId to set
3420   */
3421  public void setGenerationID(long generationId)
3422  {
3423    this.generationId = generationId;
3424  }
3425
3426  /**
3427   * Subclasses should use this method to add additional monitoring information
3428   * in the ReplicationDomain.
3429   *
3430   * @return Additional monitoring attributes that will be added in the
3431   *         ReplicationDomain monitoring entry.
3432   */
3433  public Collection<Attribute> getAdditionalMonitoring()
3434  {
3435    return new ArrayList<>();
3436  }
3437
3438  /**
3439   * Returns the Import/Export context associated to this ReplicationDomain.
3440   *
3441   * @return the Import/Export context associated to this ReplicationDomain
3442   */
3443  protected ImportExportContext getImportExportContext()
3444  {
3445    return importExportContext.get();
3446  }
3447
3448  /**
3449   * Returns the local address of this replication domain, or the empty string
3450   * if it is not yet connected.
3451   *
3452   * @return The local address.
3453   */
3454  String getLocalUrl()
3455  {
3456    final ReplicationBroker tmp = broker;
3457    return tmp != null ? tmp.getLocalUrl() : "";
3458  }
3459
3460  /**
3461   * Set the attributes configured on a server to be included in the ECL.
3462   *
3463   * @param serverId
3464   *          Server where these attributes are configured.
3465   * @param includeAttributes
3466   *          Attributes to be included with all change records, may include
3467   *          wild-cards.
3468   * @param includeAttributesForDeletes
3469   *          Additional attributes to be included with delete change records,
3470   *          may include wild-cards.
3471   * @return {@code true} if the set of attributes was modified.
3472   */
3473  public boolean setEclIncludes(int serverId,
3474      Set<String> includeAttributes,
3475      Set<String> includeAttributesForDeletes)
3476  {
3477    ECLIncludes current;
3478    ECLIncludes updated;
3479    do
3480    {
3481      current = this.eclIncludes.get();
3482      updated = current.addIncludedAttributes(
3483          serverId, includeAttributes, includeAttributesForDeletes);
3484    }
3485    while (!this.eclIncludes.compareAndSet(current, updated));
3486    return current != updated;
3487  }
3488
3489
3490
3491  /**
3492   * Get the attributes to include in each change for the ECL.
3493   *
3494   * @return The attributes to include in each change for the ECL.
3495   */
3496  public Set<String> getEclIncludes()
3497  {
3498    return eclIncludes.get().includedAttrsAllServers;
3499  }
3500
3501
3502
3503  /**
3504   * Get the attributes to include in each delete change for the ECL.
3505   *
3506   * @return The attributes to include in each delete change for the ECL.
3507   */
3508  public Set<String> getEclIncludesForDeletes()
3509  {
3510    return eclIncludes.get().includedAttrsForDeletesAllServers;
3511  }
3512
3513
3514
3515  /**
3516   * Get the attributes to include in each change for the ECL for a given
3517   * serverId.
3518   *
3519   * @param serverId
3520   *          The serverId for which we want the include attributes.
3521   * @return The attributes.
3522   */
3523  Set<String> getEclIncludes(int serverId)
3524  {
3525    return eclIncludes.get().includedAttrsByServer.get(serverId);
3526  }
3527
3528
3529
3530  /**
3531   * Get the attributes to include in each change for the ECL for a given
3532   * serverId.
3533   *
3534   * @param serverId
3535   *          The serverId for which we want the include attributes.
3536   * @return The attributes.
3537   */
3538  Set<String> getEclIncludesForDeletes(int serverId)
3539  {
3540    return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
3541  }
3542
3543  /**
3544   * Returns the CSN of the last Change that was fully processed by this
3545   * ReplicationDomain.
3546   *
3547   * @return The CSN of the last Change that was fully processed by this
3548   *         ReplicationDomain.
3549   */
3550  public CSN getLastLocalChange()
3551  {
3552    return state.getCSN(getServerId());
3553  }
3554
3555  /**
3556   * Gets and stores the assured replication configuration parameters. Returns a
3557   * boolean indicating if the passed configuration has changed compared to
3558   * previous values and the changes require a reconnection.
3559   *
3560   * @param config
3561   *          The configuration object
3562   * @param allowReconnection
3563   *          Tells if one must reconnect if significant changes occurred
3564   */
3565  protected void readAssuredConfig(ReplicationDomainCfg config,
3566      boolean allowReconnection)
3567  {
3568    // Disconnect if required: changing configuration values before
3569    // disconnection would make assured replication used immediately and
3570    // disconnection could cause some timeouts error.
3571    if (needReconnection(config) && allowReconnection)
3572    {
3573      disableService();
3574
3575      assuredConfig = config;
3576
3577      enableService();
3578    }
3579  }
3580
3581  private boolean needReconnection(ReplicationDomainCfg cfg)
3582  {
3583    final AssuredMode assuredMode = getAssuredMode();
3584    switch (cfg.getAssuredType())
3585    {
3586    case NOT_ASSURED:
3587      if (isAssured())
3588      {
3589        return true;
3590      }
3591      break;
3592    case SAFE_DATA:
3593      if (!isAssured() || assuredMode == SAFE_READ_MODE)
3594      {
3595        return true;
3596      }
3597      break;
3598    case SAFE_READ:
3599      if (!isAssured() || assuredMode == SAFE_DATA_MODE)
3600      {
3601        return true;
3602      }
3603      break;
3604    }
3605
3606    return isAssured()
3607        && assuredMode == SAFE_DATA_MODE
3608        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
3609  }
3610
3611  /** {@inheritDoc} */
3612  @Override
3613  public String toString()
3614  {
3615    return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
3616  }
3617}