001/*
002 * Copyright (C) 2011 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import com.google.common.annotations.Beta;
018import com.google.common.annotations.GwtIncompatible;
019import com.google.common.annotations.VisibleForTesting;
020import com.google.common.base.MoreObjects;
021import com.google.common.base.Preconditions;
022import com.google.common.base.Supplier;
023import com.google.common.collect.ImmutableList;
024import com.google.common.collect.Iterables;
025import com.google.common.collect.MapMaker;
026import com.google.common.math.IntMath;
027import com.google.common.primitives.Ints;
028import java.lang.ref.Reference;
029import java.lang.ref.ReferenceQueue;
030import java.lang.ref.WeakReference;
031import java.math.RoundingMode;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.Semaphore;
037import java.util.concurrent.atomic.AtomicReferenceArray;
038import java.util.concurrent.locks.Lock;
039import java.util.concurrent.locks.ReadWriteLock;
040import java.util.concurrent.locks.ReentrantLock;
041import java.util.concurrent.locks.ReentrantReadWriteLock;
042
043/**
044 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar
045 * to that of {@code ConcurrentHashMap} in a reusable form, and extends it for semaphores and
046 * read-write locks. Conceptually, lock striping is the technique of dividing a lock into many
047 * <i>stripes</i>, increasing the granularity of a single lock and allowing independent operations
048 * to lock different stripes and proceed concurrently, instead of creating contention for a single
049 * lock.
050 *
051 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore),
052 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} (assuming
053 * {@link Object#hashCode()} is correctly implemented for the keys). Note that if {@code key1} is
054 * <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> guaranteed that
055 * {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless be mapped to the
056 * same lock. The lower the number of stripes, the higher the probability of this happening.
057 *
058 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, and
059 * {@code Striped<ReadWriteLock>}. For each type, two implementations are offered:
060 * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} {@code Striped<Lock>},
061 * {@linkplain #semaphore(int, int) strong} and {@linkplain #lazyWeakSemaphore(int, int) weak}
062 * {@code Striped<Semaphore>}, and {@linkplain #readWriteLock(int) strong} and
063 * {@linkplain #lazyWeakReadWriteLock(int) weak} {@code Striped<ReadWriteLock>}. <i>Strong</i> means
064 * that all stripes (locks/semaphores) are initialized eagerly, and are not reclaimed unless
065 * {@code Striped} itself is reclaimable. <i>Weak</i> means that locks/semaphores are created
066 * lazily, and they are allowed to be reclaimed if nobody is holding on to them. This is useful, for
067 * example, if one wants to create a {@code
068 * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these would
069 * be in use.
070 *
071 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K}
072 * represents the task. This maximizes concurrency by having each unique key mapped to a unique
073 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock for
074 * all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of choosing
075 * either of these extremes, {@code Striped} allows the user to trade between required concurrency
076 * and memory footprint. For example, if a set of tasks are CPU-bound, one could easily create a
077 * very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, instead of
078 * possibly thousands of locks which could be created in a {@code Map<K, Lock>} structure.
079 *
080 * @author Dimitris Andreou
081 * @since 13.0
082 */
083@Beta
084@GwtIncompatible
085public abstract class Striped<L> {
086  /**
087   * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be
088   * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As
089   * always, if many stripes are in use, a non-lazy striped makes more sense.)
090   */
091  private static final int LARGE_LAZY_CUTOFF = 1024;
092
093  private Striped() {}
094
095  /**
096   * Returns the stripe that corresponds to the passed key. It is always guaranteed that if
097   * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
098   *
099   * @param key an arbitrary, non-null key
100   * @return the stripe that the passed key corresponds to
101   */
102  public abstract L get(Object key);
103
104  /**
105   * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to {@code size()},
106   * exclusively.
107   *
108   * @param index the index of the stripe to return; must be in {@code [0...size())}
109   * @return the stripe at the specified index
110   */
111  public abstract L getAt(int index);
112
113  /**
114   * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
115   */
116  abstract int indexFor(Object key);
117
118  /**
119   * Returns the total number of stripes in this instance.
120   */
121  public abstract int size();
122
123  /**
124   * Returns the stripes that correspond to the passed objects, in ascending (as per
125   * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned by this
126   * method are guaranteed to not deadlock each other.
127   *
128   * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and
129   * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number of
130   * shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays are
131   * needed for a pair of them to match). Please consider carefully the implications of the number
132   * of stripes, the intended concurrency level, and the typical number of keys used in a
133   * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
134   * in Bins model</a> for mathematical formulas that can be used to estimate the probability of
135   * collisions.
136   *
137   * @param keys arbitrary non-null keys
138   * @return the stripes corresponding to the objects (one per each object, derived by delegating to
139   *     {@link #get(Object)}; may contain duplicates), in an increasing index order.
140   */
141  public Iterable<L> bulkGet(Iterable<?> keys) {
142    // Initially using the array to store the keys, then reusing it to store the respective L's
143    final Object[] array = Iterables.toArray(keys, Object.class);
144    if (array.length == 0) {
145      return ImmutableList.of();
146    }
147    int[] stripes = new int[array.length];
148    for (int i = 0; i < array.length; i++) {
149      stripes[i] = indexFor(array[i]);
150    }
151    Arrays.sort(stripes);
152    // optimize for runs of identical stripes
153    int previousStripe = stripes[0];
154    array[0] = getAt(previousStripe);
155    for (int i = 1; i < array.length; i++) {
156      int currentStripe = stripes[i];
157      if (currentStripe == previousStripe) {
158        array[i] = array[i - 1];
159      } else {
160        array[i] = getAt(currentStripe);
161        previousStripe = currentStripe;
162      }
163    }
164    /*
165     * Note that the returned Iterable holds references to the returned stripes, to avoid
166     * error-prone code like:
167     *
168     * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
169     * Iterable<Lock> locks = stripedLock.bulkGet(keys);
170     * for (Lock lock : locks) {
171     *   lock.lock();
172     * }
173     * operation();
174     * for (Lock lock : locks) {
175     *   lock.unlock();
176     * }
177     *
178     * If we only held the int[] stripes, translating it on the fly to L's, the original locks might
179     * be garbage collected after locking them, ending up in a huge mess.
180     */
181    @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
182    List<L> asList = (List<L>) Arrays.asList(array);
183    return Collections.unmodifiableList(asList);
184  }
185
186  // Static factories
187
188  /**
189   * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. Every lock
190   * is reentrant.
191   *
192   * @param stripes the minimum number of stripes (locks) required
193   * @return a new {@code Striped<Lock>}
194   */
195  public static Striped<Lock> lock(int stripes) {
196    return new CompactStriped<Lock>(
197        stripes,
198        new Supplier<Lock>() {
199          @Override
200          public Lock get() {
201            return new PaddedLock();
202          }
203        });
204  }
205
206  /**
207   * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. Every lock is
208   * reentrant.
209   *
210   * @param stripes the minimum number of stripes (locks) required
211   * @return a new {@code Striped<Lock>}
212   */
213  public static Striped<Lock> lazyWeakLock(int stripes) {
214    return lazy(
215        stripes,
216        new Supplier<Lock>() {
217          @Override
218          public Lock get() {
219            return new ReentrantLock(false);
220          }
221        });
222  }
223
224  private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) {
225    return stripes < LARGE_LAZY_CUTOFF
226        ? new SmallLazyStriped<L>(stripes, supplier)
227        : new LargeLazyStriped<L>(stripes, supplier);
228  }
229
230  /**
231   * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
232   * with the specified number of permits.
233   *
234   * @param stripes the minimum number of stripes (semaphores) required
235   * @param permits the number of permits in each semaphore
236   * @return a new {@code Striped<Semaphore>}
237   */
238  public static Striped<Semaphore> semaphore(int stripes, final int permits) {
239    return new CompactStriped<Semaphore>(
240        stripes,
241        new Supplier<Semaphore>() {
242          @Override
243          public Semaphore get() {
244            return new PaddedSemaphore(permits);
245          }
246        });
247  }
248
249  /**
250   * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
251   * with the specified number of permits.
252   *
253   * @param stripes the minimum number of stripes (semaphores) required
254   * @param permits the number of permits in each semaphore
255   * @return a new {@code Striped<Semaphore>}
256   */
257  public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) {
258    return lazy(
259        stripes,
260        new Supplier<Semaphore>() {
261          @Override
262          public Semaphore get() {
263            return new Semaphore(permits, false);
264          }
265        });
266  }
267
268  /**
269   * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
270   * read-write locks. Every lock is reentrant.
271   *
272   * @param stripes the minimum number of stripes (locks) required
273   * @return a new {@code Striped<ReadWriteLock>}
274   */
275  public static Striped<ReadWriteLock> readWriteLock(int stripes) {
276    return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER);
277  }
278
279  /**
280   * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced read-write
281   * locks. Every lock is reentrant.
282   *
283   * @param stripes the minimum number of stripes (locks) required
284   * @return a new {@code Striped<ReadWriteLock>}
285   */
286  public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
287    return lazy(stripes, READ_WRITE_LOCK_SUPPLIER);
288  }
289
290  // ReentrantReadWriteLock is large enough to make padding probably unnecessary
291  private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER =
292      new Supplier<ReadWriteLock>() {
293        @Override
294        public ReadWriteLock get() {
295          return new ReentrantReadWriteLock();
296        }
297      };
298
299  private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
300    /** Capacity (power of two) minus one, for fast mod evaluation */
301    final int mask;
302
303    PowerOfTwoStriped(int stripes) {
304      Preconditions.checkArgument(stripes > 0, "Stripes must be positive");
305      this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
306    }
307
308    @Override
309    final int indexFor(Object key) {
310      int hash = smear(key.hashCode());
311      return hash & mask;
312    }
313
314    @Override
315    public final L get(Object key) {
316      return getAt(indexFor(key));
317    }
318  }
319
320  /**
321   * Implementation of Striped where 2^k stripes are represented as an array of the same length,
322   * eagerly initialized.
323   */
324  private static class CompactStriped<L> extends PowerOfTwoStriped<L> {
325    /** Size is a power of two. */
326    private final Object[] array;
327
328    private CompactStriped(int stripes, Supplier<L> supplier) {
329      super(stripes);
330      Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)");
331
332      this.array = new Object[mask + 1];
333      for (int i = 0; i < array.length; i++) {
334        array[i] = supplier.get();
335      }
336    }
337
338    @SuppressWarnings("unchecked") // we only put L's in the array
339    @Override
340    public L getAt(int index) {
341      return (L) array[index];
342    }
343
344    @Override
345    public int size() {
346      return array.length;
347    }
348  }
349
350  /**
351   * Implementation of Striped where up to 2^k stripes can be represented, using an
352   * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the
353   * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
354   */
355  @VisibleForTesting
356  static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> {
357    final AtomicReferenceArray<ArrayReference<? extends L>> locks;
358    final Supplier<L> supplier;
359    final int size;
360    final ReferenceQueue<L> queue = new ReferenceQueue<L>();
361
362    SmallLazyStriped(int stripes, Supplier<L> supplier) {
363      super(stripes);
364      this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
365      this.locks = new AtomicReferenceArray<ArrayReference<? extends L>>(size);
366      this.supplier = supplier;
367    }
368
369    @Override
370    public L getAt(int index) {
371      if (size != Integer.MAX_VALUE) {
372        Preconditions.checkElementIndex(index, size());
373      } // else no check necessary, all index values are valid
374      ArrayReference<? extends L> existingRef = locks.get(index);
375      L existing = existingRef == null ? null : existingRef.get();
376      if (existing != null) {
377        return existing;
378      }
379      L created = supplier.get();
380      ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue);
381      while (!locks.compareAndSet(index, existingRef, newRef)) {
382        // we raced, we need to re-read and try again
383        existingRef = locks.get(index);
384        existing = existingRef == null ? null : existingRef.get();
385        if (existing != null) {
386          return existing;
387        }
388      }
389      drainQueue();
390      return created;
391    }
392
393    // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references
394    // in the array. We could skip this if we decide we don't care about holding on to Reference
395    // objects indefinitely.
396    private void drainQueue() {
397      Reference<? extends L> ref;
398      while ((ref = queue.poll()) != null) {
399        // We only ever register ArrayReferences with the queue so this is always safe.
400        ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
401        // Try to clear out the array slot, n.b. if we fail that is fine, in either case the
402        // arrayRef will be out of the array after this step.
403        locks.compareAndSet(arrayRef.index, arrayRef, null);
404      }
405    }
406
407    @Override
408    public int size() {
409      return size;
410    }
411
412    private static final class ArrayReference<L> extends WeakReference<L> {
413      final int index;
414
415      ArrayReference(L referent, int index, ReferenceQueue<L> queue) {
416        super(referent, queue);
417        this.index = index;
418      }
419    }
420  }
421
422  /**
423   * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap
424   * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the
425   * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
426   */
427  @VisibleForTesting
428  static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> {
429    final ConcurrentMap<Integer, L> locks;
430    final Supplier<L> supplier;
431    final int size;
432
433    LargeLazyStriped(int stripes, Supplier<L> supplier) {
434      super(stripes);
435      this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
436      this.supplier = supplier;
437      this.locks = new MapMaker().weakValues().makeMap();
438    }
439
440    @Override
441    public L getAt(int index) {
442      if (size != Integer.MAX_VALUE) {
443        Preconditions.checkElementIndex(index, size());
444      } // else no check necessary, all index values are valid
445      L existing = locks.get(index);
446      if (existing != null) {
447        return existing;
448      }
449      L created = supplier.get();
450      existing = locks.putIfAbsent(index, created);
451      return MoreObjects.firstNonNull(existing, created);
452    }
453
454    @Override
455    public int size() {
456      return size;
457    }
458  }
459
460  /**
461   * A bit mask were all bits are set.
462   */
463  private static final int ALL_SET = ~0;
464
465  private static int ceilToPowerOfTwo(int x) {
466    return 1 << IntMath.log2(x, RoundingMode.CEILING);
467  }
468
469  /*
470   * This method was written by Doug Lea with assistance from members of JCP JSR-166 Expert Group
471   * and released to the public domain, as explained at
472   * http://creativecommons.org/licenses/publicdomain
473   *
474   * As of 2010/06/11, this method is identical to the (package private) hash method in OpenJDK 7's
475   * java.util.HashMap class.
476   */
477  // Copied from java/com/google/common/collect/Hashing.java
478  private static int smear(int hashCode) {
479    hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
480    return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
481  }
482
483  private static class PaddedLock extends ReentrantLock {
484    /*
485     * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add a fourth
486     * long here, to minimize chance of interference between consecutive locks, but I couldn't
487     * observe any benefit from that.
488     */
489    long unused1;
490    long unused2;
491    long unused3;
492
493    PaddedLock() {
494      super(false);
495    }
496  }
497
498  private static class PaddedSemaphore extends Semaphore {
499    // See PaddedReentrantLock comment
500    long unused1;
501    long unused2;
502    long unused3;
503
504    PaddedSemaphore(int permits) {
505      super(permits, false);
506    }
507  }
508}