001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2012-2015 ForgeRock AS.
026 */
027package org.opends.server.replication.server;
028
029import java.util.TreeMap;
030
031import org.forgerock.i18n.slf4j.LocalizedLogger;
032import org.opends.server.replication.common.CSN;
033import org.opends.server.replication.protocol.UpdateMsg;
034
035import static org.opends.messages.ReplicationMessages.*;
036
037/**
038 * This class is used to build ordered lists of UpdateMsg.
039 * The order is defined by the order of the CSN of the UpdateMsg.
040 * @ThreadSafe
041 */
042public class MsgQueue
043{
044  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
045
046  private TreeMap<CSN, UpdateMsg> map = new TreeMap<>();
047  /**
048   * FIXME JNR to be investigated:
049   * I strongly suspect that we could replace this field
050   * by using the synchronized keyword on each method.
051   * However, MessageHandler is weirdly synchronizing on msgQueue field
052   * even though it is touching the lateQueue field (?!?).
053   */
054  private final Object lock = new Object();
055
056  /** The total number of bytes for all the message in the queue. */
057  private int bytesCount;
058
059  /**
060   * Return the first UpdateMsg in the MsgQueue.
061   *
062   * @return The first UpdateMsg in the MsgQueue.
063   */
064  public UpdateMsg first()
065  {
066    synchronized (lock)
067    {
068      return map.get(map.firstKey());
069    }
070  }
071
072  /**
073   * Returns the number of elements in this MsgQueue.
074   *
075   * @return The number of elements in this MsgQueue.
076   */
077  public int count()
078  {
079    synchronized (lock)
080    {
081      return map.size();
082    }
083  }
084
085  /**
086   * Returns the number of bytes in this MsgQueue.
087   *
088   * @return The number of bytes in this MsgQueue.
089   */
090  public int bytesCount()
091  {
092    synchronized (lock)
093    {
094      return bytesCount;
095    }
096  }
097
098  /**
099   * Returns <tt>true</tt> if this MsgQueue contains no UpdateMsg.
100   *
101   * @return <tt>true</tt> if this MsgQueue contains no UpdateMsg.
102   */
103  public boolean isEmpty()
104  {
105    synchronized (lock)
106    {
107      return map.isEmpty();
108    }
109  }
110
111  /**
112   * Add an UpdateMsg to this MessageQueue.
113   *
114   * @param update The UpdateMsg to add to this MessageQueue.
115   */
116  public void add(UpdateMsg update)
117  {
118    synchronized (lock)
119    {
120      final UpdateMsg msgSameCSN = map.put(update.getCSN(), update);
121      if (msgSameCSN != null)
122      {
123        try
124        {
125          if (msgSameCSN.getBytes().length != update.getBytes().length
126              || msgSameCSN.isAssured() != update.isAssured()
127              || msgSameCSN.getVersion() != update.getVersion())
128          {
129            // Adding 2 msgs with the same CSN is ok only when the 2 msgs are the same
130            bytesCount += update.size() - msgSameCSN.size();
131            logger.error(ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CSN, msgSameCSN.getCSN(), msgSameCSN, update);
132          }
133        }
134        catch (Exception e)
135        {
136          logger.traceException(e);
137        }
138      }
139      else
140      {
141        // it is really an ADD
142        bytesCount += update.size();
143      }
144    }
145  }
146
147  /**
148   * Get and remove the first UpdateMsg in this MessageQueue.
149   *
150   * @return The first UpdateMsg in this MessageQueue.
151   */
152  public UpdateMsg removeFirst()
153  {
154    synchronized (lock)
155    {
156      // FIXME JNR replace next 2 lines with just that one:
157      // final UpdateMsg update = map.pollFirstEntry().getValue();
158      final UpdateMsg update = map.get(map.firstKey());
159      map.remove(update.getCSN());
160      bytesCount -= update.size();
161      if (map.isEmpty() && bytesCount != 0)
162      {
163        // should never happen
164        logger.error(ERR_BYTE_COUNT, bytesCount);
165        bytesCount = 0;
166      }
167      return update;
168    }
169  }
170
171  /**
172   * Returns <tt>true</tt> if this map contains an UpdateMsg
173   * with the same CSN as the given UpdateMsg.
174   *
175   * @param msg UpdateMsg whose presence in this queue is to be tested.
176   *
177   * @return <tt>true</tt> if this map contains an UpdateMsg
178   *         with the same CSN as the given UpdateMsg.
179   */
180  public boolean contains(UpdateMsg msg)
181  {
182    synchronized (lock)
183    {
184      return map.containsKey(msg.getCSN());
185    }
186  }
187
188  /** Removes all UpdateMsg form this queue. */
189  public void clear()
190  {
191    synchronized (lock)
192    {
193      map.clear();
194      bytesCount = 0;
195    }
196  }
197
198  /**
199   * Consumes all the messages in this queue up to and including the passed in
200   * message. If the passed in message is not contained in the current queue,
201   * then all messages will be removed from it.
202   *
203   * @param finalMsg
204   *          the final message to reach when consuming messages from this queue
205   */
206  public void consumeUpTo(UpdateMsg finalMsg)
207  {
208    // FIXME this code could be more efficient if the msgQueue could call the
209    // following code (to be tested):
210    // if (!map.containsKey(finalMsg.getCSN())) {
211    // map.clear();
212    // } else {
213    // map.headMap(finalMsg.getCSN(), true).clear();
214    // }
215
216    final CSN finalCSN = finalMsg.getCSN();
217    UpdateMsg msg;
218    do
219    {
220      msg = removeFirst();
221    }
222    while (!finalCSN.equals(msg.getCSN()));
223  }
224
225  @Override
226  public String toString()
227  {
228    return getClass().getSimpleName() + " bytesCount=" + bytesCount + " queue=" + map.values();
229  }
230}