Line data Source code
1 : // Copyright (C) 2017 The Android Open Source Project
2 : //
3 : // Licensed under the Apache License, Version 2.0 (the "License");
4 : // you may not use this file except in compliance with the License.
5 : // You may obtain a copy of the License at
6 : //
7 : // http://www.apache.org/licenses/LICENSE-2.0
8 : //
9 : // Unless required by applicable law or agreed to in writing, software
10 : // distributed under the License is distributed on an "AS IS" BASIS,
11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : // See the License for the specific language governing permissions and
13 : // limitations under the License.
14 :
15 : package com.google.gerrit.server.index;
16 :
17 : import static com.google.common.base.Preconditions.checkArgument;
18 : import static com.google.common.base.Preconditions.checkState;
19 :
20 : import com.google.common.collect.ImmutableList;
21 : import com.google.common.collect.Lists;
22 : import com.google.common.collect.Maps;
23 : import com.google.gerrit.extensions.events.LifecycleListener;
24 : import com.google.gerrit.index.Index;
25 : import com.google.gerrit.index.IndexCollection;
26 : import com.google.gerrit.index.IndexDefinition;
27 : import com.google.gerrit.index.IndexDefinition.IndexFactory;
28 : import com.google.gerrit.index.Schema;
29 : import com.google.gerrit.server.config.SitePaths;
30 : import com.google.gerrit.server.plugincontext.PluginSetContext;
31 : import com.google.inject.ProvisionException;
32 : import java.io.IOException;
33 : import java.util.Collection;
34 : import java.util.List;
35 : import java.util.Map;
36 : import java.util.TreeMap;
37 : import org.eclipse.jgit.errors.ConfigInvalidException;
38 : import org.eclipse.jgit.lib.Config;
39 :
40 : /** Trigger for online reindexing in case the index version in use is not the latest. */
41 : public abstract class VersionManager implements LifecycleListener {
42 : public static boolean getOnlineUpgrade(Config cfg) {
43 138 : return cfg.getBoolean("index", null, "onlineUpgrade", true);
44 : }
45 :
46 : public static class Version<V> {
47 : public final Schema<V> schema;
48 : public final int version;
49 : public final boolean exists;
50 : public final boolean ready;
51 :
52 138 : public Version(Schema<V> schema, int version, boolean exists, boolean ready) {
53 138 : checkArgument(schema == null || schema.getVersion() == version);
54 138 : this.schema = schema;
55 138 : this.version = version;
56 138 : this.exists = exists;
57 138 : this.ready = ready;
58 138 : }
59 : }
60 :
61 : protected final boolean onlineUpgrade;
62 : protected final String runReindexMsg;
63 : protected final SitePaths sitePaths;
64 :
65 : private final PluginSetContext<OnlineUpgradeListener> listeners;
66 :
67 : // The following fields must be accessed synchronized on this.
68 : protected final Map<String, IndexDefinition<?, ?, ?>> defs;
69 : protected final Map<String, OnlineReindexer<?, ?, ?>> reindexers;
70 :
71 : protected VersionManager(
72 : SitePaths sitePaths,
73 : PluginSetContext<OnlineUpgradeListener> listeners,
74 : Collection<IndexDefinition<?, ?, ?>> defs,
75 138 : boolean onlineUpgrade) {
76 138 : this.sitePaths = sitePaths;
77 138 : this.listeners = listeners;
78 138 : this.defs = Maps.newHashMapWithExpectedSize(defs.size());
79 138 : for (IndexDefinition<?, ?, ?> def : defs) {
80 138 : this.defs.put(def.getName(), def);
81 138 : }
82 :
83 138 : this.reindexers = Maps.newHashMapWithExpectedSize(defs.size());
84 138 : this.onlineUpgrade = onlineUpgrade;
85 138 : this.runReindexMsg =
86 : "No index versions for index '%s' ready; run java -jar "
87 138 : + sitePaths.gerrit_war.toAbsolutePath()
88 : + " reindex --index %s";
89 138 : }
90 :
91 : @Override
92 : public void start() {
93 138 : GerritIndexStatus cfg = createIndexStatus();
94 138 : for (IndexDefinition<?, ?, ?> def : defs.values()) {
95 138 : initIndex(def, cfg);
96 138 : }
97 138 : }
98 :
99 : @Override
100 : public void stop() {
101 : // Do nothing; indexes are closed on demand by IndexCollection.
102 138 : }
103 :
104 : /**
105 : * Start the online reindexer if the current index is not already the latest.
106 : *
107 : * @param name index name
108 : * @param force start re-index
109 : * @return true if started, otherwise false.
110 : */
111 : public synchronized boolean startReindexer(String name, boolean force)
112 : throws ReindexerAlreadyRunningException {
113 1 : OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
114 1 : validateReindexerNotRunning(reindexer);
115 1 : if (force || !isLatestIndexVersion(name, reindexer)) {
116 1 : reindexer.start();
117 1 : return true;
118 : }
119 0 : return false;
120 : }
121 :
122 : /**
123 : * Activate the latest index if the current index is not already the latest.
124 : *
125 : * @param name index name
126 : * @return true if index was activated, otherwise false.
127 : */
128 : public synchronized boolean activateLatestIndex(String name)
129 : throws ReindexerAlreadyRunningException {
130 0 : OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
131 0 : validateReindexerNotRunning(reindexer);
132 0 : if (!isLatestIndexVersion(name, reindexer)) {
133 0 : reindexer.activateIndex();
134 0 : return true;
135 : }
136 0 : return false;
137 : }
138 :
139 : /**
140 : * Tells if an index with this name is currently known or not.
141 : *
142 : * @param name index name
143 : * @return true if index is known and can be used, otherwise false.
144 : */
145 : public boolean isKnownIndex(String name) {
146 0 : return defs.get(name) != null;
147 : }
148 :
149 : protected <K, V, I extends Index<K, V>> void initIndex(
150 : IndexDefinition<K, V, I> def, GerritIndexStatus cfg) {
151 138 : TreeMap<Integer, Version<V>> versions = scanVersions(def, cfg);
152 : // Search from the most recent ready version.
153 : // Write to the most recent ready version and the most recent version.
154 138 : Version<V> search = null;
155 138 : List<Version<V>> write = Lists.newArrayListWithCapacity(2);
156 138 : for (Version<V> v : versions.descendingMap().values()) {
157 138 : if (v.schema == null) {
158 0 : continue;
159 : }
160 138 : if (write.isEmpty() && onlineUpgrade) {
161 15 : write.add(v);
162 : }
163 138 : if (v.ready) {
164 138 : search = v;
165 138 : if (!write.contains(v)) {
166 132 : write.add(v);
167 : }
168 : break;
169 : }
170 1 : }
171 138 : if (search == null) {
172 1 : throw new ProvisionException(String.format(runReindexMsg, def.getName(), def.getName()));
173 : }
174 :
175 138 : IndexFactory<K, V, I> factory = def.getIndexFactory();
176 138 : I searchIndex = factory.create(search.schema);
177 138 : IndexCollection<K, V, I> indexes = def.getIndexCollection();
178 138 : indexes.setSearchIndex(searchIndex);
179 138 : for (Version<V> v : write) {
180 138 : if (v.version != search.version) {
181 1 : indexes.addWriteIndex(factory.create(v.schema));
182 : } else {
183 138 : indexes.addWriteIndex(searchIndex);
184 : }
185 138 : }
186 :
187 138 : markNotReady(def.getName(), versions.values(), write);
188 :
189 138 : synchronized (this) {
190 138 : if (!reindexers.containsKey(def.getName())) {
191 138 : int latest = write.get(0).version;
192 138 : OnlineReindexer<K, V, I> reindexer =
193 : new OnlineReindexer<>(def, search.version, latest, listeners);
194 138 : reindexers.put(def.getName(), reindexer);
195 : }
196 138 : }
197 138 : }
198 :
199 : synchronized void startOnlineUpgrade() {
200 15 : checkState(onlineUpgrade, "online upgrade not enabled");
201 15 : for (IndexDefinition<?, ?, ?> def : defs.values()) {
202 15 : String name = def.getName();
203 15 : IndexCollection<?, ?, ?> indexes = def.getIndexCollection();
204 15 : Index<?, ?> search = indexes.getSearchIndex();
205 15 : checkState(
206 : search != null, "no search index ready for %s; should have failed at startup", name);
207 15 : int searchVersion = search.getSchema().getVersion();
208 :
209 15 : List<Index<?, ?>> write = ImmutableList.copyOf(indexes.getWriteIndexes());
210 15 : checkState(
211 15 : !write.isEmpty(),
212 : "no write indexes set for %s; should have been initialized at startup",
213 : name);
214 15 : int latestWriteVersion = write.get(0).getSchema().getVersion();
215 :
216 15 : if (latestWriteVersion != searchVersion) {
217 1 : OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
218 1 : checkState(
219 : reindexer != null,
220 : "no reindexer found for %s; should have been initialized at startup",
221 : name);
222 1 : reindexer.start();
223 : }
224 15 : }
225 15 : }
226 :
227 : protected GerritIndexStatus createIndexStatus() {
228 : try {
229 138 : return new GerritIndexStatus(sitePaths);
230 0 : } catch (ConfigInvalidException | IOException e) {
231 0 : throw fail(e);
232 : }
233 : }
234 :
235 : protected abstract <K, V, I extends Index<K, V>> TreeMap<Integer, Version<V>> scanVersions(
236 : IndexDefinition<K, V, I> def, GerritIndexStatus cfg);
237 :
238 : private <V> boolean isDirty(Collection<Version<V>> inUse, Version<V> v) {
239 138 : return !inUse.contains(v) && v.exists;
240 : }
241 :
242 : private boolean isLatestIndexVersion(String name, OnlineReindexer<?, ?, ?> reindexer) {
243 0 : int readVersion = defs.get(name).getIndexCollection().getSearchIndex().getSchema().getVersion();
244 0 : return reindexer == null || reindexer.getVersion() == readVersion;
245 : }
246 :
247 : private static void validateReindexerNotRunning(OnlineReindexer<?, ?, ?> reindexer)
248 : throws ReindexerAlreadyRunningException {
249 1 : if (reindexer != null && reindexer.isRunning()) {
250 0 : throw new ReindexerAlreadyRunningException();
251 : }
252 1 : }
253 :
254 : private <V> void markNotReady(
255 : String name, Iterable<Version<V>> versions, Collection<Version<V>> inUse) {
256 138 : GerritIndexStatus cfg = createIndexStatus();
257 138 : boolean dirty = false;
258 138 : for (Version<V> v : versions) {
259 138 : if (isDirty(inUse, v)) {
260 1 : cfg.setReady(name, v.version, false);
261 1 : dirty = true;
262 : }
263 138 : }
264 138 : if (dirty) {
265 : try {
266 1 : cfg.save();
267 0 : } catch (IOException e) {
268 0 : throw fail(e);
269 1 : }
270 : }
271 138 : }
272 :
273 : private ProvisionException fail(Throwable t) {
274 0 : return new ProvisionException("Error scanning indexes", t);
275 : }
276 : }
|