001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2012-2015 ForgeRock AS. 026 */ 027package org.opends.server.replication.server; 028 029import java.util.TreeMap; 030 031import org.forgerock.i18n.slf4j.LocalizedLogger; 032import org.opends.server.replication.common.CSN; 033import org.opends.server.replication.protocol.UpdateMsg; 034 035import static org.opends.messages.ReplicationMessages.*; 036 037/** 038 * This class is used to build ordered lists of UpdateMsg. 039 * The order is defined by the order of the CSN of the UpdateMsg. 040 * @ThreadSafe 041 */ 042public class MsgQueue 043{ 044 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 045 046 private TreeMap<CSN, UpdateMsg> map = new TreeMap<>(); 047 /** 048 * FIXME JNR to be investigated: 049 * I strongly suspect that we could replace this field 050 * by using the synchronized keyword on each method. 051 * However, MessageHandler is weirdly synchronizing on msgQueue field 052 * even though it is touching the lateQueue field (?!?). 053 */ 054 private final Object lock = new Object(); 055 056 /** The total number of bytes for all the message in the queue. */ 057 private int bytesCount; 058 059 /** 060 * Return the first UpdateMsg in the MsgQueue. 061 * 062 * @return The first UpdateMsg in the MsgQueue. 063 */ 064 public UpdateMsg first() 065 { 066 synchronized (lock) 067 { 068 return map.get(map.firstKey()); 069 } 070 } 071 072 /** 073 * Returns the number of elements in this MsgQueue. 074 * 075 * @return The number of elements in this MsgQueue. 076 */ 077 public int count() 078 { 079 synchronized (lock) 080 { 081 return map.size(); 082 } 083 } 084 085 /** 086 * Returns the number of bytes in this MsgQueue. 087 * 088 * @return The number of bytes in this MsgQueue. 089 */ 090 public int bytesCount() 091 { 092 synchronized (lock) 093 { 094 return bytesCount; 095 } 096 } 097 098 /** 099 * Returns <tt>true</tt> if this MsgQueue contains no UpdateMsg. 100 * 101 * @return <tt>true</tt> if this MsgQueue contains no UpdateMsg. 102 */ 103 public boolean isEmpty() 104 { 105 synchronized (lock) 106 { 107 return map.isEmpty(); 108 } 109 } 110 111 /** 112 * Add an UpdateMsg to this MessageQueue. 113 * 114 * @param update The UpdateMsg to add to this MessageQueue. 115 */ 116 public void add(UpdateMsg update) 117 { 118 synchronized (lock) 119 { 120 final UpdateMsg msgSameCSN = map.put(update.getCSN(), update); 121 if (msgSameCSN != null) 122 { 123 try 124 { 125 if (msgSameCSN.getBytes().length != update.getBytes().length 126 || msgSameCSN.isAssured() != update.isAssured() 127 || msgSameCSN.getVersion() != update.getVersion()) 128 { 129 // Adding 2 msgs with the same CSN is ok only when the 2 msgs are the same 130 bytesCount += update.size() - msgSameCSN.size(); 131 logger.error(ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CSN, msgSameCSN.getCSN(), msgSameCSN, update); 132 } 133 } 134 catch (Exception e) 135 { 136 logger.traceException(e); 137 } 138 } 139 else 140 { 141 // it is really an ADD 142 bytesCount += update.size(); 143 } 144 } 145 } 146 147 /** 148 * Get and remove the first UpdateMsg in this MessageQueue. 149 * 150 * @return The first UpdateMsg in this MessageQueue. 151 */ 152 public UpdateMsg removeFirst() 153 { 154 synchronized (lock) 155 { 156 // FIXME JNR replace next 2 lines with just that one: 157 // final UpdateMsg update = map.pollFirstEntry().getValue(); 158 final UpdateMsg update = map.get(map.firstKey()); 159 map.remove(update.getCSN()); 160 bytesCount -= update.size(); 161 if (map.isEmpty() && bytesCount != 0) 162 { 163 // should never happen 164 logger.error(ERR_BYTE_COUNT, bytesCount); 165 bytesCount = 0; 166 } 167 return update; 168 } 169 } 170 171 /** 172 * Returns <tt>true</tt> if this map contains an UpdateMsg 173 * with the same CSN as the given UpdateMsg. 174 * 175 * @param msg UpdateMsg whose presence in this queue is to be tested. 176 * 177 * @return <tt>true</tt> if this map contains an UpdateMsg 178 * with the same CSN as the given UpdateMsg. 179 */ 180 public boolean contains(UpdateMsg msg) 181 { 182 synchronized (lock) 183 { 184 return map.containsKey(msg.getCSN()); 185 } 186 } 187 188 /** Removes all UpdateMsg form this queue. */ 189 public void clear() 190 { 191 synchronized (lock) 192 { 193 map.clear(); 194 bytesCount = 0; 195 } 196 } 197 198 /** 199 * Consumes all the messages in this queue up to and including the passed in 200 * message. If the passed in message is not contained in the current queue, 201 * then all messages will be removed from it. 202 * 203 * @param finalMsg 204 * the final message to reach when consuming messages from this queue 205 */ 206 public void consumeUpTo(UpdateMsg finalMsg) 207 { 208 // FIXME this code could be more efficient if the msgQueue could call the 209 // following code (to be tested): 210 // if (!map.containsKey(finalMsg.getCSN())) { 211 // map.clear(); 212 // } else { 213 // map.headMap(finalMsg.getCSN(), true).clear(); 214 // } 215 216 final CSN finalCSN = finalMsg.getCSN(); 217 UpdateMsg msg; 218 do 219 { 220 msg = removeFirst(); 221 } 222 while (!finalCSN.equals(msg.getCSN())); 223 } 224 225 @Override 226 public String toString() 227 { 228 return getClass().getSimpleName() + " bytesCount=" + bytesCount + " queue=" + map.values(); 229 } 230}