001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2009-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.server.changelog.je;
028
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicLong;
033
034import org.forgerock.i18n.slf4j.LocalizedLogger;
035import org.forgerock.opendj.config.server.ConfigException;
036import org.opends.server.admin.std.server.MonitorProviderCfg;
037import org.opends.server.api.MonitorProvider;
038import org.opends.server.core.DirectoryServer;
039import org.opends.server.replication.common.CSN;
040import org.opends.server.replication.server.changelog.api.*;
041import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
042import org.opends.server.types.*;
043
044/**
045 * This class is used for managing the replicationServer database for each
046 * server in the topology. It is responsible for efficiently saving the updates
047 * that is received from each master server into stable storage. This class is
048 * also able to generate a {@link DBCursor} that can be used to read all changes
049 * from a given change number.
050 * <p>
051 * This class publishes some monitoring information below <code>
052 * cn=monitor</code>.
053 */
054public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
055{
056  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
057  private static final int NO_KEY = 0;
058
059  private DraftCNDB db;
060  /**
061   * The newest changenumber stored in the DB. It is used to avoid purging the
062   * record with the newest changenumber. The newest record in the changenumber
063   * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
064   * then retrieved on server startup.
065   */
066  private volatile long newestChangeNumber = NO_KEY;
067  /**
068   * The last generated value for the change number. It is kept separate from
069   * the {@link #newestChangeNumber} because there is an opportunity for a race
070   * condition between:
071   * <ol>
072   * <li>this atomic long being incremented for a new record ('recordB')</li>
073   * <li>the current newest record ('recordA') being purged from the DB</li>
074   * <li>'recordB' failing to be inserted in the DB</li>
075   * </ol>
076   */
077  private final AtomicLong lastGeneratedChangeNumber;
078  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
079  private final AtomicBoolean shutdown = new AtomicBoolean(false);
080
081
082  /**
083   * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
084   *
085   * @param dbEnv the Database Env to use to create the ReplicationServer DB.
086   * server for this domain.
087   * @throws ChangelogException If a database problem happened
088   */
089  public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
090  {
091    db = new DraftCNDB(dbEnv);
092    final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
093    newestChangeNumber = getChangeNumber(newestRecord);
094    // initialization of the lastGeneratedChangeNumber from the DB content
095    // if DB is empty => last record does not exist => default to 0
096    lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
097
098    // Monitoring registration
099    DirectoryServer.deregisterMonitorProvider(dbMonitor);
100    DirectoryServer.registerMonitorProvider(dbMonitor);
101  }
102
103  private long getChangeNumber(ChangeNumberIndexRecord record)
104      throws ChangelogException
105  {
106    if (record != null)
107    {
108      return record.getChangeNumber();
109    }
110    return NO_KEY;
111  }
112
113  /** {@inheritDoc} */
114  @Override
115  public long addRecord(ChangeNumberIndexRecord record)
116      throws ChangelogException
117  {
118    long changeNumber = nextChangeNumber();
119    final ChangeNumberIndexRecord newRecord =
120        new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN());
121    db.addRecord(newRecord);
122    newestChangeNumber = changeNumber;
123
124    logger.trace("In JEChangeNumberIndexDB.add, added: %s", newRecord);
125    return changeNumber;
126  }
127
128  /** {@inheritDoc} */
129  @Override
130  public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException
131  {
132    return db.readFirstRecord();
133  }
134
135  /** {@inheritDoc} */
136  @Override
137  public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException
138  {
139    return db.readLastRecord();
140  }
141
142  private long nextChangeNumber()
143  {
144    return lastGeneratedChangeNumber.incrementAndGet();
145  }
146
147  /** {@inheritDoc} */
148  @Override
149  public long getLastGeneratedChangeNumber()
150  {
151    return lastGeneratedChangeNumber.get();
152  }
153
154  /**
155   * Get the number of changes.
156   * @return Returns the number of changes.
157   */
158  public long count()
159  {
160    return db.count();
161  }
162
163  /**
164   * Returns whether this database is empty.
165   *
166   * @return <code>true</code> if this database is empty, <code>false</code>
167   *         otherwise
168   * @throws ChangelogException
169   *           if a database problem occurs.
170   */
171  public boolean isEmpty() throws ChangelogException
172  {
173    return getNewestRecord() == null;
174  }
175
176  /** {@inheritDoc} */
177  @Override
178  public DBCursor<ChangeNumberIndexRecord> getCursorFrom(long startChangeNumber)
179      throws ChangelogException
180  {
181    return new JEChangeNumberIndexDBCursor(db, startChangeNumber);
182  }
183
184  /**
185   * Shutdown this DB.
186   */
187  public void shutdown()
188  {
189    if (shutdown.compareAndSet(false, true))
190    {
191      db.shutdown();
192      DirectoryServer.deregisterMonitorProvider(dbMonitor);
193    }
194  }
195
196  /**
197   * Synchronously purges the change number index DB up to and excluding the
198   * provided timestamp.
199   *
200   * @param purgeCSN
201   *          the timestamp up to which purging must happen
202   * @return the oldest non purged CSN.
203   * @throws ChangelogException
204   *           if a database problem occurs.
205   */
206  public CSN purgeUpTo(CSN purgeCSN) throws ChangelogException
207  {
208    if (isEmpty() || purgeCSN == null)
209    {
210      return null;
211    }
212
213    final DraftCNDBCursor cursor = db.openDeleteCursor();
214    try
215    {
216      while (!mustShutdown(shutdown) && cursor.next())
217      {
218        final ChangeNumberIndexRecord record = cursor.currentRecord();
219        if (record.getChangeNumber() != newestChangeNumber
220            && record.getCSN().isOlderThan(purgeCSN))
221        {
222          cursor.delete();
223        }
224        else
225        {
226          // 1- Current record is not old enough to purge.
227          // 2- Do not purge the newest record to avoid having the last
228          // generated changenumber dropping back to 0 when the server restarts
229          return record.getCSN();
230        }
231      }
232
233      return null;
234    }
235    catch (ChangelogException e)
236    {
237      cursor.abort();
238      throw e;
239    }
240    catch (Exception e)
241    {
242      cursor.abort();
243      throw new ChangelogException(e);
244    }
245    finally
246    {
247      cursor.close();
248    }
249  }
250
251  /**
252   * Clear the changes from this DB (from both memory cache and DB storage) for
253   * the provided baseDN.
254   *
255   * @param baseDNToClear
256   *          The baseDN for which we want to remove all records from this DB,
257   *          null means all.
258   * @throws ChangelogException
259   *           if a database problem occurs.
260   */
261  public void removeDomain(DN baseDNToClear) throws ChangelogException
262  {
263    if (isEmpty())
264    {
265      return;
266    }
267
268    final DraftCNDBCursor cursor = db.openDeleteCursor();
269    try
270    {
271      while (!mustShutdown(shutdown) && cursor.next())
272      {
273        final ChangeNumberIndexRecord record = cursor.currentRecord();
274        if (record.getChangeNumber() == newestChangeNumber)
275        {
276          // do not purge the newest record to avoid having the last generated
277          // changenumber dropping back to 0 if the server restarts
278          return;
279        }
280
281        if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
282        {
283          cursor.delete();
284        }
285      }
286    }
287    catch (ChangelogException e)
288    {
289      cursor.abort();
290      throw e;
291    }
292    finally
293    {
294      cursor.close();
295    }
296  }
297
298  private boolean mustShutdown(AtomicBoolean shutdown)
299  {
300    return shutdown != null && shutdown.get();
301  }
302
303  /**
304   * This internal class is used to implement the Monitoring capabilities of the
305   * JEChangeNumberIndexDB.
306   */
307  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
308  {
309    /** {@inheritDoc} */
310    @Override
311    public List<Attribute> getMonitorData()
312    {
313      List<Attribute> attributes = new ArrayList<>();
314      attributes.add(createChangeNumberAttribute(true));
315      attributes.add(createChangeNumberAttribute(false));
316      attributes.add(Attributes.create("count", Long.toString(count())));
317      return attributes;
318    }
319
320    private Attribute createChangeNumberAttribute(boolean isFirst)
321    {
322      final String attributeName =
323          isFirst ? "first-draft-changenumber" : "last-draft-changenumber";
324      final String changeNumber = String.valueOf(readChangeNumber(isFirst));
325      return Attributes.create(attributeName, changeNumber);
326    }
327
328    private long readChangeNumber(boolean isFirst)
329    {
330      try
331      {
332        return getChangeNumber(
333            isFirst ? db.readFirstRecord() : db.readLastRecord());
334      }
335      catch (ChangelogException e)
336      {
337        logger.traceException(e);
338        return NO_KEY;
339      }
340    }
341
342    /** {@inheritDoc} */
343    @Override
344    public String getMonitorInstanceName()
345    {
346      return "ChangeNumber Index Database";
347    }
348
349    /** {@inheritDoc} */
350    @Override
351    public void initializeMonitorProvider(MonitorProviderCfg configuration)
352                            throws ConfigException,InitializationException
353    {
354      // Nothing to do for now
355    }
356  }
357
358  /** {@inheritDoc} */
359  @Override
360  public String toString()
361  {
362    return getClass().getSimpleName() + ", newestChangeNumber="
363        + newestChangeNumber;
364  }
365
366  /**
367   * Clear the changes from this DB (from both memory cache and DB storage).
368   *
369   * @throws ChangelogException
370   *           if a database problem occurs.
371   */
372  public void clear() throws ChangelogException
373  {
374    db.clear();
375    newestChangeNumber = NO_KEY;
376  }
377
378}