Line data Source code
1 : // Copyright (C) 2016 The Android Open Source Project 2 : // 3 : // Licensed under the Apache License, Version 2.0 (the "License"); 4 : // you may not use this file except in compliance with the License. 5 : // You may obtain a copy of the License at 6 : // 7 : // http://www.apache.org/licenses/LICENSE-2.0 8 : // 9 : // Unless required by applicable law or agreed to in writing, software 10 : // distributed under the License is distributed on an "AS IS" BASIS, 11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 : // See the License for the specific language governing permissions and 13 : // limitations under the License. 14 : 15 : package com.google.gerrit.server.index.account; 16 : 17 : import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH; 18 : 19 : import com.google.common.base.Stopwatch; 20 : import com.google.common.flogger.FluentLogger; 21 : import com.google.common.util.concurrent.Futures; 22 : import com.google.common.util.concurrent.ListenableFuture; 23 : import com.google.common.util.concurrent.ListeningExecutorService; 24 : import com.google.gerrit.entities.Account; 25 : import com.google.gerrit.index.SiteIndexer; 26 : import com.google.gerrit.server.account.AccountCache; 27 : import com.google.gerrit.server.account.AccountState; 28 : import com.google.gerrit.server.account.Accounts; 29 : import com.google.gerrit.server.index.IndexExecutor; 30 : import com.google.gerrit.server.index.options.IsFirstInsertForEntry; 31 : import com.google.inject.Inject; 32 : import com.google.inject.Singleton; 33 : import java.io.IOException; 34 : import java.util.ArrayList; 35 : import java.util.List; 36 : import java.util.Optional; 37 : import java.util.concurrent.ExecutionException; 38 : import java.util.concurrent.atomic.AtomicBoolean; 39 : import java.util.concurrent.atomic.AtomicInteger; 40 : import org.eclipse.jgit.lib.ProgressMonitor; 41 : import org.eclipse.jgit.lib.TextProgressMonitor; 42 : 43 : /** 44 : * Implementation that can index all accounts on a host. Used by Gerrit's initialization and upgrade 45 : * programs as well as by REST API endpoints that offer this functionality. 46 : */ 47 : @Singleton 48 : public class AllAccountsIndexer extends SiteIndexer<Account.Id, AccountState, AccountIndex> { 49 138 : private static final FluentLogger logger = FluentLogger.forEnclosingClass(); 50 : 51 : private final ListeningExecutorService executor; 52 : private final Accounts accounts; 53 : private final AccountCache accountCache; 54 : private final IsFirstInsertForEntry isFirstInsertForEntry; 55 : 56 : @Inject 57 : AllAccountsIndexer( 58 : @IndexExecutor(BATCH) ListeningExecutorService executor, 59 : Accounts accounts, 60 : AccountCache accountCache, 61 138 : IsFirstInsertForEntry isFirstInsertForEntry) { 62 138 : this.executor = executor; 63 138 : this.accounts = accounts; 64 138 : this.accountCache = accountCache; 65 138 : this.isFirstInsertForEntry = isFirstInsertForEntry; 66 138 : } 67 : 68 : @Override 69 : public SiteIndexer.Result indexAll(AccountIndex index) { 70 16 : ProgressMonitor progress = new TextProgressMonitor(newPrintWriter(progressOut)); 71 16 : progress.start(2); 72 16 : Stopwatch sw = Stopwatch.createStarted(); 73 : List<Account.Id> ids; 74 : try { 75 16 : ids = collectAccounts(progress); 76 0 : } catch (IOException e) { 77 0 : logger.atSevere().withCause(e).log("Error collecting accounts"); 78 0 : return SiteIndexer.Result.create(sw, false, 0, 0); 79 16 : } 80 16 : return reindexAccounts(index, ids, progress); 81 : } 82 : 83 : private SiteIndexer.Result reindexAccounts( 84 : AccountIndex index, List<Account.Id> ids, ProgressMonitor progress) { 85 16 : progress.beginTask("Reindexing accounts", ids.size()); 86 16 : List<ListenableFuture<?>> futures = new ArrayList<>(ids.size()); 87 16 : AtomicBoolean ok = new AtomicBoolean(true); 88 16 : AtomicInteger done = new AtomicInteger(); 89 16 : AtomicInteger failed = new AtomicInteger(); 90 16 : Stopwatch sw = Stopwatch.createStarted(); 91 16 : for (Account.Id id : ids) { 92 2 : String desc = "account " + id; 93 2 : ListenableFuture<?> future = 94 2 : executor.submit( 95 : () -> { 96 : try { 97 2 : Optional<AccountState> a = accountCache.get(id); 98 2 : if (a.isPresent()) { 99 2 : if (isFirstInsertForEntry.equals(IsFirstInsertForEntry.YES)) { 100 1 : index.insert(a.get()); 101 : } else { 102 1 : index.replace(a.get()); 103 : } 104 : } else { 105 0 : index.delete(id); 106 : } 107 2 : verboseWriter.println("Reindexed " + desc); 108 2 : done.incrementAndGet(); 109 0 : } catch (Exception e) { 110 0 : failed.incrementAndGet(); 111 0 : throw e; 112 2 : } 113 2 : return null; 114 : }); 115 2 : addErrorListener(future, desc, progress, ok); 116 2 : futures.add(future); 117 2 : } 118 : 119 : try { 120 16 : Futures.successfulAsList(futures).get(); 121 0 : } catch (ExecutionException | InterruptedException e) { 122 0 : logger.atSevere().withCause(e).log("Error waiting on account futures"); 123 0 : return SiteIndexer.Result.create(sw, false, 0, 0); 124 16 : } 125 : 126 16 : progress.endTask(); 127 16 : return SiteIndexer.Result.create(sw, ok.get(), done.get(), failed.get()); 128 : } 129 : 130 : private List<Account.Id> collectAccounts(ProgressMonitor progress) throws IOException { 131 16 : progress.beginTask("Collecting accounts", ProgressMonitor.UNKNOWN); 132 16 : List<Account.Id> ids = new ArrayList<>(); 133 16 : for (Account.Id accountId : accounts.allIds()) { 134 2 : ids.add(accountId); 135 2 : progress.update(1); 136 2 : } 137 16 : progress.endTask(); 138 16 : return ids; 139 : } 140 : }