LCOV - code coverage report
Current view: top level - server/git - MultiProgressMonitor.java (source / functions) Hit Total Coverage
Test: _coverage_report.dat Lines: 168 201 83.6 %
Date: 2022-11-19 15:00:39 Functions: 30 34 88.2 %

          Line data    Source code
       1             : // Copyright (C) 2012 The Android Open Source Project
       2             : //
       3             : // Licensed under the Apache License, Version 2.0 (the "License");
       4             : // you may not use this file except in compliance with the License.
       5             : // You may obtain a copy of the License at
       6             : //
       7             : // http://www.apache.org/licenses/LICENSE-2.0
       8             : //
       9             : // Unless required by applicable law or agreed to in writing, software
      10             : // distributed under the License is distributed on an "AS IS" BASIS,
      11             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             : // See the License for the specific language governing permissions and
      13             : // limitations under the License.
      14             : 
      15             : package com.google.gerrit.server.git;
      16             : 
      17             : import static com.google.gerrit.server.DeadlineChecker.getTimeoutFormatter;
      18             : import static java.util.concurrent.TimeUnit.MILLISECONDS;
      19             : import static java.util.concurrent.TimeUnit.MINUTES;
      20             : import static java.util.concurrent.TimeUnit.NANOSECONDS;
      21             : 
      22             : import com.google.common.base.CharMatcher;
      23             : import com.google.common.base.Strings;
      24             : import com.google.common.base.Ticker;
      25             : import com.google.common.flogger.FluentLogger;
      26             : import com.google.common.util.concurrent.UncheckedExecutionException;
      27             : import com.google.gerrit.server.CancellationMetrics;
      28             : import com.google.gerrit.server.cancellation.RequestStateProvider;
      29             : import com.google.inject.assistedinject.Assisted;
      30             : import com.google.inject.assistedinject.AssistedInject;
      31             : import java.io.IOException;
      32             : import java.io.OutputStream;
      33             : import java.util.List;
      34             : import java.util.Optional;
      35             : import java.util.concurrent.CancellationException;
      36             : import java.util.concurrent.CopyOnWriteArrayList;
      37             : import java.util.concurrent.ExecutionException;
      38             : import java.util.concurrent.Future;
      39             : import java.util.concurrent.TimeUnit;
      40             : import java.util.concurrent.TimeoutException;
      41             : import java.util.concurrent.atomic.AtomicBoolean;
      42             : import java.util.concurrent.atomic.AtomicInteger;
      43             : import org.eclipse.jgit.lib.Constants;
      44             : import org.eclipse.jgit.lib.ProgressMonitor;
      45             : 
      46             : /**
      47             :  * Progress reporting interface that multiplexes multiple sub-tasks.
      48             :  *
      49             :  * <p>Output is of the format:
      50             :  *
      51             :  * <pre>
      52             :  *   Task: subA: 1, subB: 75% (3/4) (-)\r
      53             :  *   Task: subA: 2, subB: 75% (3/4), subC: 1 (\)\r
      54             :  *   Task: subA: 2, subB: 100% (4/4), subC: 1 (|)\r
      55             :  *   Task: subA: 4, subB: 100% (4/4), subC: 4, done    \n
      56             :  * </pre>
      57             :  *
      58             :  * <p>Callers should try to keep task and sub-task descriptions short, since the output should fit
      59             :  * on one terminal line. (Note that git clients do not accept terminal control characters, so true
      60             :  * multi-line progress messages would be impossible.)
      61             :  *
      62             :  * <p>Whether the client is disconnected or the deadline is exceeded can be checked by {@link
      63             :  * #checkIfCancelled(RequestStateProvider.OnCancelled)}. This allows the worker thread to react to
      64             :  * cancellations and abort its execution and finish gracefully. After a cancellation has been
      65             :  * signaled the worker thread has 10 * {@link #maxIntervalNanos} to react to the cancellation and
      66             :  * finish gracefully. If the worker thread doesn't finish gracefully in time after the cancellation
      67             :  * has been signaled, the future executing the task is forcefully cancelled which means that the
      68             :  * worker thread gets interrupted and an internal error is returned to the client. To react to
      69             :  * cancellations it is recommended that the task opens a {@link
      70             :  * com.google.gerrit.server.cancellation.RequestStateContext} in a try-with-resources block to
      71             :  * register the {@link MultiProgressMonitor} as a {@link RequestStateProvider}. This way the worker
      72             :  * thread gets aborted by a {@link com.google.gerrit.server.cancellation.RequestCancelledException}
      73             :  * when the request is cancelled which allows the worker thread to handle the cancellation
      74             :  * gracefully by catching this exception (e.g. to return a proper error message). {@link
      75             :  * com.google.gerrit.server.cancellation.RequestCancelledException} is only thrown when the worker
      76             :  * thread checks for cancellation via {@link
      77             :  * com.google.gerrit.server.cancellation.RequestStateContext#abortIfCancelled()}. E.g. this is done
      78             :  * whenever {@link com.google.gerrit.server.logging.TraceContext.TraceTimer} is opened/closed.
      79             :  */
      80             : public class MultiProgressMonitor implements RequestStateProvider {
      81         105 :   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
      82             : 
      83             :   /** Constant indicating the total work units cannot be predicted. */
      84             :   public static final int UNKNOWN = 0;
      85             : 
      86         105 :   private static final char[] SPINNER_STATES = new char[] {'-', '\\', '|', '/'};
      87             :   private static final char NO_SPINNER = ' ';
      88             : 
      89         152 :   public enum TaskKind {
      90         152 :     INDEXING,
      91         152 :     RECEIVE_COMMITS;
      92             :   }
      93             : 
      94             :   /** Handle for a sub-task. */
      95             :   public class Task implements ProgressMonitor {
      96             :     private final String name;
      97             :     private final int total;
      98             :     private int count;
      99             :     private int lastPercent;
     100             : 
     101         105 :     Task(String subTaskName, int totalWork) {
     102         105 :       this.name = subTaskName;
     103         105 :       this.total = totalWork;
     104         105 :     }
     105             : 
     106             :     /**
     107             :      * Indicate that work has been completed on this sub-task.
     108             :      *
     109             :      * <p>Must be called from a worker thread.
     110             :      *
     111             :      * @param completed number of work units completed.
     112             :      */
     113             :     @Override
     114             :     public void update(int completed) {
     115         105 :       boolean w = false;
     116         105 :       synchronized (MultiProgressMonitor.this) {
     117         105 :         count += completed;
     118         105 :         if (total != UNKNOWN) {
     119          15 :           int percent = count * 100 / total;
     120          15 :           if (percent > lastPercent) {
     121          15 :             lastPercent = percent;
     122          15 :             w = true;
     123             :           }
     124             :         }
     125         105 :       }
     126         105 :       if (w) {
     127          15 :         wakeUp();
     128             :       }
     129         105 :     }
     130             : 
     131             :     /**
     132             :      * Indicate that this sub-task is finished.
     133             :      *
     134             :      * <p>Must be called from a worker thread.
     135             :      */
     136             :     public void end() {
     137          96 :       if (total == UNKNOWN && getCount() > 0) {
     138          96 :         wakeUp();
     139             :       }
     140          96 :     }
     141             : 
     142             :     @Override
     143           0 :     public void start(int totalTasks) {}
     144             : 
     145             :     @Override
     146           0 :     public void beginTask(String title, int totalWork) {}
     147             : 
     148             :     @Override
     149          15 :     public void endTask() {}
     150             : 
     151             :     @Override
     152             :     public boolean isCancelled() {
     153           0 :       return false;
     154             :     }
     155             : 
     156             :     public int getCount() {
     157         105 :       synchronized (MultiProgressMonitor.this) {
     158         105 :         return count;
     159             :       }
     160             :     }
     161             : 
     162             :     public int getTotal() {
     163         105 :       return total;
     164             :     }
     165             : 
     166             :     public String getName() {
     167           0 :       return name;
     168             :     }
     169             : 
     170             :     public String getTotalDisplay(int total) {
     171          15 :       return String.valueOf(total);
     172             :     }
     173             :   }
     174             : 
     175             :   /** Handle for a sub-task whose total work can be updated while the task is in progress. */
     176             :   public class VolatileTask extends Task {
     177             :     protected AtomicInteger volatileTotal;
     178          15 :     protected AtomicBoolean isTotalFinalized = new AtomicBoolean(false);
     179             : 
     180          15 :     public VolatileTask(String subTaskName) {
     181          15 :       super(subTaskName, UNKNOWN);
     182          15 :       volatileTotal = new AtomicInteger(UNKNOWN);
     183          15 :     }
     184             : 
     185             :     /**
     186             :      * Update the total work for this sub-task.
     187             :      *
     188             :      * <p>Intended to be called from a worker thread.
     189             :      *
     190             :      * @param workUnits number of work units to be added to existing total work.
     191             :      */
     192             :     public void updateTotal(int workUnits) {
     193           1 :       if (!isTotalFinalized.get()) {
     194           1 :         volatileTotal.addAndGet(workUnits);
     195             :       } else {
     196           0 :         logger.atWarning().log(
     197           0 :             "Total work has been finalized on sub-task %s and cannot be updated", getName());
     198             :       }
     199           1 :     }
     200             : 
     201             :     /**
     202             :      * Mark the total on this sub-task as unmodifiable.
     203             :      *
     204             :      * <p>Intended to be called from a worker thread.
     205             :      */
     206             :     public void finalizeTotal() {
     207          15 :       isTotalFinalized.set(true);
     208          15 :     }
     209             : 
     210             :     @Override
     211             :     public int getTotal() {
     212          15 :       return volatileTotal.get();
     213             :     }
     214             : 
     215             :     @Override
     216             :     public String getTotalDisplay(int total) {
     217           1 :       return super.getTotalDisplay(total) + (isTotalFinalized.get() ? "" : "+");
     218             :     }
     219             :   }
     220             : 
     221             :   public interface Factory {
     222             :     MultiProgressMonitor create(OutputStream out, TaskKind taskKind, String taskName);
     223             : 
     224             :     MultiProgressMonitor create(
     225             :         OutputStream out,
     226             :         TaskKind taskKind,
     227             :         String taskName,
     228             :         long maxIntervalTime,
     229             :         TimeUnit maxIntervalUnit);
     230             :   }
     231             : 
     232             :   private final CancellationMetrics cancellationMetrics;
     233             :   private final OutputStream out;
     234             :   private final TaskKind taskKind;
     235             :   private final String taskName;
     236         105 :   private final List<Task> tasks = new CopyOnWriteArrayList<>();
     237             :   private int spinnerIndex;
     238         105 :   private char spinnerState = NO_SPINNER;
     239             :   private boolean done;
     240             :   private boolean clientDisconnected;
     241             :   private boolean deadlineExceeded;
     242             :   private boolean forcefulTermination;
     243         105 :   private Optional<Long> timeout = Optional.empty();
     244             : 
     245             :   private final long maxIntervalNanos;
     246             :   private final Ticker ticker;
     247             : 
     248             :   /**
     249             :    * Create a new progress monitor for multiple sub-tasks.
     250             :    *
     251             :    * @param out stream for writing progress messages.
     252             :    * @param taskName name of the overall task.
     253             :    */
     254             :   @SuppressWarnings("UnusedMethod")
     255             :   @AssistedInject
     256             :   private MultiProgressMonitor(
     257             :       CancellationMetrics cancellationMetrics,
     258             :       Ticker ticker,
     259             :       @Assisted OutputStream out,
     260             :       @Assisted TaskKind taskKind,
     261             :       @Assisted String taskName) {
     262         105 :     this(cancellationMetrics, ticker, out, taskKind, taskName, 500, MILLISECONDS);
     263         105 :   }
     264             : 
     265             :   /**
     266             :    * Create a new progress monitor for multiple sub-tasks.
     267             :    *
     268             :    * @param out stream for writing progress messages.
     269             :    * @param taskName name of the overall task.
     270             :    * @param maxIntervalTime maximum interval between progress messages.
     271             :    * @param maxIntervalUnit time unit for progress interval.
     272             :    */
     273             :   @AssistedInject
     274             :   private MultiProgressMonitor(
     275             :       CancellationMetrics cancellationMetrics,
     276             :       Ticker ticker,
     277             :       @Assisted OutputStream out,
     278             :       @Assisted TaskKind taskKind,
     279             :       @Assisted String taskName,
     280             :       @Assisted long maxIntervalTime,
     281         105 :       @Assisted TimeUnit maxIntervalUnit) {
     282         105 :     this.cancellationMetrics = cancellationMetrics;
     283         105 :     this.ticker = ticker;
     284         105 :     this.out = out;
     285         105 :     this.taskKind = taskKind;
     286         105 :     this.taskName = taskName;
     287         105 :     maxIntervalNanos = NANOSECONDS.convert(maxIntervalTime, maxIntervalUnit);
     288         105 :   }
     289             : 
     290             :   /**
     291             :    * Wait for a task managed by a {@link Future}, with no timeout.
     292             :    *
     293             :    * @see #waitFor(Future, long, TimeUnit, long, TimeUnit)
     294             :    */
     295             :   public <T> T waitFor(Future<T> workerFuture) {
     296             :     try {
     297          15 :       return waitFor(
     298             :           workerFuture,
     299             :           /* taskTimeoutTime= */ 0,
     300             :           /* taskTimeoutUnit= */ null,
     301             :           /* cancellationTimeoutTime= */ 0,
     302             :           /* cancellationTimeoutUnit= */ null);
     303           0 :     } catch (TimeoutException e) {
     304           0 :       throw new IllegalStateException("timout exception without setting a timeout", e);
     305             :     }
     306             :   }
     307             : 
     308             :   /**
     309             :    * Wait for a task managed by a {@link Future}.
     310             :    *
     311             :    * <p>Must be called from the main thread, <em>not</em> a worker thread. Once a worker thread
     312             :    * calls {@link #end()}, the future has an additional {@code maxInterval} to finish before it is
     313             :    * forcefully cancelled and {@link ExecutionException} is thrown.
     314             :    *
     315             :    * @see #waitForNonFinalTask(Future, long, TimeUnit, long, TimeUnit)
     316             :    * @param workerFuture a future that returns when worker threads are finished.
     317             :    * @param taskTimeoutTime overall timeout for the task; the future gets a cancellation signal
     318             :    *     after this timeout is exceeded; non-positive values indicate no timeout.
     319             :    * @param taskTimeoutUnit unit for overall task timeout.
     320             :    * @param cancellationTimeoutTime timeout for the task to react to the cancellation signal; if the
     321             :    *     task doesn't terminate within this time it is forcefully cancelled; non-positive values
     322             :    *     indicate no timeout.
     323             :    * @param cancellationTimeoutUnit unit for the cancellation timeout.
     324             :    * @throws TimeoutException if this thread or a worker thread was interrupted, the worker was
     325             :    *     cancelled, or timed out waiting for a worker to call {@link #end()}.
     326             :    */
     327             :   public <T> T waitFor(
     328             :       Future<T> workerFuture,
     329             :       long taskTimeoutTime,
     330             :       TimeUnit taskTimeoutUnit,
     331             :       long cancellationTimeoutTime,
     332             :       TimeUnit cancellationTimeoutUnit)
     333             :       throws TimeoutException {
     334         105 :     T t =
     335         105 :         waitForNonFinalTask(
     336             :             workerFuture,
     337             :             taskTimeoutTime,
     338             :             taskTimeoutUnit,
     339             :             cancellationTimeoutTime,
     340             :             cancellationTimeoutUnit);
     341         105 :     synchronized (this) {
     342         105 :       if (!done) {
     343             :         // The worker may not have called end() explicitly, which is likely a
     344             :         // programming error.
     345           3 :         logger.atWarning().log("MultiProgressMonitor worker did not call end() before returning");
     346           3 :         end();
     347             :       }
     348         105 :     }
     349         105 :     sendDone();
     350         105 :     return t;
     351             :   }
     352             : 
     353             :   /**
     354             :    * Wait for a non-final task managed by a {@link Future}, with no timeout.
     355             :    *
     356             :    * @see #waitForNonFinalTask(Future, long, TimeUnit, long, TimeUnit)
     357             :    */
     358             :   public <T> T waitForNonFinalTask(Future<T> workerFuture) {
     359             :     try {
     360          15 :       return waitForNonFinalTask(workerFuture, 0, null, 0, null);
     361           0 :     } catch (TimeoutException e) {
     362           0 :       throw new IllegalStateException("timout exception without setting a timeout", e);
     363             :     }
     364             :   }
     365             : 
     366             :   /**
     367             :    * Wait for a task managed by a {@link Future}. This call does not expect the worker thread to
     368             :    * call {@link #end()}. It is intended to be used to track a non-final task.
     369             :    *
     370             :    * @param workerFuture a future that returns when worker threads are finished.
     371             :    * @param taskTimeoutTime overall timeout for the task; the future is forcefully cancelled if the
     372             :    *     task exceeds the timeout. Non-positive values indicate no timeout.
     373             :    * @param taskTimeoutUnit unit for overall task timeout.
     374             :    * @param cancellationTimeoutTime timeout for the task to react to the cancellation signal; if the
     375             :    *     task doesn't terminate within this time it is forcefully cancelled; non-positive values
     376             :    *     indicate no timeout.
     377             :    * @param cancellationTimeoutUnit unit for the cancellation timeout.
     378             :    * @throws TimeoutException if this thread or a worker thread was interrupted, the worker was
     379             :    *     cancelled, or timed out waiting for a worker to call {@link #end()}.
     380             :    */
     381             :   public <T> T waitForNonFinalTask(
     382             :       Future<T> workerFuture,
     383             :       long taskTimeoutTime,
     384             :       TimeUnit taskTimeoutUnit,
     385             :       long cancellationTimeoutTime,
     386             :       TimeUnit cancellationTimeoutUnit)
     387             :       throws TimeoutException {
     388         105 :     long overallStart = ticker.read();
     389             :     long cancellationNanos =
     390         105 :         cancellationTimeoutTime > 0
     391          96 :             ? NANOSECONDS.convert(cancellationTimeoutTime, cancellationTimeoutUnit)
     392         105 :             : 0;
     393             :     long deadline;
     394         105 :     if (taskTimeoutTime > 0) {
     395          96 :       timeout = Optional.of(NANOSECONDS.convert(taskTimeoutTime, taskTimeoutUnit));
     396          96 :       deadline = overallStart + timeout.get();
     397             :     } else {
     398          15 :       deadline = 0;
     399             :     }
     400             : 
     401         105 :     synchronized (this) {
     402         105 :       long left = maxIntervalNanos;
     403         105 :       while (!workerFuture.isDone() && !done) {
     404         105 :         long start = ticker.read();
     405             :         try {
     406             :           // Conditions below gives better granularity for timeouts.
     407             :           // Originally, code always used fixed interval:
     408             :           // NANOSECONDS.timedWait(this, maxIntervalNanos);
     409             :           // As a result, the actual check for timeouts happened only every maxIntervalNanos
     410             :           // (default value 500ms); so even if timout was set to 1ms, the actual timeout was 500ms.
     411             :           // This is not a big issue, however it made our tests for timeouts flaky. For example,
     412             :           // some tests in the CancellationIT set timeout to 1ms and expect that server returns
     413             :           // timeout. However, server often returned OK result, because a request takes less than
     414             :           // 500ms.
     415         105 :           if (deadlineExceeded || deadline == 0) {
     416             :             // We want to set deadlineExceeded flag as earliest as possible. If it is already
     417             :             // set - there is no reason to wait less than maxIntervalNanos
     418          15 :             NANOSECONDS.timedWait(this, maxIntervalNanos);
     419          96 :           } else if (start <= deadline) {
     420             :             // if deadlineExceeded is not set, then we should wait until deadline, but no longer
     421             :             // than maxIntervalNanos (because we want to report a progress every maxIntervalNanos).
     422          96 :             NANOSECONDS.timedWait(this, Math.min(deadline - start + 1, maxIntervalNanos));
     423             :           }
     424           0 :         } catch (InterruptedException e) {
     425           0 :           throw new UncheckedExecutionException(e);
     426         105 :         }
     427             : 
     428             :         // Send an update on every wakeup (manual or spurious), but only move
     429             :         // the spinner every maxInterval.
     430         105 :         long now = ticker.read();
     431             : 
     432         105 :         if (deadline > 0 && now > deadline) {
     433           1 :           if (!deadlineExceeded) {
     434           1 :             logger.atFine().log(
     435             :                 "deadline exceeded after %sms, signaling cancellation (timeout=%sms, task=%s(%s))",
     436           1 :                 MILLISECONDS.convert(now - overallStart, NANOSECONDS),
     437           1 :                 MILLISECONDS.convert(now - deadline, NANOSECONDS),
     438             :                 taskKind,
     439             :                 taskName);
     440             :           }
     441           1 :           deadlineExceeded = true;
     442             : 
     443             :           // After setting deadlineExceeded = true give the cancellationNanos to react to the
     444             :           // cancellation and return gracefully.
     445           1 :           if (now > deadline + cancellationNanos) {
     446             :             // The worker didn't react to the cancellation, cancel it forcefully by an interrupt.
     447           0 :             workerFuture.cancel(true);
     448           0 :             forcefulTermination = true;
     449           0 :             if (workerFuture.isCancelled()) {
     450           0 :               logger.atWarning().log(
     451             :                   "MultiProgressMonitor worker killed after %sms, cancelled (timeout=%sms, task=%s(%s))",
     452           0 :                   MILLISECONDS.convert(now - overallStart, NANOSECONDS),
     453           0 :                   MILLISECONDS.convert(now - deadline, NANOSECONDS),
     454             :                   taskKind,
     455             :                   taskName);
     456           0 :               if (taskKind == TaskKind.RECEIVE_COMMITS) {
     457           0 :                 cancellationMetrics.countForcefulReceiveTimeout();
     458             :               }
     459             :             }
     460             :             break;
     461             :           }
     462             :         }
     463             : 
     464         105 :         left -= now - start;
     465         105 :         if (left <= 0) {
     466          83 :           moveSpinner();
     467          83 :           left = maxIntervalNanos;
     468             :         }
     469         105 :         sendUpdate();
     470         105 :       }
     471         105 :       if (deadlineExceeded && !forcefulTermination && taskKind == TaskKind.RECEIVE_COMMITS) {
     472           1 :         cancellationMetrics.countGracefulReceiveTimeout();
     473             :       }
     474         105 :       wakeUp();
     475         105 :     }
     476             : 
     477             :     // The loop exits as soon as the worker calls end(), but we give it another
     478             :     // 2 x maxIntervalNanos to finish up and return.
     479             :     try {
     480         105 :       return workerFuture.get(2 * maxIntervalNanos, NANOSECONDS);
     481           0 :     } catch (InterruptedException | CancellationException e) {
     482           0 :       logger.atWarning().withCause(e).log(
     483             :           "unable to finish processing (task=%s(%s))", taskKind, taskName);
     484           0 :       throw new UncheckedExecutionException(e);
     485           0 :     } catch (TimeoutException e) {
     486           0 :       workerFuture.cancel(true);
     487           0 :       throw e;
     488           0 :     } catch (ExecutionException e) {
     489           0 :       throw new UncheckedExecutionException(e);
     490             :     }
     491             :   }
     492             : 
     493             :   private synchronized void wakeUp() {
     494         105 :     notifyAll();
     495         105 :   }
     496             : 
     497             :   /**
     498             :    * Begin a sub-task.
     499             :    *
     500             :    * @param subTask sub-task name.
     501             :    * @param subTaskWork total work units in sub-task, or {@link #UNKNOWN}.
     502             :    * @return sub-task handle.
     503             :    */
     504             :   public Task beginSubTask(String subTask, int subTaskWork) {
     505         105 :     Task task = new Task(subTask, subTaskWork);
     506         105 :     tasks.add(task);
     507         105 :     return task;
     508             :   }
     509             : 
     510             :   /**
     511             :    * Begin a sub-task whose total work can be updated.
     512             :    *
     513             :    * @param subTask sub-task name.
     514             :    * @return sub-task handle.
     515             :    */
     516             :   public VolatileTask beginVolatileSubTask(String subTask) {
     517          15 :     VolatileTask task = new VolatileTask(subTask);
     518          15 :     tasks.add(task);
     519          15 :     return task;
     520             :   }
     521             : 
     522             :   /**
     523             :    * End the overall task.
     524             :    *
     525             :    * <p>Must be called from a worker thread.
     526             :    */
     527             :   public synchronized void end() {
     528         105 :     done = true;
     529         105 :     wakeUp();
     530         105 :   }
     531             : 
     532             :   private void sendDone() {
     533         105 :     spinnerState = NO_SPINNER;
     534         105 :     StringBuilder s = format();
     535         105 :     boolean any = false;
     536         105 :     for (Task t : tasks) {
     537         105 :       if (t.count != 0) {
     538         105 :         any = true;
     539         105 :         break;
     540             :       }
     541          17 :     }
     542         105 :     if (any) {
     543         105 :       s.append(",");
     544             :     }
     545         105 :     s.append(" done    \n");
     546         105 :     send(s);
     547         105 :   }
     548             : 
     549             :   private void moveSpinner() {
     550          83 :     spinnerIndex = (spinnerIndex + 1) % SPINNER_STATES.length;
     551          83 :     spinnerState = SPINNER_STATES[spinnerIndex];
     552          83 :   }
     553             : 
     554             :   private void sendUpdate() {
     555         105 :     send(format());
     556         105 :   }
     557             : 
     558             :   private StringBuilder format() {
     559         105 :     StringBuilder s = new StringBuilder().append("\r").append(taskName).append(':');
     560             : 
     561         105 :     if (!tasks.isEmpty()) {
     562         105 :       boolean first = true;
     563         105 :       for (Task t : tasks) {
     564         105 :         int count = t.getCount();
     565         105 :         int total = t.getTotal();
     566         105 :         if (count == 0) {
     567         104 :           continue;
     568             :         }
     569             : 
     570         105 :         if (!first) {
     571          89 :           s.append(',');
     572             :         } else {
     573         105 :           first = false;
     574             :         }
     575             : 
     576         105 :         s.append(' ');
     577         105 :         if (!Strings.isNullOrEmpty(t.name)) {
     578         105 :           s.append(t.name).append(": ");
     579             :         }
     580         105 :         if (total == UNKNOWN) {
     581          96 :           s.append(count);
     582             :         } else {
     583          15 :           s.append(
     584          15 :               String.format("%d%% (%d/%s)", count * 100 / total, count, t.getTotalDisplay(total)));
     585             :         }
     586         105 :       }
     587             :     }
     588             : 
     589         105 :     if (spinnerState != NO_SPINNER) {
     590             :       // Don't output a spinner until the alarm fires for the first time.
     591          83 :       s.append(" (").append(spinnerState).append(')');
     592             :     }
     593         105 :     return s;
     594             :   }
     595             : 
     596             :   private void send(StringBuilder s) {
     597         105 :     String progress = s.toString();
     598         105 :     logger.atInfo().atMostEvery(1, MINUTES).log(
     599         105 :         "%s", CharMatcher.javaIsoControl().removeFrom(progress));
     600         105 :     if (!clientDisconnected) {
     601             :       try {
     602         105 :         out.write(Constants.encode(progress));
     603         105 :         out.flush();
     604           0 :       } catch (IOException e) {
     605           0 :         logger.atWarning().withCause(e).log(
     606             :             "Sending progress to client failed. Stop sending updates for task %s(%s)",
     607             :             taskKind, taskName);
     608           0 :         clientDisconnected = true;
     609         105 :       }
     610             :     }
     611         105 :   }
     612             : 
     613             :   @Override
     614             :   public void checkIfCancelled(OnCancelled onCancelled) {
     615          96 :     if (clientDisconnected) {
     616           0 :       onCancelled.onCancel(RequestStateProvider.Reason.CLIENT_CLOSED_REQUEST, /* message= */ null);
     617          96 :     } else if (deadlineExceeded) {
     618           1 :       onCancelled.onCancel(
     619             :           RequestStateProvider.Reason.SERVER_DEADLINE_EXCEEDED,
     620             :           timeout
     621           1 :               .map(
     622           1 :                   taskKind == TaskKind.RECEIVE_COMMITS
     623           1 :                       ? getTimeoutFormatter("receive.timeout")
     624           0 :                       : getTimeoutFormatter("timeout"))
     625           1 :               .orElse(null));
     626             :     }
     627          96 :   }
     628             : }

Generated by: LCOV version 1.16+git.20220603.dfeb750