001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2013-2014 ForgeRock AS
025 */
026package org.opends.server.core;
027
028import java.util.concurrent.atomic.AtomicInteger;
029
030import org.opends.server.types.DirectoryException;
031import org.opends.server.types.Operation;
032
033/**
034 * A QueueingStrategy that concurrently enqueues a bounded number of operations
035 * to the DirectoryServer work queue. If the maximum number of concurrently
036 * enqueued operations has been reached or if the work queue if full, then the
037 * operation will be executed on the current thread.
038 */
039public class BoundedWorkQueueStrategy implements QueueingStrategy
040{
041
042  /**
043   * The number of concurrently running operations for this
044   * BoundedWorkQueueStrategy.
045   */
046  private final AtomicInteger nbRunningOperations = new AtomicInteger(0);
047  /** Maximum number of concurrent operations. 0 means "unlimited". */
048  private final int maxNbConcurrentOperations;
049
050  /**
051   * Constructor for BoundedWorkQueueStrategy.
052   *
053   * @param maxNbConcurrentOperations
054   *          the maximum number of operations that can be concurrently enqueued
055   *          to the DirectoryServer work queue
056   */
057  public BoundedWorkQueueStrategy(Integer maxNbConcurrentOperations)
058  {
059    if (maxNbConcurrentOperations != null)
060    {
061      this.maxNbConcurrentOperations = maxNbConcurrentOperations;
062    }
063    else
064    {
065      int cpus = Runtime.getRuntime().availableProcessors();
066      this.maxNbConcurrentOperations =
067          Math.max(cpus, getNumWorkerThreads() * 25 / 100);
068    }
069  }
070
071  /**
072   * Return the maximum number of worker threads that can be used by the
073   * WorkQueue (The WorkQueue could have a thread pool which adjusts its size).
074   *
075   * @return the maximum number of worker threads that can be used by the
076   *         WorkQueue
077   */
078  protected int getNumWorkerThreads()
079  {
080    return DirectoryServer.getWorkQueue().getNumWorkerThreads();
081  }
082
083  /** {@inheritDoc} */
084  @Override
085  public void enqueueRequest(final Operation operation)
086      throws DirectoryException
087  {
088    if (!operation.getClientConnection().isConnectionValid())
089    {
090      // do not bother enqueueing
091      return;
092    }
093
094    if (maxNbConcurrentOperations == 0)
095    { // unlimited concurrent operations
096      if (!tryEnqueueRequest(operation))
097      { // avoid potential deadlocks by running in the current thread
098        operation.run();
099      }
100    }
101    else if (nbRunningOperations.getAndIncrement() > maxNbConcurrentOperations
102        || !tryEnqueueRequest(wrap(operation)))
103    { // avoid potential deadlocks by running in the current thread
104      try
105      {
106        operation.run();
107      }
108      finally
109      {
110        // only decrement when the operation is run synchronously.
111        // Otherwise it'll be decremented twice (once more in the wrapper).
112        nbRunningOperations.decrementAndGet();
113      }
114    }
115  }
116
117  /**
118   * Tries to add the provided operation to the work queue if not full so that
119   * it will be processed by one of the worker threads.
120   *
121   * @param op
122   *          The operation to be added to the work queue.
123   * @return true if the operation could be enqueued, false otherwise
124   * @throws DirectoryException
125   *           If a problem prevents the operation from being added to the queue
126   *           (e.g., the queue is full).
127   */
128  protected boolean tryEnqueueRequest(Operation op) throws DirectoryException
129  {
130    return DirectoryServer.tryEnqueueRequest(op);
131  }
132
133  private Operation wrap(final Operation operation)
134  {
135    if (operation instanceof AbandonOperation)
136    {
137      return new AbandonOperationWrapper((AbandonOperation) operation)
138      {
139        @Override
140        public void run()
141        {
142          runWrapped(operation);
143        }
144      };
145    }
146    else if (operation instanceof AddOperation)
147    {
148      return new AddOperationWrapper((AddOperation) operation)
149      {
150        @Override
151        public void run()
152        {
153          runWrapped(operation);
154        }
155      };
156    }
157    else if (operation instanceof BindOperation)
158    {
159      return new BindOperationWrapper((BindOperation) operation)
160      {
161        @Override
162        public void run()
163        {
164          runWrapped(operation);
165        }
166      };
167    }
168    else if (operation instanceof CompareOperation)
169    {
170      return new CompareOperationWrapper((CompareOperation) operation)
171      {
172        @Override
173        public void run()
174        {
175          runWrapped(operation);
176        }
177      };
178    }
179    else if (operation instanceof DeleteOperation)
180    {
181      return new DeleteOperationWrapper((DeleteOperation) operation)
182      {
183        @Override
184        public void run()
185        {
186          runWrapped(operation);
187        }
188      };
189    }
190    else if (operation instanceof ExtendedOperation)
191    {
192      return new ExtendedOperationWrapper((ExtendedOperation) operation)
193      {
194        @Override
195        public void run()
196        {
197          runWrapped(operation);
198        }
199      };
200    }
201    else if (operation instanceof ModifyDNOperation)
202    {
203      return new ModifyDNOperationWrapper((ModifyDNOperation) operation)
204      {
205        @Override
206        public void run()
207        {
208          runWrapped(operation);
209        }
210      };
211    }
212    else if (operation instanceof ModifyOperation)
213    {
214      return new ModifyOperationWrapper((ModifyOperation) operation)
215      {
216        @Override
217        public void run()
218        {
219          runWrapped(operation);
220        }
221      };
222    }
223    else if (operation instanceof SearchOperation)
224    {
225      return new SearchOperationWrapper((SearchOperation) operation)
226      {
227        @Override
228        public void run()
229        {
230          runWrapped(operation);
231        }
232      };
233    }
234    else if (operation instanceof UnbindOperation)
235    {
236      return new UnbindOperationWrapper((UnbindOperation) operation)
237      {
238        @Override
239        public void run()
240        {
241          runWrapped(operation);
242        }
243      };
244    }
245    else
246    {
247      throw new RuntimeException(
248          "Not implemented for " + operation == null ? null : operation
249              .getClass().getName());
250    }
251  }
252
253  /**
254   * Execute the provided operation and decrement the number of currently
255   * running operations after it has finished executing.
256   *
257   * @param the
258   *          operation to execute
259   */
260  private void runWrapped(final Operation operation)
261  {
262    try
263    {
264      operation.run();
265    }
266    finally
267    {
268      nbRunningOperations.decrementAndGet();
269    }
270  }
271}