001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkNotNull; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static com.google.common.util.concurrent.Platform.isInstanceOfThrowableClass; 022import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 023 024import com.google.common.annotations.Beta; 025import com.google.common.annotations.GwtCompatible; 026import com.google.common.annotations.GwtIncompatible; 027import com.google.common.base.Function; 028import com.google.common.base.Optional; 029import com.google.common.base.Preconditions; 030import com.google.common.collect.ImmutableCollection; 031import com.google.common.collect.ImmutableList; 032import com.google.common.collect.Lists; 033import com.google.common.collect.Queues; 034 035import java.lang.reflect.UndeclaredThrowableException; 036import java.util.Collections; 037import java.util.List; 038import java.util.concurrent.CancellationException; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.Executor; 042import java.util.concurrent.Future; 043import java.util.concurrent.RejectedExecutionException; 044import java.util.concurrent.ScheduledExecutorService; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.TimeoutException; 047import java.util.logging.Level; 048import java.util.logging.Logger; 049 050import javax.annotation.CheckReturnValue; 051import javax.annotation.Nullable; 052 053/** 054 * Static utility methods pertaining to the {@link Future} interface. 055 * 056 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 057 * Guava User Guide article on <a href= 058 * "https://github.com/google/guava/wiki/ListenableFutureExplained"> 059 * {@code ListenableFuture}</a>. 060 * 061 * @author Kevin Bourrillion 062 * @author Nishant Thakkar 063 * @author Sven Mawson 064 * @since 1.0 065 */ 066@Beta 067@GwtCompatible(emulated = true) 068public final class Futures extends GwtFuturesCatchingSpecialization { 069 070 // A note on memory visibility. 071 // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine) 072 // have two requirements that significantly complicate their design. 073 // 1. Cancellation should propagate from the returned future to the input future(s). 074 // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion. 075 // 076 // A consequence of these requirements is that the delegate futures cannot be stored in 077 // final fields. 078 // 079 // For simplicity the rest of this description will discuss Futures.catching since it is the 080 // simplest instance, though very similar descriptions apply to many other classes in this file. 081 // 082 // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field 083 // 'inputFuture'. That field is non-final and non-volatile. There are 2 places where the 084 // 'inputFuture' field is read and where we will have to consider visibility of the write 085 // operation in the constructor. 086 // 087 // 1. In the listener that performs the callback. In this case it is fine since inputFuture is 088 // assigned prior to calling addListener, and addListener happens-before any invocation of the 089 // listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible 090 // to the listener. 091 // 092 // 2. In done() where we may propagate cancellation to the input. In this case it is _not_ fine. 093 // There is currently nothing that enforces that the write to inputFuture in the constructor is 094 // visible to done(). This is because there is no happens before edge between the write and a 095 // (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue, 096 // it would just add an edge such that if done() observed non-null, then it would also 097 // definitely observe all earlier writes, but we still have no guarantee that done() would see 098 // the inital write (just stronger guarantees if it does). 099 // 100 // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html 101 // For a (long) discussion about this specific issue and the general futility of life. 102 // 103 // For the time being we are OK with the problem discussed above since it requires a caller to 104 // introduce a very specific kind of data-race. And given the other operations performed by these 105 // methods that involve volatile read/write operations, in practice there is no issue. Also, the 106 // way in such a visibility issue would surface is most likely as a failure of cancel() to 107 // propagate to the input. Cancellation propagation is fundamentally racy so this is fine. 108 // 109 // Future versions of the JMM may revise safe construction semantics in such a way that we can 110 // safely publish these objects and we won't need this whole discussion. 111 // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs 112 // that should resolve the issue. This comes at the cost of adding more write barriers to the 113 // implementations. 114 115 private Futures() {} 116 117 /** 118 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 119 * and a {@link Function} that maps from {@link Exception} instances into the 120 * appropriate checked type. 121 * 122 * <p>The given mapping function will be applied to an 123 * {@link InterruptedException}, a {@link CancellationException}, or an 124 * {@link ExecutionException}. 125 * See {@link Future#get()} for details on the exceptions thrown. 126 * 127 * @since 9.0 (source-compatible since 1.0) 128 */ 129 @GwtIncompatible("TODO") 130 @CheckReturnValue 131 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 132 ListenableFuture<V> future, Function<? super Exception, X> mapper) { 133 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 134 } 135 136 private abstract static class ImmediateFuture<V> 137 implements ListenableFuture<V> { 138 139 private static final Logger log = 140 Logger.getLogger(ImmediateFuture.class.getName()); 141 142 @Override 143 public void addListener(Runnable listener, Executor executor) { 144 checkNotNull(listener, "Runnable was null."); 145 checkNotNull(executor, "Executor was null."); 146 try { 147 executor.execute(listener); 148 } catch (RuntimeException e) { 149 // ListenableFuture's contract is that it will not throw unchecked 150 // exceptions, so log the bad runnable and/or executor and swallow it. 151 log.log(Level.SEVERE, "RuntimeException while executing runnable " 152 + listener + " with executor " + executor, e); 153 } 154 } 155 156 @Override 157 public boolean cancel(boolean mayInterruptIfRunning) { 158 return false; 159 } 160 161 @Override 162 public abstract V get() throws ExecutionException; 163 164 @Override 165 public V get(long timeout, TimeUnit unit) throws ExecutionException { 166 checkNotNull(unit); 167 return get(); 168 } 169 170 @Override 171 public boolean isCancelled() { 172 return false; 173 } 174 175 @Override 176 public boolean isDone() { 177 return true; 178 } 179 } 180 181 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { 182 static final ImmediateSuccessfulFuture<Object> NULL = 183 new ImmediateSuccessfulFuture<Object>(null); 184 185 @Nullable private final V value; 186 187 ImmediateSuccessfulFuture(@Nullable V value) { 188 this.value = value; 189 } 190 191 @Override 192 public V get() { 193 return value; 194 } 195 } 196 197 @GwtIncompatible("TODO") 198 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> 199 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 200 201 @Nullable private final V value; 202 203 ImmediateSuccessfulCheckedFuture(@Nullable V value) { 204 this.value = value; 205 } 206 207 @Override 208 public V get() { 209 return value; 210 } 211 212 @Override 213 public V checkedGet() { 214 return value; 215 } 216 217 @Override 218 public V checkedGet(long timeout, TimeUnit unit) { 219 checkNotNull(unit); 220 return value; 221 } 222 } 223 224 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { 225 226 private final Throwable thrown; 227 228 ImmediateFailedFuture(Throwable thrown) { 229 this.thrown = thrown; 230 } 231 232 @Override 233 public V get() throws ExecutionException { 234 throw new ExecutionException(thrown); 235 } 236 } 237 238 @GwtIncompatible("TODO") 239 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { 240 241 private final CancellationException thrown; 242 243 ImmediateCancelledFuture() { 244 this.thrown = new CancellationException("Immediate cancelled future."); 245 } 246 247 @Override 248 public boolean isCancelled() { 249 return true; 250 } 251 252 @Override 253 public V get() { 254 throw AbstractFuture.cancellationExceptionWithCause( 255 "Task was cancelled.", thrown); 256 } 257 } 258 259 @GwtIncompatible("TODO") 260 private static class ImmediateFailedCheckedFuture<V, X extends Exception> 261 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 262 263 private final X thrown; 264 265 ImmediateFailedCheckedFuture(X thrown) { 266 this.thrown = thrown; 267 } 268 269 @Override 270 public V get() throws ExecutionException { 271 throw new ExecutionException(thrown); 272 } 273 274 @Override 275 public V checkedGet() throws X { 276 throw thrown; 277 } 278 279 @Override 280 public V checkedGet(long timeout, TimeUnit unit) throws X { 281 checkNotNull(unit); 282 throw thrown; 283 } 284 } 285 286 /** 287 * Creates a {@code ListenableFuture} which has its value set immediately upon 288 * construction. The getters just return the value. This {@code Future} can't 289 * be canceled or timed out and its {@code isDone()} method always returns 290 * {@code true}. 291 */ 292 @CheckReturnValue 293 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 294 if (value == null) { 295 // This cast is safe because null is assignable to V for all V (i.e. it is covariant) 296 @SuppressWarnings({"unchecked", "rawtypes"}) 297 ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL; 298 return typedNull; 299 } 300 return new ImmediateSuccessfulFuture<V>(value); 301 } 302 303 /** 304 * Returns a {@code CheckedFuture} which has its value set immediately upon 305 * construction. 306 * 307 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 308 * method always returns {@code true}. Calling {@code get()} or {@code 309 * checkedGet()} will immediately return the provided value. 310 */ 311 @GwtIncompatible("TODO") 312 @CheckReturnValue 313 public static <V, X extends Exception> CheckedFuture<V, X> 314 immediateCheckedFuture(@Nullable V value) { 315 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 316 } 317 318 /** 319 * Returns a {@code ListenableFuture} which has an exception set immediately 320 * upon construction. 321 * 322 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 323 * method always returns {@code true}. Calling {@code get()} will immediately 324 * throw the provided {@code Throwable} wrapped in an {@code 325 * ExecutionException}. 326 */ 327 @CheckReturnValue 328 public static <V> ListenableFuture<V> immediateFailedFuture( 329 Throwable throwable) { 330 checkNotNull(throwable); 331 return new ImmediateFailedFuture<V>(throwable); 332 } 333 334 /** 335 * Creates a {@code ListenableFuture} which is cancelled immediately upon 336 * construction, so that {@code isCancelled()} always returns {@code true}. 337 * 338 * @since 14.0 339 */ 340 @GwtIncompatible("TODO") 341 @CheckReturnValue 342 public static <V> ListenableFuture<V> immediateCancelledFuture() { 343 return new ImmediateCancelledFuture<V>(); 344 } 345 346 /** 347 * Returns a {@code CheckedFuture} which has an exception set immediately upon 348 * construction. 349 * 350 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 351 * method always returns {@code true}. Calling {@code get()} will immediately 352 * throw the provided {@code Exception} wrapped in an {@code 353 * ExecutionException}, and calling {@code checkedGet()} will throw the 354 * provided exception itself. 355 */ 356 @GwtIncompatible("TODO") 357 @CheckReturnValue 358 public static <V, X extends Exception> CheckedFuture<V, X> 359 immediateFailedCheckedFuture(X exception) { 360 checkNotNull(exception); 361 return new ImmediateFailedCheckedFuture<V, X>(exception); 362 } 363 364 /** 365 * Returns a {@code Future} whose result is taken from the given primary 366 * {@code input} or, if the primary input fails, from the {@code Future} 367 * provided by the {@code fallback}. {@link FutureFallback#create} is not 368 * invoked until the primary input has failed, so if the primary input 369 * succeeds, it is never invoked. If, during the invocation of {@code 370 * fallback}, an exception is thrown, this exception is used as the result of 371 * the output {@code Future}. 372 * 373 * <p>Below is an example of a fallback that returns a default value if an 374 * exception occurs: 375 * 376 * <pre> {@code 377 * ListenableFuture<Integer> fetchCounterFuture = ...; 378 * 379 * // Falling back to a zero counter in case an exception happens when 380 * // processing the RPC to fetch counters. 381 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 382 * fetchCounterFuture, new FutureFallback<Integer>() { 383 * public ListenableFuture<Integer> create(Throwable t) { 384 * // Returning "0" as the default for the counter when the 385 * // exception happens. 386 * return immediateFuture(0); 387 * } 388 * });}</pre> 389 * 390 * <p>The fallback can also choose to propagate the original exception when 391 * desired: 392 * 393 * <pre> {@code 394 * ListenableFuture<Integer> fetchCounterFuture = ...; 395 * 396 * // Falling back to a zero counter only in case the exception was a 397 * // TimeoutException. 398 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 399 * fetchCounterFuture, new FutureFallback<Integer>() { 400 * public ListenableFuture<Integer> create(Throwable t) { 401 * if (t instanceof TimeoutException) { 402 * return immediateFuture(0); 403 * } 404 * return immediateFailedFuture(t); 405 * } 406 * });}</pre> 407 * 408 * <p>This overload, which does not accept an executor, uses {@code 409 * directExecutor}, a dangerous choice in some cases. See the discussion in 410 * the {@link ListenableFuture#addListener ListenableFuture.addListener} 411 * documentation. The documentation's warnings about "lightweight listeners" 412 * refer here to the work done during {@code FutureFallback.create}, not to 413 * any work done to complete the returned {@code Future}. 414 * 415 * @param input the primary input {@code Future} 416 * @param fallback the {@link FutureFallback} implementation to be called if 417 * {@code input} fails 418 * @since 14.0 419 * @deprecated Use {@link #catchingAsync(ListenableFuture, Class, 420 * AsyncFunction) catchingAsync(input, Throwable.class, 421 * fallbackImplementedAsAnAsyncFunction)}, usually replacing {@code 422 * Throwable.class} with the specific type you want to handle. This method 423 * will be removed in Guava release 20.0. 424 */ 425 @Deprecated 426 @CheckReturnValue 427 public static <V> ListenableFuture<V> withFallback( 428 ListenableFuture<? extends V> input, 429 FutureFallback<? extends V> fallback) { 430 return withFallback(input, fallback, directExecutor()); 431 } 432 433 /** 434 * Returns a {@code Future} whose result is taken from the given primary 435 * {@code input} or, if the primary input fails, from the {@code Future} 436 * provided by the {@code fallback}. {@link FutureFallback#create} is not 437 * invoked until the primary input has failed, so if the primary input 438 * succeeds, it is never invoked. If, during the invocation of {@code 439 * fallback}, an exception is thrown, this exception is used as the result of 440 * the output {@code Future}. 441 * 442 * <p>Below is an example of a fallback that returns a default value if an 443 * exception occurs: 444 * 445 * <pre> {@code 446 * ListenableFuture<Integer> fetchCounterFuture = ...; 447 * 448 * // Falling back to a zero counter in case an exception happens when 449 * // processing the RPC to fetch counters. 450 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 451 * fetchCounterFuture, new FutureFallback<Integer>() { 452 * public ListenableFuture<Integer> create(Throwable t) { 453 * // Returning "0" as the default for the counter when the 454 * // exception happens. 455 * return immediateFuture(0); 456 * } 457 * }, directExecutor());}</pre> 458 * 459 * <p>The fallback can also choose to propagate the original exception when 460 * desired: 461 * 462 * <pre> {@code 463 * ListenableFuture<Integer> fetchCounterFuture = ...; 464 * 465 * // Falling back to a zero counter only in case the exception was a 466 * // TimeoutException. 467 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 468 * fetchCounterFuture, new FutureFallback<Integer>() { 469 * public ListenableFuture<Integer> create(Throwable t) { 470 * if (t instanceof TimeoutException) { 471 * return immediateFuture(0); 472 * } 473 * return immediateFailedFuture(t); 474 * } 475 * }, directExecutor());}</pre> 476 * 477 * <p>When selecting an executor, note that {@code directExecutor} is 478 * dangerous in some cases. See the discussion in the {@link 479 * ListenableFuture#addListener ListenableFuture.addListener} documentation. 480 * The documentation's warnings about "lightweight listeners" refer here to 481 * the work done during {@code FutureFallback.create}, not to any work done to 482 * complete the returned {@code Future}. 483 * 484 * @param input the primary input {@code Future} 485 * @param fallback the {@link FutureFallback} implementation to be called if 486 * {@code input} fails 487 * @param executor the executor that runs {@code fallback} if {@code input} 488 * fails 489 * @since 14.0 490 * @deprecated Use {@link #catchingAsync(ListenableFuture, Class, 491 * AsyncFunction, Executor) catchingAsync(input, Throwable.class, 492 * fallbackImplementedAsAnAsyncFunction, executor)}, usually replacing 493 * {@code Throwable.class} with the specific type you want to handle. This method 494 * will be removed in Guava release 20.0. 495 */ 496 @Deprecated 497 @CheckReturnValue 498 public static <V> ListenableFuture<V> withFallback( 499 ListenableFuture<? extends V> input, 500 FutureFallback<? extends V> fallback, Executor executor) { 501 return catchingAsync( 502 input, Throwable.class, asAsyncFunction(fallback), executor); 503 } 504 505 /** 506 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 507 * primary input fails with the given {@code exceptionType}, from the result provided by the 508 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 509 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 510 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 511 * Future}. 512 * 513 * <p>Usage example: 514 * 515 * <pre> {@code 516 * ListenableFuture<Integer> fetchCounterFuture = ...; 517 * 518 * // Falling back to a zero counter in case an exception happens when 519 * // processing the RPC to fetch counters. 520 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 521 * fetchCounterFuture, FetchException.class, 522 * new Function<FetchException, Integer>() { 523 * public Integer apply(FetchException e) { 524 * return 0; 525 * } 526 * });}</pre> 527 * 528 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 529 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 530 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 531 * listeners" refer here to the work done during {@code Function.apply}. 532 * 533 * @param input the primary input {@code Future} 534 * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding 535 * bugs and other unrecoverable errors, callers should prefer more specific types, avoiding 536 * {@code Throwable.class} in particular. 537 * @param fallback the {@link Function} implementation to be called if {@code input} fails with 538 * the expected exception type 539 * @since 19.0 540 */ 541 @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 542 @CheckReturnValue 543 public static <V, X extends Throwable> ListenableFuture<V> catching( 544 ListenableFuture<? extends V> input, Class<X> exceptionType, 545 Function<? super X, ? extends V> fallback) { 546 CatchingFuture<V, X> future = new CatchingFuture<V, X>(input, exceptionType, fallback); 547 input.addListener(future, directExecutor()); 548 return future; 549 } 550 551 /** 552 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 553 * primary input fails with the given {@code exceptionType}, from the result provided by the 554 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 555 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 556 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 557 * Future}. 558 * 559 * <p>Usage example: 560 * 561 * <pre> {@code 562 * ListenableFuture<Integer> fetchCounterFuture = ...; 563 * 564 * // Falling back to a zero counter in case an exception happens when 565 * // processing the RPC to fetch counters. 566 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 567 * fetchCounterFuture, FetchException.class, 568 * new Function<FetchException, Integer>() { 569 * public Integer apply(FetchException e) { 570 * return 0; 571 * } 572 * }, directExecutor());}</pre> 573 * 574 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 575 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 576 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 577 * work done during {@code Function.apply}. 578 * 579 * @param input the primary input {@code Future} 580 * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding 581 * bugs and other unrecoverable errors, callers should prefer more specific types, avoiding 582 * {@code Throwable.class} in particular. 583 * @param fallback the {@link Function} implementation to be called if {@code input} fails with 584 * the expected exception type 585 * @param executor the executor that runs {@code fallback} if {@code input} fails 586 * @since 19.0 587 */ 588 @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 589 @CheckReturnValue 590 public static <V, X extends Throwable> ListenableFuture<V> catching( 591 ListenableFuture<? extends V> input, Class<X> exceptionType, 592 Function<? super X, ? extends V> fallback, Executor executor) { 593 CatchingFuture<V, X> future = new CatchingFuture<V, X>(input, exceptionType, fallback); 594 input.addListener(future, rejectionPropagatingExecutor(executor, future)); 595 return future; 596 } 597 598 /** 599 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 600 * primary input fails with the given {@code exceptionType}, from the result provided by the 601 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 602 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 603 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 604 * {@code Future}. 605 * 606 * <p>Usage examples: 607 * 608 * <pre> {@code 609 * ListenableFuture<Integer> fetchCounterFuture = ...; 610 * 611 * // Falling back to a zero counter in case an exception happens when 612 * // processing the RPC to fetch counters. 613 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 614 * fetchCounterFuture, FetchException.class, 615 * new AsyncFunction<FetchException, Integer>() { 616 * public ListenableFuture<Integer> apply(FetchException e) { 617 * return immediateFuture(0); 618 * } 619 * });}</pre> 620 * 621 * <p>The fallback can also choose to propagate the original exception when desired: 622 * 623 * <pre> {@code 624 * ListenableFuture<Integer> fetchCounterFuture = ...; 625 * 626 * // Falling back to a zero counter only in case the exception was a 627 * // TimeoutException. 628 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 629 * fetchCounterFuture, FetchException.class, 630 * new AsyncFunction<FetchException, Integer>() { 631 * public ListenableFuture<Integer> apply(FetchException e) 632 * throws FetchException { 633 * if (omitDataOnFetchFailure) { 634 * return immediateFuture(0); 635 * } 636 * throw e; 637 * } 638 * });}</pre> 639 * 640 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 641 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 642 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 643 * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done 644 * to complete the returned {@code Future}. 645 * 646 * @param input the primary input {@code Future} 647 * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding 648 * bugs and other unrecoverable errors, callers should prefer more specific types, avoiding 649 * {@code Throwable.class} in particular. 650 * @param fallback the {@link AsyncFunction} implementation to be called if {@code input} fails 651 * with the expected exception type 652 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 653 */ 654 @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 655 // TODO(kak): @CheckReturnValue 656 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 657 ListenableFuture<? extends V> input, Class<X> exceptionType, 658 AsyncFunction<? super X, ? extends V> fallback) { 659 AsyncCatchingFuture<V, X> future = 660 new AsyncCatchingFuture<V, X>(input, exceptionType, fallback); 661 input.addListener(future, directExecutor()); 662 return future; 663 } 664 665 /** 666 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 667 * primary input fails with the given {@code exceptionType}, from the result provided by the 668 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 669 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 670 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 671 * {@code Future}. 672 * 673 * <p>Usage examples: 674 * 675 * <pre> {@code 676 * ListenableFuture<Integer> fetchCounterFuture = ...; 677 * 678 * // Falling back to a zero counter in case an exception happens when 679 * // processing the RPC to fetch counters. 680 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 681 * fetchCounterFuture, FetchException.class, 682 * new AsyncFunction<FetchException, Integer>() { 683 * public ListenableFuture<Integer> apply(FetchException e) { 684 * return immediateFuture(0); 685 * } 686 * }, directExecutor());}</pre> 687 * 688 * <p>The fallback can also choose to propagate the original exception when desired: 689 * 690 * <pre> {@code 691 * ListenableFuture<Integer> fetchCounterFuture = ...; 692 * 693 * // Falling back to a zero counter only in case the exception was a 694 * // TimeoutException. 695 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 696 * fetchCounterFuture, FetchException.class, 697 * new AsyncFunction<FetchException, Integer>() { 698 * public ListenableFuture<Integer> apply(FetchException e) 699 * throws FetchException { 700 * if (omitDataOnFetchFailure) { 701 * return immediateFuture(0); 702 * } 703 * throw e; 704 * } 705 * }, directExecutor());}</pre> 706 * 707 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 708 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 709 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 710 * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned 711 * {@code Future}. 712 * 713 * @param input the primary input {@code Future} 714 * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding 715 * bugs and other unrecoverable errors, callers should prefer more specific types, avoiding 716 * {@code Throwable.class} in particular. 717 * @param fallback the {@link AsyncFunction} implementation to be called if {@code input} fails 718 * with the expected exception type 719 * @param executor the executor that runs {@code fallback} if {@code input} fails 720 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 721 */ 722 @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 723 // TODO(kak): @CheckReturnValue 724 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 725 ListenableFuture<? extends V> input, Class<X> exceptionType, 726 AsyncFunction<? super X, ? extends V> fallback, Executor executor) { 727 AsyncCatchingFuture<V, X> future = 728 new AsyncCatchingFuture<V, X>(input, exceptionType, fallback); 729 input.addListener(future, rejectionPropagatingExecutor(executor, future)); 730 return future; 731 } 732 733 @Deprecated 734 static <V> AsyncFunction<Throwable, V> asAsyncFunction(final FutureFallback<V> fallback) { 735 checkNotNull(fallback); 736 return new AsyncFunction<Throwable, V>() { 737 @Override 738 public ListenableFuture<V> apply(Throwable t) throws Exception { 739 return checkNotNull(fallback.create(t), "FutureFallback.create returned null instead of a " 740 + "Future. Did you mean to return immediateFuture(null)?"); 741 } 742 }; 743 } 744 745 private abstract static class AbstractCatchingFuture<V, X extends Throwable, F> 746 extends AbstractFuture.TrustedFuture<V> implements Runnable { 747 @Nullable ListenableFuture<? extends V> inputFuture; 748 @Nullable Class<X> exceptionType; 749 @Nullable F fallback; 750 751 AbstractCatchingFuture( 752 ListenableFuture<? extends V> inputFuture, Class<X> exceptionType, F fallback) { 753 this.inputFuture = checkNotNull(inputFuture); 754 this.exceptionType = checkNotNull(exceptionType); 755 this.fallback = checkNotNull(fallback); 756 } 757 758 @Override public final void run() { 759 ListenableFuture<? extends V> localInputFuture = inputFuture; 760 Class<X> localExceptionType = exceptionType; 761 F localFallback = fallback; 762 if (localInputFuture == null | localExceptionType == null | localFallback == null 763 | isCancelled()) { 764 return; 765 } 766 inputFuture = null; 767 exceptionType = null; 768 fallback = null; 769 770 Throwable throwable; 771 try { 772 set(getUninterruptibly(localInputFuture)); 773 return; 774 } catch (ExecutionException e) { 775 throwable = e.getCause(); 776 } catch (Throwable e) { // this includes cancellation exception 777 throwable = e; 778 } 779 try { 780 if (isInstanceOfThrowableClass(throwable, localExceptionType)) { 781 @SuppressWarnings("unchecked") // verified safe by isInstance 782 X castThrowable = (X) throwable; 783 doFallback(localFallback, castThrowable); 784 } else { 785 setException(throwable); 786 } 787 } catch (Throwable e) { 788 setException(e); 789 } 790 } 791 792 /** Template method for subtypes to actually run the fallback. */ 793 abstract void doFallback(F fallback, X throwable) throws Exception; 794 795 @Override final void done() { 796 maybePropagateCancellation(inputFuture); 797 this.inputFuture = null; 798 this.exceptionType = null; 799 this.fallback = null; 800 } 801 } 802 803 /** 804 * A {@link AbstractCatchingFuture} that delegates to an {@link AsyncFunction} 805 * and {@link #setFuture(ListenableFuture)} to implement {@link #doFallback} 806 */ 807 static final class AsyncCatchingFuture<V, X extends Throwable> 808 extends AbstractCatchingFuture<V, X, AsyncFunction<? super X, ? extends V>> { 809 810 AsyncCatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType, 811 AsyncFunction<? super X, ? extends V> fallback) { 812 super(input, exceptionType, fallback); 813 } 814 815 @Override void doFallback( 816 AsyncFunction<? super X, ? extends V> fallback, X cause) throws Exception { 817 ListenableFuture<? extends V> replacement = fallback.apply(cause); 818 checkNotNull(replacement, "AsyncFunction.apply returned null instead of a Future. " 819 + "Did you mean to return immediateFuture(null)?"); 820 setFuture(replacement); 821 } 822 } 823 824 /** 825 * A {@link AbstractCatchingFuture} that delegates to a {@link Function} 826 * and {@link #set(Object)} to implement {@link #doFallback} 827 */ 828 static final class CatchingFuture<V, X extends Throwable> 829 extends AbstractCatchingFuture<V, X, Function<? super X, ? extends V>> { 830 CatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType, 831 Function<? super X, ? extends V> fallback) { 832 super(input, exceptionType, fallback); 833 } 834 835 @Override void doFallback(Function<? super X, ? extends V> fallback, X cause) throws Exception { 836 V replacement = fallback.apply(cause); 837 set(replacement); 838 } 839 } 840 841 /** 842 * Returns a future that delegates to another but will finish early (via a 843 * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the 844 * specified duration expires. 845 * 846 * <p>The delegate future is interrupted and cancelled if it times out. 847 * 848 * @param delegate The future to delegate to. 849 * @param time when to timeout the future 850 * @param unit the time unit of the time parameter 851 * @param scheduledExecutor The executor service to enforce the timeout. 852 * 853 * @since 19.0 854 */ 855 @GwtIncompatible("java.util.concurrent.ScheduledExecutorService") 856 @CheckReturnValue 857 public static <V> ListenableFuture<V> withTimeout(ListenableFuture<V> delegate, 858 long time, TimeUnit unit, ScheduledExecutorService scheduledExecutor) { 859 TimeoutFuture<V> result = new TimeoutFuture<V>(delegate); 860 TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result); 861 result.timer = scheduledExecutor.schedule(fire, time, unit); 862 delegate.addListener(fire, directExecutor()); 863 return result; 864 } 865 866 /** 867 * Future that delegates to another but will finish early (via a {@link 868 * TimeoutException} wrapped in an {@link ExecutionException}) if the 869 * specified duration expires. 870 * The delegate future is interrupted and cancelled if it times out. 871 */ 872 private static final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> { 873 // Memory visibility of these fields. 874 // There are two cases to consider. 875 // 1. visibility of the writes to these fields to Fire.run 876 // The initial write to delegateRef is made definitely visible via the semantics of 877 // addListener/SES.schedule. The later racy write in cancel() is not guaranteed to be 878 // observed, however that is fine since the correctness is based on the atomic state in 879 // our base class. 880 // The initial write to timer is never definitely visible to Fire.run since it is assigned 881 // after SES.schedule is called. Therefore Fire.run has to check for null. However, it 882 // should be visible if Fire.run is called by delegate.addListener since addListener is 883 // called after the assignment to timer, and importantly this is the main situation in which 884 // we need to be able to see the write. 885 // 2. visibility of the writes to cancel 886 // Since these fields are non-final that means that TimeoutFuture is not being 'safely 887 // published', thus a motivated caller may be able to expose the reference to another thread 888 // that would then call cancel() and be unable to cancel the delegate. 889 // There are a number of ways to solve this, none of which are very pretty, and it is 890 // currently believed to be a purely theoretical problem (since the other actions should 891 // supply sufficient write-barriers). 892 893 @Nullable ListenableFuture<V> delegateRef; 894 @Nullable Future<?> timer; 895 896 TimeoutFuture(ListenableFuture<V> delegate) { 897 this.delegateRef = Preconditions.checkNotNull(delegate); 898 } 899 900 /** A runnable that is called when the delegate or the timer completes. */ 901 private static final class Fire<V> implements Runnable { 902 @Nullable TimeoutFuture<V> timeoutFutureRef; 903 904 Fire(TimeoutFuture<V> timeoutFuture) { 905 this.timeoutFutureRef = timeoutFuture; 906 } 907 908 @Override public void run() { 909 // If either of these reads return null then we must be after a successful cancel 910 // or another call to this method. 911 TimeoutFuture<V> timeoutFuture = timeoutFutureRef; 912 if (timeoutFuture == null) { 913 return; 914 } 915 ListenableFuture<V> delegate = timeoutFuture.delegateRef; 916 if (delegate == null) { 917 return; 918 } 919 920 /* 921 * If we're about to complete the TimeoutFuture, we want to release our reference to it. 922 * Otherwise, we'll pin it (and its result) in memory until the timeout task is GCed. (The 923 * need to clear our reference to the TimeoutFuture is the reason we use a *static* nested 924 * class with a manual reference back to the "containing" class.) 925 * 926 * This has the nice-ish side effect of limiting reentrancy: run() calls 927 * timeoutFuture.setException() calls run(). That reentrancy would already be harmless, 928 * since timeoutFuture can be set (and delegate cancelled) only once. (And "set only once" 929 * is important for other reasons: run() can still be invoked concurrently in different 930 * threads, even with the above null checks.) 931 */ 932 timeoutFutureRef = null; 933 if (delegate.isDone()) { 934 timeoutFuture.setFuture(delegate); 935 } else { 936 try { 937 // TODO(lukes): this stack trace is particularly useless (all it does is point at the 938 // scheduledexecutorservice thread), consider eliminating it altogether? 939 timeoutFuture.setException(new TimeoutException("Future timed out: " + delegate)); 940 } finally { 941 delegate.cancel(true); 942 } 943 } 944 } 945 } 946 947 @Override void done() { 948 maybePropagateCancellation(delegateRef); 949 950 Future<?> localTimer = timer; 951 // Try to cancel the timer as an optimization 952 // timer may be null if this call to run was by the timer task since there is no 953 // happens-before edge between the assignment to timer and an execution of the timer task. 954 if (localTimer != null) { 955 localTimer.cancel(false); 956 } 957 958 delegateRef = null; 959 timer = null; 960 } 961 } 962 963 /** 964 * Returns a new {@code ListenableFuture} whose result is asynchronously 965 * derived from the result of the given {@code Future}. More precisely, the 966 * returned {@code Future} takes its result from a {@code Future} produced by 967 * applying the given {@code AsyncFunction} to the result of the original 968 * {@code Future}. Example: 969 * 970 * <pre> {@code 971 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 972 * AsyncFunction<RowKey, QueryResult> queryFunction = 973 * new AsyncFunction<RowKey, QueryResult>() { 974 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 975 * return dataService.read(rowKey); 976 * } 977 * }; 978 * ListenableFuture<QueryResult> queryFuture = 979 * transform(rowKeyFuture, queryFunction);}</pre> 980 * 981 * <p>This overload, which does not accept an executor, uses {@code 982 * directExecutor}, a dangerous choice in some cases. See the discussion in 983 * the {@link ListenableFuture#addListener ListenableFuture.addListener} 984 * documentation. The documentation's warnings about "lightweight listeners" 985 * refer here to the work done during {@code AsyncFunction.apply}, not to any 986 * work done to complete the returned {@code Future}. 987 * 988 * <p>The returned {@code Future} attempts to keep its cancellation state in 989 * sync with that of the input future and that of the future returned by the 990 * function. That is, if the returned {@code Future} is cancelled, it will 991 * attempt to cancel the other two, and if either of the other two is 992 * cancelled, the returned {@code Future} will receive a callback in which it 993 * will attempt to cancel itself. 994 * 995 * @param input The future to transform 996 * @param function A function to transform the result of the input future 997 * to the result of the output future 998 * @return A future that holds result of the function (if the input succeeded) 999 * or the original input's failure (if not) 1000 * @since 11.0 1001 * @deprecated These {@code AsyncFunction} overloads of {@code transform} are 1002 * being renamed to {@code transformAsync}. (The {@code Function} 1003 * overloads are keeping the "transform" name.) This method will be removed in Guava release 1004 * 20.0. 1005 */ 1006 @Deprecated 1007 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 1008 AsyncFunction<? super I, ? extends O> function) { 1009 return transformAsync(input, function); 1010 } 1011 1012 /** 1013 * Returns a new {@code ListenableFuture} whose result is asynchronously 1014 * derived from the result of the given {@code Future}. More precisely, the 1015 * returned {@code Future} takes its result from a {@code Future} produced by 1016 * applying the given {@code AsyncFunction} to the result of the original 1017 * {@code Future}. Example: 1018 * 1019 * <pre> {@code 1020 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 1021 * AsyncFunction<RowKey, QueryResult> queryFunction = 1022 * new AsyncFunction<RowKey, QueryResult>() { 1023 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 1024 * return dataService.read(rowKey); 1025 * } 1026 * }; 1027 * ListenableFuture<QueryResult> queryFuture = 1028 * transform(rowKeyFuture, queryFunction, executor);}</pre> 1029 * 1030 * <p>When selecting an executor, note that {@code directExecutor} is 1031 * dangerous in some cases. See the discussion in the {@link 1032 * ListenableFuture#addListener ListenableFuture.addListener} documentation. 1033 * The documentation's warnings about "lightweight listeners" refer here to 1034 * the work done during {@code AsyncFunction.apply}, not to any work done to 1035 * complete the returned {@code Future}. 1036 * 1037 * <p>The returned {@code Future} attempts to keep its cancellation state in 1038 * sync with that of the input future and that of the future returned by the 1039 * chain function. That is, if the returned {@code Future} is cancelled, it 1040 * will attempt to cancel the other two, and if either of the other two is 1041 * cancelled, the returned {@code Future} will receive a callback in which it 1042 * will attempt to cancel itself. 1043 * 1044 * @param input The future to transform 1045 * @param function A function to transform the result of the input future 1046 * to the result of the output future 1047 * @param executor Executor to run the function in. 1048 * @return A future that holds result of the function (if the input succeeded) 1049 * or the original input's failure (if not) 1050 * @since 11.0 1051 * @deprecated These {@code AsyncFunction} overloads of {@code transform} are 1052 * being renamed to {@code transformAsync}. (The {@code Function} 1053 * overloads are keeping the "transform" name.) This method will be removed in Guava release 1054 * 20.0. 1055 */ 1056 @Deprecated 1057 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 1058 AsyncFunction<? super I, ? extends O> function, 1059 Executor executor) { 1060 return transformAsync(input, function, executor); 1061 } 1062 1063 /** 1064 * Returns a new {@code ListenableFuture} whose result is asynchronously derived from the result 1065 * of the given {@code Future}. More precisely, the returned {@code Future} takes its result from 1066 * a {@code Future} produced by applying the given {@code AsyncFunction} to the result of the 1067 * original {@code Future}. Example: 1068 * 1069 * <pre> {@code 1070 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 1071 * AsyncFunction<RowKey, QueryResult> queryFunction = 1072 * new AsyncFunction<RowKey, QueryResult>() { 1073 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 1074 * return dataService.read(rowKey); 1075 * } 1076 * }; 1077 * ListenableFuture<QueryResult> queryFuture = 1078 * transformAsync(rowKeyFuture, queryFunction);}</pre> 1079 * 1080 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 1081 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 1082 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 1083 * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done 1084 * to complete the returned {@code Future}. 1085 * 1086 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 1087 * input future and that of the future returned by the function. That is, if the returned {@code 1088 * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two 1089 * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to 1090 * cancel itself. 1091 * 1092 * @param input The future to transform 1093 * @param function A function to transform the result of the input future to the result of the 1094 * output future 1095 * @return A future that holds result of the function (if the input succeeded) or the original 1096 * input's failure (if not) 1097 * @since 19.0 (in 11.0 as {@code transform}) 1098 */ 1099 public static <I, O> ListenableFuture<O> transformAsync( 1100 ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) { 1101 AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function); 1102 input.addListener(output, directExecutor()); 1103 return output; 1104 } 1105 1106 /** 1107 * Returns a new {@code ListenableFuture} whose result is asynchronously derived from the result 1108 * of the given {@code Future}. More precisely, the returned {@code Future} takes its result from 1109 * a {@code Future} produced by applying the given {@code AsyncFunction} to the result of the 1110 * original {@code Future}. Example: 1111 * 1112 * <pre> {@code 1113 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 1114 * AsyncFunction<RowKey, QueryResult> queryFunction = 1115 * new AsyncFunction<RowKey, QueryResult>() { 1116 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 1117 * return dataService.read(rowKey); 1118 * } 1119 * }; 1120 * ListenableFuture<QueryResult> queryFuture = 1121 * transformAsync(rowKeyFuture, queryFunction, executor);}</pre> 1122 * 1123 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 1124 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1125 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 1126 * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned 1127 * {@code Future}. 1128 * 1129 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 1130 * input future and that of the future returned by the chain function. That is, if the returned 1131 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 1132 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 1133 * attempt to cancel itself. 1134 * 1135 * @param input The future to transform 1136 * @param function A function to transform the result of the input future to the result of the 1137 * output future 1138 * @param executor Executor to run the function in. 1139 * @return A future that holds result of the function (if the input succeeded) or the original 1140 * input's failure (if not) 1141 * @since 19.0 (in 11.0 as {@code transform}) 1142 */ 1143 public static <I, O> ListenableFuture<O> transformAsync(ListenableFuture<I> input, 1144 AsyncFunction<? super I, ? extends O> function, Executor executor) { 1145 checkNotNull(executor); 1146 AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function); 1147 input.addListener(output, rejectionPropagatingExecutor(executor, output)); 1148 return output; 1149 } 1150 1151 /** 1152 * Returns an Executor that will propagate {@link RejectedExecutionException} from the delegate 1153 * executor to the given {@code future}. 1154 * 1155 * <p>Note, the returned executor can only be used once. 1156 */ 1157 private static Executor rejectionPropagatingExecutor( 1158 final Executor delegate, final AbstractFuture<?> future) { 1159 checkNotNull(delegate); 1160 if (delegate == directExecutor()) { 1161 // directExecutor() cannot throw RejectedExecutionException 1162 return delegate; 1163 } 1164 return new Executor() { 1165 volatile boolean thrownFromDelegate = true; 1166 @Override public void execute(final Runnable command) { 1167 try { 1168 delegate.execute(new Runnable() { 1169 @Override public void run() { 1170 thrownFromDelegate = false; 1171 command.run(); 1172 } 1173 }); 1174 } catch (RejectedExecutionException e) { 1175 if (thrownFromDelegate) { 1176 // wrap exception? 1177 future.setException(e); 1178 } 1179 // otherwise it must have been thrown from a transitive call and the delegate runnable 1180 // should have handled it. 1181 } 1182 } 1183 }; 1184 } 1185 1186 /** 1187 * Returns a new {@code ListenableFuture} whose result is the product of 1188 * applying the given {@code Function} to the result of the given {@code 1189 * Future}. Example: 1190 * 1191 * <pre> {@code 1192 * ListenableFuture<QueryResult> queryFuture = ...; 1193 * Function<QueryResult, List<Row>> rowsFunction = 1194 * new Function<QueryResult, List<Row>>() { 1195 * public List<Row> apply(QueryResult queryResult) { 1196 * return queryResult.getRows(); 1197 * } 1198 * }; 1199 * ListenableFuture<List<Row>> rowsFuture = 1200 * transform(queryFuture, rowsFunction);}</pre> 1201 * 1202 * <p>This overload, which does not accept an executor, uses {@code 1203 * directExecutor}, a dangerous choice in some cases. See the discussion in 1204 * the {@link ListenableFuture#addListener ListenableFuture.addListener} 1205 * documentation. The documentation's warnings about "lightweight listeners" 1206 * refer here to the work done during {@code Function.apply}. 1207 * 1208 * <p>The returned {@code Future} attempts to keep its cancellation state in 1209 * sync with that of the input future. That is, if the returned {@code Future} 1210 * is cancelled, it will attempt to cancel the input, and if the input is 1211 * cancelled, the returned {@code Future} will receive a callback in which it 1212 * will attempt to cancel itself. 1213 * 1214 * <p>An example use of this method is to convert a serializable object 1215 * returned from an RPC into a POJO. 1216 * 1217 * @param input The future to transform 1218 * @param function A Function to transform the results of the provided future 1219 * to the results of the returned future. This will be run in the thread 1220 * that notifies input it is complete. 1221 * @return A future that holds result of the transformation. 1222 * @since 9.0 (in 1.0 as {@code compose}) 1223 */ 1224 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 1225 final Function<? super I, ? extends O> function) { 1226 checkNotNull(function); 1227 ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function); 1228 input.addListener(output, directExecutor()); 1229 return output; 1230 } 1231 1232 /** 1233 * Returns a new {@code ListenableFuture} whose result is the product of 1234 * applying the given {@code Function} to the result of the given {@code 1235 * Future}. Example: 1236 * 1237 * <pre> {@code 1238 * ListenableFuture<QueryResult> queryFuture = ...; 1239 * Function<QueryResult, List<Row>> rowsFunction = 1240 * new Function<QueryResult, List<Row>>() { 1241 * public List<Row> apply(QueryResult queryResult) { 1242 * return queryResult.getRows(); 1243 * } 1244 * }; 1245 * ListenableFuture<List<Row>> rowsFuture = 1246 * transform(queryFuture, rowsFunction, executor);}</pre> 1247 * 1248 * <p>When selecting an executor, note that {@code directExecutor} is 1249 * dangerous in some cases. See the discussion in the {@link 1250 * ListenableFuture#addListener ListenableFuture.addListener} documentation. 1251 * The documentation's warnings about "lightweight listeners" refer here to 1252 * the work done during {@code Function.apply}. 1253 * 1254 * <p>The returned {@code Future} attempts to keep its cancellation state in 1255 * sync with that of the input future. That is, if the returned {@code Future} 1256 * is cancelled, it will attempt to cancel the input, and if the input is 1257 * cancelled, the returned {@code Future} will receive a callback in which it 1258 * will attempt to cancel itself. 1259 * 1260 * <p>An example use of this method is to convert a serializable object 1261 * returned from an RPC into a POJO. 1262 * 1263 * @param input The future to transform 1264 * @param function A Function to transform the results of the provided future 1265 * to the results of the returned future. 1266 * @param executor Executor to run the function in. 1267 * @return A future that holds result of the transformation. 1268 * @since 9.0 (in 2.0 as {@code compose}) 1269 */ 1270 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 1271 final Function<? super I, ? extends O> function, Executor executor) { 1272 checkNotNull(function); 1273 ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function); 1274 input.addListener(output, rejectionPropagatingExecutor(executor, output)); 1275 return output; 1276 } 1277 1278 /** 1279 * Like {@link #transform(ListenableFuture, Function)} except that the 1280 * transformation {@code function} is invoked on each call to 1281 * {@link Future#get() get()} on the returned future. 1282 * 1283 * <p>The returned {@code Future} reflects the input's cancellation 1284 * state directly, and any attempt to cancel the returned Future is likewise 1285 * passed through to the input Future. 1286 * 1287 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 1288 * only apply the timeout to the execution of the underlying {@code Future}, 1289 * <em>not</em> to the execution of the transformation function. 1290 * 1291 * <p>The primary audience of this method is callers of {@code transform} 1292 * who don't have a {@code ListenableFuture} available and 1293 * do not mind repeated, lazy function evaluation. 1294 * 1295 * @param input The future to transform 1296 * @param function A Function to transform the results of the provided future 1297 * to the results of the returned future. 1298 * @return A future that returns the result of the transformation. 1299 * @since 10.0 1300 */ 1301 @GwtIncompatible("TODO") 1302 @CheckReturnValue 1303 public static <I, O> Future<O> lazyTransform(final Future<I> input, 1304 final Function<? super I, ? extends O> function) { 1305 checkNotNull(input); 1306 checkNotNull(function); 1307 return new Future<O>() { 1308 1309 @Override 1310 public boolean cancel(boolean mayInterruptIfRunning) { 1311 return input.cancel(mayInterruptIfRunning); 1312 } 1313 1314 @Override 1315 public boolean isCancelled() { 1316 return input.isCancelled(); 1317 } 1318 1319 @Override 1320 public boolean isDone() { 1321 return input.isDone(); 1322 } 1323 1324 @Override 1325 public O get() throws InterruptedException, ExecutionException { 1326 return applyTransformation(input.get()); 1327 } 1328 1329 @Override 1330 public O get(long timeout, TimeUnit unit) 1331 throws InterruptedException, ExecutionException, TimeoutException { 1332 return applyTransformation(input.get(timeout, unit)); 1333 } 1334 1335 private O applyTransformation(I input) throws ExecutionException { 1336 try { 1337 return function.apply(input); 1338 } catch (Throwable t) { 1339 throw new ExecutionException(t); 1340 } 1341 } 1342 }; 1343 } 1344 1345 /** 1346 * An implementation of {@code ListenableFuture} that also implements 1347 * {@code Runnable} so that it can be used to nest ListenableFutures. 1348 * Once the passed-in {@code ListenableFuture} is complete, it calls the 1349 * passed-in {@code Function} to generate the result. 1350 * 1351 * <p>For historical reasons, this class has a special case in its exception 1352 * handling: If the given {@code AsyncFunction} throws an {@code 1353 * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it 1354 * and uses its <i>cause</i> as the output future's exception, rather than 1355 * using the {@code UndeclaredThrowableException} itself as it would for other 1356 * exception types. The reason for this is that {@code Futures.transform} used 1357 * to require a {@code Function}, whose {@code apply} method is not allowed to 1358 * throw checked exceptions. Nowadays, {@code Futures.transform} has an 1359 * overload that accepts an {@code AsyncFunction}, whose {@code apply} method 1360 * <i>is</i> allowed to throw checked exception. Users who wish to throw 1361 * checked exceptions should use that overload instead, and <a 1362 * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we 1363 * should remove the {@code UndeclaredThrowableException} special case</a>. 1364 */ 1365 private abstract static class AbstractChainingFuture<I, O, F> 1366 extends AbstractFuture.TrustedFuture<O> implements Runnable { 1367 // In theory, this field might not be visible to a cancel() call in certain circumstances. For 1368 // details, see the comments on the fields of TimeoutFuture. 1369 @Nullable ListenableFuture<? extends I> inputFuture; 1370 @Nullable F function; 1371 1372 AbstractChainingFuture(ListenableFuture<? extends I> inputFuture, F function) { 1373 this.inputFuture = checkNotNull(inputFuture); 1374 this.function = checkNotNull(function); 1375 } 1376 1377 @Override 1378 public final void run() { 1379 try { 1380 ListenableFuture<? extends I> localInputFuture = inputFuture; 1381 F localFunction = function; 1382 if (isCancelled() | localInputFuture == null | localFunction == null) { 1383 return; 1384 } 1385 inputFuture = null; 1386 function = null; 1387 1388 I sourceResult; 1389 try { 1390 sourceResult = getUninterruptibly(localInputFuture); 1391 } catch (CancellationException e) { 1392 // Cancel this future and return. 1393 // At this point, inputFuture is cancelled and outputFuture doesn't 1394 // exist, so the value of mayInterruptIfRunning is irrelevant. 1395 cancel(false); 1396 return; 1397 } catch (ExecutionException e) { 1398 // Set the cause of the exception as this future's exception 1399 setException(e.getCause()); 1400 return; 1401 } 1402 doTransform(localFunction, sourceResult); 1403 } catch (UndeclaredThrowableException e) { 1404 // Set the cause of the exception as this future's exception 1405 setException(e.getCause()); 1406 } catch (Throwable t) { 1407 // This exception is irrelevant in this thread, but useful for the 1408 // client 1409 setException(t); 1410 } 1411 } 1412 1413 /** Template method for subtypes to actually run the transform. */ 1414 abstract void doTransform(F function, I result) throws Exception; 1415 1416 @Override final void done() { 1417 maybePropagateCancellation(inputFuture); 1418 this.inputFuture = null; 1419 this.function = null; 1420 } 1421 } 1422 1423 /** 1424 * A {@link AbstractChainingFuture} that delegates to an {@link AsyncFunction} and 1425 * {@link #setFuture(ListenableFuture)} to implement {@link #doTransform}. 1426 */ 1427 private static final class AsyncChainingFuture<I, O> 1428 extends AbstractChainingFuture<I, O, AsyncFunction<? super I, ? extends O>> { 1429 AsyncChainingFuture(ListenableFuture<? extends I> inputFuture, 1430 AsyncFunction<? super I, ? extends O> function) { 1431 super(inputFuture, function); 1432 } 1433 1434 @Override 1435 void doTransform(AsyncFunction<? super I, ? extends O> function, I input) throws Exception { 1436 ListenableFuture<? extends O> outputFuture = function.apply(input); 1437 checkNotNull(outputFuture, "AsyncFunction.apply returned null instead of a Future. " 1438 + "Did you mean to return immediateFuture(null)?"); 1439 setFuture(outputFuture); 1440 } 1441 } 1442 1443 /** 1444 * A {@link AbstractChainingFuture} that delegates to a {@link Function} and 1445 * {@link #set(Object)} to implement {@link #doTransform}. 1446 */ 1447 private static final class ChainingFuture<I, O> 1448 extends AbstractChainingFuture<I, O, Function<? super I, ? extends O>> { 1449 1450 ChainingFuture(ListenableFuture<? extends I> inputFuture, 1451 Function<? super I, ? extends O> function) { 1452 super(inputFuture, function); 1453 } 1454 1455 @Override 1456 void doTransform(Function<? super I, ? extends O> function, I input) { 1457 // TODO(lukes): move the UndeclaredThrowable catch block here? 1458 set(function.apply(input)); 1459 } 1460 } 1461 1462 /** 1463 * Returns a new {@code ListenableFuture} whose result is the product of 1464 * calling {@code get()} on the {@code Future} nested within the given {@code 1465 * Future}, effectively chaining the futures one after the other. Example: 1466 * 1467 * <pre> {@code 1468 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 1469 * ListenableFuture<String> dereferenced = dereference(nested);}</pre> 1470 * 1471 * <p>This call has the same cancellation and execution semantics as {@link 1472 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 1473 * Future} attempts to keep its cancellation state in sync with both the 1474 * input {@code Future} and the nested {@code Future}. The transformation 1475 * is very lightweight and therefore takes place in the same thread (either 1476 * the thread that called {@code dereference}, or the thread in which the 1477 * dereferenced future completes). 1478 * 1479 * @param nested The nested future to transform. 1480 * @return A future that holds result of the inner future. 1481 * @since 13.0 1482 */ 1483 @SuppressWarnings({"rawtypes", "unchecked"}) 1484 @CheckReturnValue 1485 public static <V> ListenableFuture<V> dereference( 1486 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 1487 return transformAsync((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 1488 } 1489 1490 /** 1491 * Helper {@code Function} for {@link #dereference}. 1492 */ 1493 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 1494 new AsyncFunction<ListenableFuture<Object>, Object>() { 1495 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 1496 return input; 1497 } 1498 }; 1499 1500 /** 1501 * Creates a new {@code ListenableFuture} whose value is a list containing the 1502 * values of all its input futures, if all succeed. If any input fails, the 1503 * returned future fails immediately. 1504 * 1505 * <p>The list of results is in the same order as the input list. 1506 * 1507 * <p>Canceling this future will attempt to cancel all the component futures, 1508 * and if any of the provided futures fails or is canceled, this one is, 1509 * too. 1510 * 1511 * @param futures futures to combine 1512 * @return a future that provides a list of the results of the component 1513 * futures 1514 * @since 10.0 1515 */ 1516 @Beta 1517 @SafeVarargs 1518 @CheckReturnValue 1519 public static <V> ListenableFuture<List<V>> allAsList( 1520 ListenableFuture<? extends V>... futures) { 1521 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 1522 } 1523 1524 /** 1525 * Creates a new {@code ListenableFuture} whose value is a list containing the 1526 * values of all its input futures, if all succeed. If any input fails, the 1527 * returned future fails immediately. 1528 * 1529 * <p>The list of results is in the same order as the input list. 1530 * 1531 * <p>Canceling this future will attempt to cancel all the component futures, 1532 * and if any of the provided futures fails or is canceled, this one is, 1533 * too. 1534 * 1535 * @param futures futures to combine 1536 * @return a future that provides a list of the results of the component 1537 * futures 1538 * @since 10.0 1539 */ 1540 @Beta 1541 @CheckReturnValue 1542 public static <V> ListenableFuture<List<V>> allAsList( 1543 Iterable<? extends ListenableFuture<? extends V>> futures) { 1544 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 1545 } 1546 1547 /** 1548 * Creates a new {@code ListenableFuture} whose result is set from the 1549 * supplied future when it completes. Cancelling the supplied future 1550 * will also cancel the returned future, but cancelling the returned 1551 * future will have no effect on the supplied future. 1552 * 1553 * @since 15.0 1554 */ 1555 @GwtIncompatible("TODO") 1556 @CheckReturnValue 1557 public static <V> ListenableFuture<V> nonCancellationPropagating( 1558 ListenableFuture<V> future) { 1559 return new NonCancellationPropagatingFuture<V>(future); 1560 } 1561 1562 /** 1563 * A wrapped future that does not propagate cancellation to its delegate. 1564 */ 1565 @GwtIncompatible("TODO") 1566 private static final class NonCancellationPropagatingFuture<V> 1567 extends AbstractFuture.TrustedFuture<V> { 1568 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1569 delegate.addListener(new Runnable() { 1570 @Override public void run() { 1571 // This prevents cancellation from propagating because we don't assign delegate until 1572 // delegate is already done, so calling cancel() on it is a no-op. 1573 setFuture(delegate); 1574 } 1575 }, directExecutor()); 1576 } 1577 } 1578 1579 /** 1580 * Creates a new {@code ListenableFuture} whose value is a list containing the 1581 * values of all its successful input futures. The list of results is in the 1582 * same order as the input list, and if any of the provided futures fails or 1583 * is canceled, its corresponding position will contain {@code null} (which is 1584 * indistinguishable from the future having a successful value of 1585 * {@code null}). 1586 * 1587 * <p>Canceling this future will attempt to cancel all the component futures. 1588 * 1589 * @param futures futures to combine 1590 * @return a future that provides a list of the results of the component 1591 * futures 1592 * @since 10.0 1593 */ 1594 @Beta 1595 @SafeVarargs 1596 @CheckReturnValue 1597 public static <V> ListenableFuture<List<V>> successfulAsList( 1598 ListenableFuture<? extends V>... futures) { 1599 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1600 } 1601 1602 /** 1603 * Creates a new {@code ListenableFuture} whose value is a list containing the 1604 * values of all its successful input futures. The list of results is in the 1605 * same order as the input list, and if any of the provided futures fails or 1606 * is canceled, its corresponding position will contain {@code null} (which is 1607 * indistinguishable from the future having a successful value of 1608 * {@code null}). 1609 * 1610 * <p>Canceling this future will attempt to cancel all the component futures. 1611 * 1612 * @param futures futures to combine 1613 * @return a future that provides a list of the results of the component 1614 * futures 1615 * @since 10.0 1616 */ 1617 @Beta 1618 @CheckReturnValue 1619 public static <V> ListenableFuture<List<V>> successfulAsList( 1620 Iterable<? extends ListenableFuture<? extends V>> futures) { 1621 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1622 } 1623 1624 /** 1625 * Returns a list of delegate futures that correspond to the futures received in the order 1626 * that they complete. Delegate futures return the same value or throw the same exception 1627 * as the corresponding input future returns/throws. 1628 * 1629 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future 1630 * does not correspond to a specific input future until the appropriate number of input 1631 * futures have completed. At that point, it is too late to cancel the input future. 1632 * The input future's result, which cannot be stored into the cancelled delegate future, 1633 * is ignored. 1634 * 1635 * @since 17.0 1636 */ 1637 @Beta 1638 @GwtIncompatible("TODO") 1639 @CheckReturnValue 1640 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1641 Iterable<? extends ListenableFuture<? extends T>> futures) { 1642 // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an 1643 // ArrayDeque 1644 final ConcurrentLinkedQueue<SettableFuture<T>> delegates = 1645 Queues.newConcurrentLinkedQueue(); 1646 ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder(); 1647 // Using SerializingExecutor here will ensure that each CompletionOrderListener executes 1648 // atomically and therefore that each returned future is guaranteed to be in completion order. 1649 // N.B. there are some cases where the use of this executor could have possibly surprising 1650 // effects when input futures finish at approximately the same time _and_ the output futures 1651 // have directExecutor listeners. In this situation, the listeners may end up running on a 1652 // different thread than if they were attached to the corresponding input future. We believe 1653 // this to be a negligible cost since: 1654 // 1. Using the directExecutor implies that your callback is safe to run on any thread. 1655 // 2. This would likely only be noticeable if you were doing something expensive or blocking on 1656 // a directExecutor listener on one of the output futures which is an antipattern anyway. 1657 SerializingExecutor executor = new SerializingExecutor(directExecutor()); 1658 for (final ListenableFuture<? extends T> future : futures) { 1659 SettableFuture<T> delegate = SettableFuture.create(); 1660 // Must make sure to add the delegate to the queue first in case the future is already done 1661 delegates.add(delegate); 1662 future.addListener(new Runnable() { 1663 @Override public void run() { 1664 delegates.remove().setFuture(future); 1665 } 1666 }, executor); 1667 listBuilder.add(delegate); 1668 } 1669 return listBuilder.build(); 1670 } 1671 1672 /** 1673 * Registers separate success and failure callbacks to be run when the {@code 1674 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1675 * complete} or, if the computation is already complete, immediately. 1676 * 1677 * <p>There is no guaranteed ordering of execution of callbacks, but any 1678 * callback added through this method is guaranteed to be called once the 1679 * computation is complete. 1680 * 1681 * Example: <pre> {@code 1682 * ListenableFuture<QueryResult> future = ...; 1683 * addCallback(future, 1684 * new FutureCallback<QueryResult> { 1685 * public void onSuccess(QueryResult result) { 1686 * storeInCache(result); 1687 * } 1688 * public void onFailure(Throwable t) { 1689 * reportError(t); 1690 * } 1691 * });}</pre> 1692 * 1693 * <p>This overload, which does not accept an executor, uses {@code 1694 * directExecutor}, a dangerous choice in some cases. See the discussion in 1695 * the {@link ListenableFuture#addListener ListenableFuture.addListener} 1696 * documentation. 1697 * 1698 * <p>For a more general interface to attach a completion listener to a 1699 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1700 * 1701 * @param future The future attach the callback to. 1702 * @param callback The callback to invoke when {@code future} is completed. 1703 * @since 10.0 1704 */ 1705 public static <V> void addCallback(ListenableFuture<V> future, 1706 FutureCallback<? super V> callback) { 1707 addCallback(future, callback, directExecutor()); 1708 } 1709 1710 /** 1711 * Registers separate success and failure callbacks to be run when the {@code 1712 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1713 * complete} or, if the computation is already complete, immediately. 1714 * 1715 * <p>The callback is run in {@code executor}. 1716 * There is no guaranteed ordering of execution of callbacks, but any 1717 * callback added through this method is guaranteed to be called once the 1718 * computation is complete. 1719 * 1720 * Example: <pre> {@code 1721 * ListenableFuture<QueryResult> future = ...; 1722 * Executor e = ... 1723 * addCallback(future, 1724 * new FutureCallback<QueryResult> { 1725 * public void onSuccess(QueryResult result) { 1726 * storeInCache(result); 1727 * } 1728 * public void onFailure(Throwable t) { 1729 * reportError(t); 1730 * } 1731 * }, e);}</pre> 1732 * 1733 * <p>When selecting an executor, note that {@code directExecutor} is 1734 * dangerous in some cases. See the discussion in the {@link 1735 * ListenableFuture#addListener ListenableFuture.addListener} documentation. 1736 * 1737 * <p>For a more general interface to attach a completion listener to a 1738 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1739 * 1740 * @param future The future attach the callback to. 1741 * @param callback The callback to invoke when {@code future} is completed. 1742 * @param executor The executor to run {@code callback} when the future 1743 * completes. 1744 * @since 10.0 1745 */ 1746 public static <V> void addCallback(final ListenableFuture<V> future, 1747 final FutureCallback<? super V> callback, Executor executor) { 1748 Preconditions.checkNotNull(callback); 1749 Runnable callbackListener = new Runnable() { 1750 @Override 1751 public void run() { 1752 final V value; 1753 try { 1754 // TODO(user): (Before Guava release), validate that this 1755 // is the thing for IE. 1756 value = getUninterruptibly(future); 1757 } catch (ExecutionException e) { 1758 callback.onFailure(e.getCause()); 1759 return; 1760 } catch (RuntimeException e) { 1761 callback.onFailure(e); 1762 return; 1763 } catch (Error e) { 1764 callback.onFailure(e); 1765 return; 1766 } 1767 callback.onSuccess(value); 1768 } 1769 }; 1770 future.addListener(callbackListener, executor); 1771 } 1772 1773 /** 1774 * Returns the result of {@link Future#get()}, converting most exceptions to a 1775 * new instance of the given checked exception type. This reduces boilerplate 1776 * for a common use of {@code Future} in which it is unnecessary to 1777 * programmatically distinguish between exception types or to extract other 1778 * information from the exception instance. 1779 * 1780 * <p>Exceptions from {@code Future.get} are treated as follows: 1781 * <ul> 1782 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1783 * {@code X} if the cause is a checked exception, an {@link 1784 * UncheckedExecutionException} if the cause is a {@code 1785 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1786 * {@code Error}. 1787 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1788 * restoring the interrupt). 1789 * <li>Any {@link CancellationException} is propagated untouched, as is any 1790 * other {@link RuntimeException} (though {@code get} implementations are 1791 * discouraged from throwing such exceptions). 1792 * </ul> 1793 * 1794 * <p>The overall principle is to continue to treat every checked exception as a 1795 * checked exception, every unchecked exception as an unchecked exception, and 1796 * every error as an error. In addition, the cause of any {@code 1797 * ExecutionException} is wrapped in order to ensure that the new stack trace 1798 * matches that of the current thread. 1799 * 1800 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1801 * public constructor that accepts zero or more arguments, all of type {@code 1802 * String} or {@code Throwable} (preferring constructors with at least one 1803 * {@code String}) and calling the constructor via reflection. If the 1804 * exception did not already have a cause, one is set by calling {@link 1805 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1806 * {@code IllegalArgumentException} is thrown. 1807 * 1808 * @throws X if {@code get} throws any checked exception except for an {@code 1809 * ExecutionException} whose cause is not itself a checked exception 1810 * @throws UncheckedExecutionException if {@code get} throws an {@code 1811 * ExecutionException} with a {@code RuntimeException} as its cause 1812 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1813 * with an {@code Error} as its cause 1814 * @throws CancellationException if {@code get} throws a {@code 1815 * CancellationException} 1816 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1817 * RuntimeException} or does not have a suitable constructor 1818 * @since 10.0 1819 * @deprecated Use {@link #getChecked(Future, Class)}. This method will be 1820 * removed in Guava release 20.0. 1821 */ 1822 @Deprecated 1823 @GwtIncompatible("reflection") 1824 public static <V, X extends Exception> V get( 1825 Future<V> future, Class<X> exceptionClass) throws X { 1826 return getChecked(future, exceptionClass); 1827 } 1828 1829 /** 1830 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1831 * exceptions to a new instance of the given checked exception type. This 1832 * reduces boilerplate for a common use of {@code Future} in which it is 1833 * unnecessary to programmatically distinguish between exception types or to 1834 * extract other information from the exception instance. 1835 * 1836 * <p>Exceptions from {@code Future.get} are treated as follows: 1837 * <ul> 1838 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1839 * {@code X} if the cause is a checked exception, an {@link 1840 * UncheckedExecutionException} if the cause is a {@code 1841 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1842 * {@code Error}. 1843 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1844 * restoring the interrupt). 1845 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1846 * <li>Any {@link CancellationException} is propagated untouched, as is any 1847 * other {@link RuntimeException} (though {@code get} implementations are 1848 * discouraged from throwing such exceptions). 1849 * </ul> 1850 * 1851 * <p>The overall principle is to continue to treat every checked exception as a 1852 * checked exception, every unchecked exception as an unchecked exception, and 1853 * every error as an error. In addition, the cause of any {@code 1854 * ExecutionException} is wrapped in order to ensure that the new stack trace 1855 * matches that of the current thread. 1856 * 1857 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1858 * public constructor that accepts zero or more arguments, all of type {@code 1859 * String} or {@code Throwable} (preferring constructors with at least one 1860 * {@code String}) and calling the constructor via reflection. If the 1861 * exception did not already have a cause, one is set by calling {@link 1862 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1863 * {@code IllegalArgumentException} is thrown. 1864 * 1865 * @throws X if {@code get} throws any checked exception except for an {@code 1866 * ExecutionException} whose cause is not itself a checked exception 1867 * @throws UncheckedExecutionException if {@code get} throws an {@code 1868 * ExecutionException} with a {@code RuntimeException} as its cause 1869 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1870 * with an {@code Error} as its cause 1871 * @throws CancellationException if {@code get} throws a {@code 1872 * CancellationException} 1873 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1874 * RuntimeException} or does not have a suitable constructor 1875 * @since 10.0 1876 * @deprecated Use {@link #getChecked(Future, Class, long, TimeUnit)}, noting 1877 * the change in parameter order. This method will be removed in Guava 1878 * release 20.0. 1879 */ 1880 @Deprecated 1881 @GwtIncompatible("reflection") 1882 public static <V, X extends Exception> V get( 1883 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1884 throws X { 1885 return getChecked(future, exceptionClass, timeout, unit); 1886 } 1887 1888 /** 1889 * Returns the result of {@link Future#get()}, converting most exceptions to a 1890 * new instance of the given checked exception type. This reduces boilerplate 1891 * for a common use of {@code Future} in which it is unnecessary to 1892 * programmatically distinguish between exception types or to extract other 1893 * information from the exception instance. 1894 * 1895 * <p>Exceptions from {@code Future.get} are treated as follows: 1896 * <ul> 1897 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1898 * {@code X} if the cause is a checked exception, an {@link 1899 * UncheckedExecutionException} if the cause is a {@code 1900 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1901 * {@code Error}. 1902 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1903 * restoring the interrupt). 1904 * <li>Any {@link CancellationException} is propagated untouched, as is any 1905 * other {@link RuntimeException} (though {@code get} implementations are 1906 * discouraged from throwing such exceptions). 1907 * </ul> 1908 * 1909 * <p>The overall principle is to continue to treat every checked exception as a 1910 * checked exception, every unchecked exception as an unchecked exception, and 1911 * every error as an error. In addition, the cause of any {@code 1912 * ExecutionException} is wrapped in order to ensure that the new stack trace 1913 * matches that of the current thread. 1914 * 1915 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1916 * public constructor that accepts zero or more arguments, all of type {@code 1917 * String} or {@code Throwable} (preferring constructors with at least one 1918 * {@code String}) and calling the constructor via reflection. If the 1919 * exception did not already have a cause, one is set by calling {@link 1920 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1921 * {@code IllegalArgumentException} is thrown. 1922 * 1923 * @throws X if {@code get} throws any checked exception except for an {@code 1924 * ExecutionException} whose cause is not itself a checked exception 1925 * @throws UncheckedExecutionException if {@code get} throws an {@code 1926 * ExecutionException} with a {@code RuntimeException} as its cause 1927 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1928 * with an {@code Error} as its cause 1929 * @throws CancellationException if {@code get} throws a {@code 1930 * CancellationException} 1931 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1932 * RuntimeException} or does not have a suitable constructor 1933 * @since 19.0 (in 10.0 as {@code get}) 1934 */ 1935 @GwtIncompatible("reflection") 1936 public static <V, X extends Exception> V getChecked( 1937 Future<V> future, Class<X> exceptionClass) throws X { 1938 return FuturesGetChecked.getChecked(future, exceptionClass); 1939 } 1940 1941 /** 1942 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1943 * exceptions to a new instance of the given checked exception type. This 1944 * reduces boilerplate for a common use of {@code Future} in which it is 1945 * unnecessary to programmatically distinguish between exception types or to 1946 * extract other information from the exception instance. 1947 * 1948 * <p>Exceptions from {@code Future.get} are treated as follows: 1949 * <ul> 1950 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1951 * {@code X} if the cause is a checked exception, an {@link 1952 * UncheckedExecutionException} if the cause is a {@code 1953 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1954 * {@code Error}. 1955 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1956 * restoring the interrupt). 1957 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1958 * <li>Any {@link CancellationException} is propagated untouched, as is any 1959 * other {@link RuntimeException} (though {@code get} implementations are 1960 * discouraged from throwing such exceptions). 1961 * </ul> 1962 * 1963 * <p>The overall principle is to continue to treat every checked exception as a 1964 * checked exception, every unchecked exception as an unchecked exception, and 1965 * every error as an error. In addition, the cause of any {@code 1966 * ExecutionException} is wrapped in order to ensure that the new stack trace 1967 * matches that of the current thread. 1968 * 1969 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1970 * public constructor that accepts zero or more arguments, all of type {@code 1971 * String} or {@code Throwable} (preferring constructors with at least one 1972 * {@code String}) and calling the constructor via reflection. If the 1973 * exception did not already have a cause, one is set by calling {@link 1974 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1975 * {@code IllegalArgumentException} is thrown. 1976 * 1977 * @throws X if {@code get} throws any checked exception except for an {@code 1978 * ExecutionException} whose cause is not itself a checked exception 1979 * @throws UncheckedExecutionException if {@code get} throws an {@code 1980 * ExecutionException} with a {@code RuntimeException} as its cause 1981 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1982 * with an {@code Error} as its cause 1983 * @throws CancellationException if {@code get} throws a {@code 1984 * CancellationException} 1985 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1986 * RuntimeException} or does not have a suitable constructor 1987 * @since 19.0 (in 10.0 as {@code get} and with different parameter order) 1988 */ 1989 @GwtIncompatible("reflection") 1990 public static <V, X extends Exception> V getChecked( 1991 Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit) 1992 throws X { 1993 return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit); 1994 } 1995 1996 /** 1997 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1998 * task known not to throw a checked exception. This makes {@code Future} more 1999 * suitable for lightweight, fast-running tasks that, barring bugs in the 2000 * code, will not fail. This gives it exception-handling behavior similar to 2001 * that of {@code ForkJoinTask.join}. 2002 * 2003 * <p>Exceptions from {@code Future.get} are treated as follows: 2004 * <ul> 2005 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 2006 * {@link UncheckedExecutionException} (if the cause is an {@code 2007 * Exception}) or {@link ExecutionError} (if the cause is an {@code 2008 * Error}). 2009 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 2010 * call. The interrupt is restored before {@code getUnchecked} returns. 2011 * <li>Any {@link CancellationException} is propagated untouched. So is any 2012 * other {@link RuntimeException} ({@code get} implementations are 2013 * discouraged from throwing such exceptions). 2014 * </ul> 2015 * 2016 * <p>The overall principle is to eliminate all checked exceptions: to loop to 2017 * avoid {@code InterruptedException}, to pass through {@code 2018 * CancellationException}, and to wrap any exception from the underlying 2019 * computation in an {@code UncheckedExecutionException} or {@code 2020 * ExecutionError}. 2021 * 2022 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 2023 * {@link Uninterruptibles#getUninterruptibly(Future)}. 2024 * 2025 * @throws UncheckedExecutionException if {@code get} throws an {@code 2026 * ExecutionException} with an {@code Exception} as its cause 2027 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 2028 * with an {@code Error} as its cause 2029 * @throws CancellationException if {@code get} throws a {@code 2030 * CancellationException} 2031 * @since 10.0 2032 */ 2033 @GwtIncompatible("TODO") 2034 public static <V> V getUnchecked(Future<V> future) { 2035 checkNotNull(future); 2036 try { 2037 return getUninterruptibly(future); 2038 } catch (ExecutionException e) { 2039 wrapAndThrowUnchecked(e.getCause()); 2040 throw new AssertionError(); 2041 } 2042 } 2043 2044 @GwtIncompatible("TODO") 2045 private static void wrapAndThrowUnchecked(Throwable cause) { 2046 if (cause instanceof Error) { 2047 throw new ExecutionError((Error) cause); 2048 } 2049 /* 2050 * It's a non-Error, non-Exception Throwable. From my survey of such 2051 * classes, I believe that most users intended to extend Exception, so we'll 2052 * treat it like an Exception. 2053 */ 2054 throw new UncheckedExecutionException(cause); 2055 } 2056 2057 /* 2058 * Arguably we don't need a timed getUnchecked because any operation slow 2059 * enough to require a timeout is heavyweight enough to throw a checked 2060 * exception and therefore be inappropriate to use with getUnchecked. Further, 2061 * it's not clear that converting the checked TimeoutException to a 2062 * RuntimeException -- especially to an UncheckedExecutionException, since it 2063 * wasn't thrown by the computation -- makes sense, and if we don't convert 2064 * it, the user still has to write a try-catch block. 2065 * 2066 * If you think you would use this method, let us know. You might also also 2067 * look into the Fork-Join framework: 2068 * http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html 2069 */ 2070 2071 /** Used for {@link #allAsList} and {@link #successfulAsList}. */ 2072 private static final class ListFuture<V> extends CollectionFuture<V, List<V>> { 2073 ListFuture(ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 2074 boolean allMustSucceed) { 2075 init(new ListFutureRunningState(futures, allMustSucceed)); 2076 } 2077 2078 private final class ListFutureRunningState extends CollectionFutureRunningState { 2079 ListFutureRunningState(ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 2080 boolean allMustSucceed) { 2081 super(futures, allMustSucceed); 2082 } 2083 2084 @Override 2085 public List<V> combine(List<Optional<V>> values) { 2086 List<V> result = Lists.newArrayList(); 2087 for (Optional<V> element : values) { 2088 result.add(element != null ? element.orNull() : null); 2089 } 2090 return Collections.unmodifiableList(result); 2091 } 2092 } 2093 } 2094 2095 /** 2096 * A checked future that uses a function to map from exceptions to the 2097 * appropriate checked type. 2098 */ 2099 @GwtIncompatible("TODO") 2100 private static class MappingCheckedFuture<V, X extends Exception> extends 2101 AbstractCheckedFuture<V, X> { 2102 2103 final Function<? super Exception, X> mapper; 2104 2105 MappingCheckedFuture(ListenableFuture<V> delegate, 2106 Function<? super Exception, X> mapper) { 2107 super(delegate); 2108 2109 this.mapper = checkNotNull(mapper); 2110 } 2111 2112 @Override 2113 protected X mapException(Exception e) { 2114 return mapper.apply(e); 2115 } 2116 } 2117}