LCOV - code coverage report
Current view: top level - server/index - OnlineReindexer.java (source / functions) Hit Total Coverage
Test: _coverage_report.dat Lines: 55 60 91.7 %
Date: 2022-11-19 15:00:39 Functions: 10 13 76.9 %

          Line data    Source code
       1             : // Copyright (C) 2013 The Android Open Source Project
       2             : //
       3             : // Licensed under the Apache License, Version 2.0 (the "License");
       4             : // you may not use this file except in compliance with the License.
       5             : // You may obtain a copy of the License at
       6             : //
       7             : // http://www.apache.org/licenses/LICENSE-2.0
       8             : //
       9             : // Unless required by applicable law or agreed to in writing, software
      10             : // distributed under the License is distributed on an "AS IS" BASIS,
      11             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             : // See the License for the specific language governing permissions and
      13             : // limitations under the License.
      14             : 
      15             : package com.google.gerrit.server.index;
      16             : 
      17             : import static java.util.Objects.requireNonNull;
      18             : 
      19             : import com.google.common.collect.Lists;
      20             : import com.google.common.flogger.FluentLogger;
      21             : import com.google.gerrit.index.Index;
      22             : import com.google.gerrit.index.IndexCollection;
      23             : import com.google.gerrit.index.IndexDefinition;
      24             : import com.google.gerrit.index.SiteIndexer;
      25             : import com.google.gerrit.server.plugincontext.PluginSetContext;
      26             : import java.util.List;
      27             : import java.util.concurrent.atomic.AtomicBoolean;
      28             : 
      29             : /**
      30             :  * Background thread for running an index schema upgrade by reindexing all documents in an index
      31             :  * using the new version. Intended to be run while Gerrit is serving traffic to prepare for a
      32             :  * near-zero downtime upgrade.
      33             :  */
      34             : public class OnlineReindexer<K, V, I extends Index<K, V>> {
      35         138 :   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
      36             : 
      37             :   private final String name;
      38             :   private final IndexCollection<K, V, I> indexes;
      39             :   private final SiteIndexer<K, V, I> batchIndexer;
      40             :   private final int oldVersion;
      41             :   private final int newVersion;
      42             :   private final PluginSetContext<OnlineUpgradeListener> listeners;
      43             :   private I index;
      44         138 :   private final AtomicBoolean running = new AtomicBoolean();
      45             : 
      46             :   public OnlineReindexer(
      47             :       IndexDefinition<K, V, I> def,
      48             :       int oldVersion,
      49             :       int newVersion,
      50         138 :       PluginSetContext<OnlineUpgradeListener> listeners) {
      51         138 :     this.name = def.getName();
      52         138 :     this.indexes = def.getIndexCollection();
      53         138 :     this.batchIndexer = def.getSiteIndexer();
      54         138 :     this.oldVersion = oldVersion;
      55         138 :     this.newVersion = newVersion;
      56         138 :     this.listeners = listeners;
      57         138 :   }
      58             : 
      59             :   /** Starts the background process. */
      60             :   public void start() {
      61           2 :     if (running.compareAndSet(false, true)) {
      62           2 :       Thread t =
      63             :           new Thread(
      64             :               () -> {
      65           2 :                 boolean ok = false;
      66             :                 try {
      67           2 :                   reindex();
      68           2 :                   ok = true;
      69           1 :                 } catch (RuntimeException e) {
      70           1 :                   logger.atSevere().withCause(e).log(
      71           1 :                       "Online reindex of %s schema version %s failed", name, version(index));
      72             :                 } finally {
      73           2 :                   running.set(false);
      74           2 :                   if (!ok) {
      75           1 :                     listeners.runEach(listener -> listener.onFailure(name, oldVersion, newVersion));
      76             :                   }
      77             :                 }
      78           2 :               });
      79           2 :       t.setName(
      80           2 :           String.format("Reindex %s v%d-v%d", name, version(indexes.getSearchIndex()), newVersion));
      81           2 :       t.start();
      82             :     }
      83           2 :   }
      84             : 
      85             :   /** Returns {@code true} if the background indexer is currently running. */
      86             :   public boolean isRunning() {
      87           1 :     return running.get();
      88             :   }
      89             : 
      90             :   /** Returns the index version that this indexer is creating documents for. */
      91             :   public int getVersion() {
      92           0 :     return newVersion;
      93             :   }
      94             : 
      95             :   private static int version(Index<?, ?> i) {
      96           2 :     return i.getSchema().getVersion();
      97             :   }
      98             : 
      99             :   private void reindex() {
     100           2 :     listeners.runEach(listener -> listener.onStart(name, oldVersion, newVersion));
     101           2 :     index =
     102           2 :         requireNonNull(
     103           2 :             indexes.getWriteIndex(newVersion),
     104           0 :             () -> String.format("not an active write schema version: %s %s", name, newVersion));
     105           2 :     logger.atInfo().log(
     106             :         "Starting online reindex of %s from schema version %s to %s",
     107           2 :         name, version(indexes.getSearchIndex()), version(index));
     108             : 
     109           2 :     if (oldVersion != newVersion) {
     110           1 :       index.deleteAll();
     111             :     }
     112           2 :     SiteIndexer.Result result = batchIndexer.indexAll(index);
     113           2 :     if (!result.success()) {
     114           0 :       logger.atSevere().log(
     115             :           "Online reindex of %s schema version %s failed. Successfully"
     116             :               + " indexed %s, failed to index %s",
     117           0 :           name, version(index), result.doneCount(), result.failedCount());
     118           0 :       return;
     119             :     }
     120           2 :     logger.atInfo().log("Reindex %s to version %s complete", name, version(index));
     121           2 :     activateIndex();
     122           2 :     listeners.runEach(listener -> listener.onSuccess(name, oldVersion, newVersion));
     123           2 :   }
     124             : 
     125             :   /**
     126             :    * Switches the search index from the old version to the new version. This method should be called
     127             :    * when the new version is fully ready.
     128             :    */
     129             :   public void activateIndex() {
     130           2 :     indexes.setSearchIndex(index);
     131           2 :     logger.atInfo().log("Using %s schema version %s", name, version(index));
     132           2 :     index.markReady(true);
     133             : 
     134           2 :     List<I> toRemove = Lists.newArrayListWithExpectedSize(1);
     135           2 :     for (I i : indexes.getWriteIndexes()) {
     136           2 :       if (version(i) != version(index)) {
     137           1 :         toRemove.add(i);
     138             :       }
     139           2 :     }
     140           2 :     for (I i : toRemove) {
     141           1 :       i.markReady(false);
     142           1 :       indexes.removeWriteIndex(version(i));
     143           1 :     }
     144           2 :   }
     145             : }

Generated by: LCOV version 1.16+git.20220603.dfeb750