001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2013-2015 ForgeRock AS 026 */ 027package org.opends.server.tasks; 028 029import java.util.List; 030 031import org.forgerock.i18n.LocalizableMessage; 032import org.forgerock.i18n.LocalizableMessageBuilder; 033import org.opends.messages.TaskMessages; 034import org.opends.server.backends.task.Task; 035import org.opends.server.backends.task.TaskState; 036import org.forgerock.i18n.slf4j.LocalizedLogger; 037import org.opends.server.replication.common.CSN; 038import org.opends.server.replication.plugin.LDAPReplicationDomain; 039import org.opends.server.types.*; 040import org.forgerock.opendj.ldap.ResultCode; 041import org.opends.server.util.TimeThread; 042 043import static org.opends.server.config.ConfigConstants.*; 044import static org.opends.server.core.DirectoryServer.*; 045 046/** 047 * This class provides an implementation of a Directory Server task that can 048 * be used to purge the replication historical informations stored in the 049 * user entries to solve conflicts. 050 */ 051public class PurgeConflictsHistoricalTask extends Task 052{ 053 /** The default value for the maximum duration of the purge expressed in seconds. */ 054 public static final int DEFAULT_MAX_DURATION = 60 * 60; 055 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 056 057 private String domainString; 058 private LDAPReplicationDomain domain; 059 060 /** 061 * current historical purge delay 062 * <---------------------------------> 063 * -----------------------------------------------------------------> t 064 * | | | 065 * current task task 066 * CSN being purged start date max end date 067 * <------------> 068 * config.purgeMaxDuration 069 * 070 * The task will start purging the change with the oldest CSN found. 071 * The task run as long as : 072 * - the end date (computed from the configured max duration) is not reached 073 * - the CSN purged is oldest than the configured historical purge delay 074 */ 075 private int purgeTaskMaxDurationInSec = DEFAULT_MAX_DURATION; 076 077 private TaskState initState; 078 079 080 private static void debugInfo(String s) 081 { 082 if (logger.isTraceEnabled()) 083 { 084 System.out.println(LocalizableMessage.raw(s)); 085 logger.trace(s); 086 } 087 } 088 089 /** {@inheritDoc} */ 090 @Override 091 public LocalizableMessage getDisplayName() { 092 return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get(); 093 } 094 095 /** {@inheritDoc} */ 096 @Override public void initializeTask() throws DirectoryException 097 { 098 if (TaskState.isDone(getTaskState())) 099 { 100 return; 101 } 102 103 // FIXME -- Do we need any special authorization here? 104 Entry taskEntry = getTaskEntry(); 105 106 AttributeType typeDomainBase = getAttributeTypeOrDefault(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN); 107 List<Attribute> attrList = taskEntry.getAttribute(typeDomainBase); 108 domainString = TaskUtils.getSingleValueString(attrList); 109 110 try 111 { 112 DN dn = DN.valueOf(domainString); 113 // We can assume that this is an LDAP replication domain 114 domain = LDAPReplicationDomain.retrievesReplicationDomain(dn); 115 } 116 catch(DirectoryException e) 117 { 118 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 119 mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get()); 120 mb.append(e.getMessage()); 121 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, mb.toMessage()); 122 } 123 124 AttributeType typeMaxDuration = getAttributeTypeOrDefault(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION); 125 attrList = taskEntry.getAttribute(typeMaxDuration); 126 String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList); 127 128 if (maxDurationStringInSec != null) 129 { 130 try 131 { 132 purgeTaskMaxDurationInSec = Integer.decode(maxDurationStringInSec); 133 } 134 catch(Exception e) 135 { 136 throw new DirectoryException( 137 ResultCode.UNWILLING_TO_PERFORM, 138 TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get( 139 ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage())); 140 } 141 } 142 } 143 144 /** {@inheritDoc} */ 145 @Override 146 protected TaskState runTask() 147 { 148 Boolean purgeCompletedInTime = false; 149 if (logger.isTraceEnabled()) 150 { 151 debugInfo("[PURGE] PurgeConflictsHistoricalTask is starting " 152 + "on domain: " + domain.getBaseDN() 153 + "max duration (sec):" + purgeTaskMaxDurationInSec); 154 } 155 try 156 { 157 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); 158 159 // launch the task 160 domain.purgeConflictsHistorical(this, TimeThread.getTime() + purgeTaskMaxDurationInSec*1000); 161 162 purgeCompletedInTime = true; 163 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); 164 165 initState = TaskState.COMPLETED_SUCCESSFULLY; 166 } 167 catch(DirectoryException de) 168 { 169 debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage()); 170 if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED) 171 { 172 // Error raised at submission time 173 logger.error(de.getMessageObject()); 174 initState = TaskState.STOPPED_BY_ERROR; 175 } 176 else 177 { 178 initState = TaskState.COMPLETED_SUCCESSFULLY; 179 } 180 } 181 finally 182 { 183 try 184 { 185 // sets in the attributes the last stats values 186 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); 187 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI()); 188 debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs "); 189 } 190 catch(Exception e) 191 { 192 debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + e.getLocalizedMessage()); 193 initState = TaskState.STOPPED_BY_ERROR; 194 } 195 } 196 197 if (logger.isTraceEnabled()) 198 { 199 debugInfo("[PURGE] PurgeConflictsHistoricalTask is ending with state:" + initState + 200 " completedInTime:" + purgeCompletedInTime); 201 } 202 return initState; 203 } 204 205 private int updateAttrPeriod; 206 private CSN lastCSN; 207 private int purgeCount; 208 209 /** 210 * Set the last CSN purged and the count of purged values in order to monitor 211 * the historical purge. 212 * 213 * @param lastCSN 214 * the last CSN purged. 215 * @param purgeCount 216 * the count of purged values. 217 */ 218 public void setProgressStats(CSN lastCSN, int purgeCount) 219 { 220 try 221 { 222 if (purgeCount == 0) 223 { 224 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CSN, lastCSN.toStringUI()); 225 } 226 227 // we don't want the update of the task to overload too much task duration 228 this.purgeCount = purgeCount; 229 this.lastCSN = lastCSN; 230 if (++updateAttrPeriod % 100 == 0) 231 { 232 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); 233 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI()); 234 debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs " + purgeCount); 235 } 236 } 237 catch(DirectoryException de) 238 { 239 debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage()); 240 initState = TaskState.STOPPED_BY_ERROR; 241 } 242 } 243}