001/*
002 * Copyright (C) 2011 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static java.util.concurrent.TimeUnit.NANOSECONDS;
018
019import com.google.common.annotations.Beta;
020import com.google.common.annotations.GwtCompatible;
021import com.google.common.annotations.GwtIncompatible;
022import com.google.common.base.Preconditions;
023import com.google.errorprone.annotations.CanIgnoreReturnValue;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.CancellationException;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Future;
029import java.util.concurrent.Semaphore;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.TimeoutException;
032
033/**
034 * Utilities for treating interruptible operations as uninterruptible. In all cases, if a thread is
035 * interrupted during such a call, the call continues to block until the result is available or the
036 * timeout elapses, and only then re-interrupts the thread.
037 *
038 * @author Anthony Zana
039 * @since 10.0
040 */
041@Beta
042@GwtCompatible(emulated = true)
043public final class Uninterruptibles {
044
045  // Implementation Note: As of 3-7-11, the logic for each blocking/timeout
046  // methods is identical, save for method being invoked.
047
048  /**
049   * Invokes {@code latch.}{@link CountDownLatch#await() await()} uninterruptibly.
050   */
051  @GwtIncompatible // concurrency
052  public static void awaitUninterruptibly(CountDownLatch latch) {
053    boolean interrupted = false;
054    try {
055      while (true) {
056        try {
057          latch.await();
058          return;
059        } catch (InterruptedException e) {
060          interrupted = true;
061        }
062      }
063    } finally {
064      if (interrupted) {
065        Thread.currentThread().interrupt();
066      }
067    }
068  }
069
070  /**
071   * Invokes {@code latch.}{@link CountDownLatch#await(long, TimeUnit) await(timeout, unit)}
072   * uninterruptibly.
073   */
074  @CanIgnoreReturnValue // TODO(cpovirk): Consider being more strict.
075  @GwtIncompatible // concurrency
076  public static boolean awaitUninterruptibly(CountDownLatch latch, long timeout, TimeUnit unit) {
077    boolean interrupted = false;
078    try {
079      long remainingNanos = unit.toNanos(timeout);
080      long end = System.nanoTime() + remainingNanos;
081
082      while (true) {
083        try {
084          // CountDownLatch treats negative timeouts just like zero.
085          return latch.await(remainingNanos, NANOSECONDS);
086        } catch (InterruptedException e) {
087          interrupted = true;
088          remainingNanos = end - System.nanoTime();
089        }
090      }
091    } finally {
092      if (interrupted) {
093        Thread.currentThread().interrupt();
094      }
095    }
096  }
097
098  /**
099   * Invokes {@code toJoin.}{@link Thread#join() join()} uninterruptibly.
100   */
101  @GwtIncompatible // concurrency
102  public static void joinUninterruptibly(Thread toJoin) {
103    boolean interrupted = false;
104    try {
105      while (true) {
106        try {
107          toJoin.join();
108          return;
109        } catch (InterruptedException e) {
110          interrupted = true;
111        }
112      }
113    } finally {
114      if (interrupted) {
115        Thread.currentThread().interrupt();
116      }
117    }
118  }
119
120  /**
121   * Invokes {@code future.}{@link Future#get() get()} uninterruptibly.
122   *
123   * <p>Similar methods:
124   *
125   * <ul>
126   * <li>To retrieve a result from a {@code Future} that is already done, use
127   *     {@link Futures#getDone Futures.getDone}.
128   * <li>To treat {@link InterruptedException} uniformly with other exceptions, use
129   *     {@link Futures#getChecked(Future, Class) Futures.getChecked}.
130   * <li>To get uninterruptibility and remove checked exceptions, use {@link Futures#getUnchecked}.
131   * </ul>
132   *
133   * @throws ExecutionException if the computation threw an exception
134   * @throws CancellationException if the computation was cancelled
135   */
136  @CanIgnoreReturnValue
137  public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
138    boolean interrupted = false;
139    try {
140      while (true) {
141        try {
142          return future.get();
143        } catch (InterruptedException e) {
144          interrupted = true;
145        }
146      }
147    } finally {
148      if (interrupted) {
149        Thread.currentThread().interrupt();
150      }
151    }
152  }
153
154  /**
155   * Invokes {@code future.}{@link Future#get(long, TimeUnit) get(timeout, unit)} uninterruptibly.
156   *
157   * <p>Similar methods:
158   *
159   * <ul>
160   * <li>To retrieve a result from a {@code Future} that is already done, use
161   *     {@link Futures#getDone Futures.getDone}.
162   * <li>To treat {@link InterruptedException} uniformly with other exceptions, use
163   *     {@link Futures#getChecked(Future, Class, long, TimeUnit) Futures.getChecked}.
164   * <li>To get uninterruptibility and remove checked exceptions, use {@link Futures#getUnchecked}.
165   * </ul>
166   *
167   * @throws ExecutionException if the computation threw an exception
168   * @throws CancellationException if the computation was cancelled
169   * @throws TimeoutException if the wait timed out
170   */
171  @CanIgnoreReturnValue
172  @GwtIncompatible // TODO
173  public static <V> V getUninterruptibly(Future<V> future, long timeout, TimeUnit unit)
174      throws ExecutionException, TimeoutException {
175    boolean interrupted = false;
176    try {
177      long remainingNanos = unit.toNanos(timeout);
178      long end = System.nanoTime() + remainingNanos;
179
180      while (true) {
181        try {
182          // Future treats negative timeouts just like zero.
183          return future.get(remainingNanos, NANOSECONDS);
184        } catch (InterruptedException e) {
185          interrupted = true;
186          remainingNanos = end - System.nanoTime();
187        }
188      }
189    } finally {
190      if (interrupted) {
191        Thread.currentThread().interrupt();
192      }
193    }
194  }
195
196  /**
197   * Invokes {@code unit.}{@link TimeUnit#timedJoin(Thread, long) timedJoin(toJoin, timeout)}
198   * uninterruptibly.
199   */
200  @GwtIncompatible // concurrency
201  public static void joinUninterruptibly(Thread toJoin, long timeout, TimeUnit unit) {
202    Preconditions.checkNotNull(toJoin);
203    boolean interrupted = false;
204    try {
205      long remainingNanos = unit.toNanos(timeout);
206      long end = System.nanoTime() + remainingNanos;
207      while (true) {
208        try {
209          // TimeUnit.timedJoin() treats negative timeouts just like zero.
210          NANOSECONDS.timedJoin(toJoin, remainingNanos);
211          return;
212        } catch (InterruptedException e) {
213          interrupted = true;
214          remainingNanos = end - System.nanoTime();
215        }
216      }
217    } finally {
218      if (interrupted) {
219        Thread.currentThread().interrupt();
220      }
221    }
222  }
223
224  /**
225   * Invokes {@code queue.}{@link BlockingQueue#take() take()} uninterruptibly.
226   */
227  @GwtIncompatible // concurrency
228  public static <E> E takeUninterruptibly(BlockingQueue<E> queue) {
229    boolean interrupted = false;
230    try {
231      while (true) {
232        try {
233          return queue.take();
234        } catch (InterruptedException e) {
235          interrupted = true;
236        }
237      }
238    } finally {
239      if (interrupted) {
240        Thread.currentThread().interrupt();
241      }
242    }
243  }
244
245  /**
246   * Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)} uninterruptibly.
247   *
248   * @throws ClassCastException if the class of the specified element prevents it from being added
249   *     to the given queue
250   * @throws IllegalArgumentException if some property of the specified element prevents it from
251   *     being added to the given queue
252   */
253  @GwtIncompatible // concurrency
254  public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
255    boolean interrupted = false;
256    try {
257      while (true) {
258        try {
259          queue.put(element);
260          return;
261        } catch (InterruptedException e) {
262          interrupted = true;
263        }
264      }
265    } finally {
266      if (interrupted) {
267        Thread.currentThread().interrupt();
268      }
269    }
270  }
271
272  // TODO(user): Support Sleeper somehow (wrapper or interface method)?
273  /**
274   * Invokes {@code unit.}{@link TimeUnit#sleep(long) sleep(sleepFor)} uninterruptibly.
275   */
276  @GwtIncompatible // concurrency
277  public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
278    boolean interrupted = false;
279    try {
280      long remainingNanos = unit.toNanos(sleepFor);
281      long end = System.nanoTime() + remainingNanos;
282      while (true) {
283        try {
284          // TimeUnit.sleep() treats negative timeouts just like zero.
285          NANOSECONDS.sleep(remainingNanos);
286          return;
287        } catch (InterruptedException e) {
288          interrupted = true;
289          remainingNanos = end - System.nanoTime();
290        }
291      }
292    } finally {
293      if (interrupted) {
294        Thread.currentThread().interrupt();
295      }
296    }
297  }
298
299  /**
300   * Invokes {@code semaphore.}{@link Semaphore#tryAcquire(int, long, TimeUnit) tryAcquire(1,
301   * timeout, unit)} uninterruptibly.
302   *
303   * @since 18.0
304   */
305  @GwtIncompatible // concurrency
306  public static boolean tryAcquireUninterruptibly(
307      Semaphore semaphore, long timeout, TimeUnit unit) {
308    return tryAcquireUninterruptibly(semaphore, 1, timeout, unit);
309  }
310
311  /**
312   * Invokes {@code semaphore.}{@link Semaphore#tryAcquire(int, long, TimeUnit) tryAcquire(permits,
313   * timeout, unit)} uninterruptibly.
314   *
315   * @since 18.0
316   */
317  @GwtIncompatible // concurrency
318  public static boolean tryAcquireUninterruptibly(
319      Semaphore semaphore, int permits, long timeout, TimeUnit unit) {
320    boolean interrupted = false;
321    try {
322      long remainingNanos = unit.toNanos(timeout);
323      long end = System.nanoTime() + remainingNanos;
324
325      while (true) {
326        try {
327          // Semaphore treats negative timeouts just like zero.
328          return semaphore.tryAcquire(permits, remainingNanos, NANOSECONDS);
329        } catch (InterruptedException e) {
330          interrupted = true;
331          remainingNanos = end - System.nanoTime();
332        }
333      }
334    } finally {
335      if (interrupted) {
336        Thread.currentThread().interrupt();
337      }
338    }
339  }
340
341  // TODO(user): Add support for waitUninterruptibly.
342
343  private Uninterruptibles() {}
344}