Line data Source code
1 : // Copyright (C) 2013 The Android Open Source Project 2 : // 3 : // Licensed under the Apache License, Version 2.0 (the "License"); 4 : // you may not use this file except in compliance with the License. 5 : // You may obtain a copy of the License at 6 : // 7 : // http://www.apache.org/licenses/LICENSE-2.0 8 : // 9 : // Unless required by applicable law or agreed to in writing, software 10 : // distributed under the License is distributed on an "AS IS" BASIS, 11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 : // See the License for the specific language governing permissions and 13 : // limitations under the License. 14 : 15 : package com.google.gerrit.server.index; 16 : 17 : import static java.util.Objects.requireNonNull; 18 : 19 : import com.google.common.collect.Lists; 20 : import com.google.common.flogger.FluentLogger; 21 : import com.google.gerrit.index.Index; 22 : import com.google.gerrit.index.IndexCollection; 23 : import com.google.gerrit.index.IndexDefinition; 24 : import com.google.gerrit.index.SiteIndexer; 25 : import com.google.gerrit.server.plugincontext.PluginSetContext; 26 : import java.util.List; 27 : import java.util.concurrent.atomic.AtomicBoolean; 28 : 29 : /** 30 : * Background thread for running an index schema upgrade by reindexing all documents in an index 31 : * using the new version. Intended to be run while Gerrit is serving traffic to prepare for a 32 : * near-zero downtime upgrade. 33 : */ 34 : public class OnlineReindexer<K, V, I extends Index<K, V>> { 35 138 : private static final FluentLogger logger = FluentLogger.forEnclosingClass(); 36 : 37 : private final String name; 38 : private final IndexCollection<K, V, I> indexes; 39 : private final SiteIndexer<K, V, I> batchIndexer; 40 : private final int oldVersion; 41 : private final int newVersion; 42 : private final PluginSetContext<OnlineUpgradeListener> listeners; 43 : private I index; 44 138 : private final AtomicBoolean running = new AtomicBoolean(); 45 : 46 : public OnlineReindexer( 47 : IndexDefinition<K, V, I> def, 48 : int oldVersion, 49 : int newVersion, 50 138 : PluginSetContext<OnlineUpgradeListener> listeners) { 51 138 : this.name = def.getName(); 52 138 : this.indexes = def.getIndexCollection(); 53 138 : this.batchIndexer = def.getSiteIndexer(); 54 138 : this.oldVersion = oldVersion; 55 138 : this.newVersion = newVersion; 56 138 : this.listeners = listeners; 57 138 : } 58 : 59 : /** Starts the background process. */ 60 : public void start() { 61 2 : if (running.compareAndSet(false, true)) { 62 2 : Thread t = 63 : new Thread( 64 : () -> { 65 2 : boolean ok = false; 66 : try { 67 2 : reindex(); 68 2 : ok = true; 69 1 : } catch (RuntimeException e) { 70 1 : logger.atSevere().withCause(e).log( 71 1 : "Online reindex of %s schema version %s failed", name, version(index)); 72 : } finally { 73 2 : running.set(false); 74 2 : if (!ok) { 75 1 : listeners.runEach(listener -> listener.onFailure(name, oldVersion, newVersion)); 76 : } 77 : } 78 2 : }); 79 2 : t.setName( 80 2 : String.format("Reindex %s v%d-v%d", name, version(indexes.getSearchIndex()), newVersion)); 81 2 : t.start(); 82 : } 83 2 : } 84 : 85 : /** Returns {@code true} if the background indexer is currently running. */ 86 : public boolean isRunning() { 87 1 : return running.get(); 88 : } 89 : 90 : /** Returns the index version that this indexer is creating documents for. */ 91 : public int getVersion() { 92 0 : return newVersion; 93 : } 94 : 95 : private static int version(Index<?, ?> i) { 96 2 : return i.getSchema().getVersion(); 97 : } 98 : 99 : private void reindex() { 100 2 : listeners.runEach(listener -> listener.onStart(name, oldVersion, newVersion)); 101 2 : index = 102 2 : requireNonNull( 103 2 : indexes.getWriteIndex(newVersion), 104 0 : () -> String.format("not an active write schema version: %s %s", name, newVersion)); 105 2 : logger.atInfo().log( 106 : "Starting online reindex of %s from schema version %s to %s", 107 2 : name, version(indexes.getSearchIndex()), version(index)); 108 : 109 2 : if (oldVersion != newVersion) { 110 1 : index.deleteAll(); 111 : } 112 2 : SiteIndexer.Result result = batchIndexer.indexAll(index); 113 2 : if (!result.success()) { 114 0 : logger.atSevere().log( 115 : "Online reindex of %s schema version %s failed. Successfully" 116 : + " indexed %s, failed to index %s", 117 0 : name, version(index), result.doneCount(), result.failedCount()); 118 0 : return; 119 : } 120 2 : logger.atInfo().log("Reindex %s to version %s complete", name, version(index)); 121 2 : activateIndex(); 122 2 : listeners.runEach(listener -> listener.onSuccess(name, oldVersion, newVersion)); 123 2 : } 124 : 125 : /** 126 : * Switches the search index from the old version to the new version. This method should be called 127 : * when the new version is fully ready. 128 : */ 129 : public void activateIndex() { 130 2 : indexes.setSearchIndex(index); 131 2 : logger.atInfo().log("Using %s schema version %s", name, version(index)); 132 2 : index.markReady(true); 133 : 134 2 : List<I> toRemove = Lists.newArrayListWithExpectedSize(1); 135 2 : for (I i : indexes.getWriteIndexes()) { 136 2 : if (version(i) != version(index)) { 137 1 : toRemove.add(i); 138 : } 139 2 : } 140 2 : for (I i : toRemove) { 141 1 : i.markReady(false); 142 1 : indexes.removeWriteIndex(version(i)); 143 1 : } 144 2 : } 145 : }