LCOV - code coverage report
Current view: top level - server/git - WorkQueue.java (source / functions) Hit Total Coverage
Test: _coverage_report.dat Lines: 222 258 86.0 %
Date: 2022-11-19 15:00:39 Functions: 60 77 77.9 %

          Line data    Source code
       1             : // Copyright (C) 2009 The Android Open Source Project
       2             : //
       3             : // Licensed under the Apache License, Version 2.0 (the "License");
       4             : // you may not use this file except in compliance with the License.
       5             : // You may obtain a copy of the License at
       6             : //
       7             : // http://www.apache.org/licenses/LICENSE-2.0
       8             : //
       9             : // Unless required by applicable law or agreed to in writing, software
      10             : // distributed under the License is distributed on an "AS IS" BASIS,
      11             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             : // See the License for the specific language governing permissions and
      13             : // limitations under the License.
      14             : 
      15             : package com.google.gerrit.server.git;
      16             : 
      17             : import static java.util.stream.Collectors.toList;
      18             : 
      19             : import com.google.common.base.CaseFormat;
      20             : import com.google.common.flogger.FluentLogger;
      21             : import com.google.gerrit.common.Nullable;
      22             : import com.google.gerrit.entities.Project;
      23             : import com.google.gerrit.extensions.events.LifecycleListener;
      24             : import com.google.gerrit.extensions.registration.DynamicMap;
      25             : import com.google.gerrit.lifecycle.LifecycleModule;
      26             : import com.google.gerrit.metrics.Description;
      27             : import com.google.gerrit.metrics.MetricMaker;
      28             : import com.google.gerrit.server.config.GerritServerConfig;
      29             : import com.google.gerrit.server.config.ScheduleConfig.Schedule;
      30             : import com.google.gerrit.server.logging.LoggingContext;
      31             : import com.google.gerrit.server.logging.LoggingContextAwareRunnable;
      32             : import com.google.gerrit.server.plugincontext.PluginMapContext;
      33             : import com.google.gerrit.server.util.IdGenerator;
      34             : import com.google.inject.Inject;
      35             : import com.google.inject.Singleton;
      36             : import java.lang.reflect.Field;
      37             : import java.time.Instant;
      38             : import java.util.ArrayList;
      39             : import java.util.Collection;
      40             : import java.util.List;
      41             : import java.util.concurrent.Callable;
      42             : import java.util.concurrent.ConcurrentHashMap;
      43             : import java.util.concurrent.CopyOnWriteArrayList;
      44             : import java.util.concurrent.Delayed;
      45             : import java.util.concurrent.ExecutionException;
      46             : import java.util.concurrent.Executors;
      47             : import java.util.concurrent.Future;
      48             : import java.util.concurrent.RunnableScheduledFuture;
      49             : import java.util.concurrent.ScheduledExecutorService;
      50             : import java.util.concurrent.ScheduledFuture;
      51             : import java.util.concurrent.ScheduledThreadPoolExecutor;
      52             : import java.util.concurrent.ThreadFactory;
      53             : import java.util.concurrent.TimeUnit;
      54             : import java.util.concurrent.TimeoutException;
      55             : import java.util.concurrent.atomic.AtomicInteger;
      56             : import java.util.concurrent.atomic.AtomicReference;
      57             : import org.eclipse.jgit.lib.Config;
      58             : 
      59             : /** Delayed execution of tasks using a background thread pool. */
      60             : @Singleton
      61             : public class WorkQueue {
      62         152 :   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
      63             : 
      64             :   /**
      65             :    * To register a TaskListener, which will be called directly before Tasks run, and directly after
      66             :    * they complete, bind the TaskListener like this:
      67             :    *
      68             :    * <p><code>
      69             :    *   bind(TaskListener.class)
      70             :    *       .annotatedWith(Exports.named("MyListener"))
      71             :    *       .to(MyListener.class);
      72             :    * </code>
      73             :    */
      74             :   public interface TaskListener {
      75           0 :     public static class NoOp implements TaskListener {
      76             :       @Override
      77           0 :       public void onStart(Task<?> task) {}
      78             : 
      79             :       @Override
      80           0 :       public void onStop(Task<?> task) {}
      81             :     }
      82             : 
      83             :     void onStart(Task<?> task);
      84             : 
      85             :     void onStop(Task<?> task);
      86             :   }
      87             : 
      88             :   public static class Lifecycle implements LifecycleListener {
      89             :     private final WorkQueue workQueue;
      90             : 
      91             :     @Inject
      92         151 :     Lifecycle(WorkQueue workQeueue) {
      93         151 :       this.workQueue = workQeueue;
      94         151 :     }
      95             : 
      96             :     @Override
      97         151 :     public void start() {}
      98             : 
      99             :     @Override
     100             :     public void stop() {
     101         151 :       workQueue.stop();
     102         151 :     }
     103             :   }
     104             : 
     105         152 :   public static class WorkQueueModule extends LifecycleModule {
     106             :     @Override
     107             :     protected void configure() {
     108         152 :       DynamicMap.mapOf(binder(), WorkQueue.TaskListener.class);
     109         152 :       bind(WorkQueue.class);
     110         152 :       listener().to(Lifecycle.class);
     111         152 :     }
     112             :   }
     113             : 
     114             :   private final ScheduledExecutorService defaultQueue;
     115             :   private final IdGenerator idGenerator;
     116             :   private final MetricMaker metrics;
     117             :   private final CopyOnWriteArrayList<Executor> queues;
     118             :   private final PluginMapContext<TaskListener> listeners;
     119             : 
     120             :   @Inject
     121             :   WorkQueue(
     122             :       IdGenerator idGenerator,
     123             :       @GerritServerConfig Config cfg,
     124             :       MetricMaker metrics,
     125             :       PluginMapContext<TaskListener> listeners) {
     126         152 :     this(
     127             :         idGenerator,
     128         152 :         Math.max(cfg.getInt("execution", "defaultThreadPoolSize", 2), 2),
     129             :         metrics,
     130             :         listeners);
     131         152 :   }
     132             : 
     133             :   /** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */
     134             :   public WorkQueue(
     135             :       IdGenerator idGenerator,
     136             :       int defaultThreadPoolSize,
     137             :       MetricMaker metrics,
     138         152 :       PluginMapContext<TaskListener> listeners) {
     139         152 :     this.idGenerator = idGenerator;
     140         152 :     this.metrics = metrics;
     141         152 :     this.queues = new CopyOnWriteArrayList<>();
     142         152 :     this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue", true);
     143         152 :     this.listeners = listeners;
     144         152 :   }
     145             : 
     146             :   /** Get the default work queue, for miscellaneous tasks. */
     147             :   public ScheduledExecutorService getDefaultQueue() {
     148         138 :     return defaultQueue;
     149             :   }
     150             : 
     151             :   /**
     152             :    * Create a new executor queue.
     153             :    *
     154             :    * <p>Creates a new executor queue without associated metrics. This method is suitable for use by
     155             :    * plugins.
     156             :    *
     157             :    * <p>If metrics are needed, use {@link #createQueue(int, String, int, boolean)} instead.
     158             :    *
     159             :    * @param poolsize the size of the pool.
     160             :    * @param queueName the name of the queue.
     161             :    */
     162             :   public ScheduledExecutorService createQueue(int poolsize, String queueName) {
     163         146 :     return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, false);
     164             :   }
     165             : 
     166             :   /**
     167             :    * Create a new executor queue, with default priority, optionally with metrics.
     168             :    *
     169             :    * <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
     170             :    * requested for queues created by plugins.
     171             :    *
     172             :    * @param poolsize the size of the pool.
     173             :    * @param queueName the name of the queue.
     174             :    * @param withMetrics whether to create metrics.
     175             :    */
     176             :   public ScheduledThreadPoolExecutor createQueue(
     177             :       int poolsize, String queueName, boolean withMetrics) {
     178         152 :     return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, withMetrics);
     179             :   }
     180             : 
     181             :   /**
     182             :    * Create a new executor queue, optionally with metrics.
     183             :    *
     184             :    * <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
     185             :    * requested for queues created by plugins.
     186             :    *
     187             :    * @param poolsize the size of the pool.
     188             :    * @param queueName the name of the queue.
     189             :    * @param threadPriority thread priority.
     190             :    * @param withMetrics whether to create metrics.
     191             :    */
     192             :   @SuppressWarnings("ThreadPriorityCheck")
     193             :   public ScheduledThreadPoolExecutor createQueue(
     194             :       int poolsize, String queueName, int threadPriority, boolean withMetrics) {
     195         152 :     Executor executor = new Executor(poolsize, queueName);
     196         152 :     if (withMetrics) {
     197         152 :       logger.atInfo().log("Adding metrics for '%s' queue", queueName);
     198         152 :       executor.buildMetrics(queueName);
     199             :     }
     200         152 :     executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
     201         152 :     executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
     202         152 :     queues.add(executor);
     203         152 :     if (threadPriority != Thread.NORM_PRIORITY) {
     204          17 :       ThreadFactory parent = executor.getThreadFactory();
     205          17 :       executor.setThreadFactory(
     206             :           task -> {
     207          10 :             Thread t = parent.newThread(task);
     208          10 :             t.setPriority(threadPriority);
     209          10 :             return t;
     210             :           });
     211             :     }
     212             : 
     213         152 :     return executor;
     214             :   }
     215             : 
     216             :   /** Executes a periodic command at a fixed schedule on the default queue. */
     217             :   public void scheduleAtFixedRate(Runnable command, Schedule schedule) {
     218             :     @SuppressWarnings("unused")
     219           4 :     Future<?> possiblyIgnoredError =
     220           4 :         getDefaultQueue()
     221           4 :             .scheduleAtFixedRate(
     222           4 :                 command, schedule.initialDelay(), schedule.interval(), TimeUnit.MILLISECONDS);
     223           4 :   }
     224             : 
     225             :   /** Get all of the tasks currently scheduled in any work queue. */
     226             :   public List<Task<?>> getTasks() {
     227         132 :     final List<Task<?>> r = new ArrayList<>();
     228         132 :     for (Executor e : queues) {
     229         132 :       e.addAllTo(r);
     230         132 :     }
     231         132 :     return r;
     232             :   }
     233             : 
     234             :   public <T> List<T> getTaskInfos(TaskInfoFactory<T> factory) {
     235           3 :     List<T> taskInfos = new ArrayList<>();
     236           3 :     for (Executor exe : queues) {
     237           3 :       for (Task<?> task : exe.getTasks()) {
     238           3 :         taskInfos.add(factory.getTaskInfo(task));
     239           3 :       }
     240           3 :     }
     241           3 :     return taskInfos;
     242             :   }
     243             : 
     244             :   /** Locate a task by its unique id, null if no task matches. */
     245             :   @Nullable
     246             :   public Task<?> getTask(int id) {
     247           2 :     Task<?> result = null;
     248           2 :     for (Executor e : queues) {
     249           2 :       final Task<?> t = e.getTask(id);
     250           2 :       if (t != null) {
     251           2 :         if (result != null) {
     252             :           // Don't return the task if we have a duplicate. Lie instead.
     253           0 :           return null;
     254             :         }
     255           2 :         result = t;
     256             :       }
     257           2 :     }
     258           2 :     return result;
     259             :   }
     260             : 
     261             :   @Nullable
     262             :   public ScheduledThreadPoolExecutor getExecutor(String queueName) {
     263           0 :     for (Executor e : queues) {
     264           0 :       if (e.queueName.equals(queueName)) {
     265           0 :         return e;
     266             :       }
     267           0 :     }
     268           0 :     return null;
     269             :   }
     270             : 
     271             :   private void stop() {
     272         151 :     for (Executor p : queues) {
     273         151 :       p.shutdown();
     274             :       boolean isTerminated;
     275             :       do {
     276             :         try {
     277         151 :           isTerminated = p.awaitTermination(10, TimeUnit.SECONDS);
     278           0 :         } catch (InterruptedException ie) {
     279           0 :           isTerminated = false;
     280         151 :         }
     281         151 :       } while (!isTerminated);
     282         151 :     }
     283         151 :     queues.clear();
     284         151 :   }
     285             : 
     286             :   /** An isolated queue. */
     287             :   private class Executor extends ScheduledThreadPoolExecutor {
     288             :     private final ConcurrentHashMap<Integer, Task<?>> all;
     289             :     private final String queueName;
     290             : 
     291         152 :     Executor(int corePoolSize, final String queueName) {
     292         152 :       super(
     293             :           corePoolSize,
     294         152 :           new ThreadFactory() {
     295         152 :             private final ThreadFactory parent = Executors.defaultThreadFactory();
     296         152 :             private final AtomicInteger tid = new AtomicInteger(1);
     297             : 
     298             :             @Override
     299             :             public Thread newThread(Runnable task) {
     300         146 :               final Thread t = parent.newThread(task);
     301         146 :               t.setName(queueName + "-" + tid.getAndIncrement());
     302         146 :               t.setUncaughtExceptionHandler(WorkQueue::logUncaughtException);
     303         146 :               return t;
     304             :             }
     305             :           });
     306             : 
     307         152 :       all =
     308             :           new ConcurrentHashMap<>( //
     309             :               corePoolSize << 1, // table size
     310             :               0.75f, // load factor
     311             :               corePoolSize + 4 // concurrency level
     312             :               );
     313         152 :       this.queueName = queueName;
     314         152 :     }
     315             : 
     316             :     @Override
     317             :     public void execute(Runnable command) {
     318         146 :       super.execute(LoggingContext.copy(command));
     319         146 :     }
     320             : 
     321             :     @Override
     322             :     public <T> Future<T> submit(Callable<T> task) {
     323           0 :       return super.submit(LoggingContext.copy(task));
     324             :     }
     325             : 
     326             :     @Override
     327             :     public <T> Future<T> submit(Runnable task, T result) {
     328           0 :       return super.submit(LoggingContext.copy(task), result);
     329             :     }
     330             : 
     331             :     @Override
     332             :     public Future<?> submit(Runnable task) {
     333         103 :       return super.submit(LoggingContext.copy(task));
     334             :     }
     335             : 
     336             :     @Override
     337             :     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
     338             :         throws InterruptedException {
     339           0 :       return super.invokeAll(tasks.stream().map(LoggingContext::copy).collect(toList()));
     340             :     }
     341             : 
     342             :     @Override
     343             :     public <T> List<Future<T>> invokeAll(
     344             :         Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
     345             :         throws InterruptedException {
     346           2 :       return super.invokeAll(
     347           2 :           tasks.stream().map(LoggingContext::copy).collect(toList()), timeout, unit);
     348             :     }
     349             : 
     350             :     @Override
     351             :     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
     352             :         throws InterruptedException, ExecutionException {
     353           0 :       return super.invokeAny(tasks.stream().map(LoggingContext::copy).collect(toList()));
     354             :     }
     355             : 
     356             :     @Override
     357             :     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
     358             :         throws InterruptedException, ExecutionException, TimeoutException {
     359           0 :       return super.invokeAny(
     360           0 :           tasks.stream().map(LoggingContext::copy).collect(toList()), timeout, unit);
     361             :     }
     362             : 
     363             :     @Override
     364             :     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
     365         146 :       return super.schedule(LoggingContext.copy(command), delay, unit);
     366             :     }
     367             : 
     368             :     @Override
     369             :     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
     370           0 :       return super.schedule(LoggingContext.copy(callable), delay, unit);
     371             :     }
     372             : 
     373             :     @Override
     374             :     public ScheduledFuture<?> scheduleAtFixedRate(
     375             :         Runnable command, long initialDelay, long period, TimeUnit unit) {
     376         138 :       return super.scheduleAtFixedRate(LoggingContext.copy(command), initialDelay, period, unit);
     377             :     }
     378             : 
     379             :     @Override
     380             :     public ScheduledFuture<?> scheduleWithFixedDelay(
     381             :         Runnable command, long initialDelay, long delay, TimeUnit unit) {
     382           0 :       return super.scheduleWithFixedDelay(LoggingContext.copy(command), initialDelay, delay, unit);
     383             :     }
     384             : 
     385             :     @Override
     386             :     protected void terminated() {
     387         151 :       super.terminated();
     388         151 :       queues.remove(this);
     389         151 :     }
     390             : 
     391             :     private void buildMetrics(String queueName) {
     392         152 :       metrics.newCallbackMetric(
     393         152 :           getMetricName(queueName, "max_pool_size"),
     394             :           Long.class,
     395             :           new Description("Maximum allowed number of threads in the pool")
     396         152 :               .setGauge()
     397         152 :               .setUnit("threads"),
     398          15 :           () -> (long) getMaximumPoolSize());
     399         152 :       metrics.newCallbackMetric(
     400         152 :           getMetricName(queueName, "pool_size"),
     401             :           Long.class,
     402         152 :           new Description("Current number of threads in the pool").setGauge().setUnit("threads"),
     403          15 :           () -> (long) getPoolSize());
     404         152 :       metrics.newCallbackMetric(
     405         152 :           getMetricName(queueName, "active_threads"),
     406             :           Long.class,
     407             :           new Description("Number number of threads that are actively executing tasks")
     408         152 :               .setGauge()
     409         152 :               .setUnit("threads"),
     410          15 :           () -> (long) getActiveCount());
     411         152 :       metrics.newCallbackMetric(
     412         152 :           getMetricName(queueName, "scheduled_tasks"),
     413             :           Integer.class,
     414         152 :           new Description("Number of scheduled tasks in the queue").setGauge().setUnit("tasks"),
     415          15 :           () -> getQueue().size());
     416         152 :       metrics.newCallbackMetric(
     417         152 :           getMetricName(queueName, "total_scheduled_tasks_count"),
     418             :           Long.class,
     419             :           new Description("Total number of tasks that have been scheduled for execution")
     420         152 :               .setCumulative()
     421         152 :               .setUnit("tasks"),
     422             :           this::getTaskCount);
     423         152 :       metrics.newCallbackMetric(
     424         152 :           getMetricName(queueName, "total_completed_tasks_count"),
     425             :           Long.class,
     426             :           new Description("Total number of tasks that have completed execution")
     427         152 :               .setCumulative()
     428         152 :               .setUnit("tasks"),
     429             :           this::getCompletedTaskCount);
     430         152 :     }
     431             : 
     432             :     private String getMetricName(String queueName, String metricName) {
     433         152 :       String name =
     434         152 :           CaseFormat.UPPER_CAMEL.to(
     435         152 :               CaseFormat.LOWER_UNDERSCORE, queueName.replaceFirst("SSH", "Ssh").replace("-", ""));
     436         152 :       return metrics.sanitizeMetricName(String.format("queue/%s/%s", name, metricName));
     437             :     }
     438             : 
     439             :     @Override
     440             :     protected <V> RunnableScheduledFuture<V> decorateTask(
     441             :         Runnable runnable, RunnableScheduledFuture<V> r) {
     442         146 :       r = super.decorateTask(runnable, r);
     443             :       for (; ; ) {
     444         146 :         final int id = idGenerator.next();
     445             : 
     446             :         Task<V> task;
     447             : 
     448         146 :         if (runnable instanceof LoggingContextAwareRunnable) {
     449         146 :           runnable = ((LoggingContextAwareRunnable) runnable).unwrap();
     450             :         }
     451             : 
     452         146 :         if (runnable instanceof ProjectRunnable) {
     453          99 :           task = new ProjectTask<>((ProjectRunnable) runnable, r, this, id);
     454             :         } else {
     455         146 :           task = new Task<>(runnable, r, this, id);
     456             :         }
     457             : 
     458         146 :         if (all.putIfAbsent(task.getTaskId(), task) == null) {
     459         146 :           return task;
     460             :         }
     461           0 :       }
     462             :     }
     463             : 
     464             :     @Override
     465             :     protected <V> RunnableScheduledFuture<V> decorateTask(
     466             :         Callable<V> callable, RunnableScheduledFuture<V> task) {
     467           0 :       throw new UnsupportedOperationException("Callable not implemented");
     468             :     }
     469             : 
     470             :     void remove(Task<?> task) {
     471         146 :       all.remove(task.getTaskId(), task);
     472         146 :     }
     473             : 
     474             :     Task<?> getTask(int id) {
     475           2 :       return all.get(id);
     476             :     }
     477             : 
     478             :     void addAllTo(List<Task<?>> list) {
     479         132 :       list.addAll(all.values()); // iterator is thread safe
     480         132 :     }
     481             : 
     482             :     Collection<Task<?>> getTasks() {
     483           3 :       return all.values();
     484             :     }
     485             : 
     486             :     public void onStart(Task<?> task) {
     487         146 :       listeners.runEach(extension -> extension.getProvider().get().onStart(task));
     488         146 :     }
     489             : 
     490             :     public void onStop(Task<?> task) {
     491         146 :       listeners.runEach(extension -> extension.getProvider().get().onStop(task));
     492         146 :     }
     493             :   }
     494             : 
     495             :   private static void logUncaughtException(Thread t, Throwable e) {
     496           0 :     logger.atSevere().withCause(e).log("WorkQueue thread %s threw exception", t.getName());
     497           0 :   }
     498             : 
     499             :   /**
     500             :    * Runnable needing to know it was canceled. Note that cancel is called only in case the task is
     501             :    * not in progress already.
     502             :    */
     503             :   public interface CancelableRunnable extends Runnable {
     504             :     /** Notifies the runnable it was canceled. */
     505             :     void cancel();
     506             :   }
     507             : 
     508             :   /**
     509             :    * Base interface handles the case when task was canceled before actual execution and in case it
     510             :    * was started cancel method is not called yet the task itself will be destroyed anyway (it will
     511             :    * result in resource opening errors). This interface gives a chance to implementing classes for
     512             :    * handling such scenario and act accordingly.
     513             :    */
     514             :   public interface CanceledWhileRunning extends CancelableRunnable {
     515             :     /** Notifies the runnable it was canceled during execution. * */
     516             :     void setCanceledWhileRunning();
     517             :   }
     518             : 
     519             :   /** A wrapper around a scheduled Runnable, as maintained in the queue. */
     520             :   public static class Task<V> implements RunnableScheduledFuture<V> {
     521             :     /**
     522             :      * Summarized status of a single task.
     523             :      *
     524             :      * <p>Tasks have the following state flow:
     525             :      *
     526             :      * <ol>
     527             :      *   <li>{@link #SLEEPING}: if scheduled with a non-zero delay.
     528             :      *   <li>{@link #READY}: waiting for an available worker thread.
     529             :      *   <li>{@link #STARTING}: onStart() actively executing on a worker thread.
     530             :      *   <li>{@link #RUNNING}: actively executing on a worker thread.
     531             :      *   <li>{@link #STOPPING}: onStop() actively executing on a worker thread.
     532             :      *   <li>{@link #DONE}: finished executing, if not periodic.
     533             :      * </ol>
     534             :      */
     535         146 :     public enum State {
     536             :       // Ordered like this so ordinal matches the order we would
     537             :       // prefer to see tasks sorted in: done before running,
     538             :       // stopping before running, running before starting,
     539             :       // starting before ready, ready before sleeping.
     540             :       //
     541         146 :       DONE,
     542         146 :       CANCELLED,
     543         146 :       STOPPING,
     544         146 :       RUNNING,
     545         146 :       STARTING,
     546         146 :       READY,
     547         146 :       SLEEPING,
     548         146 :       OTHER
     549             :     }
     550             : 
     551             :     private final Runnable runnable;
     552             :     private final RunnableScheduledFuture<V> task;
     553             :     private final Executor executor;
     554             :     private final int taskId;
     555             :     private final Instant startTime;
     556             : 
     557             :     // runningState is non-null when listener or task code is running in an executor thread
     558         146 :     private final AtomicReference<State> runningState = new AtomicReference<>();
     559             : 
     560         146 :     Task(Runnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) {
     561         146 :       this.runnable = runnable;
     562         146 :       this.task = task;
     563         146 :       this.executor = executor;
     564         146 :       this.taskId = taskId;
     565         146 :       this.startTime = Instant.now();
     566         146 :     }
     567             : 
     568             :     public int getTaskId() {
     569         146 :       return taskId;
     570             :     }
     571             : 
     572             :     public State getState() {
     573           4 :       if (isCancelled()) {
     574           0 :         return State.CANCELLED;
     575             :       }
     576             : 
     577           4 :       State r = runningState.get();
     578           4 :       if (r != null) {
     579           1 :         return r;
     580           3 :       } else if (isDone() && !isPeriodic()) {
     581           0 :         return State.DONE;
     582             :       }
     583             : 
     584           3 :       final long delay = getDelay(TimeUnit.MILLISECONDS);
     585           3 :       if (delay <= 0) {
     586           0 :         return State.READY;
     587             :       }
     588           3 :       return State.SLEEPING;
     589             :     }
     590             : 
     591             :     public Instant getStartTime() {
     592           3 :       return startTime;
     593             :     }
     594             : 
     595             :     public String getQueueName() {
     596           3 :       return executor.queueName;
     597             :     }
     598             : 
     599             :     @Override
     600             :     public boolean cancel(boolean mayInterruptIfRunning) {
     601         138 :       if (task.cancel(mayInterruptIfRunning)) {
     602             :         // Tiny abuse of runningState: if the task needs to know it
     603             :         // was canceled (to clean up resources) and it hasn't started
     604             :         // yet the task's run method won't execute. So we tag it
     605             :         // as running and allow it to clean up. This ensures we do
     606             :         // not invoke cancel twice.
     607             :         //
     608         138 :         if (runnable instanceof CancelableRunnable) {
     609           7 :           if (runningState.compareAndSet(null, State.RUNNING)) {
     610           0 :             ((CancelableRunnable) runnable).cancel();
     611           7 :           } else if (runnable instanceof CanceledWhileRunning) {
     612           0 :             ((CanceledWhileRunning) runnable).setCanceledWhileRunning();
     613             :           }
     614             :         }
     615         138 :         if (runnable instanceof Future<?>) {
     616             :           // Creating new futures eventually passes through
     617             :           // AbstractExecutorService#schedule, which will convert the Guava
     618             :           // Future to a Runnable, thereby making it impossible for the
     619             :           // cancellation to propagate from ScheduledThreadPool's task back to
     620             :           // the Guava future, so kludge it here.
     621           0 :           ((Future<?>) runnable).cancel(mayInterruptIfRunning);
     622             :         }
     623             : 
     624         138 :         executor.remove(this);
     625         138 :         executor.purge();
     626         138 :         return true;
     627             :       }
     628           9 :       return false;
     629             :     }
     630             : 
     631             :     @Override
     632             :     public int compareTo(Delayed o) {
     633          84 :       return task.compareTo(o);
     634             :     }
     635             : 
     636             :     @Override
     637             :     public V get() throws InterruptedException, ExecutionException {
     638           1 :       return task.get();
     639             :     }
     640             : 
     641             :     @Override
     642             :     public V get(long timeout, TimeUnit unit)
     643             :         throws InterruptedException, ExecutionException, TimeoutException {
     644          96 :       return task.get(timeout, unit);
     645             :     }
     646             : 
     647             :     @Override
     648             :     public long getDelay(TimeUnit unit) {
     649         146 :       return task.getDelay(unit);
     650             :     }
     651             : 
     652             :     @Override
     653             :     public boolean isCancelled() {
     654           8 :       return task.isCancelled();
     655             :     }
     656             : 
     657             :     @Override
     658             :     public boolean isDone() {
     659          99 :       return task.isDone();
     660             :     }
     661             : 
     662             :     @Override
     663             :     public boolean isPeriodic() {
     664         146 :       return task.isPeriodic();
     665             :     }
     666             : 
     667             :     @Override
     668             :     public void run() {
     669         146 :       if (runningState.compareAndSet(null, State.STARTING)) {
     670         146 :         String oldThreadName = Thread.currentThread().getName();
     671             :         try {
     672         146 :           executor.onStart(this);
     673         146 :           runningState.set(State.RUNNING);
     674         146 :           Thread.currentThread().setName(oldThreadName + "[" + task.toString() + "]");
     675         146 :           task.run();
     676             :         } finally {
     677         146 :           Thread.currentThread().setName(oldThreadName);
     678         146 :           runningState.set(State.STOPPING);
     679         146 :           executor.onStop(this);
     680         146 :           if (isPeriodic()) {
     681           0 :             runningState.set(null);
     682             :           } else {
     683         146 :             runningState.set(State.DONE);
     684         146 :             executor.remove(this);
     685             :           }
     686             :         }
     687             :       }
     688         146 :     }
     689             : 
     690             :     @Override
     691             :     public String toString() {
     692             :       // This is a workaround to be able to print a proper name when the task
     693             :       // is wrapped into a TrustedListenableFutureTask.
     694             :       try {
     695         132 :         if (runnable
     696         132 :             .getClass()
     697         132 :             .isAssignableFrom(
     698         132 :                 Class.forName("com.google.common.util.concurrent.TrustedListenableFutureTask"))) {
     699          28 :           Class<?> trustedFutureInterruptibleTask =
     700          28 :               Class.forName(
     701             :                   "com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask");
     702          28 :           for (Field field : runnable.getClass().getDeclaredFields()) {
     703          28 :             if (field.getType().isAssignableFrom(trustedFutureInterruptibleTask)) {
     704          28 :               field.setAccessible(true);
     705          28 :               Object innerObj = field.get(runnable);
     706          28 :               if (innerObj != null) {
     707          28 :                 for (Field innerField : innerObj.getClass().getDeclaredFields()) {
     708          28 :                   if (innerField.getType().isAssignableFrom(Callable.class)) {
     709          28 :                     innerField.setAccessible(true);
     710          28 :                     return innerField.get(innerObj).toString();
     711             :                   }
     712             :                 }
     713             :               }
     714             :             }
     715             :           }
     716             :         }
     717           0 :       } catch (ClassNotFoundException | IllegalArgumentException | IllegalAccessException e) {
     718           0 :         logger.atFine().log(
     719           0 :             "Cannot get a proper name for TrustedListenableFutureTask: %s", e.getMessage());
     720         132 :       }
     721         132 :       return runnable.toString();
     722             :     }
     723             :   }
     724             : 
     725             :   /**
     726             :    * Same as Task class, but with a reference to ProjectRunnable, used to retrieve the project name
     727             :    * from the operation queued
     728             :    */
     729             :   public static class ProjectTask<V> extends Task<V> implements ProjectRunnable {
     730             : 
     731             :     private final ProjectRunnable runnable;
     732             : 
     733             :     ProjectTask(
     734             :         ProjectRunnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) {
     735          99 :       super(runnable, task, executor, taskId);
     736          99 :       this.runnable = runnable;
     737          99 :     }
     738             : 
     739             :     @Override
     740             :     public Project.NameKey getProjectNameKey() {
     741           0 :       return runnable.getProjectNameKey();
     742             :     }
     743             : 
     744             :     @Override
     745             :     public String getRemoteName() {
     746           0 :       return runnable.getRemoteName();
     747             :     }
     748             : 
     749             :     @Override
     750             :     public boolean hasCustomizedPrint() {
     751           0 :       return runnable.hasCustomizedPrint();
     752             :     }
     753             : 
     754             :     @Override
     755             :     public String toString() {
     756          10 :       return runnable.toString();
     757             :     }
     758             :   }
     759             : }

Generated by: LCOV version 1.16+git.20220603.dfeb750