001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.Service.State.FAILED;
023import static com.google.common.util.concurrent.Service.State.NEW;
024import static com.google.common.util.concurrent.Service.State.RUNNING;
025import static com.google.common.util.concurrent.Service.State.STARTING;
026import static com.google.common.util.concurrent.Service.State.STOPPING;
027import static com.google.common.util.concurrent.Service.State.TERMINATED;
028
029import com.google.common.annotations.Beta;
030import com.google.common.util.concurrent.ListenerCallQueue.Callback;
031import com.google.common.util.concurrent.Monitor.Guard;
032import com.google.common.util.concurrent.Service.State; // javadoc needs this
033import com.google.j2objc.annotations.WeakOuter;
034
035import java.util.ArrayList;
036import java.util.Collections;
037import java.util.List;
038import java.util.concurrent.Executor;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.TimeoutException;
041
042import javax.annotation.Nullable;
043import javax.annotation.concurrent.GuardedBy;
044import javax.annotation.concurrent.Immutable;
045
046/**
047 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
048 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
049 * callbacks. Its subclasses must manage threads manually; consider
050 * {@link AbstractExecutionThreadService} if you need only a single execution thread.
051 *
052 * @author Jesse Wilson
053 * @author Luke Sandberg
054 * @since 1.0
055 */
056@Beta
057public abstract class AbstractService implements Service {
058  private static final Callback<Listener> STARTING_CALLBACK = 
059      new Callback<Listener>("starting()") {
060        @Override void call(Listener listener) {
061          listener.starting();
062        }
063      };
064  private static final Callback<Listener> RUNNING_CALLBACK = 
065      new Callback<Listener>("running()") {
066        @Override void call(Listener listener) {
067          listener.running();
068        }
069      };
070  private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK = 
071      stoppingCallback(STARTING);
072  private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK = 
073      stoppingCallback(RUNNING);
074  
075  private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK = 
076      terminatedCallback(NEW);
077  private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK = 
078      terminatedCallback(RUNNING);
079  private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK = 
080      terminatedCallback(STOPPING);
081
082  private static Callback<Listener> terminatedCallback(final State from) {
083    return new Callback<Listener>("terminated({from = " + from + "})") {
084      @Override void call(Listener listener) {
085        listener.terminated(from);
086      }
087    };
088  }
089  
090  private static Callback<Listener> stoppingCallback(final State from) {
091    return new Callback<Listener>("stopping({from = " + from + "})") {
092      @Override void call(Listener listener) {
093        listener.stopping(from);
094      }
095    };
096  }
097  
098  private final Monitor monitor = new Monitor();
099
100  private final Guard isStartable = new IsStartableGuard();
101
102  @WeakOuter
103  private final class IsStartableGuard extends Guard {
104    IsStartableGuard() {
105      super(AbstractService.this.monitor);
106    }
107
108    @Override public boolean isSatisfied() {
109      return state() == NEW;
110    }
111  }
112
113  private final Guard isStoppable = new IsStoppableGuard();
114
115  @WeakOuter
116  private final class IsStoppableGuard extends Guard {
117    IsStoppableGuard() {
118      super(AbstractService.this.monitor);
119    }
120
121    @Override public boolean isSatisfied() {
122      return state().compareTo(RUNNING) <= 0;
123    }
124  }
125
126  private final Guard hasReachedRunning = new HasReachedRunningGuard();
127
128  @WeakOuter
129  private final class HasReachedRunningGuard extends Guard {
130    HasReachedRunningGuard() {
131      super(AbstractService.this.monitor);
132    }
133
134    @Override public boolean isSatisfied() {
135      return state().compareTo(RUNNING) >= 0;
136    }
137  }
138
139  private final Guard isStopped = new IsStoppedGuard();
140
141  @WeakOuter
142  private final class IsStoppedGuard extends Guard {
143    IsStoppedGuard() {
144      super(AbstractService.this.monitor);
145    }
146
147    @Override public boolean isSatisfied() {
148      return state().isTerminal();
149    }
150  }
151
152  /**
153   * The listeners to notify during a state transition.
154   */
155  @GuardedBy("monitor")
156  private final List<ListenerCallQueue<Listener>> listeners = 
157      Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
158  
159  /**
160   * The current state of the service.  This should be written with the lock held but can be read
161   * without it because it is an immutable object in a volatile field.  This is desirable so that
162   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
163   * without grabbing the lock.  
164   * 
165   * <p>To update this field correctly the lock must be held to guarantee that the state is 
166   * consistent.
167   */
168  @GuardedBy("monitor")
169  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
170
171  /** Constructor for use by subclasses. */
172  protected AbstractService() {}
173  
174  /**
175   * This method is called by {@link #startAsync} to initiate service startup. The invocation of 
176   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
177   * or after it has returned. If startup fails, the invocation should cause a call to
178   * {@link #notifyFailed(Throwable)} instead.
179   *
180   * <p>This method should return promptly; prefer to do work on a different thread where it is
181   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 
182   * called multiple times.
183   */
184  protected abstract void doStart();
185
186  /**
187   * This method should be used to initiate service shutdown. The invocation of this method should
188   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
189   * returned. If shutdown fails, the invocation should cause a call to
190   * {@link #notifyFailed(Throwable)} instead.
191   *
192   * <p> This method should return promptly; prefer to do work on a different thread where it is
193   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 
194   * called multiple times.
195   */
196  protected abstract void doStop();
197
198  @Override public final Service startAsync() {
199    if (monitor.enterIf(isStartable)) {
200      try {
201        snapshot = new StateSnapshot(STARTING);
202        starting();
203        doStart();
204      } catch (Throwable startupFailure) {
205        notifyFailed(startupFailure);
206      } finally {
207        monitor.leave();
208        executeListeners();
209      }
210    } else {
211      throw new IllegalStateException("Service " + this + " has already been started");
212    }
213    return this;
214  }
215
216  @Override public final Service stopAsync() {
217    if (monitor.enterIf(isStoppable)) {
218      try {
219        State previous = state();
220        switch (previous) {
221          case NEW:
222            snapshot = new StateSnapshot(TERMINATED);
223            terminated(NEW);
224            break;
225          case STARTING:
226            snapshot = new StateSnapshot(STARTING, true, null);
227            stopping(STARTING);
228            break;
229          case RUNNING:
230            snapshot = new StateSnapshot(STOPPING);
231            stopping(RUNNING);
232            doStop();
233            break;
234          case STOPPING:
235          case TERMINATED:
236          case FAILED:
237            // These cases are impossible due to the if statement above.
238            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
239          default:
240            throw new AssertionError("Unexpected state: " + previous);
241        }
242      } catch (Throwable shutdownFailure) {
243        notifyFailed(shutdownFailure);
244      } finally {
245        monitor.leave();
246        executeListeners();
247      }
248    }
249    return this;
250  }
251
252  @Override public final void awaitRunning() {
253    monitor.enterWhenUninterruptibly(hasReachedRunning);
254    try {
255      checkCurrentState(RUNNING);
256    } finally {
257      monitor.leave();
258    }
259  }
260  
261  @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
262    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
263      try {
264        checkCurrentState(RUNNING);
265      } finally {
266        monitor.leave();
267      }
268    } else {
269      // It is possible due to races the we are currently in the expected state even though we 
270      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
271      // even check the guard.  I don't think we care too much about this use case but it could lead
272      // to a confusing error message.
273      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
274    }
275  }
276
277  @Override public final void awaitTerminated() {
278    monitor.enterWhenUninterruptibly(isStopped);
279    try {
280      checkCurrentState(TERMINATED);
281    } finally {
282      monitor.leave();
283    }
284  }
285  
286  @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
287    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
288      try {
289        checkCurrentState(TERMINATED);
290      } finally {
291        monitor.leave();
292      }
293    } else {
294      // It is possible due to races the we are currently in the expected state even though we 
295      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
296      // even check the guard.  I don't think we care too much about this use case but it could lead
297      // to a confusing error message.
298      throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. "
299          + "Current state: " + state());
300    }
301  }
302  
303  /** Checks that the current state is equal to the expected state. */
304  @GuardedBy("monitor")
305  private void checkCurrentState(State expected) {
306    State actual = state();
307    if (actual != expected) {
308      if (actual == FAILED) {
309        // Handle this specially so that we can include the failureCause, if there is one.
310        throw new IllegalStateException("Expected the service to be " + expected 
311            + ", but the service has FAILED", failureCause());
312      }
313      throw new IllegalStateException("Expected the service to be " + expected + ", but was " 
314          + actual);
315    }
316  }
317
318  /**
319   * Implementing classes should invoke this method once their service has started. It will cause
320   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
321   *
322   * @throws IllegalStateException if the service is not {@link State#STARTING}.
323   */
324  protected final void notifyStarted() {
325    monitor.enter();
326    try {
327      // We have to examine the internal state of the snapshot here to properly handle the stop 
328      // while starting case.
329      if (snapshot.state != STARTING) {
330        IllegalStateException failure = new IllegalStateException(
331            "Cannot notifyStarted() when the service is " + snapshot.state);
332        notifyFailed(failure);
333        throw failure;
334      }
335
336      if (snapshot.shutdownWhenStartupFinishes) {
337        snapshot = new StateSnapshot(STOPPING);
338        // We don't call listeners here because we already did that when we set the 
339        // shutdownWhenStartupFinishes flag.
340        doStop();
341      } else {
342        snapshot = new StateSnapshot(RUNNING);
343        running();
344      }
345    } finally {
346      monitor.leave();
347      executeListeners();
348    }
349  }
350
351  /**
352   * Implementing classes should invoke this method once their service has stopped. It will cause
353   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
354   *
355   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
356   *         {@link State#RUNNING}.
357   */
358  protected final void notifyStopped() {
359    monitor.enter();
360    try {
361      // We check the internal state of the snapshot instead of state() directly so we don't allow
362      // notifyStopped() to be called while STARTING, even if stop() has already been called.
363      State previous = snapshot.state;
364      if (previous != STOPPING && previous != RUNNING) {
365        IllegalStateException failure = new IllegalStateException(
366            "Cannot notifyStopped() when the service is " + previous);
367        notifyFailed(failure);
368        throw failure;
369      }
370      snapshot = new StateSnapshot(TERMINATED);
371      terminated(previous);
372    } finally {
373      monitor.leave();
374      executeListeners();
375    }
376  }
377
378  /**
379   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
380   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
381   * or otherwise cannot be started nor stopped.
382   */
383  protected final void notifyFailed(Throwable cause) {
384    checkNotNull(cause);
385
386    monitor.enter();
387    try {
388      State previous = state();
389      switch (previous) {
390        case NEW:
391        case TERMINATED:
392          throw new IllegalStateException("Failed while in state:" + previous, cause);
393        case RUNNING:
394        case STARTING:
395        case STOPPING:
396          snapshot = new StateSnapshot(FAILED, false, cause);
397          failed(previous, cause);
398          break;
399        case FAILED:
400          // Do nothing
401          break;
402        default:
403          throw new AssertionError("Unexpected state: " + previous);
404      }
405    } finally {
406      monitor.leave();
407      executeListeners();
408    }
409  }
410
411  @Override
412  public final boolean isRunning() {
413    return state() == RUNNING;
414  }
415
416  @Override
417  public final State state() {
418    return snapshot.externalState();
419  }
420  
421  /**
422   * @since 14.0
423   */
424  @Override
425  public final Throwable failureCause() {
426    return snapshot.failureCause();
427  }
428  
429  /**
430   * @since 13.0
431   */
432  @Override
433  public final void addListener(Listener listener, Executor executor) {
434    checkNotNull(listener, "listener");
435    checkNotNull(executor, "executor");
436    monitor.enter();
437    try {
438      if (!state().isTerminal()) {
439        listeners.add(new ListenerCallQueue<Listener>(listener, executor));
440      }
441    } finally {
442      monitor.leave();
443    }
444  }
445
446  @Override public String toString() {
447    return getClass().getSimpleName() + " [" + state() + "]";
448  }
449
450  /** 
451   * Attempts to execute all the listeners in {@link #listeners} while not holding the
452   * {@link #monitor}.
453   */
454  private void executeListeners() {
455    if (!monitor.isOccupiedByCurrentThread()) {
456      // iterate by index to avoid concurrent modification exceptions
457      for (int i = 0; i < listeners.size(); i++) {
458        listeners.get(i).execute();
459      }
460    }
461  }
462
463  @GuardedBy("monitor")
464  private void starting() {
465    STARTING_CALLBACK.enqueueOn(listeners);
466  }
467
468  @GuardedBy("monitor")
469  private void running() {
470    RUNNING_CALLBACK.enqueueOn(listeners);
471  }
472
473  @GuardedBy("monitor")
474  private void stopping(final State from) {
475    if (from == State.STARTING) {
476      STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners);
477    } else if (from == State.RUNNING) {
478      STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
479    } else {
480      throw new AssertionError();
481    }
482  }
483
484  @GuardedBy("monitor")
485  private void terminated(final State from) {
486    switch(from) {
487      case NEW:
488        TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners);
489        break;
490      case RUNNING:
491        TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
492        break;
493      case STOPPING:
494        TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners);
495        break;
496      case STARTING:
497      case TERMINATED:
498      case FAILED:
499      default:
500        throw new AssertionError();
501    }
502  }
503
504  @GuardedBy("monitor")
505  private void failed(final State from, final Throwable cause) {
506    // can't memoize this one due to the exception
507    new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") {
508      @Override void call(Listener listener) {
509        listener.failed(from, cause);
510      }
511    }.enqueueOn(listeners);
512  }
513  
514  /**
515   * An immutable snapshot of the current state of the service. This class represents a consistent
516   * snapshot of the state and therefore it can be used to answer simple queries without needing to
517   * grab a lock.
518   */
519  @Immutable
520  private static final class StateSnapshot {
521    /**
522     * The internal state, which equals external state unless
523     * shutdownWhenStartupFinishes is true.
524     */
525    final State state;
526
527    /**
528     * If true, the user requested a shutdown while the service was still starting
529     * up.
530     */
531    final boolean shutdownWhenStartupFinishes;
532    
533    /**
534     * The exception that caused this service to fail.  This will be {@code null}
535     * unless the service has failed.
536     */
537    @Nullable
538    final Throwable failure;
539    
540    StateSnapshot(State internalState) {
541      this(internalState, false, null);
542    }
543    
544    StateSnapshot(
545        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
546      checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING, 
547          "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 
548          internalState);
549      checkArgument(!(failure != null ^ internalState == FAILED),
550          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
551          + "instead.", internalState, failure);
552      this.state = internalState;
553      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
554      this.failure = failure;
555    }
556    
557    /** @see Service#state() */
558    State externalState() {
559      if (shutdownWhenStartupFinishes && state == STARTING) {
560        return STOPPING;
561      } else {
562        return state;
563      }
564    }
565    
566    /** @see Service#failureCause() */
567    Throwable failureCause() {
568      checkState(state == FAILED, 
569          "failureCause() is only valid if the service has failed, service is %s", state);
570      return failure;
571    }
572  }
573}