001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *      Copyright 2014-2015 ForgeRock AS
024 */
025package org.opends.server.replication.server.changelog.file;
026
027import java.util.concurrent.atomic.AtomicReference;
028
029import org.opends.server.replication.common.CSN;
030import org.opends.server.replication.protocol.ReplicaOfflineMsg;
031import org.opends.server.replication.protocol.UpdateMsg;
032import org.opends.server.replication.server.changelog.api.ChangelogException;
033import org.opends.server.replication.server.changelog.api.DBCursor;
034import org.opends.server.replication.server.changelog.api.ReplicaId;
035import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
036
037/**
038 * {@link DBCursor} over a replica returning {@link UpdateMsg}s.
039 * <p>
040 * It decorates an existing {@link DBCursor} on a replicaDB and can possibly
041 * return replica offline messages when the decorated DBCursor is exhausted and
042 * the offline CSN is newer than the last returned update CSN.
043 */
044public class ReplicaCursor implements DBCursor<UpdateMsg>
045{
046  /** @NonNull */
047  private final DBCursor<UpdateMsg> cursor;
048  private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg = new AtomicReference<>();
049  private UpdateMsg currentRecord;
050
051  private final ReplicaId replicaId;
052  private final ReplicationDomainDB domainDB;
053
054  /**
055   * Creates a ReplicaCursor object with a cursor to decorate
056   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
057   *
058   * @param cursor
059   *          the non-null underlying cursor that needs to be exhausted before
060   *          we return a ReplicaOfflineMsg
061   * @param offlineCSN
062   *          the offline CSN from which to builder the
063   *          {@link ReplicaOfflineMsg} to return
064   * @param replicaId
065   *          the replica identifier
066   * @param domainDB
067   *          the DB for the provided replication domain
068   */
069  public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN, ReplicaId replicaId, ReplicationDomainDB domainDB)
070  {
071    this.cursor = cursor;
072    this.replicaId = replicaId;
073    this.domainDB = domainDB;
074    setOfflineCSN(offlineCSN);
075  }
076
077  /**
078   * Sets the offline CSN to be returned by this cursor.
079   *
080   * @param offlineCSN
081   *          The offline CSN to be returned by this cursor.
082   *          If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg
083   */
084  public void setOfflineCSN(CSN offlineCSN)
085  {
086    this.replicaOfflineMsg.set(
087        offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null);
088  }
089
090  /** {@inheritDoc} */
091  @Override
092  public UpdateMsg getRecord()
093  {
094    return currentRecord;
095  }
096
097  /**
098   * Returns the replica identifier that this cursor is associated to.
099   *
100   * @return the replica identifier that this cursor is associated to
101   */
102  public ReplicaId getReplicaId()
103  {
104    return replicaId;
105  }
106
107  /** {@inheritDoc} */
108  @Override
109  public boolean next() throws ChangelogException
110  {
111    final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get();
112    if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord))
113    {
114      replicaOfflineMsg.compareAndSet(offlineMsg1, null);
115    }
116
117    // now verify if new changes have been added to the DB
118    // (cursors are automatically restarted)
119    final UpdateMsg lastUpdate = cursor.getRecord();
120    final boolean hasNext = cursor.next();
121    if (hasNext)
122    {
123      currentRecord = cursor.getRecord();
124      return true;
125    }
126
127    // replicaDB just happened to be exhausted now
128    final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get();
129    if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate))
130    {
131      replicaOfflineMsg.compareAndSet(offlineMsg2, null);
132      currentRecord = null;
133      return false;
134    }
135    currentRecord = offlineMsg2;
136    return currentRecord != null;
137  }
138
139  /** It could also mean that the replica offline message has already been consumed. */
140  private boolean isReplicaOfflineMsgOutdated(
141      final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg)
142  {
143    return offlineMsg != null
144        && updateMsg != null
145        && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN());
146  }
147
148  /** {@inheritDoc} */
149  @Override
150  public void close()
151  {
152    cursor.close();
153    domainDB.unregisterCursor(this);
154  }
155
156  /** {@inheritDoc} */
157  @Override
158  public String toString()
159  {
160    final ReplicaOfflineMsg msg = replicaOfflineMsg.get();
161    return getClass().getSimpleName()
162        + " currentRecord=" + currentRecord
163        + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null)
164        + " cursor=" + cursor.toString().split("", 2)[1];
165  }
166
167}