001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
021import static com.google.common.util.concurrent.Platform.isInstanceOfThrowableClass;
022import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
023
024import com.google.common.annotations.Beta;
025import com.google.common.annotations.GwtCompatible;
026import com.google.common.annotations.GwtIncompatible;
027import com.google.common.base.Function;
028import com.google.common.base.Optional;
029import com.google.common.base.Preconditions;
030import com.google.common.collect.ImmutableCollection;
031import com.google.common.collect.ImmutableList;
032import com.google.common.collect.Lists;
033import com.google.common.collect.Queues;
034
035import java.lang.reflect.UndeclaredThrowableException;
036import java.util.Collections;
037import java.util.List;
038import java.util.concurrent.CancellationException;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.Executor;
042import java.util.concurrent.Future;
043import java.util.concurrent.RejectedExecutionException;
044import java.util.concurrent.ScheduledExecutorService;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.TimeoutException;
047import java.util.logging.Level;
048import java.util.logging.Logger;
049
050import javax.annotation.CheckReturnValue;
051import javax.annotation.Nullable;
052
053/**
054 * Static utility methods pertaining to the {@link Future} interface.
055 *
056 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
057 * Guava User Guide article on <a href=
058 * "https://github.com/google/guava/wiki/ListenableFutureExplained">
059 * {@code ListenableFuture}</a>.
060 *
061 * @author Kevin Bourrillion
062 * @author Nishant Thakkar
063 * @author Sven Mawson
064 * @since 1.0
065 */
066@Beta
067@GwtCompatible(emulated = true)
068public final class Futures extends GwtFuturesCatchingSpecialization {
069
070  // A note on memory visibility.
071  // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine)
072  // have two requirements that significantly complicate their design.
073  // 1. Cancellation should propagate from the returned future to the input future(s).
074  // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion.
075  //
076  // A consequence of these requirements is that the delegate futures cannot be stored in
077  // final fields.
078  //
079  // For simplicity the rest of this description will discuss Futures.catching since it is the
080  // simplest instance, though very similar descriptions apply to many other classes in this file.
081  //
082  // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field
083  // 'inputFuture'. That field is non-final and non-volatile.  There are 2 places where the
084  // 'inputFuture' field is read and where we will have to consider visibility of the write
085  // operation in the constructor.
086  //
087  // 1. In the listener that performs the callback.  In this case it is fine since inputFuture is
088  //    assigned prior to calling addListener, and addListener happens-before any invocation of the
089  //    listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible
090  //    to the listener.
091  //
092  // 2. In done() where we may propagate cancellation to the input.  In this case it is _not_ fine.
093  //    There is currently nothing that enforces that the write to inputFuture in the constructor is
094  //    visible to done().  This is because there is no happens before edge between the write and a
095  //    (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue,
096  //    it would just add an edge such that if done() observed non-null, then it would also
097  //    definitely observe all earlier writes, but we still have no guarantee that done() would see
098  //    the inital write (just stronger guarantees if it does).
099  //
100  // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html
101  // For a (long) discussion about this specific issue and the general futility of life.
102  //
103  // For the time being we are OK with the problem discussed above since it requires a caller to
104  // introduce a very specific kind of data-race.  And given the other operations performed by these
105  // methods that involve volatile read/write operations, in practice there is no issue.  Also, the
106  // way in such a visibility issue would surface is most likely as a failure of cancel() to
107  // propagate to the input.  Cancellation propagation is fundamentally racy so this is fine.
108  //
109  // Future versions of the JMM may revise safe construction semantics in such a way that we can
110  // safely publish these objects and we won't need this whole discussion.
111  // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs
112  // that should resolve the issue.  This comes at the cost of adding more write barriers to the
113  // implementations.
114
115  private Futures() {}
116
117  /**
118   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
119   * and a {@link Function} that maps from {@link Exception} instances into the
120   * appropriate checked type.
121   *
122   * <p>The given mapping function will be applied to an
123   * {@link InterruptedException}, a {@link CancellationException}, or an
124   * {@link ExecutionException}.
125   * See {@link Future#get()} for details on the exceptions thrown.
126   *
127   * @since 9.0 (source-compatible since 1.0)
128   */
129  @GwtIncompatible("TODO")
130  @CheckReturnValue
131  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
132      ListenableFuture<V> future, Function<? super Exception, X> mapper) {
133    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
134  }
135
136  private abstract static class ImmediateFuture<V>
137      implements ListenableFuture<V> {
138
139    private static final Logger log =
140        Logger.getLogger(ImmediateFuture.class.getName());
141
142    @Override
143    public void addListener(Runnable listener, Executor executor) {
144      checkNotNull(listener, "Runnable was null.");
145      checkNotNull(executor, "Executor was null.");
146      try {
147        executor.execute(listener);
148      } catch (RuntimeException e) {
149        // ListenableFuture's contract is that it will not throw unchecked
150        // exceptions, so log the bad runnable and/or executor and swallow it.
151        log.log(Level.SEVERE, "RuntimeException while executing runnable "
152            + listener + " with executor " + executor, e);
153      }
154    }
155
156    @Override
157    public boolean cancel(boolean mayInterruptIfRunning) {
158      return false;
159    }
160
161    @Override
162    public abstract V get() throws ExecutionException;
163
164    @Override
165    public V get(long timeout, TimeUnit unit) throws ExecutionException {
166      checkNotNull(unit);
167      return get();
168    }
169
170    @Override
171    public boolean isCancelled() {
172      return false;
173    }
174
175    @Override
176    public boolean isDone() {
177      return true;
178    }
179  }
180
181  private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
182    static final ImmediateSuccessfulFuture<Object> NULL =
183        new ImmediateSuccessfulFuture<Object>(null);
184
185    @Nullable private final V value;
186
187    ImmediateSuccessfulFuture(@Nullable V value) {
188      this.value = value;
189    }
190
191    @Override
192    public V get() {
193      return value;
194    }
195  }
196
197  @GwtIncompatible("TODO")
198  private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
199      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
200
201    @Nullable private final V value;
202
203    ImmediateSuccessfulCheckedFuture(@Nullable V value) {
204      this.value = value;
205    }
206
207    @Override
208    public V get() {
209      return value;
210    }
211
212    @Override
213    public V checkedGet() {
214      return value;
215    }
216
217    @Override
218    public V checkedGet(long timeout, TimeUnit unit) {
219      checkNotNull(unit);
220      return value;
221    }
222  }
223
224  private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
225
226    private final Throwable thrown;
227
228    ImmediateFailedFuture(Throwable thrown) {
229      this.thrown = thrown;
230    }
231
232    @Override
233    public V get() throws ExecutionException {
234      throw new ExecutionException(thrown);
235    }
236  }
237
238  @GwtIncompatible("TODO")
239  private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
240
241    private final CancellationException thrown;
242
243    ImmediateCancelledFuture() {
244      this.thrown = new CancellationException("Immediate cancelled future.");
245    }
246
247    @Override
248    public boolean isCancelled() {
249      return true;
250    }
251
252    @Override
253    public V get() {
254      throw AbstractFuture.cancellationExceptionWithCause(
255          "Task was cancelled.", thrown);
256    }
257  }
258
259  @GwtIncompatible("TODO")
260  private static class ImmediateFailedCheckedFuture<V, X extends Exception>
261      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
262
263    private final X thrown;
264
265    ImmediateFailedCheckedFuture(X thrown) {
266      this.thrown = thrown;
267    }
268
269    @Override
270    public V get() throws ExecutionException {
271      throw new ExecutionException(thrown);
272    }
273
274    @Override
275    public V checkedGet() throws X {
276      throw thrown;
277    }
278
279    @Override
280    public V checkedGet(long timeout, TimeUnit unit) throws X {
281      checkNotNull(unit);
282      throw thrown;
283    }
284  }
285
286  /**
287   * Creates a {@code ListenableFuture} which has its value set immediately upon
288   * construction. The getters just return the value. This {@code Future} can't
289   * be canceled or timed out and its {@code isDone()} method always returns
290   * {@code true}.
291   */
292  @CheckReturnValue
293  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
294    if (value == null) {
295      // This cast is safe because null is assignable to V for all V (i.e. it is covariant)
296      @SuppressWarnings({"unchecked", "rawtypes"})
297      ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL;
298      return typedNull;
299    }
300    return new ImmediateSuccessfulFuture<V>(value);
301  }
302
303  /**
304   * Returns a {@code CheckedFuture} which has its value set immediately upon
305   * construction.
306   *
307   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
308   * method always returns {@code true}. Calling {@code get()} or {@code
309   * checkedGet()} will immediately return the provided value.
310   */
311  @GwtIncompatible("TODO")
312  @CheckReturnValue
313  public static <V, X extends Exception> CheckedFuture<V, X>
314      immediateCheckedFuture(@Nullable V value) {
315    return new ImmediateSuccessfulCheckedFuture<V, X>(value);
316  }
317
318  /**
319   * Returns a {@code ListenableFuture} which has an exception set immediately
320   * upon construction.
321   *
322   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
323   * method always returns {@code true}. Calling {@code get()} will immediately
324   * throw the provided {@code Throwable} wrapped in an {@code
325   * ExecutionException}.
326   */
327  @CheckReturnValue
328  public static <V> ListenableFuture<V> immediateFailedFuture(
329      Throwable throwable) {
330    checkNotNull(throwable);
331    return new ImmediateFailedFuture<V>(throwable);
332  }
333
334  /**
335   * Creates a {@code ListenableFuture} which is cancelled immediately upon
336   * construction, so that {@code isCancelled()} always returns {@code true}.
337   *
338   * @since 14.0
339   */
340  @GwtIncompatible("TODO")
341  @CheckReturnValue
342  public static <V> ListenableFuture<V> immediateCancelledFuture() {
343    return new ImmediateCancelledFuture<V>();
344  }
345
346  /**
347   * Returns a {@code CheckedFuture} which has an exception set immediately upon
348   * construction.
349   *
350   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
351   * method always returns {@code true}. Calling {@code get()} will immediately
352   * throw the provided {@code Exception} wrapped in an {@code
353   * ExecutionException}, and calling {@code checkedGet()} will throw the
354   * provided exception itself.
355   */
356  @GwtIncompatible("TODO")
357  @CheckReturnValue
358  public static <V, X extends Exception> CheckedFuture<V, X>
359      immediateFailedCheckedFuture(X exception) {
360    checkNotNull(exception);
361    return new ImmediateFailedCheckedFuture<V, X>(exception);
362  }
363
364  /**
365   * Returns a {@code Future} whose result is taken from the given primary
366   * {@code input} or, if the primary input fails, from the {@code Future}
367   * provided by the {@code fallback}. {@link FutureFallback#create} is not
368   * invoked until the primary input has failed, so if the primary input
369   * succeeds, it is never invoked. If, during the invocation of {@code
370   * fallback}, an exception is thrown, this exception is used as the result of
371   * the output {@code Future}.
372   *
373   * <p>Below is an example of a fallback that returns a default value if an
374   * exception occurs:
375   *
376   * <pre>   {@code
377   *   ListenableFuture<Integer> fetchCounterFuture = ...;
378   *
379   *   // Falling back to a zero counter in case an exception happens when
380   *   // processing the RPC to fetch counters.
381   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
382   *       fetchCounterFuture, new FutureFallback<Integer>() {
383   *         public ListenableFuture<Integer> create(Throwable t) {
384   *           // Returning "0" as the default for the counter when the
385   *           // exception happens.
386   *           return immediateFuture(0);
387   *         }
388   *       });}</pre>
389   *
390   * <p>The fallback can also choose to propagate the original exception when
391   * desired:
392   *
393   * <pre>   {@code
394   *   ListenableFuture<Integer> fetchCounterFuture = ...;
395   *
396   *   // Falling back to a zero counter only in case the exception was a
397   *   // TimeoutException.
398   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
399   *       fetchCounterFuture, new FutureFallback<Integer>() {
400   *         public ListenableFuture<Integer> create(Throwable t) {
401   *           if (t instanceof TimeoutException) {
402   *             return immediateFuture(0);
403   *           }
404   *           return immediateFailedFuture(t);
405   *         }
406   *       });}</pre>
407   *
408   * <p>This overload, which does not accept an executor, uses {@code
409   * directExecutor}, a dangerous choice in some cases. See the discussion in
410   * the {@link ListenableFuture#addListener ListenableFuture.addListener}
411   * documentation. The documentation's warnings about "lightweight listeners"
412   * refer here to the work done during {@code FutureFallback.create}, not to
413   * any work done to complete the returned {@code Future}.
414   *
415   * @param input the primary input {@code Future}
416   * @param fallback the {@link FutureFallback} implementation to be called if
417   *     {@code input} fails
418   * @since 14.0
419   * @deprecated Use {@link #catchingAsync(ListenableFuture, Class,
420   *     AsyncFunction) catchingAsync(input, Throwable.class,
421   *     fallbackImplementedAsAnAsyncFunction)}, usually replacing {@code
422   *     Throwable.class} with the specific type you want to handle. This method
423   *     will be removed in Guava release 20.0.
424   */
425  @Deprecated
426  @CheckReturnValue
427  public static <V> ListenableFuture<V> withFallback(
428      ListenableFuture<? extends V> input,
429      FutureFallback<? extends V> fallback) {
430    return withFallback(input, fallback, directExecutor());
431  }
432
433  /**
434   * Returns a {@code Future} whose result is taken from the given primary
435   * {@code input} or, if the primary input fails, from the {@code Future}
436   * provided by the {@code fallback}. {@link FutureFallback#create} is not
437   * invoked until the primary input has failed, so if the primary input
438   * succeeds, it is never invoked. If, during the invocation of {@code
439   * fallback}, an exception is thrown, this exception is used as the result of
440   * the output {@code Future}.
441   *
442   * <p>Below is an example of a fallback that returns a default value if an
443   * exception occurs:
444   *
445   * <pre>   {@code
446   *   ListenableFuture<Integer> fetchCounterFuture = ...;
447   *
448   *   // Falling back to a zero counter in case an exception happens when
449   *   // processing the RPC to fetch counters.
450   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
451   *       fetchCounterFuture, new FutureFallback<Integer>() {
452   *         public ListenableFuture<Integer> create(Throwable t) {
453   *           // Returning "0" as the default for the counter when the
454   *           // exception happens.
455   *           return immediateFuture(0);
456   *         }
457   *       }, directExecutor());}</pre>
458   *
459   * <p>The fallback can also choose to propagate the original exception when
460   * desired:
461   *
462   * <pre>   {@code
463   *   ListenableFuture<Integer> fetchCounterFuture = ...;
464   *
465   *   // Falling back to a zero counter only in case the exception was a
466   *   // TimeoutException.
467   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
468   *       fetchCounterFuture, new FutureFallback<Integer>() {
469   *         public ListenableFuture<Integer> create(Throwable t) {
470   *           if (t instanceof TimeoutException) {
471   *             return immediateFuture(0);
472   *           }
473   *           return immediateFailedFuture(t);
474   *         }
475   *       }, directExecutor());}</pre>
476   *
477   * <p>When selecting an executor, note that {@code directExecutor} is
478   * dangerous in some cases. See the discussion in the {@link
479   * ListenableFuture#addListener ListenableFuture.addListener} documentation.
480   * The documentation's warnings about "lightweight listeners" refer here to
481   * the work done during {@code FutureFallback.create}, not to any work done to
482   * complete the returned {@code Future}.
483   *
484   * @param input the primary input {@code Future}
485   * @param fallback the {@link FutureFallback} implementation to be called if
486   *     {@code input} fails
487   * @param executor the executor that runs {@code fallback} if {@code input}
488   *     fails
489   * @since 14.0
490   * @deprecated Use {@link #catchingAsync(ListenableFuture, Class,
491   *     AsyncFunction, Executor) catchingAsync(input, Throwable.class,
492   *     fallbackImplementedAsAnAsyncFunction, executor)}, usually replacing
493   *     {@code Throwable.class} with the specific type you want to handle. This method
494   *     will be removed in Guava release 20.0.
495   */
496  @Deprecated
497  @CheckReturnValue
498  public static <V> ListenableFuture<V> withFallback(
499      ListenableFuture<? extends V> input,
500      FutureFallback<? extends V> fallback, Executor executor) {
501    return catchingAsync(
502        input, Throwable.class, asAsyncFunction(fallback), executor);
503  }
504
505  /**
506   * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
507   * primary input fails with the given {@code exceptionType}, from the result provided by the
508   * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so
509   * if the primary input succeeds, it is never invoked. If, during the invocation of {@code
510   * fallback}, an exception is thrown, this exception is used as the result of the output {@code
511   * Future}.
512   *
513   * <p>Usage example:
514   *
515   * <pre>   {@code
516   *   ListenableFuture<Integer> fetchCounterFuture = ...;
517   *
518   *   // Falling back to a zero counter in case an exception happens when
519   *   // processing the RPC to fetch counters.
520   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catching(
521   *       fetchCounterFuture, FetchException.class,
522   *       new Function<FetchException, Integer>() {
523   *         public Integer apply(FetchException e) {
524   *           return 0;
525   *         }
526   *       });}</pre>
527   *
528   * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous
529   * choice in some cases. See the discussion in the {@link ListenableFuture#addListener
530   * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight
531   * listeners" refer here to the work done during {@code Function.apply}.
532   *
533   * @param input the primary input {@code Future}
534   * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding
535   *     bugs and other unrecoverable errors, callers should prefer more specific types, avoiding
536   *     {@code Throwable.class} in particular.
537   * @param fallback the {@link Function} implementation to be called if {@code input} fails with
538   *     the expected exception type
539   * @since 19.0
540   */
541  @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
542  @CheckReturnValue
543  public static <V, X extends Throwable> ListenableFuture<V> catching(
544      ListenableFuture<? extends V> input, Class<X> exceptionType,
545      Function<? super X, ? extends V> fallback) {
546    CatchingFuture<V, X> future = new CatchingFuture<V, X>(input, exceptionType, fallback);
547    input.addListener(future, directExecutor());
548    return future;
549  }
550
551  /**
552   * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
553   * primary input fails with the given {@code exceptionType}, from the result provided by the
554   * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so
555   * if the primary input succeeds, it is never invoked. If, during the invocation of {@code
556   * fallback}, an exception is thrown, this exception is used as the result of the output {@code
557   * Future}.
558   *
559   * <p>Usage example:
560   *
561   * <pre>   {@code
562   *   ListenableFuture<Integer> fetchCounterFuture = ...;
563   *
564   *   // Falling back to a zero counter in case an exception happens when
565   *   // processing the RPC to fetch counters.
566   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catching(
567   *       fetchCounterFuture, FetchException.class,
568   *       new Function<FetchException, Integer>() {
569   *         public Integer apply(FetchException e) {
570   *           return 0;
571   *         }
572   *       }, directExecutor());}</pre>
573   *
574   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
575   * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
576   * documentation. The documentation's warnings about "lightweight listeners" refer here to the
577   * work done during {@code Function.apply}.
578   *
579   * @param input the primary input {@code Future}
580   * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding
581   *     bugs and other unrecoverable errors, callers should prefer more specific types, avoiding
582   *     {@code Throwable.class} in particular.
583   * @param fallback the {@link Function} implementation to be called if {@code input} fails with
584   *     the expected exception type
585   * @param executor the executor that runs {@code fallback} if {@code input} fails
586   * @since 19.0
587   */
588  @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
589  @CheckReturnValue
590  public static <V, X extends Throwable> ListenableFuture<V> catching(
591      ListenableFuture<? extends V> input, Class<X> exceptionType,
592      Function<? super X, ? extends V> fallback, Executor executor) {
593    CatchingFuture<V, X> future = new CatchingFuture<V, X>(input, exceptionType, fallback);
594    input.addListener(future, rejectionPropagatingExecutor(executor, future));
595    return future;
596  }
597
598  /**
599   * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
600   * primary input fails with the given {@code exceptionType}, from the result provided by the
601   * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has
602   * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of
603   * {@code fallback}, an exception is thrown, this exception is used as the result of the output
604   * {@code Future}.
605   *
606   * <p>Usage examples:
607   *
608   * <pre>   {@code
609   *   ListenableFuture<Integer> fetchCounterFuture = ...;
610   *
611   *   // Falling back to a zero counter in case an exception happens when
612   *   // processing the RPC to fetch counters.
613   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(
614   *       fetchCounterFuture, FetchException.class,
615   *       new AsyncFunction<FetchException, Integer>() {
616   *         public ListenableFuture<Integer> apply(FetchException e) {
617   *           return immediateFuture(0);
618   *         }
619   *       });}</pre>
620   *
621   * <p>The fallback can also choose to propagate the original exception when desired:
622   *
623   * <pre>   {@code
624   *   ListenableFuture<Integer> fetchCounterFuture = ...;
625   *
626   *   // Falling back to a zero counter only in case the exception was a
627   *   // TimeoutException.
628   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(
629   *       fetchCounterFuture, FetchException.class,
630   *       new AsyncFunction<FetchException, Integer>() {
631   *         public ListenableFuture<Integer> apply(FetchException e)
632   *             throws FetchException {
633   *           if (omitDataOnFetchFailure) {
634   *             return immediateFuture(0);
635   *           }
636   *           throw e;
637   *         }
638   *       });}</pre>
639   *
640   * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous
641   * choice in some cases. See the discussion in the {@link ListenableFuture#addListener
642   * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight
643   * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done
644   * to complete the returned {@code Future}.
645   *
646   * @param input the primary input {@code Future}
647   * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding
648   *     bugs and other unrecoverable errors, callers should prefer more specific types, avoiding
649   *     {@code Throwable.class} in particular.
650   * @param fallback the {@link AsyncFunction} implementation to be called if {@code input} fails
651   *     with the expected exception type
652   * @since 19.0 (similar functionality in 14.0 as {@code withFallback})
653   */
654  @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
655  // TODO(kak): @CheckReturnValue
656  public static <V, X extends Throwable> ListenableFuture<V> catchingAsync(
657      ListenableFuture<? extends V> input, Class<X> exceptionType,
658      AsyncFunction<? super X, ? extends V> fallback) {
659    AsyncCatchingFuture<V, X> future =
660        new AsyncCatchingFuture<V, X>(input, exceptionType, fallback);
661    input.addListener(future, directExecutor());
662    return future;
663  }
664
665  /**
666   * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
667   * primary input fails with the given {@code exceptionType}, from the result provided by the
668   * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has
669   * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of
670   * {@code fallback}, an exception is thrown, this exception is used as the result of the output
671   * {@code Future}.
672   *
673   * <p>Usage examples:
674   *
675   * <pre>   {@code
676   *   ListenableFuture<Integer> fetchCounterFuture = ...;
677   *
678   *   // Falling back to a zero counter in case an exception happens when
679   *   // processing the RPC to fetch counters.
680   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(
681   *       fetchCounterFuture, FetchException.class,
682   *       new AsyncFunction<FetchException, Integer>() {
683   *         public ListenableFuture<Integer> apply(FetchException e) {
684   *           return immediateFuture(0);
685   *         }
686   *       }, directExecutor());}</pre>
687   *
688   * <p>The fallback can also choose to propagate the original exception when desired:
689   *
690   * <pre>   {@code
691   *   ListenableFuture<Integer> fetchCounterFuture = ...;
692   *
693   *   // Falling back to a zero counter only in case the exception was a
694   *   // TimeoutException.
695   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(
696   *       fetchCounterFuture, FetchException.class,
697   *       new AsyncFunction<FetchException, Integer>() {
698   *         public ListenableFuture<Integer> apply(FetchException e)
699   *             throws FetchException {
700   *           if (omitDataOnFetchFailure) {
701   *             return immediateFuture(0);
702   *           }
703   *           throw e;
704   *         }
705   *       }, directExecutor());}</pre>
706   *
707   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
708   * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
709   * documentation. The documentation's warnings about "lightweight listeners" refer here to the
710   * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned
711   * {@code Future}.
712   *
713   * @param input the primary input {@code Future}
714   * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding
715   *     bugs and other unrecoverable errors, callers should prefer more specific types, avoiding
716   *     {@code Throwable.class} in particular.
717   * @param fallback the {@link AsyncFunction} implementation to be called if {@code input} fails
718   *     with the expected exception type
719   * @param executor the executor that runs {@code fallback} if {@code input} fails
720   * @since 19.0 (similar functionality in 14.0 as {@code withFallback})
721   */
722  @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
723  // TODO(kak): @CheckReturnValue
724  public static <V, X extends Throwable> ListenableFuture<V> catchingAsync(
725      ListenableFuture<? extends V> input, Class<X> exceptionType,
726      AsyncFunction<? super X, ? extends V> fallback, Executor executor) {
727    AsyncCatchingFuture<V, X> future =
728        new AsyncCatchingFuture<V, X>(input, exceptionType, fallback);
729    input.addListener(future, rejectionPropagatingExecutor(executor, future));
730    return future;
731  }
732
733  @Deprecated
734  static <V> AsyncFunction<Throwable, V> asAsyncFunction(final FutureFallback<V> fallback) {
735    checkNotNull(fallback);
736    return new AsyncFunction<Throwable, V>() {
737      @Override
738      public ListenableFuture<V> apply(Throwable t) throws Exception {
739        return checkNotNull(fallback.create(t), "FutureFallback.create returned null instead of a "
740            + "Future. Did you mean to return immediateFuture(null)?");
741      }
742    };
743  }
744
745  private abstract static class AbstractCatchingFuture<V, X extends Throwable, F>
746      extends AbstractFuture.TrustedFuture<V> implements Runnable {
747    @Nullable ListenableFuture<? extends V> inputFuture;
748    @Nullable Class<X> exceptionType;
749    @Nullable F fallback;
750
751    AbstractCatchingFuture(
752        ListenableFuture<? extends V> inputFuture, Class<X> exceptionType, F fallback) {
753      this.inputFuture = checkNotNull(inputFuture);
754      this.exceptionType = checkNotNull(exceptionType);
755      this.fallback = checkNotNull(fallback);
756    }
757
758    @Override public final void run() {
759      ListenableFuture<? extends V> localInputFuture = inputFuture;
760      Class<X> localExceptionType = exceptionType;
761      F localFallback = fallback;
762      if (localInputFuture == null | localExceptionType == null | localFallback == null
763          | isCancelled()) {
764        return;
765      }
766      inputFuture = null;
767      exceptionType = null;
768      fallback = null;
769
770      Throwable throwable;
771      try {
772        set(getUninterruptibly(localInputFuture));
773        return;
774      } catch (ExecutionException e) {
775        throwable = e.getCause();
776      } catch (Throwable e) {  // this includes cancellation exception
777        throwable = e;
778      }
779      try {
780        if (isInstanceOfThrowableClass(throwable, localExceptionType)) {
781          @SuppressWarnings("unchecked") // verified safe by isInstance
782          X castThrowable = (X) throwable;
783          doFallback(localFallback, castThrowable);
784        } else {
785          setException(throwable);
786        }
787      } catch (Throwable e) {
788        setException(e);
789      }
790    }
791
792    /** Template method for subtypes to actually run the fallback. */
793    abstract void doFallback(F fallback, X throwable) throws Exception;
794
795    @Override final void done() {
796      maybePropagateCancellation(inputFuture);
797      this.inputFuture = null;
798      this.exceptionType = null;
799      this.fallback = null;
800    }
801  }
802
803  /**
804   * A {@link AbstractCatchingFuture} that delegates to an {@link AsyncFunction}
805   * and {@link #setFuture(ListenableFuture)} to implement {@link #doFallback}
806   */
807  static final class AsyncCatchingFuture<V, X extends Throwable>
808      extends AbstractCatchingFuture<V, X, AsyncFunction<? super X, ? extends V>> {
809
810    AsyncCatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType,
811        AsyncFunction<? super X, ? extends V> fallback) {
812      super(input, exceptionType, fallback);
813    }
814
815    @Override void doFallback(
816        AsyncFunction<? super X, ? extends V> fallback, X cause) throws Exception {
817      ListenableFuture<? extends V> replacement = fallback.apply(cause);
818      checkNotNull(replacement, "AsyncFunction.apply returned null instead of a Future. "
819          + "Did you mean to return immediateFuture(null)?");
820      setFuture(replacement);
821    }
822  }
823
824  /**
825   * A {@link AbstractCatchingFuture} that delegates to a {@link Function}
826   * and {@link #set(Object)} to implement {@link #doFallback}
827   */
828  static final class CatchingFuture<V, X extends Throwable>
829      extends AbstractCatchingFuture<V, X, Function<? super X, ? extends V>> {
830    CatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType,
831        Function<? super X, ? extends V> fallback) {
832      super(input, exceptionType, fallback);
833    }
834
835    @Override void doFallback(Function<? super X, ? extends V> fallback, X cause) throws Exception {
836      V replacement = fallback.apply(cause);
837      set(replacement);
838    }
839  }
840
841  /**
842   * Returns a future that delegates to another but will finish early (via a
843   * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the
844   * specified duration expires.
845   *
846   * <p>The delegate future is interrupted and cancelled if it times out.
847   *
848   * @param delegate The future to delegate to.
849   * @param time when to timeout the future
850   * @param unit the time unit of the time parameter
851   * @param scheduledExecutor The executor service to enforce the timeout.
852   *
853   * @since 19.0
854   */
855  @GwtIncompatible("java.util.concurrent.ScheduledExecutorService")
856  @CheckReturnValue
857  public static <V> ListenableFuture<V> withTimeout(ListenableFuture<V> delegate,
858      long time, TimeUnit unit, ScheduledExecutorService scheduledExecutor) {
859    TimeoutFuture<V> result = new TimeoutFuture<V>(delegate);
860    TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result);
861    result.timer = scheduledExecutor.schedule(fire, time, unit);
862    delegate.addListener(fire, directExecutor());
863    return result;
864  }
865
866  /**
867   * Future that delegates to another but will finish early (via a {@link
868   * TimeoutException} wrapped in an {@link ExecutionException}) if the
869   * specified duration expires.
870   * The delegate future is interrupted and cancelled if it times out.
871   */
872  private static final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> {
873    // Memory visibility of these fields.
874    // There are two cases to consider.
875    // 1. visibility of the writes to these fields to Fire.run
876    //    The initial write to delegateRef is made definitely visible via the semantics of
877    //    addListener/SES.schedule.  The later racy write in cancel() is not guaranteed to be
878    //    observed, however that is fine since the correctness is based on the atomic state in
879    //    our base class.
880    //    The initial write to timer is never definitely visible to Fire.run since it is assigned
881    //    after SES.schedule is called. Therefore Fire.run has to check for null.  However, it
882    //    should be visible if Fire.run is called by delegate.addListener since addListener is
883    //    called after the assignment to timer, and importantly this is the main situation in which
884    //    we need to be able to see the write.
885    // 2. visibility of the writes to cancel
886    //    Since these fields are non-final that means that TimeoutFuture is not being 'safely
887    //    published', thus a motivated caller may be able to expose the reference to another thread
888    //    that would then call cancel() and be unable to cancel the delegate.
889    //    There are a number of ways to solve this, none of which are very pretty, and it is
890    //    currently believed to be a purely theoretical problem (since the other actions should
891    //    supply sufficient write-barriers).
892
893    @Nullable ListenableFuture<V> delegateRef;
894    @Nullable Future<?> timer;
895
896    TimeoutFuture(ListenableFuture<V> delegate) {
897      this.delegateRef = Preconditions.checkNotNull(delegate);
898    }
899
900    /** A runnable that is called when the delegate or the timer completes. */
901    private static final class Fire<V> implements Runnable {
902      @Nullable TimeoutFuture<V> timeoutFutureRef;
903
904      Fire(TimeoutFuture<V> timeoutFuture) {
905        this.timeoutFutureRef = timeoutFuture;
906      }
907
908      @Override public void run() {
909        // If either of these reads return null then we must be after a successful cancel
910        // or another call to this method.
911        TimeoutFuture<V> timeoutFuture = timeoutFutureRef;
912        if (timeoutFuture == null) {
913          return;
914        }
915        ListenableFuture<V> delegate = timeoutFuture.delegateRef;
916        if (delegate == null) {
917          return;
918        }
919
920        /*
921         * If we're about to complete the TimeoutFuture, we want to release our reference to it.
922         * Otherwise, we'll pin it (and its result) in memory until the timeout task is GCed. (The
923         * need to clear our reference to the TimeoutFuture is the reason we use a *static* nested
924         * class with a manual reference back to the "containing" class.)
925         *
926         * This has the nice-ish side effect of limiting reentrancy: run() calls
927         * timeoutFuture.setException() calls run(). That reentrancy would already be harmless,
928         * since timeoutFuture can be set (and delegate cancelled) only once. (And "set only once"
929         * is important for other reasons: run() can still be invoked concurrently in different
930         * threads, even with the above null checks.)
931         */
932        timeoutFutureRef = null;
933        if (delegate.isDone()) {
934          timeoutFuture.setFuture(delegate);
935        } else {
936          try {
937            // TODO(lukes): this stack trace is particularly useless (all it does is point at the
938            // scheduledexecutorservice thread), consider eliminating it altogether?
939            timeoutFuture.setException(new TimeoutException("Future timed out: " + delegate));
940          } finally {
941            delegate.cancel(true);
942          }
943        }
944      }
945    }
946
947    @Override void done() {
948      maybePropagateCancellation(delegateRef);
949
950      Future<?> localTimer = timer;
951      // Try to cancel the timer as an optimization
952      // timer may be null if this call to run was by the timer task since there is no
953      // happens-before edge between the assignment to timer and an execution of the timer task.
954      if (localTimer != null) {
955        localTimer.cancel(false);
956      }
957
958      delegateRef = null;
959      timer = null;
960    }
961  }
962
963  /**
964   * Returns a new {@code ListenableFuture} whose result is asynchronously
965   * derived from the result of the given {@code Future}. More precisely, the
966   * returned {@code Future} takes its result from a {@code Future} produced by
967   * applying the given {@code AsyncFunction} to the result of the original
968   * {@code Future}. Example:
969   *
970   * <pre>   {@code
971   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
972   *   AsyncFunction<RowKey, QueryResult> queryFunction =
973   *       new AsyncFunction<RowKey, QueryResult>() {
974   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
975   *           return dataService.read(rowKey);
976   *         }
977   *       };
978   *   ListenableFuture<QueryResult> queryFuture =
979   *       transform(rowKeyFuture, queryFunction);}</pre>
980   *
981   * <p>This overload, which does not accept an executor, uses {@code
982   * directExecutor}, a dangerous choice in some cases. See the discussion in
983   * the {@link ListenableFuture#addListener ListenableFuture.addListener}
984   * documentation. The documentation's warnings about "lightweight listeners"
985   * refer here to the work done during {@code AsyncFunction.apply}, not to any
986   * work done to complete the returned {@code Future}.
987   *
988   * <p>The returned {@code Future} attempts to keep its cancellation state in
989   * sync with that of the input future and that of the future returned by the
990   * function. That is, if the returned {@code Future} is cancelled, it will
991   * attempt to cancel the other two, and if either of the other two is
992   * cancelled, the returned {@code Future} will receive a callback in which it
993   * will attempt to cancel itself.
994   *
995   * @param input The future to transform
996   * @param function A function to transform the result of the input future
997   *     to the result of the output future
998   * @return A future that holds result of the function (if the input succeeded)
999   *     or the original input's failure (if not)
1000   * @since 11.0
1001   * @deprecated These {@code AsyncFunction} overloads of {@code transform} are
1002   *     being renamed to {@code transformAsync}. (The {@code Function}
1003   *     overloads are keeping the "transform" name.) This method will be removed in Guava release
1004   *     20.0.
1005   */
1006  @Deprecated
1007  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
1008      AsyncFunction<? super I, ? extends O> function) {
1009    return transformAsync(input, function);
1010  }
1011
1012  /**
1013   * Returns a new {@code ListenableFuture} whose result is asynchronously
1014   * derived from the result of the given {@code Future}. More precisely, the
1015   * returned {@code Future} takes its result from a {@code Future} produced by
1016   * applying the given {@code AsyncFunction} to the result of the original
1017   * {@code Future}. Example:
1018   *
1019   * <pre>   {@code
1020   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
1021   *   AsyncFunction<RowKey, QueryResult> queryFunction =
1022   *       new AsyncFunction<RowKey, QueryResult>() {
1023   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
1024   *           return dataService.read(rowKey);
1025   *         }
1026   *       };
1027   *   ListenableFuture<QueryResult> queryFuture =
1028   *       transform(rowKeyFuture, queryFunction, executor);}</pre>
1029   *
1030   * <p>When selecting an executor, note that {@code directExecutor} is
1031   * dangerous in some cases. See the discussion in the {@link
1032   * ListenableFuture#addListener ListenableFuture.addListener} documentation.
1033   * The documentation's warnings about "lightweight listeners" refer here to
1034   * the work done during {@code AsyncFunction.apply}, not to any work done to
1035   * complete the returned {@code Future}.
1036   *
1037   * <p>The returned {@code Future} attempts to keep its cancellation state in
1038   * sync with that of the input future and that of the future returned by the
1039   * chain function. That is, if the returned {@code Future} is cancelled, it
1040   * will attempt to cancel the other two, and if either of the other two is
1041   * cancelled, the returned {@code Future} will receive a callback in which it
1042   * will attempt to cancel itself.
1043   *
1044   * @param input The future to transform
1045   * @param function A function to transform the result of the input future
1046   *     to the result of the output future
1047   * @param executor Executor to run the function in.
1048   * @return A future that holds result of the function (if the input succeeded)
1049   *     or the original input's failure (if not)
1050   * @since 11.0
1051   * @deprecated These {@code AsyncFunction} overloads of {@code transform} are
1052   *     being renamed to {@code transformAsync}. (The {@code Function}
1053   *     overloads are keeping the "transform" name.) This method will be removed in Guava release
1054   *     20.0.
1055   */
1056  @Deprecated
1057  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
1058      AsyncFunction<? super I, ? extends O> function,
1059      Executor executor) {
1060    return transformAsync(input, function, executor);
1061  }
1062
1063  /**
1064   * Returns a new {@code ListenableFuture} whose result is asynchronously derived from the result
1065   * of the given {@code Future}. More precisely, the returned {@code Future} takes its result from
1066   * a {@code Future} produced by applying the given {@code AsyncFunction} to the result of the
1067   * original {@code Future}. Example:
1068   *
1069   * <pre>   {@code
1070   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
1071   *   AsyncFunction<RowKey, QueryResult> queryFunction =
1072   *       new AsyncFunction<RowKey, QueryResult>() {
1073   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
1074   *           return dataService.read(rowKey);
1075   *         }
1076   *       };
1077   *   ListenableFuture<QueryResult> queryFuture =
1078   *       transformAsync(rowKeyFuture, queryFunction);}</pre>
1079   *
1080   * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous
1081   * choice in some cases. See the discussion in the {@link ListenableFuture#addListener
1082   * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight
1083   * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done
1084   * to complete the returned {@code Future}.
1085   *
1086   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
1087   * input future and that of the future returned by the function. That is, if the returned {@code
1088   * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two
1089   * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to
1090   * cancel itself.
1091   *
1092   * @param input The future to transform
1093   * @param function A function to transform the result of the input future to the result of the
1094   *     output future
1095   * @return A future that holds result of the function (if the input succeeded) or the original
1096   *     input's failure (if not)
1097   * @since 19.0 (in 11.0 as {@code transform})
1098   */
1099  public static <I, O> ListenableFuture<O> transformAsync(
1100      ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) {
1101    AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function);
1102    input.addListener(output, directExecutor());
1103    return output;
1104  }
1105
1106  /**
1107   * Returns a new {@code ListenableFuture} whose result is asynchronously derived from the result
1108   * of the given {@code Future}. More precisely, the returned {@code Future} takes its result from
1109   * a {@code Future} produced by applying the given {@code AsyncFunction} to the result of the
1110   * original {@code Future}. Example:
1111   *
1112   * <pre>   {@code
1113   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
1114   *   AsyncFunction<RowKey, QueryResult> queryFunction =
1115   *       new AsyncFunction<RowKey, QueryResult>() {
1116   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
1117   *           return dataService.read(rowKey);
1118   *         }
1119   *       };
1120   *   ListenableFuture<QueryResult> queryFuture =
1121   *       transformAsync(rowKeyFuture, queryFunction, executor);}</pre>
1122   *
1123   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
1124   * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
1125   * documentation. The documentation's warnings about "lightweight listeners" refer here to the
1126   * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned
1127   * {@code Future}.
1128   *
1129   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
1130   * input future and that of the future returned by the chain function. That is, if the returned
1131   * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
1132   * other two is cancelled, the returned {@code Future} will receive a callback in which it will
1133   * attempt to cancel itself.
1134   *
1135   * @param input The future to transform
1136   * @param function A function to transform the result of the input future to the result of the
1137   *     output future
1138   * @param executor Executor to run the function in.
1139   * @return A future that holds result of the function (if the input succeeded) or the original
1140   *     input's failure (if not)
1141   * @since 19.0 (in 11.0 as {@code transform})
1142   */
1143  public static <I, O> ListenableFuture<O> transformAsync(ListenableFuture<I> input,
1144      AsyncFunction<? super I, ? extends O> function, Executor executor) {
1145    checkNotNull(executor);
1146    AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function);
1147    input.addListener(output, rejectionPropagatingExecutor(executor, output));
1148    return output;
1149  }
1150
1151  /**
1152   * Returns an Executor that will propagate {@link RejectedExecutionException} from the delegate
1153   * executor to the given {@code future}.
1154   *
1155   * <p>Note, the returned executor can only be used once.
1156   */
1157  private static Executor rejectionPropagatingExecutor(
1158      final Executor delegate, final AbstractFuture<?> future) {
1159    checkNotNull(delegate);
1160    if (delegate == directExecutor()) {
1161      // directExecutor() cannot throw RejectedExecutionException
1162      return delegate;
1163    }
1164    return new Executor() {
1165      volatile boolean thrownFromDelegate = true;
1166      @Override public void execute(final Runnable command) {
1167        try {
1168          delegate.execute(new Runnable() {
1169            @Override public void run() {
1170              thrownFromDelegate = false;
1171              command.run();
1172            }
1173          });
1174        } catch (RejectedExecutionException e) {
1175          if (thrownFromDelegate) {
1176            // wrap exception?
1177            future.setException(e);
1178          }
1179          // otherwise it must have been thrown from a transitive call and the delegate runnable
1180          // should have handled it.
1181        }
1182      }
1183    };
1184  }
1185
1186  /**
1187   * Returns a new {@code ListenableFuture} whose result is the product of
1188   * applying the given {@code Function} to the result of the given {@code
1189   * Future}. Example:
1190   *
1191   * <pre>   {@code
1192   *   ListenableFuture<QueryResult> queryFuture = ...;
1193   *   Function<QueryResult, List<Row>> rowsFunction =
1194   *       new Function<QueryResult, List<Row>>() {
1195   *         public List<Row> apply(QueryResult queryResult) {
1196   *           return queryResult.getRows();
1197   *         }
1198   *       };
1199   *   ListenableFuture<List<Row>> rowsFuture =
1200   *       transform(queryFuture, rowsFunction);}</pre>
1201   *
1202   * <p>This overload, which does not accept an executor, uses {@code
1203   * directExecutor}, a dangerous choice in some cases. See the discussion in
1204   * the {@link ListenableFuture#addListener ListenableFuture.addListener}
1205   * documentation. The documentation's warnings about "lightweight listeners"
1206   * refer here to the work done during {@code Function.apply}.
1207   *
1208   * <p>The returned {@code Future} attempts to keep its cancellation state in
1209   * sync with that of the input future. That is, if the returned {@code Future}
1210   * is cancelled, it will attempt to cancel the input, and if the input is
1211   * cancelled, the returned {@code Future} will receive a callback in which it
1212   * will attempt to cancel itself.
1213   *
1214   * <p>An example use of this method is to convert a serializable object
1215   * returned from an RPC into a POJO.
1216   *
1217   * @param input The future to transform
1218   * @param function A Function to transform the results of the provided future
1219   *     to the results of the returned future.  This will be run in the thread
1220   *     that notifies input it is complete.
1221   * @return A future that holds result of the transformation.
1222   * @since 9.0 (in 1.0 as {@code compose})
1223   */
1224  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
1225      final Function<? super I, ? extends O> function) {
1226    checkNotNull(function);
1227    ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function);
1228    input.addListener(output, directExecutor());
1229    return output;
1230  }
1231
1232  /**
1233   * Returns a new {@code ListenableFuture} whose result is the product of
1234   * applying the given {@code Function} to the result of the given {@code
1235   * Future}. Example:
1236   *
1237   * <pre>   {@code
1238   *   ListenableFuture<QueryResult> queryFuture = ...;
1239   *   Function<QueryResult, List<Row>> rowsFunction =
1240   *       new Function<QueryResult, List<Row>>() {
1241   *         public List<Row> apply(QueryResult queryResult) {
1242   *           return queryResult.getRows();
1243   *         }
1244   *       };
1245   *   ListenableFuture<List<Row>> rowsFuture =
1246   *       transform(queryFuture, rowsFunction, executor);}</pre>
1247   *
1248   * <p>When selecting an executor, note that {@code directExecutor} is
1249   * dangerous in some cases. See the discussion in the {@link
1250   * ListenableFuture#addListener ListenableFuture.addListener} documentation.
1251   * The documentation's warnings about "lightweight listeners" refer here to
1252   * the work done during {@code Function.apply}.
1253   *
1254   * <p>The returned {@code Future} attempts to keep its cancellation state in
1255   * sync with that of the input future. That is, if the returned {@code Future}
1256   * is cancelled, it will attempt to cancel the input, and if the input is
1257   * cancelled, the returned {@code Future} will receive a callback in which it
1258   * will attempt to cancel itself.
1259   *
1260   * <p>An example use of this method is to convert a serializable object
1261   * returned from an RPC into a POJO.
1262   *
1263   * @param input The future to transform
1264   * @param function A Function to transform the results of the provided future
1265   *     to the results of the returned future.
1266   * @param executor Executor to run the function in.
1267   * @return A future that holds result of the transformation.
1268   * @since 9.0 (in 2.0 as {@code compose})
1269   */
1270  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
1271      final Function<? super I, ? extends O> function, Executor executor) {
1272    checkNotNull(function);
1273    ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function);
1274    input.addListener(output, rejectionPropagatingExecutor(executor, output));
1275    return output;
1276  }
1277
1278  /**
1279   * Like {@link #transform(ListenableFuture, Function)} except that the
1280   * transformation {@code function} is invoked on each call to
1281   * {@link Future#get() get()} on the returned future.
1282   *
1283   * <p>The returned {@code Future} reflects the input's cancellation
1284   * state directly, and any attempt to cancel the returned Future is likewise
1285   * passed through to the input Future.
1286   *
1287   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
1288   * only apply the timeout to the execution of the underlying {@code Future},
1289   * <em>not</em> to the execution of the transformation function.
1290   *
1291   * <p>The primary audience of this method is callers of {@code transform}
1292   * who don't have a {@code ListenableFuture} available and
1293   * do not mind repeated, lazy function evaluation.
1294   *
1295   * @param input The future to transform
1296   * @param function A Function to transform the results of the provided future
1297   *     to the results of the returned future.
1298   * @return A future that returns the result of the transformation.
1299   * @since 10.0
1300   */
1301  @GwtIncompatible("TODO")
1302  @CheckReturnValue
1303  public static <I, O> Future<O> lazyTransform(final Future<I> input,
1304      final Function<? super I, ? extends O> function) {
1305    checkNotNull(input);
1306    checkNotNull(function);
1307    return new Future<O>() {
1308
1309      @Override
1310      public boolean cancel(boolean mayInterruptIfRunning) {
1311        return input.cancel(mayInterruptIfRunning);
1312      }
1313
1314      @Override
1315      public boolean isCancelled() {
1316        return input.isCancelled();
1317      }
1318
1319      @Override
1320      public boolean isDone() {
1321        return input.isDone();
1322      }
1323
1324      @Override
1325      public O get() throws InterruptedException, ExecutionException {
1326        return applyTransformation(input.get());
1327      }
1328
1329      @Override
1330      public O get(long timeout, TimeUnit unit)
1331          throws InterruptedException, ExecutionException, TimeoutException {
1332        return applyTransformation(input.get(timeout, unit));
1333      }
1334
1335      private O applyTransformation(I input) throws ExecutionException {
1336        try {
1337          return function.apply(input);
1338        } catch (Throwable t) {
1339          throw new ExecutionException(t);
1340        }
1341      }
1342    };
1343  }
1344
1345  /**
1346   * An implementation of {@code ListenableFuture} that also implements
1347   * {@code Runnable} so that it can be used to nest ListenableFutures.
1348   * Once the passed-in {@code ListenableFuture} is complete, it calls the
1349   * passed-in {@code Function} to generate the result.
1350   *
1351   * <p>For historical reasons, this class has a special case in its exception
1352   * handling: If the given {@code AsyncFunction} throws an {@code
1353   * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
1354   * and uses its <i>cause</i> as the output future's exception, rather than
1355   * using the {@code UndeclaredThrowableException} itself as it would for other
1356   * exception types. The reason for this is that {@code Futures.transform} used
1357   * to require a {@code Function}, whose {@code apply} method is not allowed to
1358   * throw checked exceptions. Nowadays, {@code Futures.transform} has an
1359   * overload that accepts an {@code AsyncFunction}, whose {@code apply} method
1360   * <i>is</i> allowed to throw checked exception. Users who wish to throw
1361   * checked exceptions should use that overload instead, and <a
1362   * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
1363   * should remove the {@code UndeclaredThrowableException} special case</a>.
1364   */
1365  private abstract static class AbstractChainingFuture<I, O, F>
1366      extends AbstractFuture.TrustedFuture<O> implements Runnable {
1367    // In theory, this field might not be visible to a cancel() call in certain circumstances. For
1368    // details, see the comments on the fields of TimeoutFuture.
1369    @Nullable ListenableFuture<? extends I> inputFuture;
1370    @Nullable F function;
1371
1372    AbstractChainingFuture(ListenableFuture<? extends I> inputFuture, F function) {
1373      this.inputFuture = checkNotNull(inputFuture);
1374      this.function = checkNotNull(function);
1375    }
1376
1377    @Override
1378    public final void run() {
1379      try {
1380        ListenableFuture<? extends I> localInputFuture = inputFuture;
1381        F localFunction = function;
1382        if (isCancelled() | localInputFuture == null | localFunction == null) {
1383          return;
1384        }
1385        inputFuture = null;
1386        function = null;
1387
1388        I sourceResult;
1389        try {
1390          sourceResult = getUninterruptibly(localInputFuture);
1391        } catch (CancellationException e) {
1392          // Cancel this future and return.
1393          // At this point, inputFuture is cancelled and outputFuture doesn't
1394          // exist, so the value of mayInterruptIfRunning is irrelevant.
1395          cancel(false);
1396          return;
1397        } catch (ExecutionException e) {
1398          // Set the cause of the exception as this future's exception
1399          setException(e.getCause());
1400          return;
1401        }
1402        doTransform(localFunction, sourceResult);
1403      } catch (UndeclaredThrowableException e) {
1404        // Set the cause of the exception as this future's exception
1405        setException(e.getCause());
1406      } catch (Throwable t) {
1407        // This exception is irrelevant in this thread, but useful for the
1408        // client
1409        setException(t);
1410      }
1411    }
1412
1413    /** Template method for subtypes to actually run the transform. */
1414    abstract void doTransform(F function, I result) throws Exception;
1415
1416    @Override final void done() {
1417      maybePropagateCancellation(inputFuture);
1418      this.inputFuture = null;
1419      this.function = null;
1420    }
1421  }
1422
1423  /**
1424   * A {@link AbstractChainingFuture} that delegates to an {@link AsyncFunction} and
1425   * {@link #setFuture(ListenableFuture)} to implement {@link #doTransform}.
1426   */
1427  private static final class AsyncChainingFuture<I, O>
1428      extends AbstractChainingFuture<I, O, AsyncFunction<? super I, ? extends O>> {
1429    AsyncChainingFuture(ListenableFuture<? extends I> inputFuture,
1430        AsyncFunction<? super I, ? extends O> function) {
1431      super(inputFuture, function);
1432    }
1433
1434    @Override
1435    void doTransform(AsyncFunction<? super I, ? extends O> function, I input) throws Exception {
1436      ListenableFuture<? extends O> outputFuture = function.apply(input);
1437      checkNotNull(outputFuture, "AsyncFunction.apply returned null instead of a Future. "
1438          + "Did you mean to return immediateFuture(null)?");
1439      setFuture(outputFuture);
1440    }
1441  }
1442
1443  /**
1444   * A {@link AbstractChainingFuture} that delegates to a {@link Function} and
1445   * {@link #set(Object)} to implement {@link #doTransform}.
1446   */
1447  private static final class ChainingFuture<I, O>
1448      extends AbstractChainingFuture<I, O, Function<? super I, ? extends O>> {
1449
1450    ChainingFuture(ListenableFuture<? extends I> inputFuture,
1451        Function<? super I, ? extends O> function) {
1452      super(inputFuture, function);
1453    }
1454
1455    @Override
1456    void doTransform(Function<? super I, ? extends O> function, I input) {
1457      // TODO(lukes): move the UndeclaredThrowable catch block here?
1458      set(function.apply(input));
1459    }
1460  }
1461
1462  /**
1463   * Returns a new {@code ListenableFuture} whose result is the product of
1464   * calling {@code get()} on the {@code Future} nested within the given {@code
1465   * Future}, effectively chaining the futures one after the other.  Example:
1466   *
1467   * <pre>   {@code
1468   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
1469   *   ListenableFuture<String> dereferenced = dereference(nested);}</pre>
1470   *
1471   * <p>This call has the same cancellation and execution semantics as {@link
1472   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
1473   * Future} attempts to keep its cancellation state in sync with both the
1474   * input {@code Future} and the nested {@code Future}.  The transformation
1475   * is very lightweight and therefore takes place in the same thread (either
1476   * the thread that called {@code dereference}, or the thread in which the
1477   * dereferenced future completes).
1478   *
1479   * @param nested The nested future to transform.
1480   * @return A future that holds result of the inner future.
1481   * @since 13.0
1482   */
1483  @SuppressWarnings({"rawtypes", "unchecked"})
1484  @CheckReturnValue
1485  public static <V> ListenableFuture<V> dereference(
1486      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
1487    return transformAsync((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
1488  }
1489
1490  /**
1491   * Helper {@code Function} for {@link #dereference}.
1492   */
1493  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
1494      new AsyncFunction<ListenableFuture<Object>, Object>() {
1495        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
1496          return input;
1497        }
1498      };
1499
1500  /**
1501   * Creates a new {@code ListenableFuture} whose value is a list containing the
1502   * values of all its input futures, if all succeed. If any input fails, the
1503   * returned future fails immediately.
1504   *
1505   * <p>The list of results is in the same order as the input list.
1506   *
1507   * <p>Canceling this future will attempt to cancel all the component futures,
1508   * and if any of the provided futures fails or is canceled, this one is,
1509   * too.
1510   *
1511   * @param futures futures to combine
1512   * @return a future that provides a list of the results of the component
1513   *         futures
1514   * @since 10.0
1515   */
1516  @Beta
1517  @SafeVarargs
1518  @CheckReturnValue
1519  public static <V> ListenableFuture<List<V>> allAsList(
1520      ListenableFuture<? extends V>... futures) {
1521    return new ListFuture<V>(ImmutableList.copyOf(futures), true);
1522  }
1523
1524  /**
1525   * Creates a new {@code ListenableFuture} whose value is a list containing the
1526   * values of all its input futures, if all succeed. If any input fails, the
1527   * returned future fails immediately.
1528   *
1529   * <p>The list of results is in the same order as the input list.
1530   *
1531   * <p>Canceling this future will attempt to cancel all the component futures,
1532   * and if any of the provided futures fails or is canceled, this one is,
1533   * too.
1534   *
1535   * @param futures futures to combine
1536   * @return a future that provides a list of the results of the component
1537   *         futures
1538   * @since 10.0
1539   */
1540  @Beta
1541  @CheckReturnValue
1542  public static <V> ListenableFuture<List<V>> allAsList(
1543      Iterable<? extends ListenableFuture<? extends V>> futures) {
1544    return new ListFuture<V>(ImmutableList.copyOf(futures), true);
1545  }
1546
1547  /**
1548   * Creates a new {@code ListenableFuture} whose result is set from the
1549   * supplied future when it completes.  Cancelling the supplied future
1550   * will also cancel the returned future, but cancelling the returned
1551   * future will have no effect on the supplied future.
1552   *
1553   * @since 15.0
1554   */
1555  @GwtIncompatible("TODO")
1556  @CheckReturnValue
1557  public static <V> ListenableFuture<V> nonCancellationPropagating(
1558      ListenableFuture<V> future) {
1559    return new NonCancellationPropagatingFuture<V>(future);
1560  }
1561
1562  /**
1563   * A wrapped future that does not propagate cancellation to its delegate.
1564   */
1565  @GwtIncompatible("TODO")
1566  private static final class NonCancellationPropagatingFuture<V>
1567      extends AbstractFuture.TrustedFuture<V> {
1568    NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
1569      delegate.addListener(new Runnable() {
1570        @Override public void run() {
1571          // This prevents cancellation from propagating because we don't assign delegate until
1572          // delegate is already done, so calling cancel() on it is a no-op.
1573          setFuture(delegate);
1574        }
1575      }, directExecutor());
1576    }
1577  }
1578
1579  /**
1580   * Creates a new {@code ListenableFuture} whose value is a list containing the
1581   * values of all its successful input futures. The list of results is in the
1582   * same order as the input list, and if any of the provided futures fails or
1583   * is canceled, its corresponding position will contain {@code null} (which is
1584   * indistinguishable from the future having a successful value of
1585   * {@code null}).
1586   *
1587   * <p>Canceling this future will attempt to cancel all the component futures.
1588   *
1589   * @param futures futures to combine
1590   * @return a future that provides a list of the results of the component
1591   *         futures
1592   * @since 10.0
1593   */
1594  @Beta
1595  @SafeVarargs
1596  @CheckReturnValue
1597  public static <V> ListenableFuture<List<V>> successfulAsList(
1598      ListenableFuture<? extends V>... futures) {
1599    return new ListFuture<V>(ImmutableList.copyOf(futures), false);
1600  }
1601
1602  /**
1603   * Creates a new {@code ListenableFuture} whose value is a list containing the
1604   * values of all its successful input futures. The list of results is in the
1605   * same order as the input list, and if any of the provided futures fails or
1606   * is canceled, its corresponding position will contain {@code null} (which is
1607   * indistinguishable from the future having a successful value of
1608   * {@code null}).
1609   *
1610   * <p>Canceling this future will attempt to cancel all the component futures.
1611   *
1612   * @param futures futures to combine
1613   * @return a future that provides a list of the results of the component
1614   *         futures
1615   * @since 10.0
1616   */
1617  @Beta
1618  @CheckReturnValue
1619  public static <V> ListenableFuture<List<V>> successfulAsList(
1620      Iterable<? extends ListenableFuture<? extends V>> futures) {
1621    return new ListFuture<V>(ImmutableList.copyOf(futures), false);
1622  }
1623
1624  /**
1625   * Returns a list of delegate futures that correspond to the futures received in the order
1626   * that they complete. Delegate futures return the same value or throw the same exception
1627   * as the corresponding input future returns/throws.
1628   *
1629   * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
1630   * does not correspond to a specific input future until the appropriate number of input
1631   * futures have completed. At that point, it is too late to cancel the input future.
1632   * The input future's result, which cannot be stored into the cancelled delegate future,
1633   * is ignored.
1634   *
1635   * @since 17.0
1636   */
1637  @Beta
1638  @GwtIncompatible("TODO")
1639  @CheckReturnValue
1640  public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
1641      Iterable<? extends ListenableFuture<? extends T>> futures) {
1642    // A CLQ may be overkill here.  We could save some pointers/memory by synchronizing on an
1643    // ArrayDeque
1644    final ConcurrentLinkedQueue<SettableFuture<T>> delegates =
1645        Queues.newConcurrentLinkedQueue();
1646    ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
1647    // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
1648    // atomically and therefore that each returned future is guaranteed to be in completion order.
1649    // N.B. there are some cases where the use of this executor could have possibly surprising
1650    // effects when input futures finish at approximately the same time _and_ the output futures
1651    // have directExecutor listeners. In this situation, the listeners may end up running on a
1652    // different thread than if they were attached to the corresponding input future.  We believe
1653    // this to be a negligible cost since:
1654    // 1. Using the directExecutor implies that your callback is safe to run on any thread.
1655    // 2. This would likely only be noticeable if you were doing something expensive or blocking on
1656    //    a directExecutor listener on one of the output futures which is an antipattern anyway.
1657    SerializingExecutor executor = new SerializingExecutor(directExecutor());
1658    for (final ListenableFuture<? extends T> future : futures) {
1659      SettableFuture<T> delegate = SettableFuture.create();
1660      // Must make sure to add the delegate to the queue first in case the future is already done
1661      delegates.add(delegate);
1662      future.addListener(new Runnable() {
1663        @Override public void run() {
1664          delegates.remove().setFuture(future);
1665        }
1666      }, executor);
1667      listBuilder.add(delegate);
1668    }
1669    return listBuilder.build();
1670  }
1671
1672  /**
1673   * Registers separate success and failure callbacks to be run when the {@code
1674   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1675   * complete} or, if the computation is already complete, immediately.
1676   *
1677   * <p>There is no guaranteed ordering of execution of callbacks, but any
1678   * callback added through this method is guaranteed to be called once the
1679   * computation is complete.
1680   *
1681   * Example: <pre> {@code
1682   * ListenableFuture<QueryResult> future = ...;
1683   * addCallback(future,
1684   *     new FutureCallback<QueryResult> {
1685   *       public void onSuccess(QueryResult result) {
1686   *         storeInCache(result);
1687   *       }
1688   *       public void onFailure(Throwable t) {
1689   *         reportError(t);
1690   *       }
1691   *     });}</pre>
1692   *
1693   * <p>This overload, which does not accept an executor, uses {@code
1694   * directExecutor}, a dangerous choice in some cases. See the discussion in
1695   * the {@link ListenableFuture#addListener ListenableFuture.addListener}
1696   * documentation.
1697   *
1698   * <p>For a more general interface to attach a completion listener to a
1699   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1700   *
1701   * @param future The future attach the callback to.
1702   * @param callback The callback to invoke when {@code future} is completed.
1703   * @since 10.0
1704   */
1705  public static <V> void addCallback(ListenableFuture<V> future,
1706      FutureCallback<? super V> callback) {
1707    addCallback(future, callback, directExecutor());
1708  }
1709
1710  /**
1711   * Registers separate success and failure callbacks to be run when the {@code
1712   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1713   * complete} or, if the computation is already complete, immediately.
1714   *
1715   * <p>The callback is run in {@code executor}.
1716   * There is no guaranteed ordering of execution of callbacks, but any
1717   * callback added through this method is guaranteed to be called once the
1718   * computation is complete.
1719   *
1720   * Example: <pre> {@code
1721   * ListenableFuture<QueryResult> future = ...;
1722   * Executor e = ...
1723   * addCallback(future,
1724   *     new FutureCallback<QueryResult> {
1725   *       public void onSuccess(QueryResult result) {
1726   *         storeInCache(result);
1727   *       }
1728   *       public void onFailure(Throwable t) {
1729   *         reportError(t);
1730   *       }
1731   *     }, e);}</pre>
1732   *
1733   * <p>When selecting an executor, note that {@code directExecutor} is
1734   * dangerous in some cases. See the discussion in the {@link
1735   * ListenableFuture#addListener ListenableFuture.addListener} documentation.
1736   *
1737   * <p>For a more general interface to attach a completion listener to a
1738   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1739   *
1740   * @param future The future attach the callback to.
1741   * @param callback The callback to invoke when {@code future} is completed.
1742   * @param executor The executor to run {@code callback} when the future
1743   *    completes.
1744   * @since 10.0
1745   */
1746  public static <V> void addCallback(final ListenableFuture<V> future,
1747      final FutureCallback<? super V> callback, Executor executor) {
1748    Preconditions.checkNotNull(callback);
1749    Runnable callbackListener = new Runnable() {
1750      @Override
1751      public void run() {
1752        final V value;
1753        try {
1754          // TODO(user): (Before Guava release), validate that this
1755          // is the thing for IE.
1756          value = getUninterruptibly(future);
1757        } catch (ExecutionException e) {
1758          callback.onFailure(e.getCause());
1759          return;
1760        } catch (RuntimeException e) {
1761          callback.onFailure(e);
1762          return;
1763        } catch (Error e) {
1764          callback.onFailure(e);
1765          return;
1766        }
1767        callback.onSuccess(value);
1768      }
1769    };
1770    future.addListener(callbackListener, executor);
1771  }
1772
1773  /**
1774   * Returns the result of {@link Future#get()}, converting most exceptions to a
1775   * new instance of the given checked exception type. This reduces boilerplate
1776   * for a common use of {@code Future} in which it is unnecessary to
1777   * programmatically distinguish between exception types or to extract other
1778   * information from the exception instance.
1779   *
1780   * <p>Exceptions from {@code Future.get} are treated as follows:
1781   * <ul>
1782   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1783   *     {@code X} if the cause is a checked exception, an {@link
1784   *     UncheckedExecutionException} if the cause is a {@code
1785   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1786   *     {@code Error}.
1787   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1788   *     restoring the interrupt).
1789   * <li>Any {@link CancellationException} is propagated untouched, as is any
1790   *     other {@link RuntimeException} (though {@code get} implementations are
1791   *     discouraged from throwing such exceptions).
1792   * </ul>
1793   *
1794   * <p>The overall principle is to continue to treat every checked exception as a
1795   * checked exception, every unchecked exception as an unchecked exception, and
1796   * every error as an error. In addition, the cause of any {@code
1797   * ExecutionException} is wrapped in order to ensure that the new stack trace
1798   * matches that of the current thread.
1799   *
1800   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1801   * public constructor that accepts zero or more arguments, all of type {@code
1802   * String} or {@code Throwable} (preferring constructors with at least one
1803   * {@code String}) and calling the constructor via reflection. If the
1804   * exception did not already have a cause, one is set by calling {@link
1805   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1806   * {@code IllegalArgumentException} is thrown.
1807   *
1808   * @throws X if {@code get} throws any checked exception except for an {@code
1809   *         ExecutionException} whose cause is not itself a checked exception
1810   * @throws UncheckedExecutionException if {@code get} throws an {@code
1811   *         ExecutionException} with a {@code RuntimeException} as its cause
1812   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1813   *         with an {@code Error} as its cause
1814   * @throws CancellationException if {@code get} throws a {@code
1815   *         CancellationException}
1816   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1817   *         RuntimeException} or does not have a suitable constructor
1818   * @since 10.0
1819   * @deprecated Use {@link #getChecked(Future, Class)}. This method will be
1820   *     removed in Guava release 20.0.
1821   */
1822  @Deprecated
1823  @GwtIncompatible("reflection")
1824  public static <V, X extends Exception> V get(
1825      Future<V> future, Class<X> exceptionClass) throws X {
1826    return getChecked(future, exceptionClass);
1827  }
1828
1829  /**
1830   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1831   * exceptions to a new instance of the given checked exception type. This
1832   * reduces boilerplate for a common use of {@code Future} in which it is
1833   * unnecessary to programmatically distinguish between exception types or to
1834   * extract other information from the exception instance.
1835   *
1836   * <p>Exceptions from {@code Future.get} are treated as follows:
1837   * <ul>
1838   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1839   *     {@code X} if the cause is a checked exception, an {@link
1840   *     UncheckedExecutionException} if the cause is a {@code
1841   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1842   *     {@code Error}.
1843   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1844   *     restoring the interrupt).
1845   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1846   * <li>Any {@link CancellationException} is propagated untouched, as is any
1847   *     other {@link RuntimeException} (though {@code get} implementations are
1848   *     discouraged from throwing such exceptions).
1849   * </ul>
1850   *
1851   * <p>The overall principle is to continue to treat every checked exception as a
1852   * checked exception, every unchecked exception as an unchecked exception, and
1853   * every error as an error. In addition, the cause of any {@code
1854   * ExecutionException} is wrapped in order to ensure that the new stack trace
1855   * matches that of the current thread.
1856   *
1857   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1858   * public constructor that accepts zero or more arguments, all of type {@code
1859   * String} or {@code Throwable} (preferring constructors with at least one
1860   * {@code String}) and calling the constructor via reflection. If the
1861   * exception did not already have a cause, one is set by calling {@link
1862   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1863   * {@code IllegalArgumentException} is thrown.
1864   *
1865   * @throws X if {@code get} throws any checked exception except for an {@code
1866   *         ExecutionException} whose cause is not itself a checked exception
1867   * @throws UncheckedExecutionException if {@code get} throws an {@code
1868   *         ExecutionException} with a {@code RuntimeException} as its cause
1869   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1870   *         with an {@code Error} as its cause
1871   * @throws CancellationException if {@code get} throws a {@code
1872   *         CancellationException}
1873   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1874   *         RuntimeException} or does not have a suitable constructor
1875   * @since 10.0
1876   * @deprecated Use {@link #getChecked(Future, Class, long, TimeUnit)}, noting
1877   *     the change in parameter order. This method will be removed in Guava
1878   *     release 20.0.
1879   */
1880  @Deprecated
1881  @GwtIncompatible("reflection")
1882  public static <V, X extends Exception> V get(
1883      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1884      throws X {
1885    return getChecked(future, exceptionClass, timeout, unit);
1886  }
1887
1888  /**
1889   * Returns the result of {@link Future#get()}, converting most exceptions to a
1890   * new instance of the given checked exception type. This reduces boilerplate
1891   * for a common use of {@code Future} in which it is unnecessary to
1892   * programmatically distinguish between exception types or to extract other
1893   * information from the exception instance.
1894   *
1895   * <p>Exceptions from {@code Future.get} are treated as follows:
1896   * <ul>
1897   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1898   *     {@code X} if the cause is a checked exception, an {@link
1899   *     UncheckedExecutionException} if the cause is a {@code
1900   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1901   *     {@code Error}.
1902   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1903   *     restoring the interrupt).
1904   * <li>Any {@link CancellationException} is propagated untouched, as is any
1905   *     other {@link RuntimeException} (though {@code get} implementations are
1906   *     discouraged from throwing such exceptions).
1907   * </ul>
1908   *
1909   * <p>The overall principle is to continue to treat every checked exception as a
1910   * checked exception, every unchecked exception as an unchecked exception, and
1911   * every error as an error. In addition, the cause of any {@code
1912   * ExecutionException} is wrapped in order to ensure that the new stack trace
1913   * matches that of the current thread.
1914   *
1915   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1916   * public constructor that accepts zero or more arguments, all of type {@code
1917   * String} or {@code Throwable} (preferring constructors with at least one
1918   * {@code String}) and calling the constructor via reflection. If the
1919   * exception did not already have a cause, one is set by calling {@link
1920   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1921   * {@code IllegalArgumentException} is thrown.
1922   *
1923   * @throws X if {@code get} throws any checked exception except for an {@code
1924   *     ExecutionException} whose cause is not itself a checked exception
1925   * @throws UncheckedExecutionException if {@code get} throws an {@code
1926   *     ExecutionException} with a {@code RuntimeException} as its cause
1927   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1928   *     with an {@code Error} as its cause
1929   * @throws CancellationException if {@code get} throws a {@code
1930   *     CancellationException}
1931   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1932   *     RuntimeException} or does not have a suitable constructor
1933   * @since 19.0 (in 10.0 as {@code get})
1934   */
1935  @GwtIncompatible("reflection")
1936  public static <V, X extends Exception> V getChecked(
1937      Future<V> future, Class<X> exceptionClass) throws X {
1938    return FuturesGetChecked.getChecked(future, exceptionClass);
1939  }
1940
1941  /**
1942   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1943   * exceptions to a new instance of the given checked exception type. This
1944   * reduces boilerplate for a common use of {@code Future} in which it is
1945   * unnecessary to programmatically distinguish between exception types or to
1946   * extract other information from the exception instance.
1947   *
1948   * <p>Exceptions from {@code Future.get} are treated as follows:
1949   * <ul>
1950   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1951   *     {@code X} if the cause is a checked exception, an {@link
1952   *     UncheckedExecutionException} if the cause is a {@code
1953   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1954   *     {@code Error}.
1955   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1956   *     restoring the interrupt).
1957   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1958   * <li>Any {@link CancellationException} is propagated untouched, as is any
1959   *     other {@link RuntimeException} (though {@code get} implementations are
1960   *     discouraged from throwing such exceptions).
1961   * </ul>
1962   *
1963   * <p>The overall principle is to continue to treat every checked exception as a
1964   * checked exception, every unchecked exception as an unchecked exception, and
1965   * every error as an error. In addition, the cause of any {@code
1966   * ExecutionException} is wrapped in order to ensure that the new stack trace
1967   * matches that of the current thread.
1968   *
1969   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1970   * public constructor that accepts zero or more arguments, all of type {@code
1971   * String} or {@code Throwable} (preferring constructors with at least one
1972   * {@code String}) and calling the constructor via reflection. If the
1973   * exception did not already have a cause, one is set by calling {@link
1974   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1975   * {@code IllegalArgumentException} is thrown.
1976   *
1977   * @throws X if {@code get} throws any checked exception except for an {@code
1978   *     ExecutionException} whose cause is not itself a checked exception
1979   * @throws UncheckedExecutionException if {@code get} throws an {@code
1980   *     ExecutionException} with a {@code RuntimeException} as its cause
1981   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1982   *     with an {@code Error} as its cause
1983   * @throws CancellationException if {@code get} throws a {@code
1984   *     CancellationException}
1985   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1986   *     RuntimeException} or does not have a suitable constructor
1987   * @since 19.0 (in 10.0 as {@code get} and with different parameter order)
1988   */
1989  @GwtIncompatible("reflection")
1990  public static <V, X extends Exception> V getChecked(
1991      Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit)
1992      throws X {
1993    return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit);
1994  }
1995
1996  /**
1997   * Returns the result of calling {@link Future#get()} uninterruptibly on a
1998   * task known not to throw a checked exception. This makes {@code Future} more
1999   * suitable for lightweight, fast-running tasks that, barring bugs in the
2000   * code, will not fail. This gives it exception-handling behavior similar to
2001   * that of {@code ForkJoinTask.join}.
2002   *
2003   * <p>Exceptions from {@code Future.get} are treated as follows:
2004   * <ul>
2005   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
2006   *     {@link UncheckedExecutionException} (if the cause is an {@code
2007   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
2008   *     Error}).
2009   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
2010   *     call. The interrupt is restored before {@code getUnchecked} returns.
2011   * <li>Any {@link CancellationException} is propagated untouched. So is any
2012   *     other {@link RuntimeException} ({@code get} implementations are
2013   *     discouraged from throwing such exceptions).
2014   * </ul>
2015   *
2016   * <p>The overall principle is to eliminate all checked exceptions: to loop to
2017   * avoid {@code InterruptedException}, to pass through {@code
2018   * CancellationException}, and to wrap any exception from the underlying
2019   * computation in an {@code UncheckedExecutionException} or {@code
2020   * ExecutionError}.
2021   *
2022   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
2023   * {@link Uninterruptibles#getUninterruptibly(Future)}.
2024   *
2025   * @throws UncheckedExecutionException if {@code get} throws an {@code
2026   *         ExecutionException} with an {@code Exception} as its cause
2027   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
2028   *         with an {@code Error} as its cause
2029   * @throws CancellationException if {@code get} throws a {@code
2030   *         CancellationException}
2031   * @since 10.0
2032   */
2033  @GwtIncompatible("TODO")
2034  public static <V> V getUnchecked(Future<V> future) {
2035    checkNotNull(future);
2036    try {
2037      return getUninterruptibly(future);
2038    } catch (ExecutionException e) {
2039      wrapAndThrowUnchecked(e.getCause());
2040      throw new AssertionError();
2041    }
2042  }
2043
2044  @GwtIncompatible("TODO")
2045  private static void wrapAndThrowUnchecked(Throwable cause) {
2046    if (cause instanceof Error) {
2047      throw new ExecutionError((Error) cause);
2048    }
2049    /*
2050     * It's a non-Error, non-Exception Throwable. From my survey of such
2051     * classes, I believe that most users intended to extend Exception, so we'll
2052     * treat it like an Exception.
2053     */
2054    throw new UncheckedExecutionException(cause);
2055  }
2056
2057  /*
2058   * Arguably we don't need a timed getUnchecked because any operation slow
2059   * enough to require a timeout is heavyweight enough to throw a checked
2060   * exception and therefore be inappropriate to use with getUnchecked. Further,
2061   * it's not clear that converting the checked TimeoutException to a
2062   * RuntimeException -- especially to an UncheckedExecutionException, since it
2063   * wasn't thrown by the computation -- makes sense, and if we don't convert
2064   * it, the user still has to write a try-catch block.
2065   *
2066   * If you think you would use this method, let us know. You might also also
2067   * look into the Fork-Join framework:
2068   * http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
2069   */
2070
2071  /** Used for {@link #allAsList} and {@link #successfulAsList}. */
2072  private static final class ListFuture<V> extends CollectionFuture<V, List<V>> {
2073    ListFuture(ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
2074        boolean allMustSucceed) {
2075      init(new ListFutureRunningState(futures, allMustSucceed));
2076    }
2077
2078    private final class ListFutureRunningState extends CollectionFutureRunningState {
2079      ListFutureRunningState(ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
2080        boolean allMustSucceed) {
2081        super(futures, allMustSucceed);
2082      }
2083
2084      @Override
2085      public List<V> combine(List<Optional<V>> values) {
2086        List<V> result = Lists.newArrayList();
2087        for (Optional<V> element : values) {
2088          result.add(element != null ? element.orNull() : null);
2089        }
2090        return Collections.unmodifiableList(result);
2091      }
2092    }
2093  }
2094
2095  /**
2096   * A checked future that uses a function to map from exceptions to the
2097   * appropriate checked type.
2098   */
2099  @GwtIncompatible("TODO")
2100  private static class MappingCheckedFuture<V, X extends Exception> extends
2101      AbstractCheckedFuture<V, X> {
2102
2103    final Function<? super Exception, X> mapper;
2104
2105    MappingCheckedFuture(ListenableFuture<V> delegate,
2106        Function<? super Exception, X> mapper) {
2107      super(delegate);
2108
2109      this.mapper = checkNotNull(mapper);
2110    }
2111
2112    @Override
2113    protected X mapException(Exception e) {
2114      return mapper.apply(e);
2115    }
2116  }
2117}