001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2013-2015 ForgeRock AS
026 */
027package org.opends.server.tasks;
028
029import java.util.List;
030
031import org.forgerock.i18n.LocalizableMessage;
032import org.forgerock.i18n.LocalizableMessageBuilder;
033import org.opends.messages.TaskMessages;
034import org.opends.server.backends.task.Task;
035import org.opends.server.backends.task.TaskState;
036import org.forgerock.i18n.slf4j.LocalizedLogger;
037import org.opends.server.replication.common.CSN;
038import org.opends.server.replication.plugin.LDAPReplicationDomain;
039import org.opends.server.types.*;
040import org.forgerock.opendj.ldap.ResultCode;
041import org.opends.server.util.TimeThread;
042
043import static org.opends.server.config.ConfigConstants.*;
044import static org.opends.server.core.DirectoryServer.*;
045
046/**
047 * This class provides an implementation of a Directory Server task that can
048 * be used to purge the replication historical informations stored in the
049 * user entries to solve conflicts.
050 */
051public class PurgeConflictsHistoricalTask extends Task
052{
053  /** The default value for the maximum duration of the purge expressed in seconds. */
054  public static final int DEFAULT_MAX_DURATION = 60 * 60;
055  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
056
057  private String domainString;
058  private LDAPReplicationDomain domain;
059
060  /**
061   *                 current historical purge delay
062   *                <--------------------------------->
063   * -----------------------------------------------------------------> t
064   *               |                           |            |
065   *           current                      task           task
066   *           CSN being purged           start date    max end date
067   *                                           <------------>
068   *                                          config.purgeMaxDuration
069   *
070   * The task will start purging the change with the oldest CSN found.
071   * The task run as long as :
072   *  - the end date (computed from the configured max duration) is not reached
073   *  - the CSN purged is oldest than the configured historical purge delay
074   */
075  private int purgeTaskMaxDurationInSec = DEFAULT_MAX_DURATION;
076
077  private TaskState initState;
078
079
080  private static void debugInfo(String s)
081  {
082    if (logger.isTraceEnabled())
083    {
084      System.out.println(LocalizableMessage.raw(s));
085      logger.trace(s);
086    }
087  }
088
089  /** {@inheritDoc} */
090  @Override
091  public LocalizableMessage getDisplayName() {
092    return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get();
093  }
094
095  /** {@inheritDoc} */
096  @Override public void initializeTask() throws DirectoryException
097  {
098    if (TaskState.isDone(getTaskState()))
099    {
100      return;
101    }
102
103    // FIXME -- Do we need any special authorization here?
104    Entry taskEntry = getTaskEntry();
105
106    AttributeType typeDomainBase = getAttributeTypeOrDefault(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN);
107    List<Attribute> attrList = taskEntry.getAttribute(typeDomainBase);
108    domainString = TaskUtils.getSingleValueString(attrList);
109
110    try
111    {
112      DN dn = DN.valueOf(domainString);
113      // We can assume that this is an LDAP replication domain
114      domain = LDAPReplicationDomain.retrievesReplicationDomain(dn);
115    }
116    catch(DirectoryException e)
117    {
118      LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
119      mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get());
120      mb.append(e.getMessage());
121      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, mb.toMessage());
122    }
123
124    AttributeType typeMaxDuration = getAttributeTypeOrDefault(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION);
125    attrList = taskEntry.getAttribute(typeMaxDuration);
126    String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList);
127
128    if (maxDurationStringInSec != null)
129    {
130      try
131      {
132        purgeTaskMaxDurationInSec = Integer.decode(maxDurationStringInSec);
133      }
134      catch(Exception e)
135      {
136        throw new DirectoryException(
137            ResultCode.UNWILLING_TO_PERFORM,
138            TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get(
139        ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage()));
140      }
141    }
142  }
143
144  /** {@inheritDoc} */
145  @Override
146  protected TaskState runTask()
147  {
148    Boolean purgeCompletedInTime = false;
149    if (logger.isTraceEnabled())
150    {
151      debugInfo("[PURGE] PurgeConflictsHistoricalTask is starting "
152          + "on domain: " + domain.getBaseDN()
153          + "max duration (sec):" + purgeTaskMaxDurationInSec);
154    }
155    try
156    {
157      replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString());
158
159      // launch the task
160      domain.purgeConflictsHistorical(this, TimeThread.getTime() + purgeTaskMaxDurationInSec*1000);
161
162      purgeCompletedInTime = true;
163      replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString());
164
165      initState =  TaskState.COMPLETED_SUCCESSFULLY;
166    }
167    catch(DirectoryException de)
168    {
169      debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage());
170      if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED)
171      {
172        // Error raised at submission time
173        logger.error(de.getMessageObject());
174        initState = TaskState.STOPPED_BY_ERROR;
175      }
176      else
177      {
178        initState =  TaskState.COMPLETED_SUCCESSFULLY;
179      }
180    }
181    finally
182    {
183      try
184      {
185        // sets in the attributes the last stats values
186        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount));
187        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI());
188        debugInfo("[PURGE] PurgeConflictsHistoricalTask write  attrs ");
189      }
190      catch(Exception e)
191      {
192        debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + e.getLocalizedMessage());
193        initState = TaskState.STOPPED_BY_ERROR;
194      }
195    }
196
197    if (logger.isTraceEnabled())
198    {
199      debugInfo("[PURGE] PurgeConflictsHistoricalTask is ending with state:" + initState +
200            " completedInTime:" + purgeCompletedInTime);
201    }
202    return initState;
203  }
204
205  private int updateAttrPeriod;
206  private CSN lastCSN;
207  private int purgeCount;
208
209  /**
210   * Set the last CSN purged and the count of purged values in order to monitor
211   * the historical purge.
212   *
213   * @param lastCSN
214   *          the last CSN purged.
215   * @param purgeCount
216   *          the count of purged values.
217   */
218  public void setProgressStats(CSN lastCSN, int purgeCount)
219  {
220    try
221    {
222      if (purgeCount == 0)
223      {
224        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CSN, lastCSN.toStringUI());
225      }
226
227      // we don't want the update of the task to overload too much task duration
228      this.purgeCount = purgeCount;
229      this.lastCSN = lastCSN;
230      if (++updateAttrPeriod % 100 == 0)
231      {
232        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount));
233        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI());
234        debugInfo("[PURGE] PurgeConflictsHistoricalTask write  attrs " + purgeCount);
235      }
236    }
237    catch(DirectoryException de)
238    {
239      debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage());
240      initState = TaskState.STOPPED_BY_ERROR;
241    }
242  }
243}