001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2009-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.server.changelog.je; 028 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicLong; 033 034import org.forgerock.i18n.slf4j.LocalizedLogger; 035import org.forgerock.opendj.config.server.ConfigException; 036import org.opends.server.admin.std.server.MonitorProviderCfg; 037import org.opends.server.api.MonitorProvider; 038import org.opends.server.core.DirectoryServer; 039import org.opends.server.replication.common.CSN; 040import org.opends.server.replication.server.changelog.api.*; 041import org.opends.server.replication.server.changelog.je.DraftCNDB.*; 042import org.opends.server.types.*; 043 044/** 045 * This class is used for managing the replicationServer database for each 046 * server in the topology. It is responsible for efficiently saving the updates 047 * that is received from each master server into stable storage. This class is 048 * also able to generate a {@link DBCursor} that can be used to read all changes 049 * from a given change number. 050 * <p> 051 * This class publishes some monitoring information below <code> 052 * cn=monitor</code>. 053 */ 054public class JEChangeNumberIndexDB implements ChangeNumberIndexDB 055{ 056 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 057 private static final int NO_KEY = 0; 058 059 private DraftCNDB db; 060 /** 061 * The newest changenumber stored in the DB. It is used to avoid purging the 062 * record with the newest changenumber. The newest record in the changenumber 063 * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is 064 * then retrieved on server startup. 065 */ 066 private volatile long newestChangeNumber = NO_KEY; 067 /** 068 * The last generated value for the change number. It is kept separate from 069 * the {@link #newestChangeNumber} because there is an opportunity for a race 070 * condition between: 071 * <ol> 072 * <li>this atomic long being incremented for a new record ('recordB')</li> 073 * <li>the current newest record ('recordA') being purged from the DB</li> 074 * <li>'recordB' failing to be inserted in the DB</li> 075 * </ol> 076 */ 077 private final AtomicLong lastGeneratedChangeNumber; 078 private DbMonitorProvider dbMonitor = new DbMonitorProvider(); 079 private final AtomicBoolean shutdown = new AtomicBoolean(false); 080 081 082 /** 083 * Creates a new JEChangeNumberIndexDB associated to a given LDAP server. 084 * 085 * @param dbEnv the Database Env to use to create the ReplicationServer DB. 086 * server for this domain. 087 * @throws ChangelogException If a database problem happened 088 */ 089 public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException 090 { 091 db = new DraftCNDB(dbEnv); 092 final ChangeNumberIndexRecord newestRecord = db.readLastRecord(); 093 newestChangeNumber = getChangeNumber(newestRecord); 094 // initialization of the lastGeneratedChangeNumber from the DB content 095 // if DB is empty => last record does not exist => default to 0 096 lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber); 097 098 // Monitoring registration 099 DirectoryServer.deregisterMonitorProvider(dbMonitor); 100 DirectoryServer.registerMonitorProvider(dbMonitor); 101 } 102 103 private long getChangeNumber(ChangeNumberIndexRecord record) 104 throws ChangelogException 105 { 106 if (record != null) 107 { 108 return record.getChangeNumber(); 109 } 110 return NO_KEY; 111 } 112 113 /** {@inheritDoc} */ 114 @Override 115 public long addRecord(ChangeNumberIndexRecord record) 116 throws ChangelogException 117 { 118 long changeNumber = nextChangeNumber(); 119 final ChangeNumberIndexRecord newRecord = 120 new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN()); 121 db.addRecord(newRecord); 122 newestChangeNumber = changeNumber; 123 124 logger.trace("In JEChangeNumberIndexDB.add, added: %s", newRecord); 125 return changeNumber; 126 } 127 128 /** {@inheritDoc} */ 129 @Override 130 public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException 131 { 132 return db.readFirstRecord(); 133 } 134 135 /** {@inheritDoc} */ 136 @Override 137 public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException 138 { 139 return db.readLastRecord(); 140 } 141 142 private long nextChangeNumber() 143 { 144 return lastGeneratedChangeNumber.incrementAndGet(); 145 } 146 147 /** {@inheritDoc} */ 148 @Override 149 public long getLastGeneratedChangeNumber() 150 { 151 return lastGeneratedChangeNumber.get(); 152 } 153 154 /** 155 * Get the number of changes. 156 * @return Returns the number of changes. 157 */ 158 public long count() 159 { 160 return db.count(); 161 } 162 163 /** 164 * Returns whether this database is empty. 165 * 166 * @return <code>true</code> if this database is empty, <code>false</code> 167 * otherwise 168 * @throws ChangelogException 169 * if a database problem occurs. 170 */ 171 public boolean isEmpty() throws ChangelogException 172 { 173 return getNewestRecord() == null; 174 } 175 176 /** {@inheritDoc} */ 177 @Override 178 public DBCursor<ChangeNumberIndexRecord> getCursorFrom(long startChangeNumber) 179 throws ChangelogException 180 { 181 return new JEChangeNumberIndexDBCursor(db, startChangeNumber); 182 } 183 184 /** 185 * Shutdown this DB. 186 */ 187 public void shutdown() 188 { 189 if (shutdown.compareAndSet(false, true)) 190 { 191 db.shutdown(); 192 DirectoryServer.deregisterMonitorProvider(dbMonitor); 193 } 194 } 195 196 /** 197 * Synchronously purges the change number index DB up to and excluding the 198 * provided timestamp. 199 * 200 * @param purgeCSN 201 * the timestamp up to which purging must happen 202 * @return the oldest non purged CSN. 203 * @throws ChangelogException 204 * if a database problem occurs. 205 */ 206 public CSN purgeUpTo(CSN purgeCSN) throws ChangelogException 207 { 208 if (isEmpty() || purgeCSN == null) 209 { 210 return null; 211 } 212 213 final DraftCNDBCursor cursor = db.openDeleteCursor(); 214 try 215 { 216 while (!mustShutdown(shutdown) && cursor.next()) 217 { 218 final ChangeNumberIndexRecord record = cursor.currentRecord(); 219 if (record.getChangeNumber() != newestChangeNumber 220 && record.getCSN().isOlderThan(purgeCSN)) 221 { 222 cursor.delete(); 223 } 224 else 225 { 226 // 1- Current record is not old enough to purge. 227 // 2- Do not purge the newest record to avoid having the last 228 // generated changenumber dropping back to 0 when the server restarts 229 return record.getCSN(); 230 } 231 } 232 233 return null; 234 } 235 catch (ChangelogException e) 236 { 237 cursor.abort(); 238 throw e; 239 } 240 catch (Exception e) 241 { 242 cursor.abort(); 243 throw new ChangelogException(e); 244 } 245 finally 246 { 247 cursor.close(); 248 } 249 } 250 251 /** 252 * Clear the changes from this DB (from both memory cache and DB storage) for 253 * the provided baseDN. 254 * 255 * @param baseDNToClear 256 * The baseDN for which we want to remove all records from this DB, 257 * null means all. 258 * @throws ChangelogException 259 * if a database problem occurs. 260 */ 261 public void removeDomain(DN baseDNToClear) throws ChangelogException 262 { 263 if (isEmpty()) 264 { 265 return; 266 } 267 268 final DraftCNDBCursor cursor = db.openDeleteCursor(); 269 try 270 { 271 while (!mustShutdown(shutdown) && cursor.next()) 272 { 273 final ChangeNumberIndexRecord record = cursor.currentRecord(); 274 if (record.getChangeNumber() == newestChangeNumber) 275 { 276 // do not purge the newest record to avoid having the last generated 277 // changenumber dropping back to 0 if the server restarts 278 return; 279 } 280 281 if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear)) 282 { 283 cursor.delete(); 284 } 285 } 286 } 287 catch (ChangelogException e) 288 { 289 cursor.abort(); 290 throw e; 291 } 292 finally 293 { 294 cursor.close(); 295 } 296 } 297 298 private boolean mustShutdown(AtomicBoolean shutdown) 299 { 300 return shutdown != null && shutdown.get(); 301 } 302 303 /** 304 * This internal class is used to implement the Monitoring capabilities of the 305 * JEChangeNumberIndexDB. 306 */ 307 private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> 308 { 309 /** {@inheritDoc} */ 310 @Override 311 public List<Attribute> getMonitorData() 312 { 313 List<Attribute> attributes = new ArrayList<>(); 314 attributes.add(createChangeNumberAttribute(true)); 315 attributes.add(createChangeNumberAttribute(false)); 316 attributes.add(Attributes.create("count", Long.toString(count()))); 317 return attributes; 318 } 319 320 private Attribute createChangeNumberAttribute(boolean isFirst) 321 { 322 final String attributeName = 323 isFirst ? "first-draft-changenumber" : "last-draft-changenumber"; 324 final String changeNumber = String.valueOf(readChangeNumber(isFirst)); 325 return Attributes.create(attributeName, changeNumber); 326 } 327 328 private long readChangeNumber(boolean isFirst) 329 { 330 try 331 { 332 return getChangeNumber( 333 isFirst ? db.readFirstRecord() : db.readLastRecord()); 334 } 335 catch (ChangelogException e) 336 { 337 logger.traceException(e); 338 return NO_KEY; 339 } 340 } 341 342 /** {@inheritDoc} */ 343 @Override 344 public String getMonitorInstanceName() 345 { 346 return "ChangeNumber Index Database"; 347 } 348 349 /** {@inheritDoc} */ 350 @Override 351 public void initializeMonitorProvider(MonitorProviderCfg configuration) 352 throws ConfigException,InitializationException 353 { 354 // Nothing to do for now 355 } 356 } 357 358 /** {@inheritDoc} */ 359 @Override 360 public String toString() 361 { 362 return getClass().getSimpleName() + ", newestChangeNumber=" 363 + newestChangeNumber; 364 } 365 366 /** 367 * Clear the changes from this DB (from both memory cache and DB storage). 368 * 369 * @throws ChangelogException 370 * if a database problem occurs. 371 */ 372 public void clear() throws ChangelogException 373 { 374 db.clear(); 375 newestChangeNumber = NO_KEY; 376 } 377 378}