001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2013-2014 ForgeRock AS 025 */ 026package org.opends.server.core; 027 028import java.util.concurrent.atomic.AtomicInteger; 029 030import org.opends.server.types.DirectoryException; 031import org.opends.server.types.Operation; 032 033/** 034 * A QueueingStrategy that concurrently enqueues a bounded number of operations 035 * to the DirectoryServer work queue. If the maximum number of concurrently 036 * enqueued operations has been reached or if the work queue if full, then the 037 * operation will be executed on the current thread. 038 */ 039public class BoundedWorkQueueStrategy implements QueueingStrategy 040{ 041 042 /** 043 * The number of concurrently running operations for this 044 * BoundedWorkQueueStrategy. 045 */ 046 private final AtomicInteger nbRunningOperations = new AtomicInteger(0); 047 /** Maximum number of concurrent operations. 0 means "unlimited". */ 048 private final int maxNbConcurrentOperations; 049 050 /** 051 * Constructor for BoundedWorkQueueStrategy. 052 * 053 * @param maxNbConcurrentOperations 054 * the maximum number of operations that can be concurrently enqueued 055 * to the DirectoryServer work queue 056 */ 057 public BoundedWorkQueueStrategy(Integer maxNbConcurrentOperations) 058 { 059 if (maxNbConcurrentOperations != null) 060 { 061 this.maxNbConcurrentOperations = maxNbConcurrentOperations; 062 } 063 else 064 { 065 int cpus = Runtime.getRuntime().availableProcessors(); 066 this.maxNbConcurrentOperations = 067 Math.max(cpus, getNumWorkerThreads() * 25 / 100); 068 } 069 } 070 071 /** 072 * Return the maximum number of worker threads that can be used by the 073 * WorkQueue (The WorkQueue could have a thread pool which adjusts its size). 074 * 075 * @return the maximum number of worker threads that can be used by the 076 * WorkQueue 077 */ 078 protected int getNumWorkerThreads() 079 { 080 return DirectoryServer.getWorkQueue().getNumWorkerThreads(); 081 } 082 083 /** {@inheritDoc} */ 084 @Override 085 public void enqueueRequest(final Operation operation) 086 throws DirectoryException 087 { 088 if (!operation.getClientConnection().isConnectionValid()) 089 { 090 // do not bother enqueueing 091 return; 092 } 093 094 if (maxNbConcurrentOperations == 0) 095 { // unlimited concurrent operations 096 if (!tryEnqueueRequest(operation)) 097 { // avoid potential deadlocks by running in the current thread 098 operation.run(); 099 } 100 } 101 else if (nbRunningOperations.getAndIncrement() > maxNbConcurrentOperations 102 || !tryEnqueueRequest(wrap(operation))) 103 { // avoid potential deadlocks by running in the current thread 104 try 105 { 106 operation.run(); 107 } 108 finally 109 { 110 // only decrement when the operation is run synchronously. 111 // Otherwise it'll be decremented twice (once more in the wrapper). 112 nbRunningOperations.decrementAndGet(); 113 } 114 } 115 } 116 117 /** 118 * Tries to add the provided operation to the work queue if not full so that 119 * it will be processed by one of the worker threads. 120 * 121 * @param op 122 * The operation to be added to the work queue. 123 * @return true if the operation could be enqueued, false otherwise 124 * @throws DirectoryException 125 * If a problem prevents the operation from being added to the queue 126 * (e.g., the queue is full). 127 */ 128 protected boolean tryEnqueueRequest(Operation op) throws DirectoryException 129 { 130 return DirectoryServer.tryEnqueueRequest(op); 131 } 132 133 private Operation wrap(final Operation operation) 134 { 135 if (operation instanceof AbandonOperation) 136 { 137 return new AbandonOperationWrapper((AbandonOperation) operation) 138 { 139 @Override 140 public void run() 141 { 142 runWrapped(operation); 143 } 144 }; 145 } 146 else if (operation instanceof AddOperation) 147 { 148 return new AddOperationWrapper((AddOperation) operation) 149 { 150 @Override 151 public void run() 152 { 153 runWrapped(operation); 154 } 155 }; 156 } 157 else if (operation instanceof BindOperation) 158 { 159 return new BindOperationWrapper((BindOperation) operation) 160 { 161 @Override 162 public void run() 163 { 164 runWrapped(operation); 165 } 166 }; 167 } 168 else if (operation instanceof CompareOperation) 169 { 170 return new CompareOperationWrapper((CompareOperation) operation) 171 { 172 @Override 173 public void run() 174 { 175 runWrapped(operation); 176 } 177 }; 178 } 179 else if (operation instanceof DeleteOperation) 180 { 181 return new DeleteOperationWrapper((DeleteOperation) operation) 182 { 183 @Override 184 public void run() 185 { 186 runWrapped(operation); 187 } 188 }; 189 } 190 else if (operation instanceof ExtendedOperation) 191 { 192 return new ExtendedOperationWrapper((ExtendedOperation) operation) 193 { 194 @Override 195 public void run() 196 { 197 runWrapped(operation); 198 } 199 }; 200 } 201 else if (operation instanceof ModifyDNOperation) 202 { 203 return new ModifyDNOperationWrapper((ModifyDNOperation) operation) 204 { 205 @Override 206 public void run() 207 { 208 runWrapped(operation); 209 } 210 }; 211 } 212 else if (operation instanceof ModifyOperation) 213 { 214 return new ModifyOperationWrapper((ModifyOperation) operation) 215 { 216 @Override 217 public void run() 218 { 219 runWrapped(operation); 220 } 221 }; 222 } 223 else if (operation instanceof SearchOperation) 224 { 225 return new SearchOperationWrapper((SearchOperation) operation) 226 { 227 @Override 228 public void run() 229 { 230 runWrapped(operation); 231 } 232 }; 233 } 234 else if (operation instanceof UnbindOperation) 235 { 236 return new UnbindOperationWrapper((UnbindOperation) operation) 237 { 238 @Override 239 public void run() 240 { 241 runWrapped(operation); 242 } 243 }; 244 } 245 else 246 { 247 throw new RuntimeException( 248 "Not implemented for " + operation == null ? null : operation 249 .getClass().getName()); 250 } 251 } 252 253 /** 254 * Execute the provided operation and decrement the number of currently 255 * running operations after it has finished executing. 256 * 257 * @param the 258 * operation to execute 259 */ 260 private void runWrapped(final Operation operation) 261 { 262 try 263 { 264 operation.run(); 265 } 266 finally 267 { 268 nbRunningOperations.decrementAndGet(); 269 } 270 } 271}