001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server;
028
029import java.net.SocketException;
030
031import org.forgerock.i18n.LocalizableMessage;
032import org.opends.server.api.DirectoryThread;
033import org.forgerock.i18n.slf4j.LocalizedLogger;
034import org.opends.server.replication.common.ServerStatus;
035import org.opends.server.replication.protocol.*;
036
037import static org.opends.messages.ReplicationMessages.*;
038import static org.opends.server.replication.common.ServerStatus.*;
039import static org.opends.server.util.StaticUtils.*;
040
041/**
042 * This class implement the part of the replicationServer that is reading
043 * the connection from the LDAP servers to get all the updates that
044 * were done on this replica and forward them to other servers.
045 *
046 * A single thread is dedicated to this work.
047 * It waits in a blocking mode on the connection from the LDAP server
048 * and upon receiving an update puts in into the replicationServer cache
049 * from where the other servers will grab it.
050 */
051public class ServerReader extends DirectoryThread
052{
053  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
054  private final Session session;
055  private final ServerHandler handler;
056
057  /**
058   * Constructor for the LDAP server reader part of the replicationServer.
059   *
060   * @param session
061   *          The Session from which to read the data.
062   * @param handler
063   *          The server handler for this server reader.
064   */
065  public ServerReader(Session session, ServerHandler handler)
066  {
067    super("Replication server RS(" + handler.getReplicationServerId()
068        + ") reading from " + handler + " at "
069        + session.getReadableRemoteAddress());
070    this.session = session;
071    this.handler = handler;
072  }
073
074  /**
075   * Create a loop that reads changes and hands them off to be processed.
076   */
077  @Override
078  public void run()
079  {
080    if (logger.isTraceEnabled())
081    {
082      logger.trace(getName() + " starting");
083    }
084    /*
085     * wait on input stream
086     * grab all incoming messages and publish them to the
087     * replicationServerDomain
088     */
089    LocalizableMessage errMessage = null;
090    try
091    {
092      while (true)
093      {
094        try
095        {
096          final ReplicationMsg msg = session.receive();
097
098          if (logger.isTraceEnabled())
099          {
100            logger.trace("In " + getName() + " receives " + msg);
101          }
102
103          if (msg instanceof AckMsg)
104          {
105            handler.checkWindow();
106            handler.processAck((AckMsg) msg);
107          }
108          else if (msg instanceof UpdateMsg)
109          {
110            final UpdateMsg updateMsg = (UpdateMsg) msg;
111            if (!isUpdateMsgFiltered(updateMsg))
112            {
113              handler.put(updateMsg);
114            }
115          }
116          else if (msg instanceof WindowMsg)
117          {
118            handler.updateWindow((WindowMsg) msg);
119          }
120          else if (msg instanceof MonitorRequestMsg)
121          {
122            handler.processMonitorRequestMsg((MonitorRequestMsg) msg);
123          }
124          else if (msg instanceof MonitorMsg)
125          {
126            handler.processMonitorMsg((MonitorMsg) msg);
127          }
128          else if (msg instanceof RoutableMsg)
129          {
130            /*
131             * Note that we handle monitor messages separately since they in
132             * fact never need "routing" and are instead sent directly between
133             * connected peers. Doing so allows us to more clearly decouple
134             * write IO from the reader thread (see OPENDJ-1354).
135             */
136            handler.process((RoutableMsg) msg);
137          }
138          else if (msg instanceof ResetGenerationIdMsg)
139          {
140            handler.processResetGenId((ResetGenerationIdMsg) msg);
141          }
142          else if (msg instanceof WindowProbeMsg)
143          {
144            handler.replyToWindowProbe();
145          }
146          else if (msg instanceof TopologyMsg)
147          {
148            ReplicationServerHandler rsh = (ReplicationServerHandler) handler;
149            rsh.receiveTopoInfoFromRS((TopologyMsg) msg);
150          }
151          else if (msg instanceof ChangeStatusMsg)
152          {
153            ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
154            try
155            {
156              DataServerHandler dsh = (DataServerHandler) handler;
157              dsh.receiveNewStatus(csMsg);
158            }
159            catch (Exception e)
160            {
161              errMessage = ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(
162                  handler.getBaseDN(), handler.getServerId(), csMsg);
163              logger.error(errMessage);
164            }
165          }
166          else if (msg instanceof ChangeTimeHeartbeatMsg)
167          {
168            handler.process((ChangeTimeHeartbeatMsg) msg);
169          }
170          else if (msg instanceof StopMsg)
171          {
172            /*
173             * Peer server is properly disconnecting: go out of here to properly
174             * close the server handler going to finally block.
175             */
176            if (logger.isTraceEnabled())
177            {
178              logger.trace(handler
179                  + " has properly disconnected from this replication server "
180                  + handler.getReplicationServerId());
181            }
182            return;
183          }
184          else if (msg == null)
185          {
186            /*
187             * The remote server has sent an unknown message, close the
188             * connection.
189             */
190            errMessage = NOTE_READER_NULL_MSG.get(handler);
191            logger.info(errMessage);
192            return;
193          }
194        }
195        catch (NotSupportedOldVersionPDUException e)
196        {
197          /*
198           * Received a V1 PDU we do not need to support: we just trash the
199           * message and log the event for debug purpose, then continue
200           * receiving messages.
201           */
202          logException(e);
203        }
204      }
205    }
206    catch (SocketException e)
207    {
208      /*
209       * The connection has been broken Log a message and exit from this loop So
210       * that this handler is stopped.
211       */
212      logException(e);
213      if (!handler.shuttingDown())
214      {
215        errMessage = handler.getBadlyDisconnectedErrorMessage();
216        logger.error(errMessage);
217      }
218    }
219    catch (Exception e)
220    {
221      /*
222       * The remote server has sent an unknown message, close the connection.
223       */
224      errMessage = NOTE_READER_EXCEPTION.get(handler,
225          stackTraceToSingleLineString(e));
226      logger.info(errMessage);
227    }
228    finally
229    {
230      /*
231       * The thread only exits the loop above if some error condition happen.
232       * Attempt to close the socket and stop the server handler.
233       */
234      if (logger.isTraceEnabled())
235      {
236        logger.trace("In " + getName() + " closing the session");
237      }
238      session.close();
239      handler.doStop();
240      if (logger.isTraceEnabled())
241      {
242        logger.trace(getName() + " stopped: " + errMessage);
243      }
244    }
245  }
246
247  /**
248   * Returns whether the update message is filtered in one of those cases:
249   * <ul>
250   * <li>Ignore updates from DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS</li>
251   * <li>Ignore updates from RS with bad gen id</li>
252   * </ul>
253   */
254  private boolean isUpdateMsgFiltered(UpdateMsg updateMsg)
255  {
256    if (handler.isDataServer())
257    {
258      /**
259       * Ignore updates from DS in bad BAD_GENID_STATUS or
260       * FULL_UPDATE_STATUS
261       *
262       * The RSD lock should not be taken here as it is acceptable to
263       * have a delay between the time the server has a wrong status and
264       * the fact we detect it: the updates that succeed to pass during
265       * this time will have no impact on remote server. But it is
266       * interesting to not saturate uselessly the network if the
267       * updates are not necessary so this check to stop sending updates
268       * is interesting anyway. Not taking the RSD lock allows to have
269       * better performances in normal mode (most of the time).
270       */
271      final ServerStatus dsStatus = handler.getStatus();
272      if (dsStatus == BAD_GEN_ID_STATUS)
273      {
274        logger.warn(WARN_IGNORING_UPDATE_FROM_DS_BADGENID,
275            handler.getReplicationServerId(), updateMsg.getCSN(),
276            handler.getBaseDN(), handler.getServerId(),
277            session.getReadableRemoteAddress(),
278            handler.getGenerationId(), handler.getReferenceGenId());
279        return true;
280      }
281      else if (dsStatus == FULL_UPDATE_STATUS)
282      {
283        logger.warn(WARN_IGNORING_UPDATE_FROM_DS_FULLUP,
284            handler.getReplicationServerId(), updateMsg.getCSN(),
285            handler.getBaseDN(), handler.getServerId(),
286            session.getReadableRemoteAddress());
287        return true;
288      }
289    }
290    else
291    {
292      /**
293       * Ignore updates from RS with bad gen id
294       * (no system managed status for a RS)
295       */
296      long referenceGenerationId = handler.getReferenceGenId();
297      if (referenceGenerationId > 0
298          && referenceGenerationId != handler.getGenerationId())
299      {
300        logger.error(WARN_IGNORING_UPDATE_FROM_RS,
301            handler.getReplicationServerId(), updateMsg.getCSN(),
302            handler.getBaseDN(), handler.getServerId(),
303            session.getReadableRemoteAddress(),
304            handler.getGenerationId(), referenceGenerationId);
305        return true;
306      }
307    }
308    return false;
309  }
310
311  private void logException(Exception e)
312  {
313    if (logger.isTraceEnabled())
314    {
315      logger.trace("In " + getName() + " " + stackTraceToSingleLineString(e));
316    }
317  }
318}