001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2014 ForgeRock AS. 026 */ 027 028package org.forgerock.opendj.ldif; 029 030import java.util.NoSuchElementException; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.LinkedBlockingQueue; 033import java.util.concurrent.TimeUnit; 034 035import org.forgerock.opendj.ldap.Connection; 036import org.forgerock.opendj.ldap.LdapException; 037import org.forgerock.opendj.ldap.LdapPromise; 038import org.forgerock.opendj.ldap.ResultCode; 039import org.forgerock.opendj.ldap.LdapResultHandler; 040import org.forgerock.opendj.ldap.SearchResultHandler; 041import org.forgerock.opendj.ldap.SearchResultReferenceIOException; 042import org.forgerock.opendj.ldap.requests.SearchRequest; 043import org.forgerock.opendj.ldap.responses.Response; 044import org.forgerock.opendj.ldap.responses.Responses; 045import org.forgerock.opendj.ldap.responses.Result; 046import org.forgerock.opendj.ldap.responses.SearchResultEntry; 047import org.forgerock.opendj.ldap.responses.SearchResultReference; 048import org.forgerock.util.Reject; 049 050import static org.forgerock.opendj.ldap.LdapException.*; 051 052/** 053 * A {@code ConnectionEntryReader} is a bridge from {@code Connection}s to 054 * {@code EntryReader}s. A connection entry reader allows applications to 055 * iterate over search results as they are returned from the server during a 056 * search operation. 057 * <p> 058 * The Search operation is performed synchronously, blocking until a search 059 * result entry is received. If a search result indicates that the search 060 * operation has failed for some reason then the error result is propagated to 061 * the caller using an {@code LdapException}. If a search result 062 * reference is returned then it is propagated to the caller using a 063 * {@code SearchResultReferenceIOException}. 064 * <p> 065 * The following code illustrates how a {@code ConnectionEntryReader} may be 066 * used: 067 * 068 * <pre> 069 * Connection connection = ...; 070 * ConnectionEntryReader reader = connection.search("dc=example,dc=com", 071 * SearchScope.WHOLE_SUBTREE, "(objectClass=person)"); 072 * try 073 * { 074 * while (reader.hasNext()) 075 * { 076 * if (reader.isEntry()) 077 * { 078 * SearchResultEntry entry = reader.readEntry(); 079 * 080 * // Handle entry... 081 * } 082 * else 083 * { 084 * SearchResultReference ref = reader.readReference(); 085 * 086 * // Handle continuation reference... 087 * } 088 * } 089 * 090 * Result result = reader.readResult(); 091 * // Handle controls included with the search result... 092 * } 093 * catch (IOException e) 094 * { 095 * // Handle exceptions... 096 * } 097 * finally 098 * { 099 * reader.close(); 100 * } 101 * </pre> 102 * 103 * <b>NOTE:</b> although this class is non-final, sub-classing is not supported 104 * except when creating mock objects for unit tests. This class has been 105 * selected specifically because it is the only aspect of the {@code Connection} 106 * interface which is not mockable. 107 */ 108public class ConnectionEntryReader implements EntryReader { 109 /* 110 * See OPENDJ-1124 for more discussion about why this class is non-final. 111 */ 112 113 /** 114 * Result handler that places all responses in a queue. 115 */ 116 private static final class BufferHandler implements SearchResultHandler, LdapResultHandler<Result> { 117 private final BlockingQueue<Response> responses; 118 private volatile boolean isInterrupted; 119 120 private BufferHandler(final BlockingQueue<Response> responses) { 121 this.responses = responses; 122 } 123 124 @Override 125 public boolean handleEntry(final SearchResultEntry entry) { 126 try { 127 responses.put(entry); 128 return true; 129 } catch (final InterruptedException e) { 130 // Prevent the reader from waiting for a result that will never 131 // arrive. 132 isInterrupted = true; 133 Thread.currentThread().interrupt(); 134 return false; 135 } 136 } 137 138 @Override 139 public void handleException(final LdapException error) { 140 try { 141 responses.put(error.getResult()); 142 } catch (final InterruptedException e) { 143 // Prevent the reader from waiting for a result that will never 144 // arrive. 145 isInterrupted = true; 146 Thread.currentThread().interrupt(); 147 } 148 } 149 150 @Override 151 public boolean handleReference(final SearchResultReference reference) { 152 try { 153 responses.put(reference); 154 return true; 155 } catch (final InterruptedException e) { 156 // Prevent the reader from waiting for a result that will never 157 // arrive. 158 isInterrupted = true; 159 Thread.currentThread().interrupt(); 160 return false; 161 } 162 } 163 164 @Override 165 public void handleResult(final Result result) { 166 try { 167 responses.put(result); 168 } catch (final InterruptedException e) { 169 // Prevent the reader from waiting for a result that will never 170 // arrive. 171 isInterrupted = true; 172 Thread.currentThread().interrupt(); 173 } 174 } 175 } 176 177 private final BufferHandler buffer; 178 private final LdapPromise<Result> promise; 179 private Response nextResponse; 180 181 /** 182 * Creates a new connection entry reader whose destination is the provided 183 * connection using an unbounded {@code LinkedBlockingQueue}. 184 * 185 * @param connection 186 * The connection to use. 187 * @param searchRequest 188 * The search request to retrieve entries with. 189 * @throws NullPointerException 190 * If {@code connection} was {@code null}. 191 */ 192 public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest) { 193 this(connection, searchRequest, new LinkedBlockingQueue<Response>()); 194 } 195 196 /** 197 * Creates a new connection entry reader whose destination is the provided 198 * connection. 199 * 200 * @param connection 201 * The connection to use. 202 * @param searchRequest 203 * The search request to retrieve entries with. 204 * @param entries 205 * The {@code BlockingQueue} implementation to use when queuing 206 * the returned entries. 207 * @throws NullPointerException 208 * If {@code connection} was {@code null}. 209 */ 210 public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest, 211 final BlockingQueue<Response> entries) { 212 Reject.ifNull(connection); 213 buffer = new BufferHandler(entries); 214 promise = connection.searchAsync(searchRequest, buffer).thenOnResult(buffer).thenOnException(buffer); 215 } 216 217 /** 218 * Closes this connection entry reader, canceling the search request if it 219 * is still active. 220 */ 221 @Override 222 public void close() { 223 // Cancel the search if it is still running. 224 promise.cancel(true); 225 } 226 227 /** {@inheritDoc} */ 228 @Override 229 public boolean hasNext() throws LdapException { 230 // Poll for the next response if needed. 231 final Response r = getNextResponse(); 232 if (!(r instanceof Result)) { 233 // Entry or reference. 234 return true; 235 } 236 237 // Final result. 238 final Result result = (Result) r; 239 if (result.isSuccess()) { 240 return false; 241 } 242 243 throw newLdapException(result); 244 } 245 246 /** 247 * Waits for the next search result entry or reference to become available 248 * and returns {@code true} if it is an entry, or {@code false} if it is a 249 * reference. 250 * 251 * @return {@code true} if the next search result is an entry, or 252 * {@code false} if it is a reference. 253 * @throws LdapException 254 * If there are no more search result entries or references and 255 * the search result code indicates that the search operation 256 * failed for some reason. 257 * @throws NoSuchElementException 258 * If there are no more search result entries or references and 259 * the search result code indicates that the search operation 260 * succeeded. 261 */ 262 public boolean isEntry() throws LdapException { 263 // Throws LdapException if search returned error. 264 if (!hasNext()) { 265 // Search has completed successfully. 266 throw new NoSuchElementException(); 267 } 268 269 // Entry or reference? 270 final Response r = nextResponse; 271 if (r instanceof SearchResultEntry) { 272 return true; 273 } else if (r instanceof SearchResultReference) { 274 return false; 275 } else { 276 throw new RuntimeException("Unexpected response type: " + r.getClass()); 277 } 278 } 279 280 /** 281 * Waits for the next search result entry or reference to become available 282 * and returns {@code true} if it is a reference, or {@code false} if it is 283 * an entry. 284 * 285 * @return {@code true} if the next search result is a reference, or 286 * {@code false} if it is an entry. 287 * @throws LdapException 288 * If there are no more search result entries or references and 289 * the search result code indicates that the search operation 290 * failed for some reason. 291 * @throws NoSuchElementException 292 * If there are no more search result entries or references and 293 * the search result code indicates that the search operation 294 * succeeded. 295 */ 296 public boolean isReference() throws LdapException { 297 return !isEntry(); 298 } 299 300 /** 301 * Waits for the next search result entry or reference to become available 302 * and, if it is an entry, returns it as a {@code SearchResultEntry}. If the 303 * next search response is a reference then this method will throw a 304 * {@code SearchResultReferenceIOException}. 305 * 306 * @return The next search result entry. 307 * @throws SearchResultReferenceIOException 308 * If the next search response was a search result reference. 309 * This connection entry reader may still contain remaining 310 * search results and references which can be retrieved using 311 * additional calls to this method. 312 * @throws LdapException 313 * If there are no more search result entries or references and 314 * the search result code indicates that the search operation 315 * failed for some reason. 316 * @throws NoSuchElementException 317 * If there are no more search result entries or references and 318 * the search result code indicates that the search operation 319 * succeeded. 320 */ 321 @Override 322 public SearchResultEntry readEntry() throws SearchResultReferenceIOException, LdapException { 323 if (isEntry()) { 324 final SearchResultEntry entry = (SearchResultEntry) nextResponse; 325 nextResponse = null; 326 return entry; 327 } else { 328 final SearchResultReference reference = (SearchResultReference) nextResponse; 329 nextResponse = null; 330 throw new SearchResultReferenceIOException(reference); 331 } 332 } 333 334 /** 335 * Waits for the next search result entry or reference to become available 336 * and, if it is a reference, returns it as a {@code SearchResultReference}. 337 * If the next search response is an entry then this method will return 338 * {@code null}. 339 * 340 * @return The next search result reference, or {@code null} if the next 341 * response was a search result entry. 342 * @throws LdapException 343 * If there are no more search result entries or references and 344 * the search result code indicates that the search operation 345 * failed for some reason. 346 * @throws NoSuchElementException 347 * If there are no more search result entries or references and 348 * the search result code indicates that the search operation 349 * succeeded. 350 */ 351 public SearchResultReference readReference() throws LdapException { 352 if (isReference()) { 353 final SearchResultReference reference = (SearchResultReference) nextResponse; 354 nextResponse = null; 355 return reference; 356 } else { 357 return null; 358 } 359 } 360 361 /** 362 * Waits for the next search response to become available and returns it if 363 * it is a search result indicating that the search completed successfully. 364 * If the search result indicates that the search failed then an 365 * {@link LdapException} is thrown. Otherwise, if the search 366 * response represents an entry or reference then an 367 * {@code IllegalStateException} is thrown. 368 * <p> 369 * This method should only be called if {@link #hasNext()} has, or will, 370 * return {@code false}. 371 * <p> 372 * It is not necessary to call this method once all search result entries 373 * have been processed, but it may be useful to do so in order to inspect 374 * any controls which were included with the result. For example, this 375 * method may be called in order to obtain the next paged results cookie 376 * once the current page of results has been processed. 377 * 378 * @return The search result indicating success. 379 * @throws LdapException 380 * If the search result indicates that the search operation 381 * failed for some reason. 382 * @throws IllegalStateException 383 * If there are remaining search result entries or references to 384 * be processed. In other words, if {@link #hasNext()} would 385 * return {@code true}. 386 */ 387 public Result readResult() throws LdapException { 388 if (hasNext()) { 389 throw new IllegalStateException(); 390 } else { 391 return (Result) nextResponse; 392 } 393 } 394 395 private Response getNextResponse() throws LdapException { 396 while (nextResponse == null) { 397 try { 398 nextResponse = buffer.responses.poll(50, TimeUnit.MILLISECONDS); 399 } catch (final InterruptedException e) { 400 throw newLdapException(ResultCode.CLIENT_SIDE_USER_CANCELLED, e); 401 } 402 403 if (nextResponse == null && buffer.isInterrupted) { 404 // The worker thread processing the result was interrupted so no 405 // result will ever arrive. We don't want to hang this thread 406 // forever while we wait, so terminate now. 407 nextResponse = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR); 408 break; 409 } 410 } 411 return nextResponse; 412 } 413}