001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2009 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server;
028
029import java.net.SocketException;
030
031import org.forgerock.i18n.LocalizableMessage;
032import org.opends.server.api.DirectoryThread;
033import org.forgerock.i18n.slf4j.LocalizedLogger;
034import org.opends.server.replication.common.ServerStatus;
035import org.opends.server.replication.protocol.ReplicaOfflineMsg;
036import org.opends.server.replication.protocol.Session;
037import org.opends.server.replication.protocol.UpdateMsg;
038import org.opends.server.replication.service.DSRSShutdownSync;
039
040import static org.opends.messages.ReplicationMessages.*;
041import static org.opends.server.replication.common.ServerStatus.*;
042import static org.opends.server.util.StaticUtils.*;
043
044/**
045 * This class defines a server writer, which is used to send changes to a
046 * directory server.
047 */
048public class ServerWriter extends DirectoryThread
049{
050  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
051
052  private final Session session;
053  private final ServerHandler handler;
054  private final ReplicationServerDomain replicationServerDomain;
055  private final DSRSShutdownSync dsrsShutdownSync;
056
057  /**
058   * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler
059   * for new updates and forward them to the server
060   *
061   * @param session
062   *          the Session that will be used to send updates.
063   * @param handler
064   *          handler for which the ServerWriter is created.
065   * @param replicationServerDomain
066   *          The ReplicationServerDomain of this ServerWriter.
067   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
068   */
069  public ServerWriter(Session session, ServerHandler handler,
070      ReplicationServerDomain replicationServerDomain,
071      DSRSShutdownSync dsrsShutdownSync)
072  {
073    // Session may be null for ECLServerWriter.
074    super("Replication server RS(" + handler.getReplicationServerId()
075        + ") writing to " + handler + " at "
076        + (session != null ? session.getReadableRemoteAddress() : "unknown"));
077
078    this.session = session;
079    this.handler = handler;
080    this.replicationServerDomain = replicationServerDomain;
081    this.dsrsShutdownSync = dsrsShutdownSync;
082  }
083
084  /**
085   * Run method for the ServerWriter.
086   * Loops waiting for changes from the ReplicationServerDomain and forward them
087   * to the other servers
088   */
089  @Override
090  public void run()
091  {
092    if (logger.isTraceEnabled())
093    {
094      logger.trace(getName() + " starting");
095    }
096
097    LocalizableMessage errMessage = null;
098    try
099    {
100      boolean shutdown = false;
101      while (!shutdown
102          || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
103      {
104        final UpdateMsg updateMsg = this.handler.take();
105        if (updateMsg == null)
106        {
107          // this connection is closing
108          errMessage = LocalizableMessage.raw(
109           "Connection closure: null update returned by domain.");
110          shutdown = true;
111        }
112        else if (!isUpdateMsgFiltered(updateMsg))
113        {
114          // Publish the update to the remote server using a protocol version it supports
115          session.publish(updateMsg);
116          if (updateMsg instanceof ReplicaOfflineMsg)
117          {
118            dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN());
119          }
120        }
121      }
122    }
123    catch (SocketException e)
124    {
125      /*
126       * The remote host has disconnected and this particular Tree is going to
127       * be removed, just ignore the exception and let the thread die as well
128       */
129      errMessage = handler.getBadlyDisconnectedErrorMessage();
130      logger.error(errMessage);
131    }
132    catch (Exception e)
133    {
134      /*
135       * An unexpected error happened.
136       * Log an error and close the connection.
137       */
138      errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler +
139                        " " +  stackTraceToSingleLineString(e));
140      logger.error(errMessage);
141    }
142    finally {
143      session.close();
144      replicationServerDomain.stopServer(handler, false);
145      if (logger.isTraceEnabled())
146      {
147        logger.trace(getName() + " stopped " + errMessage);
148      }
149    }
150  }
151
152  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg)
153  {
154    if (handler.isDataServer())
155    {
156      /**
157       * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS
158       *
159       * The RSD lock should not be taken here as it is acceptable to have a delay
160       * between the time the server has a wrong status and the fact we detect it:
161       * the updates that succeed to pass during this time will have no impact on remote server.
162       * But it is interesting to not saturate uselessly the network
163       * if the updates are not necessary so this check to stop sending updates is interesting anyway.
164       * Not taking the RSD lock allows to have better performances in normal mode (most of the time).
165       */
166      final ServerStatus dsStatus = handler.getStatus();
167      if (dsStatus == BAD_GEN_ID_STATUS)
168      {
169        logger.warn(WARN_IGNORING_UPDATE_TO_DS_BADGENID, handler.getReplicationServerId(),
170            updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
171            session.getReadableRemoteAddress(),
172            handler.getGenerationId(),
173            replicationServerDomain.getGenerationId());
174        return true;
175      }
176      else if (dsStatus == FULL_UPDATE_STATUS)
177      {
178        logger.warn(WARN_IGNORING_UPDATE_TO_DS_FULLUP, handler.getReplicationServerId(),
179            updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
180            session.getReadableRemoteAddress());
181        return true;
182      }
183    }
184    else
185    {
186      /**
187       * Ignore updates to RS with bad gen id
188       * (no system managed status for a RS)
189       */
190      final long referenceGenerationId = replicationServerDomain.getGenerationId();
191      if (referenceGenerationId != handler.getGenerationId()
192          || referenceGenerationId == -1 || handler.getGenerationId() == -1)
193      {
194        logger.error(WARN_IGNORING_UPDATE_TO_RS,
195            handler.getReplicationServerId(),
196            updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(),
197            session.getReadableRemoteAddress(),
198            handler.getGenerationId(),
199            referenceGenerationId);
200        return true;
201      }
202    }
203    return false;
204  }
205}