Line data Source code
1 : // Copyright (C) 2013 The Android Open Source Project 2 : // 3 : // Licensed under the Apache License, Version 2.0 (the "License"); 4 : // you may not use this file except in compliance with the License. 5 : // You may obtain a copy of the License at 6 : // 7 : // http://www.apache.org/licenses/LICENSE-2.0 8 : // 9 : // Unless required by applicable law or agreed to in writing, software 10 : // distributed under the License is distributed on an "AS IS" BASIS, 11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 : // See the License for the specific language governing permissions and 13 : // limitations under the License. 14 : 15 : package com.google.gerrit.server.index; 16 : 17 : import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH; 18 : import static com.google.gerrit.server.git.QueueProvider.QueueType.INTERACTIVE; 19 : 20 : import com.google.common.base.Ticker; 21 : import com.google.common.collect.FluentIterable; 22 : import com.google.common.collect.ImmutableCollection; 23 : import com.google.common.collect.ImmutableList; 24 : import com.google.common.util.concurrent.ListeningExecutorService; 25 : import com.google.common.util.concurrent.MoreExecutors; 26 : import com.google.gerrit.extensions.events.LifecycleListener; 27 : import com.google.gerrit.extensions.registration.DynamicSet; 28 : import com.google.gerrit.index.IndexDefinition; 29 : import com.google.gerrit.index.IndexType; 30 : import com.google.gerrit.index.SchemaDefinitions; 31 : import com.google.gerrit.index.project.ProjectIndexCollection; 32 : import com.google.gerrit.index.project.ProjectIndexRewriter; 33 : import com.google.gerrit.index.project.ProjectIndexer; 34 : import com.google.gerrit.index.project.ProjectSchemaDefinitions; 35 : import com.google.gerrit.lifecycle.LifecycleModule; 36 : import com.google.gerrit.server.config.GerritServerConfig; 37 : import com.google.gerrit.server.git.MultiProgressMonitor; 38 : import com.google.gerrit.server.git.WorkQueue; 39 : import com.google.gerrit.server.index.account.AccountIndexCollection; 40 : import com.google.gerrit.server.index.account.AccountIndexDefinition; 41 : import com.google.gerrit.server.index.account.AccountIndexRewriter; 42 : import com.google.gerrit.server.index.account.AccountIndexer; 43 : import com.google.gerrit.server.index.account.AccountIndexerImpl; 44 : import com.google.gerrit.server.index.account.AccountSchemaDefinitions; 45 : import com.google.gerrit.server.index.change.ChangeIndexCollection; 46 : import com.google.gerrit.server.index.change.ChangeIndexDefinition; 47 : import com.google.gerrit.server.index.change.ChangeIndexRewriter; 48 : import com.google.gerrit.server.index.change.ChangeIndexer; 49 : import com.google.gerrit.server.index.change.ChangeSchemaDefinitions; 50 : import com.google.gerrit.server.index.group.GroupIndexCollection; 51 : import com.google.gerrit.server.index.group.GroupIndexDefinition; 52 : import com.google.gerrit.server.index.group.GroupIndexRewriter; 53 : import com.google.gerrit.server.index.group.GroupIndexer; 54 : import com.google.gerrit.server.index.group.GroupIndexerImpl; 55 : import com.google.gerrit.server.index.group.GroupSchemaDefinitions; 56 : import com.google.gerrit.server.index.options.IsFirstInsertForEntry; 57 : import com.google.gerrit.server.index.project.ProjectIndexDefinition; 58 : import com.google.gerrit.server.index.project.ProjectIndexerImpl; 59 : import com.google.inject.Inject; 60 : import com.google.inject.Injector; 61 : import com.google.inject.Key; 62 : import com.google.inject.Provides; 63 : import com.google.inject.ProvisionException; 64 : import com.google.inject.Singleton; 65 : import com.google.inject.multibindings.OptionalBinder; 66 : import java.util.Collection; 67 : import java.util.Set; 68 : import java.util.concurrent.TimeUnit; 69 : import org.eclipse.jgit.lib.Config; 70 : 71 : /** 72 : * Module for non-indexer-specific secondary index setup. 73 : * 74 : * <p>This module should not be used directly except by specific secondary indexer implementations 75 : * (e.g. Lucene). 76 : */ 77 : public class IndexModule extends LifecycleModule { 78 152 : public static final ImmutableCollection<SchemaDefinitions<?>> ALL_SCHEMA_DEFS = 79 152 : ImmutableList.of( 80 : AccountSchemaDefinitions.INSTANCE, 81 : ChangeSchemaDefinitions.INSTANCE, 82 : GroupSchemaDefinitions.INSTANCE, 83 : ProjectSchemaDefinitions.INSTANCE); 84 : 85 : /** Type of secondary index. */ 86 : public static IndexType getIndexType(Injector injector) { 87 138 : Config cfg = injector.getInstance(Key.get(Config.class, GerritServerConfig.class)); 88 138 : String configValue = cfg != null ? cfg.getString("index", null, "type") : null; 89 138 : return new IndexType(configValue); 90 : } 91 : 92 : private final int threads; 93 : private final ListeningExecutorService interactiveExecutor; 94 : private final ListeningExecutorService batchExecutor; 95 : private final boolean closeExecutorsOnShutdown; 96 : private final boolean slave; 97 : 98 152 : public IndexModule(int threads, boolean slave) { 99 152 : this.threads = threads; 100 152 : this.slave = slave; 101 152 : this.interactiveExecutor = null; 102 152 : this.batchExecutor = null; 103 152 : this.closeExecutorsOnShutdown = true; 104 152 : } 105 : 106 : public IndexModule( 107 0 : ListeningExecutorService interactiveExecutor, ListeningExecutorService batchExecutor) { 108 0 : this.threads = 0; 109 0 : this.interactiveExecutor = interactiveExecutor; 110 0 : this.batchExecutor = batchExecutor; 111 0 : this.closeExecutorsOnShutdown = false; 112 0 : slave = false; 113 0 : } 114 : 115 : @Override 116 : protected void configure() { 117 152 : factory(MultiProgressMonitor.Factory.class); 118 152 : OptionalBinder.newOptionalBinder(binder(), Ticker.class) 119 152 : .setDefault() 120 152 : .toInstance(Ticker.systemTicker()); 121 : 122 152 : bind(AccountIndexRewriter.class); 123 152 : bind(AccountIndexCollection.class); 124 152 : listener().to(AccountIndexCollection.class); 125 152 : factory(AccountIndexerImpl.Factory.class); 126 : 127 152 : bind(ChangeIndexRewriter.class); 128 152 : bind(ChangeIndexCollection.class); 129 152 : listener().to(ChangeIndexCollection.class); 130 152 : factory(ChangeIndexer.Factory.class); 131 : 132 152 : bind(GroupIndexRewriter.class); 133 : // GroupIndexCollection is already bound very high up in SchemaModule. 134 152 : listener().to(GroupIndexCollection.class); 135 152 : factory(GroupIndexerImpl.Factory.class); 136 : 137 152 : bind(ProjectIndexRewriter.class); 138 152 : bind(ProjectIndexCollection.class); 139 152 : listener().to(ProjectIndexCollection.class); 140 152 : factory(ProjectIndexerImpl.Factory.class); 141 : 142 152 : if (closeExecutorsOnShutdown) { 143 : // The executors must be shutdown _before_ closing the indexes. 144 : // On Gerrit start the LifecycleListeners are invoked in the order in which they are 145 : // registered, but on shutdown of Gerrit the order is reversed. This means the 146 : // LifecycleListener to shutdown the executors must be registered _after_ the 147 : // LifecycleListeners that close the indexes. The closing of the indexes is done by 148 : // *IndexCollection which have been registered as LifecycleListener above. The 149 : // registration of the ShutdownIndexExecutors LifecycleListener must happen afterwards. 150 152 : listener().to(ShutdownIndexExecutors.class); 151 : } 152 : 153 152 : DynamicSet.setOf(binder(), OnlineUpgradeListener.class); 154 152 : OptionalBinder.newOptionalBinder(binder(), IsFirstInsertForEntry.class) 155 152 : .setDefault() 156 152 : .toInstance(IsFirstInsertForEntry.NO); 157 152 : } 158 : 159 : @Provides 160 : Collection<IndexDefinition<?, ?, ?>> getIndexDefinitions( 161 : AccountIndexDefinition accounts, 162 : ChangeIndexDefinition changes, 163 : GroupIndexDefinition groups, 164 : ProjectIndexDefinition projects) { 165 151 : if (slave) { 166 : // In slave mode, we only have the group index. 167 1 : return ImmutableList.of(groups); 168 : } 169 : 170 151 : Collection<IndexDefinition<?, ?, ?>> result = 171 151 : ImmutableList.of(accounts, groups, changes, projects); 172 151 : Set<String> expected = 173 151 : FluentIterable.from(ALL_SCHEMA_DEFS).transform(SchemaDefinitions::getName).toSet(); 174 151 : Set<String> actual = FluentIterable.from(result).transform(IndexDefinition::getName).toSet(); 175 151 : if (!expected.equals(actual)) { 176 0 : throw new ProvisionException( 177 : "need index definitions for all schemas: " + expected + " != " + actual); 178 : } 179 151 : return result; 180 : } 181 : 182 : @Provides 183 : @Singleton 184 : AccountIndexer getAccountIndexer( 185 : AccountIndexerImpl.Factory factory, AccountIndexCollection indexes) { 186 151 : return factory.create(indexes); 187 : } 188 : 189 : @Provides 190 : @Singleton 191 : ChangeIndexer getChangeIndexer( 192 : @IndexExecutor(INTERACTIVE) ListeningExecutorService executor, 193 : ChangeIndexer.Factory factory, 194 : ChangeIndexCollection indexes) { 195 : // Bind default indexer to interactive executor; callers who need a 196 : // different executor can use the factory directly. 197 148 : return factory.create(executor, indexes); 198 : } 199 : 200 : @Provides 201 : @Singleton 202 : GroupIndexer getGroupIndexer(GroupIndexerImpl.Factory factory, GroupIndexCollection indexes) { 203 151 : return factory.create(indexes); 204 : } 205 : 206 : @Provides 207 : @Singleton 208 : ProjectIndexer getProjectIndexer( 209 : ProjectIndexerImpl.Factory factory, ProjectIndexCollection indexes) { 210 147 : return factory.create(indexes); 211 : } 212 : 213 : @Provides 214 : @Singleton 215 : @IndexExecutor(INTERACTIVE) 216 : ListeningExecutorService getInteractiveIndexExecutor( 217 : @GerritServerConfig Config config, WorkQueue workQueue) { 218 151 : if (interactiveExecutor != null) { 219 0 : return interactiveExecutor; 220 : } 221 151 : int threads = this.threads; 222 151 : if (threads == 0) { 223 17 : threads = 224 17 : config.getInt( 225 17 : "index", null, "threads", Runtime.getRuntime().availableProcessors() / 2 + 1); 226 : } 227 151 : if (threads < 0) { 228 138 : return MoreExecutors.newDirectExecutorService(); 229 : } 230 31 : return MoreExecutors.listeningDecorator( 231 31 : workQueue.createQueue(threads, "Index-Interactive", true)); 232 : } 233 : 234 : @Provides 235 : @Singleton 236 : @IndexExecutor(BATCH) 237 : ListeningExecutorService getBatchIndexExecutor( 238 : @GerritServerConfig Config config, WorkQueue workQueue) { 239 151 : if (batchExecutor != null) { 240 0 : return batchExecutor; 241 : } 242 151 : int threads = this.threads; 243 151 : if (threads == 0) { 244 17 : threads = 245 17 : config.getInt("index", null, "batchThreads", Runtime.getRuntime().availableProcessors()); 246 : } 247 151 : if (threads < 0) { 248 138 : return MoreExecutors.newDirectExecutorService(); 249 : } 250 30 : return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Batch", true)); 251 : } 252 : 253 : @Singleton 254 : private static class ShutdownIndexExecutors implements LifecycleListener { 255 : private final ListeningExecutorService interactiveExecutor; 256 : private final ListeningExecutorService batchExecutor; 257 : 258 : @Inject 259 : ShutdownIndexExecutors( 260 : @IndexExecutor(INTERACTIVE) ListeningExecutorService interactiveExecutor, 261 151 : @IndexExecutor(BATCH) ListeningExecutorService batchExecutor) { 262 151 : this.interactiveExecutor = interactiveExecutor; 263 151 : this.batchExecutor = batchExecutor; 264 151 : } 265 : 266 : @Override 267 151 : public void start() {} 268 : 269 : @Override 270 : public void stop() { 271 151 : MoreExecutors.shutdownAndAwaitTermination( 272 : interactiveExecutor, Long.MAX_VALUE, TimeUnit.SECONDS); 273 151 : MoreExecutors.shutdownAndAwaitTermination(batchExecutor, Long.MAX_VALUE, TimeUnit.SECONDS); 274 151 : } 275 : } 276 : }