Line data Source code
1 : // Copyright (C) 2009 The Android Open Source Project
2 : //
3 : // Licensed under the Apache License, Version 2.0 (the "License");
4 : // you may not use this file except in compliance with the License.
5 : // You may obtain a copy of the License at
6 : //
7 : // http://www.apache.org/licenses/LICENSE-2.0
8 : //
9 : // Unless required by applicable law or agreed to in writing, software
10 : // distributed under the License is distributed on an "AS IS" BASIS,
11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : // See the License for the specific language governing permissions and
13 : // limitations under the License.
14 :
15 : package com.google.gerrit.server.git;
16 :
17 : import static java.util.stream.Collectors.toList;
18 :
19 : import com.google.common.base.CaseFormat;
20 : import com.google.common.flogger.FluentLogger;
21 : import com.google.gerrit.common.Nullable;
22 : import com.google.gerrit.entities.Project;
23 : import com.google.gerrit.extensions.events.LifecycleListener;
24 : import com.google.gerrit.extensions.registration.DynamicMap;
25 : import com.google.gerrit.lifecycle.LifecycleModule;
26 : import com.google.gerrit.metrics.Description;
27 : import com.google.gerrit.metrics.MetricMaker;
28 : import com.google.gerrit.server.config.GerritServerConfig;
29 : import com.google.gerrit.server.config.ScheduleConfig.Schedule;
30 : import com.google.gerrit.server.logging.LoggingContext;
31 : import com.google.gerrit.server.logging.LoggingContextAwareRunnable;
32 : import com.google.gerrit.server.plugincontext.PluginMapContext;
33 : import com.google.gerrit.server.util.IdGenerator;
34 : import com.google.inject.Inject;
35 : import com.google.inject.Singleton;
36 : import java.lang.reflect.Field;
37 : import java.time.Instant;
38 : import java.util.ArrayList;
39 : import java.util.Collection;
40 : import java.util.List;
41 : import java.util.concurrent.Callable;
42 : import java.util.concurrent.ConcurrentHashMap;
43 : import java.util.concurrent.CopyOnWriteArrayList;
44 : import java.util.concurrent.Delayed;
45 : import java.util.concurrent.ExecutionException;
46 : import java.util.concurrent.Executors;
47 : import java.util.concurrent.Future;
48 : import java.util.concurrent.RunnableScheduledFuture;
49 : import java.util.concurrent.ScheduledExecutorService;
50 : import java.util.concurrent.ScheduledFuture;
51 : import java.util.concurrent.ScheduledThreadPoolExecutor;
52 : import java.util.concurrent.ThreadFactory;
53 : import java.util.concurrent.TimeUnit;
54 : import java.util.concurrent.TimeoutException;
55 : import java.util.concurrent.atomic.AtomicInteger;
56 : import java.util.concurrent.atomic.AtomicReference;
57 : import org.eclipse.jgit.lib.Config;
58 :
59 : /** Delayed execution of tasks using a background thread pool. */
60 : @Singleton
61 : public class WorkQueue {
62 152 : private static final FluentLogger logger = FluentLogger.forEnclosingClass();
63 :
64 : /**
65 : * To register a TaskListener, which will be called directly before Tasks run, and directly after
66 : * they complete, bind the TaskListener like this:
67 : *
68 : * <p><code>
69 : * bind(TaskListener.class)
70 : * .annotatedWith(Exports.named("MyListener"))
71 : * .to(MyListener.class);
72 : * </code>
73 : */
74 : public interface TaskListener {
75 0 : public static class NoOp implements TaskListener {
76 : @Override
77 0 : public void onStart(Task<?> task) {}
78 :
79 : @Override
80 0 : public void onStop(Task<?> task) {}
81 : }
82 :
83 : void onStart(Task<?> task);
84 :
85 : void onStop(Task<?> task);
86 : }
87 :
88 : public static class Lifecycle implements LifecycleListener {
89 : private final WorkQueue workQueue;
90 :
91 : @Inject
92 151 : Lifecycle(WorkQueue workQeueue) {
93 151 : this.workQueue = workQeueue;
94 151 : }
95 :
96 : @Override
97 151 : public void start() {}
98 :
99 : @Override
100 : public void stop() {
101 151 : workQueue.stop();
102 151 : }
103 : }
104 :
105 152 : public static class WorkQueueModule extends LifecycleModule {
106 : @Override
107 : protected void configure() {
108 152 : DynamicMap.mapOf(binder(), WorkQueue.TaskListener.class);
109 152 : bind(WorkQueue.class);
110 152 : listener().to(Lifecycle.class);
111 152 : }
112 : }
113 :
114 : private final ScheduledExecutorService defaultQueue;
115 : private final IdGenerator idGenerator;
116 : private final MetricMaker metrics;
117 : private final CopyOnWriteArrayList<Executor> queues;
118 : private final PluginMapContext<TaskListener> listeners;
119 :
120 : @Inject
121 : WorkQueue(
122 : IdGenerator idGenerator,
123 : @GerritServerConfig Config cfg,
124 : MetricMaker metrics,
125 : PluginMapContext<TaskListener> listeners) {
126 152 : this(
127 : idGenerator,
128 152 : Math.max(cfg.getInt("execution", "defaultThreadPoolSize", 2), 2),
129 : metrics,
130 : listeners);
131 152 : }
132 :
133 : /** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */
134 : public WorkQueue(
135 : IdGenerator idGenerator,
136 : int defaultThreadPoolSize,
137 : MetricMaker metrics,
138 152 : PluginMapContext<TaskListener> listeners) {
139 152 : this.idGenerator = idGenerator;
140 152 : this.metrics = metrics;
141 152 : this.queues = new CopyOnWriteArrayList<>();
142 152 : this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue", true);
143 152 : this.listeners = listeners;
144 152 : }
145 :
146 : /** Get the default work queue, for miscellaneous tasks. */
147 : public ScheduledExecutorService getDefaultQueue() {
148 138 : return defaultQueue;
149 : }
150 :
151 : /**
152 : * Create a new executor queue.
153 : *
154 : * <p>Creates a new executor queue without associated metrics. This method is suitable for use by
155 : * plugins.
156 : *
157 : * <p>If metrics are needed, use {@link #createQueue(int, String, int, boolean)} instead.
158 : *
159 : * @param poolsize the size of the pool.
160 : * @param queueName the name of the queue.
161 : */
162 : public ScheduledExecutorService createQueue(int poolsize, String queueName) {
163 146 : return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, false);
164 : }
165 :
166 : /**
167 : * Create a new executor queue, with default priority, optionally with metrics.
168 : *
169 : * <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
170 : * requested for queues created by plugins.
171 : *
172 : * @param poolsize the size of the pool.
173 : * @param queueName the name of the queue.
174 : * @param withMetrics whether to create metrics.
175 : */
176 : public ScheduledThreadPoolExecutor createQueue(
177 : int poolsize, String queueName, boolean withMetrics) {
178 152 : return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, withMetrics);
179 : }
180 :
181 : /**
182 : * Create a new executor queue, optionally with metrics.
183 : *
184 : * <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
185 : * requested for queues created by plugins.
186 : *
187 : * @param poolsize the size of the pool.
188 : * @param queueName the name of the queue.
189 : * @param threadPriority thread priority.
190 : * @param withMetrics whether to create metrics.
191 : */
192 : @SuppressWarnings("ThreadPriorityCheck")
193 : public ScheduledThreadPoolExecutor createQueue(
194 : int poolsize, String queueName, int threadPriority, boolean withMetrics) {
195 152 : Executor executor = new Executor(poolsize, queueName);
196 152 : if (withMetrics) {
197 152 : logger.atInfo().log("Adding metrics for '%s' queue", queueName);
198 152 : executor.buildMetrics(queueName);
199 : }
200 152 : executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
201 152 : executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
202 152 : queues.add(executor);
203 152 : if (threadPriority != Thread.NORM_PRIORITY) {
204 17 : ThreadFactory parent = executor.getThreadFactory();
205 17 : executor.setThreadFactory(
206 : task -> {
207 10 : Thread t = parent.newThread(task);
208 10 : t.setPriority(threadPriority);
209 10 : return t;
210 : });
211 : }
212 :
213 152 : return executor;
214 : }
215 :
216 : /** Executes a periodic command at a fixed schedule on the default queue. */
217 : public void scheduleAtFixedRate(Runnable command, Schedule schedule) {
218 : @SuppressWarnings("unused")
219 4 : Future<?> possiblyIgnoredError =
220 4 : getDefaultQueue()
221 4 : .scheduleAtFixedRate(
222 4 : command, schedule.initialDelay(), schedule.interval(), TimeUnit.MILLISECONDS);
223 4 : }
224 :
225 : /** Get all of the tasks currently scheduled in any work queue. */
226 : public List<Task<?>> getTasks() {
227 132 : final List<Task<?>> r = new ArrayList<>();
228 132 : for (Executor e : queues) {
229 132 : e.addAllTo(r);
230 132 : }
231 132 : return r;
232 : }
233 :
234 : public <T> List<T> getTaskInfos(TaskInfoFactory<T> factory) {
235 3 : List<T> taskInfos = new ArrayList<>();
236 3 : for (Executor exe : queues) {
237 3 : for (Task<?> task : exe.getTasks()) {
238 3 : taskInfos.add(factory.getTaskInfo(task));
239 3 : }
240 3 : }
241 3 : return taskInfos;
242 : }
243 :
244 : /** Locate a task by its unique id, null if no task matches. */
245 : @Nullable
246 : public Task<?> getTask(int id) {
247 2 : Task<?> result = null;
248 2 : for (Executor e : queues) {
249 2 : final Task<?> t = e.getTask(id);
250 2 : if (t != null) {
251 2 : if (result != null) {
252 : // Don't return the task if we have a duplicate. Lie instead.
253 0 : return null;
254 : }
255 2 : result = t;
256 : }
257 2 : }
258 2 : return result;
259 : }
260 :
261 : @Nullable
262 : public ScheduledThreadPoolExecutor getExecutor(String queueName) {
263 0 : for (Executor e : queues) {
264 0 : if (e.queueName.equals(queueName)) {
265 0 : return e;
266 : }
267 0 : }
268 0 : return null;
269 : }
270 :
271 : private void stop() {
272 151 : for (Executor p : queues) {
273 151 : p.shutdown();
274 : boolean isTerminated;
275 : do {
276 : try {
277 151 : isTerminated = p.awaitTermination(10, TimeUnit.SECONDS);
278 0 : } catch (InterruptedException ie) {
279 0 : isTerminated = false;
280 151 : }
281 151 : } while (!isTerminated);
282 151 : }
283 151 : queues.clear();
284 151 : }
285 :
286 : /** An isolated queue. */
287 : private class Executor extends ScheduledThreadPoolExecutor {
288 : private final ConcurrentHashMap<Integer, Task<?>> all;
289 : private final String queueName;
290 :
291 152 : Executor(int corePoolSize, final String queueName) {
292 152 : super(
293 : corePoolSize,
294 152 : new ThreadFactory() {
295 152 : private final ThreadFactory parent = Executors.defaultThreadFactory();
296 152 : private final AtomicInteger tid = new AtomicInteger(1);
297 :
298 : @Override
299 : public Thread newThread(Runnable task) {
300 146 : final Thread t = parent.newThread(task);
301 146 : t.setName(queueName + "-" + tid.getAndIncrement());
302 146 : t.setUncaughtExceptionHandler(WorkQueue::logUncaughtException);
303 146 : return t;
304 : }
305 : });
306 :
307 152 : all =
308 : new ConcurrentHashMap<>( //
309 : corePoolSize << 1, // table size
310 : 0.75f, // load factor
311 : corePoolSize + 4 // concurrency level
312 : );
313 152 : this.queueName = queueName;
314 152 : }
315 :
316 : @Override
317 : public void execute(Runnable command) {
318 146 : super.execute(LoggingContext.copy(command));
319 146 : }
320 :
321 : @Override
322 : public <T> Future<T> submit(Callable<T> task) {
323 0 : return super.submit(LoggingContext.copy(task));
324 : }
325 :
326 : @Override
327 : public <T> Future<T> submit(Runnable task, T result) {
328 0 : return super.submit(LoggingContext.copy(task), result);
329 : }
330 :
331 : @Override
332 : public Future<?> submit(Runnable task) {
333 103 : return super.submit(LoggingContext.copy(task));
334 : }
335 :
336 : @Override
337 : public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
338 : throws InterruptedException {
339 0 : return super.invokeAll(tasks.stream().map(LoggingContext::copy).collect(toList()));
340 : }
341 :
342 : @Override
343 : public <T> List<Future<T>> invokeAll(
344 : Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
345 : throws InterruptedException {
346 2 : return super.invokeAll(
347 2 : tasks.stream().map(LoggingContext::copy).collect(toList()), timeout, unit);
348 : }
349 :
350 : @Override
351 : public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
352 : throws InterruptedException, ExecutionException {
353 0 : return super.invokeAny(tasks.stream().map(LoggingContext::copy).collect(toList()));
354 : }
355 :
356 : @Override
357 : public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
358 : throws InterruptedException, ExecutionException, TimeoutException {
359 0 : return super.invokeAny(
360 0 : tasks.stream().map(LoggingContext::copy).collect(toList()), timeout, unit);
361 : }
362 :
363 : @Override
364 : public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
365 146 : return super.schedule(LoggingContext.copy(command), delay, unit);
366 : }
367 :
368 : @Override
369 : public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
370 0 : return super.schedule(LoggingContext.copy(callable), delay, unit);
371 : }
372 :
373 : @Override
374 : public ScheduledFuture<?> scheduleAtFixedRate(
375 : Runnable command, long initialDelay, long period, TimeUnit unit) {
376 138 : return super.scheduleAtFixedRate(LoggingContext.copy(command), initialDelay, period, unit);
377 : }
378 :
379 : @Override
380 : public ScheduledFuture<?> scheduleWithFixedDelay(
381 : Runnable command, long initialDelay, long delay, TimeUnit unit) {
382 0 : return super.scheduleWithFixedDelay(LoggingContext.copy(command), initialDelay, delay, unit);
383 : }
384 :
385 : @Override
386 : protected void terminated() {
387 151 : super.terminated();
388 151 : queues.remove(this);
389 151 : }
390 :
391 : private void buildMetrics(String queueName) {
392 152 : metrics.newCallbackMetric(
393 152 : getMetricName(queueName, "max_pool_size"),
394 : Long.class,
395 : new Description("Maximum allowed number of threads in the pool")
396 152 : .setGauge()
397 152 : .setUnit("threads"),
398 15 : () -> (long) getMaximumPoolSize());
399 152 : metrics.newCallbackMetric(
400 152 : getMetricName(queueName, "pool_size"),
401 : Long.class,
402 152 : new Description("Current number of threads in the pool").setGauge().setUnit("threads"),
403 15 : () -> (long) getPoolSize());
404 152 : metrics.newCallbackMetric(
405 152 : getMetricName(queueName, "active_threads"),
406 : Long.class,
407 : new Description("Number number of threads that are actively executing tasks")
408 152 : .setGauge()
409 152 : .setUnit("threads"),
410 15 : () -> (long) getActiveCount());
411 152 : metrics.newCallbackMetric(
412 152 : getMetricName(queueName, "scheduled_tasks"),
413 : Integer.class,
414 152 : new Description("Number of scheduled tasks in the queue").setGauge().setUnit("tasks"),
415 15 : () -> getQueue().size());
416 152 : metrics.newCallbackMetric(
417 152 : getMetricName(queueName, "total_scheduled_tasks_count"),
418 : Long.class,
419 : new Description("Total number of tasks that have been scheduled for execution")
420 152 : .setCumulative()
421 152 : .setUnit("tasks"),
422 : this::getTaskCount);
423 152 : metrics.newCallbackMetric(
424 152 : getMetricName(queueName, "total_completed_tasks_count"),
425 : Long.class,
426 : new Description("Total number of tasks that have completed execution")
427 152 : .setCumulative()
428 152 : .setUnit("tasks"),
429 : this::getCompletedTaskCount);
430 152 : }
431 :
432 : private String getMetricName(String queueName, String metricName) {
433 152 : String name =
434 152 : CaseFormat.UPPER_CAMEL.to(
435 152 : CaseFormat.LOWER_UNDERSCORE, queueName.replaceFirst("SSH", "Ssh").replace("-", ""));
436 152 : return metrics.sanitizeMetricName(String.format("queue/%s/%s", name, metricName));
437 : }
438 :
439 : @Override
440 : protected <V> RunnableScheduledFuture<V> decorateTask(
441 : Runnable runnable, RunnableScheduledFuture<V> r) {
442 146 : r = super.decorateTask(runnable, r);
443 : for (; ; ) {
444 146 : final int id = idGenerator.next();
445 :
446 : Task<V> task;
447 :
448 146 : if (runnable instanceof LoggingContextAwareRunnable) {
449 146 : runnable = ((LoggingContextAwareRunnable) runnable).unwrap();
450 : }
451 :
452 146 : if (runnable instanceof ProjectRunnable) {
453 99 : task = new ProjectTask<>((ProjectRunnable) runnable, r, this, id);
454 : } else {
455 146 : task = new Task<>(runnable, r, this, id);
456 : }
457 :
458 146 : if (all.putIfAbsent(task.getTaskId(), task) == null) {
459 146 : return task;
460 : }
461 0 : }
462 : }
463 :
464 : @Override
465 : protected <V> RunnableScheduledFuture<V> decorateTask(
466 : Callable<V> callable, RunnableScheduledFuture<V> task) {
467 0 : throw new UnsupportedOperationException("Callable not implemented");
468 : }
469 :
470 : void remove(Task<?> task) {
471 146 : all.remove(task.getTaskId(), task);
472 146 : }
473 :
474 : Task<?> getTask(int id) {
475 2 : return all.get(id);
476 : }
477 :
478 : void addAllTo(List<Task<?>> list) {
479 132 : list.addAll(all.values()); // iterator is thread safe
480 132 : }
481 :
482 : Collection<Task<?>> getTasks() {
483 3 : return all.values();
484 : }
485 :
486 : public void onStart(Task<?> task) {
487 146 : listeners.runEach(extension -> extension.getProvider().get().onStart(task));
488 146 : }
489 :
490 : public void onStop(Task<?> task) {
491 146 : listeners.runEach(extension -> extension.getProvider().get().onStop(task));
492 146 : }
493 : }
494 :
495 : private static void logUncaughtException(Thread t, Throwable e) {
496 0 : logger.atSevere().withCause(e).log("WorkQueue thread %s threw exception", t.getName());
497 0 : }
498 :
499 : /**
500 : * Runnable needing to know it was canceled. Note that cancel is called only in case the task is
501 : * not in progress already.
502 : */
503 : public interface CancelableRunnable extends Runnable {
504 : /** Notifies the runnable it was canceled. */
505 : void cancel();
506 : }
507 :
508 : /**
509 : * Base interface handles the case when task was canceled before actual execution and in case it
510 : * was started cancel method is not called yet the task itself will be destroyed anyway (it will
511 : * result in resource opening errors). This interface gives a chance to implementing classes for
512 : * handling such scenario and act accordingly.
513 : */
514 : public interface CanceledWhileRunning extends CancelableRunnable {
515 : /** Notifies the runnable it was canceled during execution. * */
516 : void setCanceledWhileRunning();
517 : }
518 :
519 : /** A wrapper around a scheduled Runnable, as maintained in the queue. */
520 : public static class Task<V> implements RunnableScheduledFuture<V> {
521 : /**
522 : * Summarized status of a single task.
523 : *
524 : * <p>Tasks have the following state flow:
525 : *
526 : * <ol>
527 : * <li>{@link #SLEEPING}: if scheduled with a non-zero delay.
528 : * <li>{@link #READY}: waiting for an available worker thread.
529 : * <li>{@link #STARTING}: onStart() actively executing on a worker thread.
530 : * <li>{@link #RUNNING}: actively executing on a worker thread.
531 : * <li>{@link #STOPPING}: onStop() actively executing on a worker thread.
532 : * <li>{@link #DONE}: finished executing, if not periodic.
533 : * </ol>
534 : */
535 146 : public enum State {
536 : // Ordered like this so ordinal matches the order we would
537 : // prefer to see tasks sorted in: done before running,
538 : // stopping before running, running before starting,
539 : // starting before ready, ready before sleeping.
540 : //
541 146 : DONE,
542 146 : CANCELLED,
543 146 : STOPPING,
544 146 : RUNNING,
545 146 : STARTING,
546 146 : READY,
547 146 : SLEEPING,
548 146 : OTHER
549 : }
550 :
551 : private final Runnable runnable;
552 : private final RunnableScheduledFuture<V> task;
553 : private final Executor executor;
554 : private final int taskId;
555 : private final Instant startTime;
556 :
557 : // runningState is non-null when listener or task code is running in an executor thread
558 146 : private final AtomicReference<State> runningState = new AtomicReference<>();
559 :
560 146 : Task(Runnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) {
561 146 : this.runnable = runnable;
562 146 : this.task = task;
563 146 : this.executor = executor;
564 146 : this.taskId = taskId;
565 146 : this.startTime = Instant.now();
566 146 : }
567 :
568 : public int getTaskId() {
569 146 : return taskId;
570 : }
571 :
572 : public State getState() {
573 4 : if (isCancelled()) {
574 0 : return State.CANCELLED;
575 : }
576 :
577 4 : State r = runningState.get();
578 4 : if (r != null) {
579 1 : return r;
580 3 : } else if (isDone() && !isPeriodic()) {
581 0 : return State.DONE;
582 : }
583 :
584 3 : final long delay = getDelay(TimeUnit.MILLISECONDS);
585 3 : if (delay <= 0) {
586 0 : return State.READY;
587 : }
588 3 : return State.SLEEPING;
589 : }
590 :
591 : public Instant getStartTime() {
592 3 : return startTime;
593 : }
594 :
595 : public String getQueueName() {
596 3 : return executor.queueName;
597 : }
598 :
599 : @Override
600 : public boolean cancel(boolean mayInterruptIfRunning) {
601 138 : if (task.cancel(mayInterruptIfRunning)) {
602 : // Tiny abuse of runningState: if the task needs to know it
603 : // was canceled (to clean up resources) and it hasn't started
604 : // yet the task's run method won't execute. So we tag it
605 : // as running and allow it to clean up. This ensures we do
606 : // not invoke cancel twice.
607 : //
608 138 : if (runnable instanceof CancelableRunnable) {
609 7 : if (runningState.compareAndSet(null, State.RUNNING)) {
610 0 : ((CancelableRunnable) runnable).cancel();
611 7 : } else if (runnable instanceof CanceledWhileRunning) {
612 0 : ((CanceledWhileRunning) runnable).setCanceledWhileRunning();
613 : }
614 : }
615 138 : if (runnable instanceof Future<?>) {
616 : // Creating new futures eventually passes through
617 : // AbstractExecutorService#schedule, which will convert the Guava
618 : // Future to a Runnable, thereby making it impossible for the
619 : // cancellation to propagate from ScheduledThreadPool's task back to
620 : // the Guava future, so kludge it here.
621 0 : ((Future<?>) runnable).cancel(mayInterruptIfRunning);
622 : }
623 :
624 138 : executor.remove(this);
625 138 : executor.purge();
626 138 : return true;
627 : }
628 9 : return false;
629 : }
630 :
631 : @Override
632 : public int compareTo(Delayed o) {
633 84 : return task.compareTo(o);
634 : }
635 :
636 : @Override
637 : public V get() throws InterruptedException, ExecutionException {
638 1 : return task.get();
639 : }
640 :
641 : @Override
642 : public V get(long timeout, TimeUnit unit)
643 : throws InterruptedException, ExecutionException, TimeoutException {
644 96 : return task.get(timeout, unit);
645 : }
646 :
647 : @Override
648 : public long getDelay(TimeUnit unit) {
649 146 : return task.getDelay(unit);
650 : }
651 :
652 : @Override
653 : public boolean isCancelled() {
654 8 : return task.isCancelled();
655 : }
656 :
657 : @Override
658 : public boolean isDone() {
659 99 : return task.isDone();
660 : }
661 :
662 : @Override
663 : public boolean isPeriodic() {
664 146 : return task.isPeriodic();
665 : }
666 :
667 : @Override
668 : public void run() {
669 146 : if (runningState.compareAndSet(null, State.STARTING)) {
670 146 : String oldThreadName = Thread.currentThread().getName();
671 : try {
672 146 : executor.onStart(this);
673 146 : runningState.set(State.RUNNING);
674 146 : Thread.currentThread().setName(oldThreadName + "[" + task.toString() + "]");
675 146 : task.run();
676 : } finally {
677 146 : Thread.currentThread().setName(oldThreadName);
678 146 : runningState.set(State.STOPPING);
679 146 : executor.onStop(this);
680 146 : if (isPeriodic()) {
681 0 : runningState.set(null);
682 : } else {
683 146 : runningState.set(State.DONE);
684 146 : executor.remove(this);
685 : }
686 : }
687 : }
688 146 : }
689 :
690 : @Override
691 : public String toString() {
692 : // This is a workaround to be able to print a proper name when the task
693 : // is wrapped into a TrustedListenableFutureTask.
694 : try {
695 132 : if (runnable
696 132 : .getClass()
697 132 : .isAssignableFrom(
698 132 : Class.forName("com.google.common.util.concurrent.TrustedListenableFutureTask"))) {
699 28 : Class<?> trustedFutureInterruptibleTask =
700 28 : Class.forName(
701 : "com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask");
702 28 : for (Field field : runnable.getClass().getDeclaredFields()) {
703 28 : if (field.getType().isAssignableFrom(trustedFutureInterruptibleTask)) {
704 28 : field.setAccessible(true);
705 28 : Object innerObj = field.get(runnable);
706 28 : if (innerObj != null) {
707 28 : for (Field innerField : innerObj.getClass().getDeclaredFields()) {
708 28 : if (innerField.getType().isAssignableFrom(Callable.class)) {
709 28 : innerField.setAccessible(true);
710 28 : return innerField.get(innerObj).toString();
711 : }
712 : }
713 : }
714 : }
715 : }
716 : }
717 0 : } catch (ClassNotFoundException | IllegalArgumentException | IllegalAccessException e) {
718 0 : logger.atFine().log(
719 0 : "Cannot get a proper name for TrustedListenableFutureTask: %s", e.getMessage());
720 132 : }
721 132 : return runnable.toString();
722 : }
723 : }
724 :
725 : /**
726 : * Same as Task class, but with a reference to ProjectRunnable, used to retrieve the project name
727 : * from the operation queued
728 : */
729 : public static class ProjectTask<V> extends Task<V> implements ProjectRunnable {
730 :
731 : private final ProjectRunnable runnable;
732 :
733 : ProjectTask(
734 : ProjectRunnable runnable, RunnableScheduledFuture<V> task, Executor executor, int taskId) {
735 99 : super(runnable, task, executor, taskId);
736 99 : this.runnable = runnable;
737 99 : }
738 :
739 : @Override
740 : public Project.NameKey getProjectNameKey() {
741 0 : return runnable.getProjectNameKey();
742 : }
743 :
744 : @Override
745 : public String getRemoteName() {
746 0 : return runnable.getRemoteName();
747 : }
748 :
749 : @Override
750 : public boolean hasCustomizedPrint() {
751 0 : return runnable.hasCustomizedPrint();
752 : }
753 :
754 : @Override
755 : public String toString() {
756 10 : return runnable.toString();
757 : }
758 : }
759 : }
|