Line data Source code
1 : // Copyright (C) 2013 The Android Open Source Project
2 : //
3 : // Licensed under the Apache License, Version 2.0 (the "License");
4 : // you may not use this file except in compliance with the License.
5 : // You may obtain a copy of the License at
6 : //
7 : // http://www.apache.org/licenses/LICENSE-2.0
8 : //
9 : // Unless required by applicable law or agreed to in writing, software
10 : // distributed under the License is distributed on an "AS IS" BASIS,
11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : // See the License for the specific language governing permissions and
13 : // limitations under the License.
14 :
15 : package com.google.gerrit.server.index.change;
16 :
17 : import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
18 :
19 : import com.google.common.base.Objects;
20 : import com.google.common.flogger.FluentLogger;
21 : import com.google.common.util.concurrent.Futures;
22 : import com.google.common.util.concurrent.ListenableFuture;
23 : import com.google.common.util.concurrent.ListeningExecutorService;
24 : import com.google.gerrit.common.Nullable;
25 : import com.google.gerrit.entities.Change;
26 : import com.google.gerrit.entities.Project;
27 : import com.google.gerrit.exceptions.StorageException;
28 : import com.google.gerrit.extensions.events.ChangeIndexedListener;
29 : import com.google.gerrit.index.Index;
30 : import com.google.gerrit.server.config.GerritServerConfig;
31 : import com.google.gerrit.server.index.IndexExecutor;
32 : import com.google.gerrit.server.index.StalenessCheckResult;
33 : import com.google.gerrit.server.index.options.IsFirstInsertForEntry;
34 : import com.google.gerrit.server.logging.Metadata;
35 : import com.google.gerrit.server.logging.TraceContext;
36 : import com.google.gerrit.server.logging.TraceContext.TraceTimer;
37 : import com.google.gerrit.server.notedb.ChangeNotes;
38 : import com.google.gerrit.server.plugincontext.PluginSetContext;
39 : import com.google.gerrit.server.project.NoSuchChangeException;
40 : import com.google.gerrit.server.query.change.ChangeData;
41 : import com.google.gerrit.server.util.RequestContext;
42 : import com.google.gerrit.server.util.ThreadLocalRequestContext;
43 : import com.google.inject.OutOfScopeException;
44 : import com.google.inject.assistedinject.Assisted;
45 : import com.google.inject.assistedinject.AssistedInject;
46 : import java.util.Collection;
47 : import java.util.Collections;
48 : import java.util.Map;
49 : import java.util.Set;
50 : import java.util.concurrent.Callable;
51 : import java.util.concurrent.ConcurrentHashMap;
52 : import java.util.concurrent.Future;
53 : import org.eclipse.jgit.errors.RepositoryNotFoundException;
54 : import org.eclipse.jgit.lib.Config;
55 :
56 : /**
57 : * Helper for (re)indexing a change document.
58 : *
59 : * <p>Indexing is run in the background, as it may require substantial work to compute some of the
60 : * fields and/or update the index.
61 : */
62 : public class ChangeIndexer {
63 148 : private static final FluentLogger logger = FluentLogger.forEnclosingClass();
64 :
65 : public interface Factory {
66 : ChangeIndexer create(ListeningExecutorService executor, ChangeIndex index);
67 :
68 : ChangeIndexer create(ListeningExecutorService executor, ChangeIndexCollection indexes);
69 : }
70 :
71 : @Nullable private final ChangeIndexCollection indexes;
72 : @Nullable private final ChangeIndex index;
73 : private final ChangeData.Factory changeDataFactory;
74 : private final ChangeNotes.Factory notesFactory;
75 : private final ThreadLocalRequestContext context;
76 : private final ListeningExecutorService batchExecutor;
77 : private final ListeningExecutorService executor;
78 : private final PluginSetContext<ChangeIndexedListener> indexedListeners;
79 : private final StalenessChecker stalenessChecker;
80 : private final boolean autoReindexIfStale;
81 : private final IsFirstInsertForEntry isFirstInsertForEntry;
82 :
83 148 : private final Map<Change.Id, IndexTask> queuedIndexTasks = new ConcurrentHashMap<>();
84 148 : private final Set<ReindexIfStaleTask> queuedReindexIfStaleTasks =
85 148 : Collections.newSetFromMap(new ConcurrentHashMap<>());
86 :
87 : @AssistedInject
88 : ChangeIndexer(
89 : @GerritServerConfig Config cfg,
90 : ChangeData.Factory changeDataFactory,
91 : ChangeNotes.Factory notesFactory,
92 : ThreadLocalRequestContext context,
93 : PluginSetContext<ChangeIndexedListener> indexedListeners,
94 : StalenessChecker stalenessChecker,
95 : @IndexExecutor(BATCH) ListeningExecutorService batchExecutor,
96 : @Assisted ListeningExecutorService executor,
97 : @Assisted ChangeIndex index,
98 1 : IsFirstInsertForEntry isFirstInsertForEntry) {
99 1 : this.executor = executor;
100 1 : this.changeDataFactory = changeDataFactory;
101 1 : this.notesFactory = notesFactory;
102 1 : this.context = context;
103 1 : this.indexedListeners = indexedListeners;
104 1 : this.stalenessChecker = stalenessChecker;
105 1 : this.batchExecutor = batchExecutor;
106 1 : this.autoReindexIfStale = autoReindexIfStale(cfg);
107 1 : this.index = index;
108 1 : this.indexes = null;
109 1 : this.isFirstInsertForEntry = isFirstInsertForEntry;
110 1 : }
111 :
112 : @AssistedInject
113 : ChangeIndexer(
114 : @GerritServerConfig Config cfg,
115 : ChangeData.Factory changeDataFactory,
116 : ChangeNotes.Factory notesFactory,
117 : ThreadLocalRequestContext context,
118 : PluginSetContext<ChangeIndexedListener> indexedListeners,
119 : StalenessChecker stalenessChecker,
120 : @IndexExecutor(BATCH) ListeningExecutorService batchExecutor,
121 : @Assisted ListeningExecutorService executor,
122 : @Assisted ChangeIndexCollection indexes,
123 148 : IsFirstInsertForEntry isFirstInsertForEntry) {
124 148 : this.executor = executor;
125 148 : this.changeDataFactory = changeDataFactory;
126 148 : this.notesFactory = notesFactory;
127 148 : this.context = context;
128 148 : this.indexedListeners = indexedListeners;
129 148 : this.stalenessChecker = stalenessChecker;
130 148 : this.batchExecutor = batchExecutor;
131 148 : this.autoReindexIfStale = autoReindexIfStale(cfg);
132 148 : this.index = null;
133 148 : this.indexes = indexes;
134 148 : this.isFirstInsertForEntry = isFirstInsertForEntry;
135 148 : }
136 :
137 : private static boolean autoReindexIfStale(Config cfg) {
138 148 : return cfg.getBoolean("index", null, "autoReindexIfStale", false);
139 : }
140 :
141 : /**
142 : * Start indexing a change.
143 : *
144 : * @param changeId change to index.
145 : * @return future for the indexing task.
146 : */
147 : public ListenableFuture<ChangeData> indexAsync(Project.NameKey project, Change.Id changeId) {
148 : // If the change was already scheduled for indexing, we do not need to schedule it again. Change
149 : // updates that happened after the change was scheduled for indexing will automatically be taken
150 : // into account when the index task is executed (as it reads the current change state).
151 : // To skip duplicate index requests, queuedIndexTasks keeps track of the scheduled index tasks.
152 : // Here we check if the change has already been scheduled for indexing, and only if not we
153 : // create a new index task for the change.
154 : // By using computeIfAbsent we ensure that the lookup and the insertion of a new task happens
155 : // atomically. Some attempted update operations on this map by other threads may be blocked
156 : // while the computation is in progress (but not all as ConcurrentHashMap doesn't lock the
157 : // entire table on write, but only segments of the table).
158 103 : IndexTask task =
159 103 : queuedIndexTasks.computeIfAbsent(
160 : changeId,
161 : id -> {
162 103 : fireChangeScheduledForIndexingEvent(project.get(), id.get());
163 103 : return new IndexTask(project, id);
164 : });
165 : // Submitting the task to the executor must not happen from within the computeIfAbsent callback,
166 : // as this could result in the task being executed before the computeIfAbsent method has
167 : // finished (e.g. if a direct executor is used, but also if starting the task asynchronously is
168 : // faster than finishing the computeIfAbsent method). This could lead to failures and unexpected
169 : // behavior:
170 : // * The first thing that IndexTask does is to remove itself from queuedIndexTasks.
171 : // This is done so that index requests which are received while an index task for the same
172 : // change is in progress, are not dropped but added to the queue. This is important since
173 : // the change state that is written to the index is read at the beginning of the index task
174 : // and change updates that happen after this read will not be considered when updating the
175 : // index.
176 : // * Trying to remove the IndexTask from queuedIndexTasks at the beginning of the task doesn't
177 : // work if the computeIfAbsent method hasn't finished yet. Either the queuedIndexTasks doesn't
178 : // contain the new entry yet and the removal has no effect as it is done before the entry is
179 : // added to the map, or the removal fails with {@link IllegalStateException} as recursive
180 : // updates from within the computeIfAbsent callback are not allowed.
181 103 : return task.submitIfNeeded();
182 : }
183 :
184 : /**
185 : * Synchronously index a change, then check if the index is stale due to a race condition.
186 : *
187 : * @param cd change to index.
188 : */
189 : public void index(ChangeData cd) {
190 29 : fireChangeScheduledForIndexingEvent(cd.project().get(), cd.getId().get());
191 29 : doIndex(cd);
192 29 : }
193 :
194 : private void doIndex(ChangeData cd) {
195 103 : indexImpl(cd);
196 :
197 : // Always double-check whether the change might be stale immediately after
198 : // interactively indexing it. This fixes up the case where two writers write
199 : // to the primary storage in one order, and the corresponding index writes
200 : // happen in the opposite order:
201 : // 1. Writer A writes to primary storage.
202 : // 2. Writer B writes to primary storage.
203 : // 3. Writer B updates index.
204 : // 4. Writer A updates index.
205 : //
206 : // Without the extra reindexIfStale step, A has no way of knowing that it's
207 : // about to overwrite the index document with stale data. It doesn't work to
208 : // have A check for staleness before attempting its index update, because
209 : // B's index update might not have happened when it does the check.
210 : //
211 : // With the extra reindexIfStale step after (3)/(4), we are able to detect
212 : // and fix the staleness. It doesn't matter which order the two
213 : // reindexIfStale calls actually execute in; we are guaranteed that at least
214 : // one of them will execute after the second index write, (4).
215 103 : autoReindexIfStale(cd);
216 103 : }
217 :
218 : private void indexImpl(ChangeData cd) {
219 103 : logger.atFine().log("Reindex change %d in index.", cd.getId().get());
220 103 : for (Index<?, ChangeData> i : getWriteIndexes()) {
221 103 : try (TraceTimer traceTimer =
222 103 : TraceContext.newTimer(
223 : "Reindexing change in index",
224 103 : Metadata.builder()
225 103 : .changeId(cd.getId().get())
226 103 : .patchSetId(cd.currentPatchSet().number())
227 103 : .indexVersion(i.getSchema().getVersion())
228 103 : .build())) {
229 103 : if (isFirstInsertForEntry.equals(IsFirstInsertForEntry.YES)) {
230 1 : i.insert(cd);
231 : } else {
232 103 : i.replace(cd);
233 : }
234 3 : } catch (RuntimeException e) {
235 3 : throw new StorageException(
236 3 : String.format(
237 : "Failed to reindex change %d in index version %d (current patch set = %d)",
238 3 : cd.getId().get(), i.getSchema().getVersion(), cd.currentPatchSet().number()),
239 : e);
240 103 : }
241 103 : }
242 103 : fireChangeIndexedEvent(cd.project().get(), cd.getId().get());
243 103 : }
244 :
245 : private void fireChangeScheduledForIndexingEvent(String projectName, int id) {
246 103 : indexedListeners.runEach(l -> l.onChangeScheduledForIndexing(projectName, id));
247 103 : }
248 :
249 : private void fireChangeIndexedEvent(String projectName, int id) {
250 103 : indexedListeners.runEach(l -> l.onChangeIndexed(projectName, id));
251 103 : }
252 :
253 : private void fireChangeScheduledForDeletionFromIndexEvent(int id) {
254 14 : indexedListeners.runEach(l -> l.onChangeScheduledForDeletionFromIndex(id));
255 14 : }
256 :
257 : private void fireChangeDeletedFromIndexEvent(int id) {
258 14 : indexedListeners.runEach(l -> l.onChangeDeleted(id));
259 14 : }
260 :
261 : /**
262 : * Synchronously index a change.
263 : *
264 : * @param change change to index.
265 : */
266 : public void index(Change change) {
267 27 : index(changeDataFactory.create(change));
268 27 : }
269 :
270 : /**
271 : * Synchronously index a change.
272 : *
273 : * @param project the project to which the change belongs.
274 : * @param changeId ID of the change to index.
275 : */
276 : public void index(Project.NameKey project, Change.Id changeId) {
277 0 : index(changeDataFactory.create(project, changeId));
278 0 : }
279 :
280 : /**
281 : * Start deleting a change.
282 : *
283 : * @param id change to delete.
284 : * @return future for the deleting task, the result of the future is always {@code null}
285 : */
286 : public ListenableFuture<ChangeData> deleteAsync(Change.Id id) {
287 11 : fireChangeScheduledForDeletionFromIndexEvent(id.get());
288 11 : return submit(new DeleteTask(id));
289 : }
290 :
291 : /**
292 : * Synchronously delete a change.
293 : *
294 : * @param id change ID to delete.
295 : */
296 : public void delete(Change.Id id) {
297 3 : fireChangeScheduledForDeletionFromIndexEvent(id.get());
298 3 : doDelete(id);
299 3 : }
300 :
301 : private void doDelete(Change.Id id) {
302 3 : new DeleteTask(id).call();
303 3 : }
304 :
305 : /**
306 : * Asynchronously check if a change is stale, and reindex if it is.
307 : *
308 : * <p>Always run on the batch executor, even if this indexer instance is configured to use a
309 : * different executor.
310 : *
311 : * @param project the project to which the change belongs.
312 : * @param id ID of the change to index.
313 : * @return future for reindexing the change; returns true if the change was stale.
314 : */
315 : public ListenableFuture<Boolean> reindexIfStale(Project.NameKey project, Change.Id id) {
316 4 : ReindexIfStaleTask task = new ReindexIfStaleTask(project, id);
317 4 : if (queuedReindexIfStaleTasks.add(task)) {
318 4 : return submit(task, batchExecutor);
319 : }
320 0 : return Futures.immediateFuture(false);
321 : }
322 :
323 : private void autoReindexIfStale(ChangeData cd) {
324 103 : autoReindexIfStale(cd.project(), cd.getId());
325 103 : }
326 :
327 : private void autoReindexIfStale(Project.NameKey project, Change.Id id) {
328 103 : if (autoReindexIfStale) {
329 : // Don't retry indefinitely; if this fails the change will be stale.
330 : @SuppressWarnings("unused")
331 0 : Future<?> possiblyIgnoredError = reindexIfStale(project, id);
332 : }
333 103 : }
334 :
335 : private Collection<ChangeIndex> getWriteIndexes() {
336 103 : return indexes != null ? indexes.getWriteIndexes() : Collections.singleton(index);
337 : }
338 :
339 : private <T> ListenableFuture<T> submit(Callable<T> task) {
340 103 : return submit(task, executor);
341 : }
342 :
343 : private static <T> ListenableFuture<T> submit(
344 : Callable<T> task, ListeningExecutorService executor) {
345 103 : return Futures.nonCancellationPropagating(executor.submit(task));
346 : }
347 :
348 : private abstract class AbstractIndexTask<T> implements Callable<T> {
349 : protected final Project.NameKey project;
350 : protected final Change.Id id;
351 :
352 103 : protected AbstractIndexTask(Project.NameKey project, Change.Id id) {
353 103 : this.project = project;
354 103 : this.id = id;
355 103 : }
356 :
357 : protected abstract T callImpl() throws Exception;
358 :
359 : protected abstract void remove();
360 :
361 : @Override
362 : public abstract String toString();
363 :
364 : @Override
365 : public final T call() throws Exception {
366 : try {
367 103 : RequestContext newCtx =
368 : () -> {
369 0 : throw new OutOfScopeException("No user during ChangeIndexer");
370 : };
371 103 : RequestContext oldCtx = context.setContext(newCtx);
372 : try {
373 103 : return callImpl();
374 : } finally {
375 103 : context.setContext(oldCtx);
376 : }
377 3 : } catch (Exception e) {
378 3 : logger.atSevere().withCause(e).log("Failed to execute %s", this);
379 3 : throw e;
380 : }
381 : }
382 : }
383 :
384 : private class IndexTask extends AbstractIndexTask<ChangeData> {
385 : ListenableFuture<ChangeData> future;
386 :
387 103 : private IndexTask(Project.NameKey project, Change.Id id) {
388 103 : super(project, id);
389 103 : }
390 :
391 : /**
392 : * Submits this task to be executed, if it wasn't submitted yet.
393 : *
394 : * <p>Submits this task to the executor if it hasn't been submitted yet. The future is cached so
395 : * that it can be returned if this method is called again.
396 : *
397 : * <p>This method must be synchronized so that concurrent calls do not submit this task to the
398 : * executor multiple times.
399 : *
400 : * @return future from which the result of the index task (the {@link ChangeData} instance) can
401 : * be retrieved.
402 : */
403 : private synchronized ListenableFuture<ChangeData> submitIfNeeded() {
404 103 : if (future == null) {
405 103 : future = submit(this);
406 : }
407 103 : return future;
408 : }
409 :
410 : @Nullable
411 : @Override
412 : public ChangeData callImpl() throws Exception {
413 : // Remove this task from queuedIndexTasks. This is done right at the beginning of this task so
414 : // that index requests which are received for the same change while this index task is in
415 : // progress, are not dropped but added to the queue. This is important since change updates
416 : // that happen after reading the change notes below will not be considered when updating the
417 : // index.
418 103 : remove();
419 :
420 : try {
421 103 : ChangeNotes changeNotes = notesFactory.createChecked(project, id);
422 103 : ChangeData changeData = changeDataFactory.create(changeNotes);
423 103 : doIndex(changeData);
424 103 : return changeData;
425 0 : } catch (NoSuchChangeException e) {
426 0 : doDelete(id);
427 : }
428 0 : return null;
429 : }
430 :
431 : @Override
432 : public int hashCode() {
433 0 : return Objects.hashCode(IndexTask.class, id.get());
434 : }
435 :
436 : @Override
437 : public boolean equals(Object obj) {
438 0 : if (!(obj instanceof IndexTask)) {
439 0 : return false;
440 : }
441 0 : IndexTask other = (IndexTask) obj;
442 0 : return id.get() == other.id.get();
443 : }
444 :
445 : @Override
446 : public String toString() {
447 3 : return "index-change-" + id;
448 : }
449 :
450 : @Override
451 : protected void remove() {
452 103 : queuedIndexTasks.remove(id);
453 103 : }
454 : }
455 :
456 : // Not AbstractIndexTask as it doesn't need a request context.
457 : private class DeleteTask implements Callable<ChangeData> {
458 : private final Change.Id id;
459 :
460 14 : private DeleteTask(Change.Id id) {
461 14 : this.id = id;
462 14 : }
463 :
464 : @Nullable
465 : @Override
466 : public ChangeData call() {
467 14 : logger.atFine().log("Delete change %d from index.", id.get());
468 : // Don't bother setting a RequestContext to provide the DB.
469 : // Implementations should not need to access the DB in order to delete a
470 : // change ID.
471 14 : for (ChangeIndex i : getWriteIndexes()) {
472 14 : try (TraceTimer traceTimer =
473 14 : TraceContext.newTimer(
474 : "Deleting change in index",
475 14 : Metadata.builder()
476 14 : .changeId(id.get())
477 14 : .indexVersion(i.getSchema().getVersion())
478 14 : .build())) {
479 14 : i.delete(id);
480 0 : } catch (RuntimeException e) {
481 0 : throw new StorageException(
482 0 : String.format(
483 : "Failed to delete change %d from index version %d",
484 0 : id.get(), i.getSchema().getVersion()),
485 : e);
486 14 : }
487 14 : }
488 14 : fireChangeDeletedFromIndexEvent(id.get());
489 14 : return null;
490 : }
491 : }
492 :
493 : private class ReindexIfStaleTask extends AbstractIndexTask<Boolean> {
494 4 : private ReindexIfStaleTask(Project.NameKey project, Change.Id id) {
495 4 : super(project, id);
496 4 : }
497 :
498 : @Override
499 : public Boolean callImpl() throws Exception {
500 4 : remove();
501 : try {
502 4 : StalenessCheckResult stalenessCheckResult = stalenessChecker.check(id);
503 4 : if (stalenessCheckResult.isStale()) {
504 4 : logger.atInfo().log("Reindexing stale document %s", stalenessCheckResult);
505 4 : indexImpl(changeDataFactory.create(project, id));
506 4 : return true;
507 : }
508 0 : } catch (Exception e) {
509 0 : if (!isCausedByRepositoryNotFoundException(e)) {
510 0 : throw e;
511 : }
512 0 : logger.atFine().log(
513 : "Change %s belongs to deleted project %s, aborting reindexing the change.",
514 0 : id.get(), project.get());
515 4 : }
516 4 : return false;
517 : }
518 :
519 : @Override
520 : public int hashCode() {
521 4 : return Objects.hashCode(ReindexIfStaleTask.class, id.get());
522 : }
523 :
524 : @Override
525 : public boolean equals(Object obj) {
526 0 : if (!(obj instanceof ReindexIfStaleTask)) {
527 0 : return false;
528 : }
529 0 : ReindexIfStaleTask other = (ReindexIfStaleTask) obj;
530 0 : return id.get() == other.id.get();
531 : }
532 :
533 : @Override
534 : public String toString() {
535 0 : return "reindex-if-stale-change-" + id;
536 : }
537 :
538 : @Override
539 : protected void remove() {
540 4 : queuedReindexIfStaleTasks.remove(this);
541 4 : }
542 : }
543 :
544 : private boolean isCausedByRepositoryNotFoundException(Throwable throwable) {
545 0 : while (throwable != null) {
546 0 : if (throwable instanceof RepositoryNotFoundException) {
547 0 : return true;
548 : }
549 0 : throwable = throwable.getCause();
550 : }
551 0 : return false;
552 : }
553 : }
|