001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2014 ForgeRock AS.
026 */
027
028package org.forgerock.opendj.ldif;
029
030import java.util.NoSuchElementException;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.LinkedBlockingQueue;
033import java.util.concurrent.TimeUnit;
034
035import org.forgerock.opendj.ldap.Connection;
036import org.forgerock.opendj.ldap.LdapException;
037import org.forgerock.opendj.ldap.LdapPromise;
038import org.forgerock.opendj.ldap.ResultCode;
039import org.forgerock.opendj.ldap.LdapResultHandler;
040import org.forgerock.opendj.ldap.SearchResultHandler;
041import org.forgerock.opendj.ldap.SearchResultReferenceIOException;
042import org.forgerock.opendj.ldap.requests.SearchRequest;
043import org.forgerock.opendj.ldap.responses.Response;
044import org.forgerock.opendj.ldap.responses.Responses;
045import org.forgerock.opendj.ldap.responses.Result;
046import org.forgerock.opendj.ldap.responses.SearchResultEntry;
047import org.forgerock.opendj.ldap.responses.SearchResultReference;
048import org.forgerock.util.Reject;
049
050import static org.forgerock.opendj.ldap.LdapException.*;
051
052/**
053 * A {@code ConnectionEntryReader} is a bridge from {@code Connection}s to
054 * {@code EntryReader}s. A connection entry reader allows applications to
055 * iterate over search results as they are returned from the server during a
056 * search operation.
057 * <p>
058 * The Search operation is performed synchronously, blocking until a search
059 * result entry is received. If a search result indicates that the search
060 * operation has failed for some reason then the error result is propagated to
061 * the caller using an {@code LdapException}. If a search result
062 * reference is returned then it is propagated to the caller using a
063 * {@code SearchResultReferenceIOException}.
064 * <p>
065 * The following code illustrates how a {@code ConnectionEntryReader} may be
066 * used:
067 *
068 * <pre>
069 * Connection connection = ...;
070 * ConnectionEntryReader reader = connection.search(&quot;dc=example,dc=com&quot;,
071 *     SearchScope.WHOLE_SUBTREE, &quot;(objectClass=person)&quot;);
072 * try
073 * {
074 *   while (reader.hasNext())
075 *   {
076 *     if (reader.isEntry())
077 *     {
078 *       SearchResultEntry entry = reader.readEntry();
079 *
080 *       // Handle entry...
081 *     }
082 *     else
083 *     {
084 *       SearchResultReference ref = reader.readReference();
085 *
086 *       // Handle continuation reference...
087 *     }
088 *   }
089 *
090 *   Result result = reader.readResult();
091 *   // Handle controls included with the search result...
092 * }
093 * catch (IOException e)
094 * {
095 *   // Handle exceptions...
096 * }
097 * finally
098 * {
099 *   reader.close();
100 * }
101 * </pre>
102 *
103 * <b>NOTE:</b> although this class is non-final, sub-classing is not supported
104 * except when creating mock objects for unit tests. This class has been
105 * selected specifically because it is the only aspect of the {@code Connection}
106 * interface which is not mockable.
107 */
108public class ConnectionEntryReader implements EntryReader {
109    /*
110     * See OPENDJ-1124 for more discussion about why this class is non-final.
111     */
112
113    /**
114     * Result handler that places all responses in a queue.
115     */
116    private static final class BufferHandler implements SearchResultHandler, LdapResultHandler<Result> {
117        private final BlockingQueue<Response> responses;
118        private volatile boolean isInterrupted;
119
120        private BufferHandler(final BlockingQueue<Response> responses) {
121            this.responses = responses;
122        }
123
124        @Override
125        public boolean handleEntry(final SearchResultEntry entry) {
126            try {
127                responses.put(entry);
128                return true;
129            } catch (final InterruptedException e) {
130                // Prevent the reader from waiting for a result that will never
131                // arrive.
132                isInterrupted = true;
133                Thread.currentThread().interrupt();
134                return false;
135            }
136        }
137
138        @Override
139        public void handleException(final LdapException error) {
140            try {
141                responses.put(error.getResult());
142            } catch (final InterruptedException e) {
143                // Prevent the reader from waiting for a result that will never
144                // arrive.
145                isInterrupted = true;
146                Thread.currentThread().interrupt();
147            }
148        }
149
150        @Override
151        public boolean handleReference(final SearchResultReference reference) {
152            try {
153                responses.put(reference);
154                return true;
155            } catch (final InterruptedException e) {
156                // Prevent the reader from waiting for a result that will never
157                // arrive.
158                isInterrupted = true;
159                Thread.currentThread().interrupt();
160                return false;
161            }
162        }
163
164        @Override
165        public void handleResult(final Result result) {
166            try {
167                responses.put(result);
168            } catch (final InterruptedException e) {
169                // Prevent the reader from waiting for a result that will never
170                // arrive.
171                isInterrupted = true;
172                Thread.currentThread().interrupt();
173            }
174        }
175    }
176
177    private final BufferHandler buffer;
178    private final LdapPromise<Result> promise;
179    private Response nextResponse;
180
181    /**
182     * Creates a new connection entry reader whose destination is the provided
183     * connection using an unbounded {@code LinkedBlockingQueue}.
184     *
185     * @param connection
186     *            The connection to use.
187     * @param searchRequest
188     *            The search request to retrieve entries with.
189     * @throws NullPointerException
190     *             If {@code connection} was {@code null}.
191     */
192    public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest) {
193        this(connection, searchRequest, new LinkedBlockingQueue<Response>());
194    }
195
196    /**
197     * Creates a new connection entry reader whose destination is the provided
198     * connection.
199     *
200     * @param connection
201     *            The connection to use.
202     * @param searchRequest
203     *            The search request to retrieve entries with.
204     * @param entries
205     *            The {@code BlockingQueue} implementation to use when queuing
206     *            the returned entries.
207     * @throws NullPointerException
208     *             If {@code connection} was {@code null}.
209     */
210    public ConnectionEntryReader(final Connection connection, final SearchRequest searchRequest,
211        final BlockingQueue<Response> entries) {
212        Reject.ifNull(connection);
213        buffer = new BufferHandler(entries);
214        promise = connection.searchAsync(searchRequest, buffer).thenOnResult(buffer).thenOnException(buffer);
215    }
216
217    /**
218     * Closes this connection entry reader, canceling the search request if it
219     * is still active.
220     */
221    @Override
222    public void close() {
223        // Cancel the search if it is still running.
224        promise.cancel(true);
225    }
226
227    /** {@inheritDoc} */
228    @Override
229    public boolean hasNext() throws LdapException {
230        // Poll for the next response if needed.
231        final Response r = getNextResponse();
232        if (!(r instanceof Result)) {
233            // Entry or reference.
234            return true;
235        }
236
237        // Final result.
238        final Result result = (Result) r;
239        if (result.isSuccess()) {
240            return false;
241        }
242
243        throw newLdapException(result);
244    }
245
246    /**
247     * Waits for the next search result entry or reference to become available
248     * and returns {@code true} if it is an entry, or {@code false} if it is a
249     * reference.
250     *
251     * @return {@code true} if the next search result is an entry, or
252     *         {@code false} if it is a reference.
253     * @throws LdapException
254     *             If there are no more search result entries or references and
255     *             the search result code indicates that the search operation
256     *             failed for some reason.
257     * @throws NoSuchElementException
258     *             If there are no more search result entries or references and
259     *             the search result code indicates that the search operation
260     *             succeeded.
261     */
262    public boolean isEntry() throws LdapException {
263        // Throws LdapException if search returned error.
264        if (!hasNext()) {
265            // Search has completed successfully.
266            throw new NoSuchElementException();
267        }
268
269        // Entry or reference?
270        final Response r = nextResponse;
271        if (r instanceof SearchResultEntry) {
272            return true;
273        } else if (r instanceof SearchResultReference) {
274            return false;
275        } else {
276            throw new RuntimeException("Unexpected response type: " + r.getClass());
277        }
278    }
279
280    /**
281     * Waits for the next search result entry or reference to become available
282     * and returns {@code true} if it is a reference, or {@code false} if it is
283     * an entry.
284     *
285     * @return {@code true} if the next search result is a reference, or
286     *         {@code false} if it is an entry.
287     * @throws LdapException
288     *             If there are no more search result entries or references and
289     *             the search result code indicates that the search operation
290     *             failed for some reason.
291     * @throws NoSuchElementException
292     *             If there are no more search result entries or references and
293     *             the search result code indicates that the search operation
294     *             succeeded.
295     */
296    public boolean isReference() throws LdapException {
297        return !isEntry();
298    }
299
300    /**
301     * Waits for the next search result entry or reference to become available
302     * and, if it is an entry, returns it as a {@code SearchResultEntry}. If the
303     * next search response is a reference then this method will throw a
304     * {@code SearchResultReferenceIOException}.
305     *
306     * @return The next search result entry.
307     * @throws SearchResultReferenceIOException
308     *             If the next search response was a search result reference.
309     *             This connection entry reader may still contain remaining
310     *             search results and references which can be retrieved using
311     *             additional calls to this method.
312     * @throws LdapException
313     *             If there are no more search result entries or references and
314     *             the search result code indicates that the search operation
315     *             failed for some reason.
316     * @throws NoSuchElementException
317     *             If there are no more search result entries or references and
318     *             the search result code indicates that the search operation
319     *             succeeded.
320     */
321    @Override
322    public SearchResultEntry readEntry() throws SearchResultReferenceIOException, LdapException {
323        if (isEntry()) {
324            final SearchResultEntry entry = (SearchResultEntry) nextResponse;
325            nextResponse = null;
326            return entry;
327        } else {
328            final SearchResultReference reference = (SearchResultReference) nextResponse;
329            nextResponse = null;
330            throw new SearchResultReferenceIOException(reference);
331        }
332    }
333
334    /**
335     * Waits for the next search result entry or reference to become available
336     * and, if it is a reference, returns it as a {@code SearchResultReference}.
337     * If the next search response is an entry then this method will return
338     * {@code null}.
339     *
340     * @return The next search result reference, or {@code null} if the next
341     *         response was a search result entry.
342     * @throws LdapException
343     *             If there are no more search result entries or references and
344     *             the search result code indicates that the search operation
345     *             failed for some reason.
346     * @throws NoSuchElementException
347     *             If there are no more search result entries or references and
348     *             the search result code indicates that the search operation
349     *             succeeded.
350     */
351    public SearchResultReference readReference() throws LdapException {
352        if (isReference()) {
353            final SearchResultReference reference = (SearchResultReference) nextResponse;
354            nextResponse = null;
355            return reference;
356        } else {
357            return null;
358        }
359    }
360
361    /**
362     * Waits for the next search response to become available and returns it if
363     * it is a search result indicating that the search completed successfully.
364     * If the search result indicates that the search failed then an
365     * {@link LdapException} is thrown. Otherwise, if the search
366     * response represents an entry or reference then an
367     * {@code IllegalStateException} is thrown.
368     * <p>
369     * This method should only be called if {@link #hasNext()} has, or will,
370     * return {@code false}.
371     * <p>
372     * It is not necessary to call this method once all search result entries
373     * have been processed, but it may be useful to do so in order to inspect
374     * any controls which were included with the result. For example, this
375     * method may be called in order to obtain the next paged results cookie
376     * once the current page of results has been processed.
377     *
378     * @return The search result indicating success.
379     * @throws LdapException
380     *             If the search result indicates that the search operation
381     *             failed for some reason.
382     * @throws IllegalStateException
383     *             If there are remaining search result entries or references to
384     *             be processed. In other words, if {@link #hasNext()} would
385     *             return {@code true}.
386     */
387    public Result readResult() throws LdapException {
388        if (hasNext()) {
389            throw new IllegalStateException();
390        } else {
391            return (Result) nextResponse;
392        }
393    }
394
395    private Response getNextResponse() throws LdapException {
396        while (nextResponse == null) {
397            try {
398                nextResponse = buffer.responses.poll(50, TimeUnit.MILLISECONDS);
399            } catch (final InterruptedException e) {
400                throw newLdapException(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
401            }
402
403            if (nextResponse == null && buffer.isInterrupted) {
404                // The worker thread processing the result was interrupted so no
405                // result will ever arrive. We don't want to hang this thread
406                // forever while we wait, so terminate now.
407                nextResponse = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR);
408                break;
409            }
410        }
411        return nextResponse;
412    }
413}