Line data Source code
1 : // Copyright (C) 2012 The Android Open Source Project
2 : //
3 : // Licensed under the Apache License, Version 2.0 (the "License");
4 : // you may not use this file except in compliance with the License.
5 : // You may obtain a copy of the License at
6 : //
7 : // http://www.apache.org/licenses/LICENSE-2.0
8 : //
9 : // Unless required by applicable law or agreed to in writing, software
10 : // distributed under the License is distributed on an "AS IS" BASIS,
11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : // See the License for the specific language governing permissions and
13 : // limitations under the License.
14 :
15 : package com.google.gerrit.server.git.receive;
16 :
17 : import static com.google.common.base.Preconditions.checkState;
18 : import static com.google.gerrit.server.quota.QuotaGroupDefinitions.REPOSITORY_SIZE_GROUP;
19 : import static java.util.concurrent.TimeUnit.NANOSECONDS;
20 :
21 : import com.google.common.flogger.FluentLogger;
22 : import com.google.common.util.concurrent.UncheckedExecutionException;
23 : import com.google.gerrit.common.Nullable;
24 : import com.google.gerrit.common.UsedAt;
25 : import com.google.gerrit.common.data.Capable;
26 : import com.google.gerrit.entities.Change;
27 : import com.google.gerrit.entities.Project;
28 : import com.google.gerrit.extensions.restapi.AuthException;
29 : import com.google.gerrit.metrics.Counter0;
30 : import com.google.gerrit.metrics.Description;
31 : import com.google.gerrit.metrics.Description.Units;
32 : import com.google.gerrit.metrics.Field;
33 : import com.google.gerrit.metrics.Histogram1;
34 : import com.google.gerrit.metrics.MetricMaker;
35 : import com.google.gerrit.metrics.Timer1;
36 : import com.google.gerrit.server.IdentifiedUser;
37 : import com.google.gerrit.server.PublishCommentsOp;
38 : import com.google.gerrit.server.cache.PerThreadCache;
39 : import com.google.gerrit.server.config.AllUsersName;
40 : import com.google.gerrit.server.config.ConfigUtil;
41 : import com.google.gerrit.server.config.GerritServerConfig;
42 : import com.google.gerrit.server.config.ReceiveCommitsExecutor;
43 : import com.google.gerrit.server.git.MultiProgressMonitor;
44 : import com.google.gerrit.server.git.MultiProgressMonitor.TaskKind;
45 : import com.google.gerrit.server.git.PermissionAwareRepositoryManager;
46 : import com.google.gerrit.server.git.ProjectRunnable;
47 : import com.google.gerrit.server.git.TransferConfig;
48 : import com.google.gerrit.server.git.UsersSelfAdvertiseRefsHook;
49 : import com.google.gerrit.server.logging.Metadata;
50 : import com.google.gerrit.server.permissions.PermissionBackend;
51 : import com.google.gerrit.server.permissions.PermissionBackendException;
52 : import com.google.gerrit.server.permissions.ProjectPermission;
53 : import com.google.gerrit.server.project.ContributorAgreementsChecker;
54 : import com.google.gerrit.server.project.ProjectState;
55 : import com.google.gerrit.server.query.change.InternalChangeQuery;
56 : import com.google.gerrit.server.quota.QuotaBackend;
57 : import com.google.gerrit.server.quota.QuotaException;
58 : import com.google.gerrit.server.quota.QuotaResponse;
59 : import com.google.gerrit.server.util.MagicBranch;
60 : import com.google.gerrit.server.util.RequestScopePropagator;
61 : import com.google.inject.Inject;
62 : import com.google.inject.PrivateModule;
63 : import com.google.inject.Provider;
64 : import com.google.inject.Provides;
65 : import com.google.inject.Singleton;
66 : import com.google.inject.assistedinject.Assisted;
67 : import com.google.inject.assistedinject.FactoryModuleBuilder;
68 : import com.google.inject.name.Named;
69 : import java.io.IOException;
70 : import java.io.OutputStream;
71 : import java.util.Collection;
72 : import java.util.Set;
73 : import java.util.concurrent.Callable;
74 : import java.util.concurrent.ExecutionException;
75 : import java.util.concurrent.ExecutorService;
76 : import java.util.concurrent.FutureTask;
77 : import java.util.concurrent.TimeUnit;
78 : import java.util.concurrent.TimeoutException;
79 : import org.eclipse.jgit.lib.Config;
80 : import org.eclipse.jgit.lib.Repository;
81 : import org.eclipse.jgit.transport.PreReceiveHook;
82 : import org.eclipse.jgit.transport.ReceiveCommand;
83 : import org.eclipse.jgit.transport.ReceiveCommand.Result;
84 : import org.eclipse.jgit.transport.ReceivePack;
85 :
86 : /**
87 : * Hook that delegates to {@link ReceiveCommits} in a worker thread.
88 : *
89 : * <p>Since the work that {@link ReceiveCommits} does may take a long, potentially unbounded amount
90 : * of time, it runs in the background so it can be monitored for timeouts and cancelled, and have
91 : * stalls reported to the user from the main thread.
92 : */
93 : public class AsyncReceiveCommits {
94 97 : private static final FluentLogger logger = FluentLogger.forEnclosingClass();
95 :
96 : private static final String RECEIVE_OVERALL_TIMEOUT_NAME = "ReceiveCommitsOverallTimeout";
97 : private static final String RECEIVE_CANCELLATION_TIMEOUT_NAME =
98 : "ReceiveCommitsCancellationTimeout";
99 :
100 : public interface Factory {
101 : AsyncReceiveCommits create(
102 : ProjectState projectState,
103 : IdentifiedUser user,
104 : Repository repository,
105 : @Nullable MessageSender messageSender);
106 : }
107 :
108 138 : public static class AsyncReceiveCommitsModule extends PrivateModule {
109 : @Override
110 : public void configure() {
111 138 : install(new FactoryModuleBuilder().build(LazyPostReceiveHookChain.Factory.class));
112 138 : install(new FactoryModuleBuilder().build(AsyncReceiveCommits.Factory.class));
113 138 : expose(AsyncReceiveCommits.Factory.class);
114 : // Don't expose the binding for ReceiveCommits.Factory. All callers should
115 : // be using AsyncReceiveCommits.Factory instead.
116 138 : install(new FactoryModuleBuilder().build(ReceiveCommits.Factory.class));
117 138 : install(new FactoryModuleBuilder().build(PublishCommentsOp.Factory.class));
118 138 : install(new FactoryModuleBuilder().build(BranchCommitValidator.Factory.class));
119 138 : }
120 :
121 : @Provides
122 : @Singleton
123 : @Named(RECEIVE_OVERALL_TIMEOUT_NAME)
124 : long getReceiveTimeoutMillis(@GerritServerConfig Config cfg) {
125 138 : return ConfigUtil.getTimeUnit(
126 138 : cfg, "receive", null, "timeout", TimeUnit.MINUTES.toMillis(4), TimeUnit.MILLISECONDS);
127 : }
128 :
129 : @Provides
130 : @Singleton
131 : @Named(RECEIVE_CANCELLATION_TIMEOUT_NAME)
132 : long getCancellationTimeoutMillis(@GerritServerConfig Config cfg) {
133 138 : return ConfigUtil.getTimeUnit(
134 : cfg,
135 : "receive",
136 : null,
137 : "cancellationTimeout",
138 138 : TimeUnit.SECONDS.toMillis(5),
139 : TimeUnit.MILLISECONDS);
140 : }
141 : }
142 :
143 : private static MultiProgressMonitor newMultiProgressMonitor(
144 : MultiProgressMonitor.Factory multiProgressMonitorFactory, MessageSender messageSender) {
145 96 : return multiProgressMonitorFactory.create(
146 96 : new OutputStream() {
147 : @Override
148 : public void write(int b) {
149 0 : messageSender.sendBytes(new byte[] {(byte) b});
150 0 : }
151 :
152 : @Override
153 : public void write(byte[] what, int off, int len) {
154 0 : messageSender.sendBytes(what, off, len);
155 0 : }
156 :
157 : @Override
158 : public void write(byte[] what) {
159 96 : messageSender.sendBytes(what);
160 96 : }
161 :
162 : @Override
163 : public void flush() {
164 96 : messageSender.flush();
165 96 : }
166 : },
167 : TaskKind.RECEIVE_COMMITS,
168 : "Processing changes");
169 : }
170 :
171 96 : private enum PushType {
172 96 : CREATE_REPLACE,
173 96 : NORMAL,
174 96 : AUTOCLOSE,
175 : }
176 :
177 : @Singleton
178 : private static class Metrics {
179 : private final Histogram1<PushType> changes;
180 : private final Timer1<PushType> latencyPerChange;
181 : private final Timer1<PushType> latencyPerPush;
182 : private final Counter0 timeouts;
183 :
184 : @Inject
185 97 : Metrics(MetricMaker metricMaker) {
186 : // For the changes metric the push type field is never set to PushType.NORMAL, hence it is not
187 : // mentioned in the field description.
188 97 : changes =
189 97 : metricMaker.newHistogram(
190 : "receivecommits/changes_per_push",
191 97 : new Description("number of changes uploaded in a single push.").setCumulative(),
192 97 : Field.ofEnum(PushType.class, "type", Metadata.Builder::pushType)
193 97 : .description("type of push (create/replace, autoclose)")
194 97 : .build());
195 :
196 97 : Field<PushType> pushTypeField =
197 97 : Field.ofEnum(PushType.class, "type", Metadata.Builder::pushType)
198 97 : .description("type of push (create/replace, autoclose, normal)")
199 97 : .build();
200 :
201 97 : latencyPerChange =
202 97 : metricMaker.newTimer(
203 : "receivecommits/latency_per_push_per_change",
204 : new Description(
205 : "Processing delay per push divided by the number of changes in said push. "
206 : + "(Only includes pushes which contain changes.)")
207 97 : .setUnit(Units.MILLISECONDS)
208 97 : .setCumulative(),
209 : pushTypeField);
210 :
211 97 : latencyPerPush =
212 97 : metricMaker.newTimer(
213 : "receivecommits/latency_per_push",
214 : new Description("processing delay for a processing single push")
215 97 : .setUnit(Units.MILLISECONDS)
216 97 : .setCumulative(),
217 : pushTypeField);
218 :
219 97 : timeouts =
220 97 : metricMaker.newCounter(
221 97 : "receivecommits/timeout", new Description("rate of push timeouts").setRate());
222 97 : }
223 : }
224 :
225 : private final MultiProgressMonitor.Factory multiProgressMonitorFactory;
226 : private final Metrics metrics;
227 : private final ReceiveCommits receiveCommits;
228 : private final PermissionBackend.ForProject perm;
229 : private final ReceivePack receivePack;
230 : private final ExecutorService executor;
231 : private final RequestScopePropagator scopePropagator;
232 : private final ReceiveConfig receiveConfig;
233 : private final ContributorAgreementsChecker contributorAgreements;
234 : private final long receiveTimeoutMillis;
235 : private final long cancellationTimeoutMillis;
236 : private final ProjectState projectState;
237 : private final IdentifiedUser user;
238 : private final Repository repo;
239 : private final AllRefsWatcher allRefsWatcher;
240 :
241 : @Inject
242 : AsyncReceiveCommits(
243 : MultiProgressMonitor.Factory multiProgressMonitorFactory,
244 : ReceiveCommits.Factory factory,
245 : PermissionBackend permissionBackend,
246 : Provider<InternalChangeQuery> queryProvider,
247 : @ReceiveCommitsExecutor ExecutorService executor,
248 : RequestScopePropagator scopePropagator,
249 : ReceiveConfig receiveConfig,
250 : TransferConfig transferConfig,
251 : LazyPostReceiveHookChain.Factory lazyPostReceive,
252 : ContributorAgreementsChecker contributorAgreements,
253 : Metrics metrics,
254 : QuotaBackend quotaBackend,
255 : UsersSelfAdvertiseRefsHook usersSelfAdvertiseRefsHook,
256 : AllUsersName allUsersName,
257 : @Named(RECEIVE_OVERALL_TIMEOUT_NAME) long receiveTimeoutMillis,
258 : @Named(RECEIVE_CANCELLATION_TIMEOUT_NAME) long cancellationTimeoutMillis,
259 : @Assisted ProjectState projectState,
260 : @Assisted IdentifiedUser user,
261 : @Assisted Repository repo,
262 : @Assisted @Nullable MessageSender messageSender)
263 97 : throws PermissionBackendException {
264 97 : this.multiProgressMonitorFactory = multiProgressMonitorFactory;
265 97 : this.executor = executor;
266 97 : this.scopePropagator = scopePropagator;
267 97 : this.receiveConfig = receiveConfig;
268 97 : this.contributorAgreements = contributorAgreements;
269 97 : this.receiveTimeoutMillis = receiveTimeoutMillis;
270 97 : this.cancellationTimeoutMillis = cancellationTimeoutMillis;
271 97 : this.projectState = projectState;
272 97 : this.user = user;
273 97 : this.repo = repo;
274 97 : this.metrics = metrics;
275 : // If the user lacks READ permission, some references may be filtered and hidden from view.
276 : // Check objects mentioned inside the incoming pack file are reachable from visible refs.
277 97 : Project.NameKey projectName = projectState.getNameKey();
278 97 : this.perm = permissionBackend.user(user).project(projectName);
279 :
280 97 : receivePack = new ReceivePack(PermissionAwareRepositoryManager.wrap(repo, perm));
281 97 : receivePack.setAllowCreates(true);
282 97 : receivePack.setAllowDeletes(true);
283 97 : receivePack.setAllowNonFastForwards(true);
284 97 : receivePack.setRefLogIdent(user.newRefLogIdent());
285 97 : receivePack.setTimeout(transferConfig.getTimeout());
286 97 : receivePack.setMaxObjectSizeLimit(projectState.getEffectiveMaxObjectSizeLimit().value);
287 97 : receivePack.setCheckReceivedObjects(projectState.getConfig().getCheckReceivedObjects());
288 97 : receivePack.setRefFilter(new ReceiveRefFilter());
289 97 : receivePack.setAllowPushOptions(true);
290 97 : receivePack.setPreReceiveHook(asHook());
291 97 : receivePack.setPostReceiveHook(lazyPostReceive.create(user, projectName));
292 :
293 97 : if (!projectState.statePermitsRead() || !this.perm.test(ProjectPermission.READ)) {
294 27 : receivePack.setCheckReferencedObjectsAreReachable(
295 : receiveConfig.checkReferencedObjectsAreReachable);
296 : }
297 :
298 97 : allRefsWatcher = new AllRefsWatcher();
299 97 : receivePack.setAdvertiseRefsHook(
300 97 : ReceiveCommitsAdvertiseRefsHookChain.create(
301 : allRefsWatcher,
302 : usersSelfAdvertiseRefsHook,
303 : allUsersName,
304 : queryProvider,
305 : projectName,
306 97 : user.getAccountId()));
307 97 : receiveCommits =
308 97 : factory.create(projectState, user, receivePack, repo, allRefsWatcher, messageSender);
309 97 : receiveCommits.init();
310 97 : QuotaResponse.Aggregated availableTokens =
311 97 : quotaBackend.user(user).project(projectName).availableTokens(REPOSITORY_SIZE_GROUP);
312 : try {
313 97 : availableTokens.throwOnError();
314 1 : } catch (QuotaException e) {
315 1 : logger.atWarning().withCause(e).log(
316 : "Quota %s availableTokens request failed for project %s",
317 : REPOSITORY_SIZE_GROUP, projectName);
318 1 : throw new RuntimeException(e);
319 97 : }
320 97 : availableTokens.availableTokens().ifPresent(receivePack::setMaxPackSizeLimit);
321 97 : }
322 :
323 : /** Determine if the user can upload commits. */
324 : public Capable canUpload() throws IOException, PermissionBackendException {
325 97 : if (!perm.test(ProjectPermission.PUSH_AT_LEAST_ONE_REF)) {
326 0 : return new Capable("Upload denied for project '" + projectState.getName() + "'");
327 : }
328 :
329 : try {
330 97 : contributorAgreements.check(projectState.getNameKey(), user);
331 0 : } catch (AuthException e) {
332 0 : return new Capable(e.getMessage());
333 97 : }
334 :
335 97 : if (receiveConfig.checkMagicRefs) {
336 97 : return MagicBranch.checkMagicBranchRefs(repo, projectState.getProject());
337 : }
338 0 : return Capable.OK;
339 : }
340 :
341 : /**
342 : * Returns a {@link PreReceiveHook} implementation that can be used directly by JGit when
343 : * processing a push.
344 : */
345 : public PreReceiveHook asHook() {
346 97 : return (rp, commands) -> {
347 96 : checkState(receivePack == rp, "can't perform PreReceive for a different receive pack");
348 96 : long startNanos = System.nanoTime();
349 : ReceiveCommitsResult result;
350 : try {
351 96 : result = preReceive(commands);
352 0 : } catch (TimeoutException e) {
353 0 : receivePack.sendError("timeout while processing changes");
354 0 : rejectCommandsNotAttempted(commands);
355 0 : return;
356 3 : } catch (Exception e) {
357 3 : logger.atSevere().withCause(e.getCause()).log("error while processing push");
358 3 : receivePack.sendError("internal error");
359 3 : rejectCommandsNotAttempted(commands);
360 3 : return;
361 : } finally {
362 : // Flush the messages queued up until now (if any).
363 96 : receiveCommits.sendMessages();
364 : }
365 96 : reportMetrics(result, System.nanoTime() - startNanos);
366 96 : };
367 : }
368 :
369 : /** Processes {@code commands}, applies them to Git storage and communicates back on the wire. */
370 : @UsedAt(UsedAt.Project.GOOGLE)
371 : public ReceiveCommitsResult preReceive(Collection<ReceiveCommand> commands)
372 : throws TimeoutException, UncheckedExecutionException {
373 96 : if (commands.stream().anyMatch(c -> c.getResult() != Result.NOT_ATTEMPTED)) {
374 : // Stop processing when command was already processed by previously invoked
375 : // pre-receive hooks
376 3 : return ReceiveCommitsResult.empty();
377 : }
378 96 : String currentThreadName = Thread.currentThread().getName();
379 96 : MultiProgressMonitor monitor =
380 96 : newMultiProgressMonitor(multiProgressMonitorFactory, receiveCommits.getMessageSender());
381 96 : Callable<ReceiveCommitsResult> callable =
382 : () -> {
383 96 : String oldName = Thread.currentThread().getName();
384 96 : Thread.currentThread().setName(oldName + "-for-" + currentThreadName);
385 96 : try (PerThreadCache threadLocalCache = PerThreadCache.create()) {
386 96 : return receiveCommits.processCommands(commands, monitor);
387 : } finally {
388 96 : Thread.currentThread().setName(oldName);
389 : }
390 : };
391 :
392 : try {
393 : // WorkQueue does not support Callable<T>, so we have to covert it here.
394 96 : FutureTask<ReceiveCommitsResult> runnable =
395 96 : ProjectRunnable.fromCallable(
396 96 : callable, receiveCommits.getProject().getNameKey(), "receive-commits", null, false);
397 96 : monitor.waitFor(
398 96 : executor.submit(scopePropagator.wrap(runnable)),
399 : receiveTimeoutMillis,
400 : TimeUnit.MILLISECONDS,
401 : cancellationTimeoutMillis,
402 : TimeUnit.MILLISECONDS);
403 96 : if (!runnable.isDone()) {
404 : // At this point we are either done or have thrown a TimeoutException and bailed out.
405 0 : throw new IllegalStateException("unable to get receive commits result");
406 : }
407 96 : return runnable.get();
408 0 : } catch (TimeoutException e) {
409 0 : metrics.timeouts.increment();
410 0 : logger.atWarning().withCause(e).log(
411 : "Timeout in ReceiveCommits while processing changes for project %s",
412 0 : projectState.getName());
413 0 : throw e;
414 3 : } catch (InterruptedException | ExecutionException e) {
415 3 : throw new UncheckedExecutionException(e);
416 : }
417 : }
418 :
419 : @UsedAt(UsedAt.Project.GOOGLE)
420 : public void reportMetrics(ReceiveCommitsResult result, long deltaNanos) {
421 : PushType pushType;
422 96 : int totalChanges = 0;
423 96 : if (result.magicPush()) {
424 89 : pushType = PushType.CREATE_REPLACE;
425 89 : Set<Change.Id> created = result.changes().get(ReceiveCommitsResult.ChangeStatus.CREATED);
426 89 : Set<Change.Id> replaced = result.changes().get(ReceiveCommitsResult.ChangeStatus.REPLACED);
427 89 : metrics.changes.record(pushType, created.size() + replaced.size());
428 89 : totalChanges = replaced.size() + created.size();
429 89 : } else {
430 50 : Set<Change.Id> autoclosed =
431 50 : result.changes().get(ReceiveCommitsResult.ChangeStatus.AUTOCLOSED);
432 50 : if (!autoclosed.isEmpty()) {
433 6 : pushType = PushType.AUTOCLOSE;
434 6 : metrics.changes.record(pushType, autoclosed.size());
435 6 : totalChanges = autoclosed.size();
436 : } else {
437 50 : pushType = PushType.NORMAL;
438 : }
439 : }
440 96 : if (totalChanges > 0) {
441 88 : metrics.latencyPerChange.record(pushType, deltaNanos / totalChanges, NANOSECONDS);
442 : }
443 96 : metrics.latencyPerPush.record(pushType, deltaNanos, NANOSECONDS);
444 96 : }
445 :
446 : /**
447 : * Sends all messages which have been collected while processing the push to the client.
448 : *
449 : * @see ReceiveCommits#sendMessages()
450 : */
451 : @UsedAt(UsedAt.Project.GOOGLE)
452 : public void sendMessages() {
453 0 : receiveCommits.sendMessages();
454 0 : }
455 :
456 : public ReceivePack getReceivePack() {
457 97 : return receivePack;
458 : }
459 :
460 : /**
461 : * Marks all commands that were not processed yet as {@link Result#REJECTED_OTHER_REASON}.
462 : * Intended to be used to finish up remaining commands when errors occur during processing.
463 : */
464 : private static void rejectCommandsNotAttempted(Collection<ReceiveCommand> commands) {
465 3 : for (ReceiveCommand c : commands) {
466 3 : if (c.getResult() == Result.NOT_ATTEMPTED) {
467 3 : c.setResult(Result.REJECTED_OTHER_REASON, "internal error");
468 : }
469 3 : }
470 3 : }
471 : }
|