001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * Copyright 2014-2015 ForgeRock AS 024 */ 025package org.opends.server.replication.server.changelog.file; 026 027import java.util.concurrent.atomic.AtomicReference; 028 029import org.opends.server.replication.common.CSN; 030import org.opends.server.replication.protocol.ReplicaOfflineMsg; 031import org.opends.server.replication.protocol.UpdateMsg; 032import org.opends.server.replication.server.changelog.api.ChangelogException; 033import org.opends.server.replication.server.changelog.api.DBCursor; 034import org.opends.server.replication.server.changelog.api.ReplicaId; 035import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 036 037/** 038 * {@link DBCursor} over a replica returning {@link UpdateMsg}s. 039 * <p> 040 * It decorates an existing {@link DBCursor} on a replicaDB and can possibly 041 * return replica offline messages when the decorated DBCursor is exhausted and 042 * the offline CSN is newer than the last returned update CSN. 043 */ 044public class ReplicaCursor implements DBCursor<UpdateMsg> 045{ 046 /** @NonNull */ 047 private final DBCursor<UpdateMsg> cursor; 048 private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg = new AtomicReference<>(); 049 private UpdateMsg currentRecord; 050 051 private final ReplicaId replicaId; 052 private final ReplicationDomainDB domainDB; 053 054 /** 055 * Creates a ReplicaCursor object with a cursor to decorate 056 * and an offlineCSN to return as part of a ReplicaOfflineMsg. 057 * 058 * @param cursor 059 * the non-null underlying cursor that needs to be exhausted before 060 * we return a ReplicaOfflineMsg 061 * @param offlineCSN 062 * the offline CSN from which to builder the 063 * {@link ReplicaOfflineMsg} to return 064 * @param replicaId 065 * the replica identifier 066 * @param domainDB 067 * the DB for the provided replication domain 068 */ 069 public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN, ReplicaId replicaId, ReplicationDomainDB domainDB) 070 { 071 this.cursor = cursor; 072 this.replicaId = replicaId; 073 this.domainDB = domainDB; 074 setOfflineCSN(offlineCSN); 075 } 076 077 /** 078 * Sets the offline CSN to be returned by this cursor. 079 * 080 * @param offlineCSN 081 * The offline CSN to be returned by this cursor. 082 * If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg 083 */ 084 public void setOfflineCSN(CSN offlineCSN) 085 { 086 this.replicaOfflineMsg.set( 087 offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null); 088 } 089 090 /** {@inheritDoc} */ 091 @Override 092 public UpdateMsg getRecord() 093 { 094 return currentRecord; 095 } 096 097 /** 098 * Returns the replica identifier that this cursor is associated to. 099 * 100 * @return the replica identifier that this cursor is associated to 101 */ 102 public ReplicaId getReplicaId() 103 { 104 return replicaId; 105 } 106 107 /** {@inheritDoc} */ 108 @Override 109 public boolean next() throws ChangelogException 110 { 111 final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get(); 112 if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord)) 113 { 114 replicaOfflineMsg.compareAndSet(offlineMsg1, null); 115 } 116 117 // now verify if new changes have been added to the DB 118 // (cursors are automatically restarted) 119 final UpdateMsg lastUpdate = cursor.getRecord(); 120 final boolean hasNext = cursor.next(); 121 if (hasNext) 122 { 123 currentRecord = cursor.getRecord(); 124 return true; 125 } 126 127 // replicaDB just happened to be exhausted now 128 final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get(); 129 if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate)) 130 { 131 replicaOfflineMsg.compareAndSet(offlineMsg2, null); 132 currentRecord = null; 133 return false; 134 } 135 currentRecord = offlineMsg2; 136 return currentRecord != null; 137 } 138 139 /** It could also mean that the replica offline message has already been consumed. */ 140 private boolean isReplicaOfflineMsgOutdated( 141 final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg) 142 { 143 return offlineMsg != null 144 && updateMsg != null 145 && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN()); 146 } 147 148 /** {@inheritDoc} */ 149 @Override 150 public void close() 151 { 152 cursor.close(); 153 domainDB.unregisterCursor(this); 154 } 155 156 /** {@inheritDoc} */ 157 @Override 158 public String toString() 159 { 160 final ReplicaOfflineMsg msg = replicaOfflineMsg.get(); 161 return getClass().getSimpleName() 162 + " currentRecord=" + currentRecord 163 + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null) 164 + " cursor=" + cursor.toString().split("", 2)[1]; 165 } 166 167}