LCOV - code coverage report
Current view: top level - server/index/change - AllChangesIndexer.java (source / functions) Hit Total Coverage
Test: _coverage_report.dat Lines: 111 148 75.0 %
Date: 2022-11-19 15:00:39 Functions: 19 23 82.6 %

          Line data    Source code
       1             : // Copyright (C) 2013 The Android Open Source Project
       2             : //
       3             : // Licensed under the Apache License, Version 2.0 (the "License");
       4             : // you may not use this file except in compliance with the License.
       5             : // You may obtain a copy of the License at
       6             : //
       7             : // http://www.apache.org/licenses/LICENSE-2.0
       8             : //
       9             : // Unless required by applicable law or agreed to in writing, software
      10             : // distributed under the License is distributed on an "AS IS" BASIS,
      11             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             : // See the License for the specific language governing permissions and
      13             : // limitations under the License.
      14             : 
      15             : package com.google.gerrit.server.index.change;
      16             : 
      17             : import static com.google.common.util.concurrent.Futures.successfulAsList;
      18             : import static com.google.common.util.concurrent.Futures.transform;
      19             : import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
      20             : import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
      21             : 
      22             : import com.google.auto.value.AutoValue;
      23             : import com.google.common.base.Stopwatch;
      24             : import com.google.common.collect.ImmutableSortedSet;
      25             : import com.google.common.flogger.FluentLogger;
      26             : import com.google.common.util.concurrent.ListenableFuture;
      27             : import com.google.common.util.concurrent.ListeningExecutorService;
      28             : import com.google.common.util.concurrent.UncheckedExecutionException;
      29             : import com.google.gerrit.common.Nullable;
      30             : import com.google.gerrit.entities.Change;
      31             : import com.google.gerrit.entities.Project;
      32             : import com.google.gerrit.index.SiteIndexer;
      33             : import com.google.gerrit.server.git.GitRepositoryManager;
      34             : import com.google.gerrit.server.git.MultiProgressMonitor;
      35             : import com.google.gerrit.server.git.MultiProgressMonitor.Task;
      36             : import com.google.gerrit.server.git.MultiProgressMonitor.TaskKind;
      37             : import com.google.gerrit.server.git.MultiProgressMonitor.VolatileTask;
      38             : import com.google.gerrit.server.index.IndexExecutor;
      39             : import com.google.gerrit.server.index.OnlineReindexMode;
      40             : import com.google.gerrit.server.notedb.ChangeNotes;
      41             : import com.google.gerrit.server.notedb.ChangeNotes.Factory.ChangeNotesResult;
      42             : import com.google.gerrit.server.notedb.ChangeNotes.Factory.ScanResult;
      43             : import com.google.gerrit.server.project.ProjectCache;
      44             : import com.google.gerrit.server.query.change.ChangeData;
      45             : import com.google.inject.Inject;
      46             : import java.io.IOException;
      47             : import java.util.ArrayList;
      48             : import java.util.List;
      49             : import java.util.concurrent.Callable;
      50             : import java.util.concurrent.RejectedExecutionException;
      51             : import java.util.concurrent.atomic.AtomicBoolean;
      52             : import java.util.concurrent.atomic.AtomicInteger;
      53             : import org.eclipse.jgit.lib.ProgressMonitor;
      54             : import org.eclipse.jgit.lib.Repository;
      55             : 
      56             : /**
      57             :  * Implementation that can index all changes on a host or within a project. Used by Gerrit's
      58             :  * initialization and upgrade programs as well as by REST API endpoints that offer this
      59             :  * functionality.
      60             :  */
      61             : public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
      62         138 :   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
      63             :   private MultiProgressMonitor mpm;
      64             :   private VolatileTask doneTask;
      65             :   private Task failedTask;
      66             :   private static final int PROJECT_SLICE_MAX_REFS = 1000;
      67             : 
      68             :   private final MultiProgressMonitor.Factory multiProgressMonitorFactory;
      69             : 
      70             :   private static class ProjectsCollectionFailure extends Exception {
      71             :     private static final long serialVersionUID = 1L;
      72             : 
      73             :     public ProjectsCollectionFailure(String message) {
      74           0 :       super(message);
      75           0 :     }
      76             :   }
      77             : 
      78             :   private final ChangeData.Factory changeDataFactory;
      79             :   private final GitRepositoryManager repoManager;
      80             :   private final ListeningExecutorService executor;
      81             :   private final ChangeIndexer.Factory indexerFactory;
      82             :   private final ChangeNotes.Factory notesFactory;
      83             :   private final ProjectCache projectCache;
      84             : 
      85             :   @Inject
      86             :   AllChangesIndexer(
      87             :       MultiProgressMonitor.Factory multiProgressMonitorFactory,
      88             :       ChangeData.Factory changeDataFactory,
      89             :       GitRepositoryManager repoManager,
      90             :       @IndexExecutor(BATCH) ListeningExecutorService executor,
      91             :       ChangeIndexer.Factory indexerFactory,
      92             :       ChangeNotes.Factory notesFactory,
      93         138 :       ProjectCache projectCache) {
      94         138 :     this.multiProgressMonitorFactory = multiProgressMonitorFactory;
      95         138 :     this.changeDataFactory = changeDataFactory;
      96         138 :     this.repoManager = repoManager;
      97         138 :     this.executor = executor;
      98         138 :     this.indexerFactory = indexerFactory;
      99         138 :     this.notesFactory = notesFactory;
     100         138 :     this.projectCache = projectCache;
     101         138 :   }
     102             : 
     103             :   @AutoValue
     104           3 :   public abstract static class ProjectSlice {
     105             :     public abstract Project.NameKey name();
     106             : 
     107             :     public abstract int slice();
     108             : 
     109             :     public abstract int slices();
     110             : 
     111             :     public abstract ScanResult scanResult();
     112             : 
     113             :     private static ProjectSlice create(Project.NameKey name, int slice, int slices, ScanResult sr) {
     114           1 :       return new AutoValue_AllChangesIndexer_ProjectSlice(name, slice, slices, sr);
     115             :     }
     116             : 
     117             :     private static ProjectSlice oneSlice(Project.NameKey name, ScanResult sr) {
     118           2 :       return new AutoValue_AllChangesIndexer_ProjectSlice(name, 0, 1, sr);
     119             :     }
     120             :   }
     121             : 
     122             :   @Override
     123             :   public Result indexAll(ChangeIndex index) {
     124             :     // The simplest approach to distribute indexing would be to let each thread grab a project
     125             :     // and index it fully. But if a site has one big project and 100s of small projects, then
     126             :     // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
     127             :     // projects have been reindexed, and only the thread that reindexes the big project is
     128             :     // still working. The other threads would idle. Reindexing the big project on a single
     129             :     // thread becomes the critical path. Bringing in more CPUs would not speed up things.
     130             :     //
     131             :     // To avoid such situations, we split big repos into smaller parts and let
     132             :     // the thread pool index these smaller parts. This splitting introduces an overhead in the
     133             :     // workload setup and there might be additional slow-downs from multiple threads
     134             :     // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
     135             :     // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
     136             :     // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
     137             :     // in 2020.
     138             : 
     139          15 :     Stopwatch sw = Stopwatch.createStarted();
     140          15 :     AtomicBoolean ok = new AtomicBoolean(true);
     141          15 :     mpm = multiProgressMonitorFactory.create(progressOut, TaskKind.INDEXING, "Reindexing changes");
     142          15 :     doneTask = mpm.beginVolatileSubTask("changes");
     143          15 :     failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
     144             :     List<ListenableFuture<?>> futures;
     145             :     try {
     146          15 :       futures = new SliceScheduler(index, ok).schedule();
     147           0 :     } catch (ProjectsCollectionFailure e) {
     148           0 :       logger.atSevere().log("%s", e.getMessage());
     149           0 :       return Result.create(sw, false, 0, 0);
     150          15 :     }
     151             : 
     152             :     try {
     153          15 :       mpm.waitFor(
     154          15 :           transform(
     155          15 :               successfulAsList(futures),
     156             :               x -> {
     157          15 :                 mpm.end();
     158          15 :                 return null;
     159             :               },
     160          15 :               directExecutor()));
     161           0 :     } catch (UncheckedExecutionException e) {
     162           0 :       logger.atSevere().withCause(e).log("Error in batch indexer");
     163           0 :       ok.set(false);
     164          15 :     }
     165             :     // If too many changes failed, maybe there was a bug in the indexer. Don't
     166             :     // trust the results. This is not an exact percentage since we bump the same
     167             :     // failure counter if a project can't be read, but close enough.
     168          15 :     int nFailed = failedTask.getCount();
     169          15 :     int nDone = doneTask.getCount();
     170          15 :     int nTotal = nFailed + nDone;
     171          15 :     double pctFailed = ((double) nFailed) / nTotal * 100;
     172          15 :     if (pctFailed > 10) {
     173           0 :       logger.atSevere().log(
     174             :           "Failed %s/%s changes (%s%%); not marking new index as ready",
     175           0 :           nFailed, nTotal, Math.round(pctFailed));
     176           0 :       ok.set(false);
     177          15 :     } else if (nFailed > 0) {
     178           0 :       logger.atWarning().log("Failed %s/%s changes", nFailed, nTotal);
     179             :     }
     180          15 :     return Result.create(sw, ok.get(), nDone, nFailed);
     181             :   }
     182             : 
     183             :   @Nullable
     184             :   public Callable<Void> reindexProject(
     185             :       ChangeIndexer indexer, Project.NameKey project, Task done, Task failed) {
     186           2 :     try (Repository repo = repoManager.openRepository(project)) {
     187           2 :       return reindexProjectSlice(
     188             :           indexer,
     189           2 :           ProjectSlice.oneSlice(project, ChangeNotes.Factory.scanChangeIds(repo)),
     190             :           done,
     191             :           failed);
     192           0 :     } catch (IOException e) {
     193           0 :       logger.atSevere().log("%s", e.getMessage());
     194           0 :       return null;
     195             :     }
     196             :   }
     197             : 
     198             :   public Callable<Void> reindexProjectSlice(
     199             :       ChangeIndexer indexer, ProjectSlice projectSlice, Task done, Task failed) {
     200           3 :     return new ProjectSliceIndexer(indexer, projectSlice, done, failed);
     201             :   }
     202             : 
     203             :   private class ProjectSliceIndexer implements Callable<Void> {
     204             :     private final ChangeIndexer indexer;
     205             :     private final ProjectSlice projectSlice;
     206             :     private final ProgressMonitor done;
     207             :     private final ProgressMonitor failed;
     208             : 
     209             :     private ProjectSliceIndexer(
     210             :         ChangeIndexer indexer,
     211             :         ProjectSlice projectSlice,
     212             :         ProgressMonitor done,
     213           3 :         ProgressMonitor failed) {
     214           3 :       this.indexer = indexer;
     215           3 :       this.projectSlice = projectSlice;
     216           3 :       this.done = done;
     217           3 :       this.failed = failed;
     218           3 :     }
     219             : 
     220             :     @Override
     221             :     public Void call() throws Exception {
     222           3 :       OnlineReindexMode.begin();
     223             :       // Order of scanning changes is undefined. This is ok if we assume that packfile locality is
     224             :       // not important for indexing, since sites should have a fully populated DiffSummary cache.
     225             :       // It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
     226             :       // but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
     227             :       // we don't have concrete proof that improving packfile locality would help.
     228           3 :       notesFactory
     229           3 :           .scan(
     230           3 :               projectSlice.scanResult(),
     231           3 :               projectSlice.name(),
     232           3 :               id -> (id.get() % projectSlice.slices()) == projectSlice.slice())
     233           3 :           .forEach(r -> index(r));
     234           3 :       OnlineReindexMode.end();
     235           3 :       return null;
     236             :     }
     237             : 
     238             :     private void index(ChangeNotesResult r) {
     239           3 :       if (r.error().isPresent()) {
     240           0 :         fail("Failed to read change " + r.id() + " for indexing", true, r.error().get());
     241           0 :         return;
     242             :       }
     243             :       try {
     244           3 :         indexer.index(changeDataFactory.create(r.notes()));
     245           3 :         done.update(1);
     246           3 :         verboseWriter.format(
     247           3 :             "Reindexed change %d (project: %s)\n", r.id().get(), r.notes().getProjectName().get());
     248           0 :       } catch (RejectedExecutionException e) {
     249             :         // Server shutdown, don't spam the logs.
     250           0 :         failSilently();
     251           0 :       } catch (Exception e) {
     252           0 :         fail("Failed to index change " + r.id(), true, e);
     253           3 :       }
     254           3 :     }
     255             : 
     256             :     private void fail(String error, boolean failed, Throwable e) {
     257           0 :       if (failed) {
     258           0 :         this.failed.update(1);
     259             :       }
     260             : 
     261           0 :       logger.atWarning().withCause(e).log("%s", error);
     262           0 :       verboseWriter.println(error);
     263           0 :     }
     264             : 
     265             :     private void failSilently() {
     266           0 :       this.failed.update(1);
     267           0 :     }
     268             : 
     269             :     @Override
     270             :     public String toString() {
     271           0 :       return "Index project slice " + projectSlice;
     272             :     }
     273             :   }
     274             : 
     275             :   private class SliceScheduler {
     276             :     final ChangeIndex index;
     277             :     final AtomicBoolean ok;
     278          15 :     final AtomicInteger changeCount = new AtomicInteger(0);
     279          15 :     final AtomicInteger projectsFailed = new AtomicInteger(0);
     280          15 :     final List<ListenableFuture<?>> sliceIndexerFutures = new ArrayList<>();
     281          15 :     final List<ListenableFuture<?>> sliceCreationFutures = new ArrayList<>();
     282          15 :     VolatileTask projTask = mpm.beginVolatileSubTask("project-slices");
     283             :     Task slicingProjects;
     284             : 
     285          15 :     public SliceScheduler(ChangeIndex index, AtomicBoolean ok) {
     286          15 :       this.index = index;
     287          15 :       this.ok = ok;
     288          15 :     }
     289             : 
     290             :     private List<ListenableFuture<?>> schedule() throws ProjectsCollectionFailure {
     291          15 :       ImmutableSortedSet<Project.NameKey> projects = projectCache.all();
     292          15 :       int projectCount = projects.size();
     293          15 :       slicingProjects = mpm.beginSubTask("Slicing projects", projectCount);
     294          15 :       for (Project.NameKey name : projects) {
     295          15 :         sliceCreationFutures.add(executor.submit(new ProjectSliceCreator(name)));
     296          15 :       }
     297             : 
     298             :       try {
     299          15 :         mpm.waitForNonFinalTask(
     300          15 :             transform(
     301          15 :                 successfulAsList(sliceCreationFutures),
     302             :                 x -> {
     303          15 :                   projTask.finalizeTotal();
     304          15 :                   doneTask.finalizeTotal();
     305          15 :                   return null;
     306             :                 },
     307          15 :                 directExecutor()));
     308           0 :       } catch (UncheckedExecutionException e) {
     309           0 :         logger.atSevere().withCause(e).log("Error project slice creation");
     310           0 :         ok.set(false);
     311          15 :       }
     312             : 
     313          15 :       if (projectsFailed.get() > projectCount / 2) {
     314           0 :         throw new ProjectsCollectionFailure(
     315             :             "Over 50%% of the projects could not be collected: aborted");
     316             :       }
     317             : 
     318          15 :       slicingProjects.endTask();
     319          15 :       setTotalWork(changeCount.get());
     320             : 
     321          15 :       return sliceIndexerFutures;
     322             :     }
     323             : 
     324             :     private class ProjectSliceCreator implements Callable<Void> {
     325             :       final Project.NameKey name;
     326             : 
     327          15 :       public ProjectSliceCreator(Project.NameKey name) {
     328          15 :         this.name = name;
     329          15 :       }
     330             : 
     331             :       @Override
     332             :       public Void call() throws IOException {
     333          15 :         try (Repository repo = repoManager.openRepository(name)) {
     334          15 :           ScanResult sr = ChangeNotes.Factory.scanChangeIds(repo);
     335          15 :           int size = sr.all().size();
     336          15 :           if (size > 0) {
     337           1 :             changeCount.addAndGet(size);
     338           1 :             int slices = 1 + (size - 1) / PROJECT_SLICE_MAX_REFS;
     339           1 :             if (slices > 1) {
     340           0 :               verboseWriter.println(
     341             :                   "Submitting " + name + " for indexing in " + slices + " slices");
     342             :             }
     343             : 
     344           1 :             doneTask.updateTotal(size);
     345           1 :             projTask.updateTotal(slices);
     346             : 
     347           1 :             for (int slice = 0; slice < slices; slice++) {
     348           1 :               ProjectSlice projectSlice = ProjectSlice.create(name, slice, slices, sr);
     349           1 :               ListenableFuture<?> future =
     350           1 :                   executor.submit(
     351           1 :                       reindexProjectSlice(
     352           1 :                           indexerFactory.create(executor, index),
     353             :                           projectSlice,
     354             :                           doneTask,
     355             :                           failedTask));
     356           1 :               String description = "project " + name + " (" + slice + "/" + slices + ")";
     357           1 :               addErrorListener(future, description, projTask, ok);
     358           1 :               sliceIndexerFutures.add(future);
     359             :             }
     360             :           }
     361           0 :         } catch (IOException e) {
     362           0 :           logger.atSevere().withCause(e).log("Error collecting project %s", name);
     363           0 :           projectsFailed.incrementAndGet();
     364          15 :         }
     365          15 :         slicingProjects.update(1);
     366          15 :         return null;
     367             :       }
     368             :     }
     369             :   }
     370             : }

Generated by: LCOV version 1.16+git.20220603.dfeb750