LCOV - code coverage report
Current view: top level - pgm/http/jetty - ProjectQoSFilter.java (source / functions) Hit Total Coverage
Test: _coverage_report.dat Lines: 116 132 87.9 %
Date: 2022-11-19 15:00:39 Functions: 24 26 92.3 %

          Line data    Source code
       1             : // Copyright (C) 2010 The Android Open Source Project
       2             : //
       3             : // Licensed under the Apache License, Version 2.0 (the "License");
       4             : // you may not use this file except in compliance with the License.
       5             : // You may obtain a copy of the License at
       6             : //
       7             : // http://www.apache.org/licenses/LICENSE-2.0
       8             : //
       9             : // Unless required by applicable law or agreed to in writing, software
      10             : // distributed under the License is distributed on an "AS IS" BASIS,
      11             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             : // See the License for the specific language governing permissions and
      13             : // limitations under the License.
      14             : 
      15             : package com.google.gerrit.pgm.http.jetty;
      16             : 
      17             : import static com.google.gerrit.server.config.ConfigUtil.getTimeUnit;
      18             : import static java.util.concurrent.TimeUnit.MINUTES;
      19             : import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
      20             : 
      21             : import com.google.common.annotations.VisibleForTesting;
      22             : import com.google.gerrit.server.CurrentUser;
      23             : import com.google.gerrit.server.account.AccountLimits;
      24             : import com.google.gerrit.server.config.GerritServerConfig;
      25             : import com.google.gerrit.server.git.QueueProvider;
      26             : import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
      27             : import com.google.gerrit.sshd.CommandExecutorQueueProvider;
      28             : import com.google.inject.Inject;
      29             : import com.google.inject.Provider;
      30             : import com.google.inject.Singleton;
      31             : import com.google.inject.servlet.ServletModule;
      32             : import java.io.IOException;
      33             : import java.util.Optional;
      34             : import java.util.concurrent.Future;
      35             : import java.util.concurrent.ScheduledThreadPoolExecutor;
      36             : import java.util.regex.Matcher;
      37             : import java.util.regex.Pattern;
      38             : import javax.servlet.AsyncContext;
      39             : import javax.servlet.AsyncEvent;
      40             : import javax.servlet.AsyncListener;
      41             : import javax.servlet.DispatcherType;
      42             : import javax.servlet.Filter;
      43             : import javax.servlet.FilterChain;
      44             : import javax.servlet.FilterConfig;
      45             : import javax.servlet.ServletContext;
      46             : import javax.servlet.ServletException;
      47             : import javax.servlet.ServletRequest;
      48             : import javax.servlet.ServletResponse;
      49             : import javax.servlet.http.HttpServletRequest;
      50             : import javax.servlet.http.HttpServletResponse;
      51             : import org.eclipse.jgit.lib.Config;
      52             : 
      53             : /**
      54             :  * Use AsyncContexts to defer execution until threads are available.
      55             :  *
      56             :  * <p>We actually schedule a task into the same execution queue as the SSH daemon uses for command
      57             :  * execution, and then park the web request in an AsyncContext until an execution thread is
      58             :  * available. This ensures that the overall JVM process doesn't exceed the configured limit on
      59             :  * concurrent Git requests.
      60             :  *
      61             :  * <p>During Git request execution however we have to use the Jetty service thread, not the thread
      62             :  * from the SSH execution queue. Trying to complete the request on the SSH execution queue caused
      63             :  * Jetty's HTTP parser to crash, so we instead block the SSH execution queue thread and ask Jetty to
      64             :  * resume processing on the web service thread.
      65             :  */
      66             : @Singleton
      67             : public class ProjectQoSFilter implements Filter {
      68          12 :   private static final String ATT_SPACE = ProjectQoSFilter.class.getName() + "/";
      69          12 :   private static final String TASK = ATT_SPACE + "TASK";
      70             : 
      71             :   private static final String FILTER_RE = "^/(.*)/(git-upload-pack|git-receive-pack)$";
      72          12 :   private static final Pattern URI_PATTERN = Pattern.compile(FILTER_RE);
      73             : 
      74          11 :   public static class ProjectQoSFilterModule extends ServletModule {
      75             :     @Override
      76             :     protected void configureServlets() {
      77          11 :       bind(QueueProvider.class).to(CommandExecutorQueueProvider.class);
      78          11 :       filterRegex(FILTER_RE).through(ProjectQoSFilter.class);
      79          11 :     }
      80             :   }
      81             : 
      82           3 :   public enum RequestState {
      83           3 :     INITIAL,
      84           3 :     SUSPENDED,
      85           3 :     RESUMED,
      86           3 :     CANCELED,
      87           3 :     UNEXPECTED;
      88             : 
      89           3 :     private static final String CANCELED_ATT = ATT_SPACE + CANCELED;
      90           3 :     private static final String SUSPENDED_ATT = ATT_SPACE + SUSPENDED;
      91           3 :     private static final String RESUMED_ATT = ATT_SPACE + RESUMED;
      92             : 
      93             :     private void set(ServletRequest req) {
      94           2 :       switch (this) {
      95             :         case SUSPENDED:
      96           2 :           req.setAttribute(SUSPENDED_ATT, true);
      97           2 :           req.setAttribute(RESUMED_ATT, false);
      98           2 :           break;
      99             :         case CANCELED:
     100           0 :           req.setAttribute(CANCELED_ATT, true);
     101           0 :           break;
     102             :         case RESUMED:
     103           2 :           req.setAttribute(RESUMED_ATT, true);
     104           2 :           break;
     105             :         case INITIAL:
     106             :         case UNEXPECTED:
     107             :         default:
     108             :           break;
     109             :       }
     110           2 :     }
     111             : 
     112             :     private static RequestState get(ServletRequest req) {
     113           3 :       if (Boolean.FALSE.equals(req.getAttribute(RESUMED_ATT))
     114           2 :           && Boolean.TRUE.equals(req.getAttribute(SUSPENDED_ATT))) {
     115           2 :         return SUSPENDED;
     116             :       }
     117             : 
     118           3 :       if (req.getDispatcherType() != DispatcherType.ASYNC) {
     119           3 :         return INITIAL;
     120             :       }
     121             : 
     122           2 :       if (Boolean.TRUE.equals(req.getAttribute(RESUMED_ATT))
     123           2 :           && Boolean.TRUE.equals(req.getAttribute(CANCELED_ATT))) {
     124           0 :         return CANCELED;
     125             :       }
     126             : 
     127           2 :       if (Boolean.TRUE.equals(req.getAttribute(RESUMED_ATT))) {
     128           2 :         return RESUMED;
     129             :       }
     130             : 
     131           0 :       return UNEXPECTED;
     132             :     }
     133             :   }
     134             : 
     135             :   private final AccountLimits.Factory limitsFactory;
     136             :   private final Provider<CurrentUser> user;
     137             :   private final QueueProvider queue;
     138             :   private final ServletContext context;
     139             :   private final long maxWait;
     140             : 
     141             :   @Inject
     142             :   ProjectQoSFilter(
     143             :       AccountLimits.Factory limitsFactory,
     144             :       Provider<CurrentUser> user,
     145             :       QueueProvider queue,
     146             :       ServletContext context,
     147          12 :       @GerritServerConfig Config cfg) {
     148          12 :     this.limitsFactory = limitsFactory;
     149          12 :     this.user = user;
     150          12 :     this.queue = queue;
     151          12 :     this.context = context;
     152          12 :     this.maxWait = MINUTES.toMillis(getTimeUnit(cfg, "httpd", null, "maxwait", 5, MINUTES));
     153          12 :   }
     154             : 
     155             :   @Override
     156             :   public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
     157             :       throws IOException, ServletException {
     158           2 :     final HttpServletRequest req = (HttpServletRequest) request;
     159           2 :     final HttpServletResponse rsp = (HttpServletResponse) response;
     160             : 
     161             :     final TaskThunk task;
     162             : 
     163           2 :     switch (RequestState.get(request)) {
     164             :       case INITIAL:
     165           2 :         AsyncContext asyncContext = suspend(request);
     166           2 :         task = new TaskThunk(asyncContext, req);
     167           2 :         if (maxWait > 0) {
     168           2 :           asyncContext.setTimeout(maxWait);
     169             :         }
     170             : 
     171           2 :         request.setAttribute(TASK, task);
     172             : 
     173           2 :         Future<?> f = getExecutor().submit(task);
     174           2 :         asyncContext.addListener(new Listener(f, task));
     175           2 :         break;
     176             :       case CANCELED:
     177           0 :         rsp.sendError(SC_SERVICE_UNAVAILABLE);
     178           0 :         break;
     179             :       case RESUMED:
     180           2 :         task = (TaskThunk) request.getAttribute(TASK);
     181             :         try {
     182           2 :           task.begin(Thread.currentThread());
     183           2 :           chain.doFilter(req, rsp);
     184             :         } finally {
     185           2 :           Thread.interrupted();
     186             :         }
     187           2 :         break;
     188             :       case SUSPENDED:
     189             :       case UNEXPECTED:
     190             :       default:
     191           0 :         context.log("Unexpected QoS state, aborting request");
     192           0 :         rsp.sendError(SC_SERVICE_UNAVAILABLE);
     193             :         break;
     194             :     }
     195           2 :   }
     196             : 
     197             :   private AsyncContext suspend(ServletRequest request) {
     198           2 :     AsyncContext asyncContext = request.startAsync();
     199           2 :     RequestState.SUSPENDED.set(request);
     200           2 :     return asyncContext;
     201             :   }
     202             : 
     203             :   private ScheduledThreadPoolExecutor getExecutor() {
     204           2 :     QueueProvider.QueueType qt = limitsFactory.create(user.get()).getQueueType();
     205           2 :     return queue.getQueue(qt);
     206             :   }
     207             : 
     208             :   @Override
     209          11 :   public void init(FilterConfig config) {}
     210             : 
     211             :   @Override
     212          11 :   public void destroy() {}
     213             : 
     214             :   @VisibleForTesting
     215             :   protected static final class Listener implements AsyncListener {
     216             :     final Future<?> future;
     217             :     final TaskThunk task;
     218             : 
     219           3 :     Listener(Future<?> future, TaskThunk task) {
     220           3 :       this.future = future;
     221           3 :       this.task = task;
     222           3 :     }
     223             : 
     224             :     @Override
     225             :     public void onComplete(AsyncEvent event) throws IOException {
     226           3 :       task.end();
     227           3 :     }
     228             : 
     229             :     @Override
     230             :     public void onTimeout(AsyncEvent event) throws IOException {
     231           1 :       task.end();
     232           1 :       future.cancel(true);
     233           1 :     }
     234             : 
     235             :     @Override
     236             :     public void onError(AsyncEvent event) throws IOException {
     237           1 :       task.end();
     238           1 :     }
     239             : 
     240             :     @Override
     241           0 :     public void onStartAsync(AsyncEvent event) throws IOException {}
     242             :   }
     243             : 
     244             :   @VisibleForTesting
     245             :   protected class TaskThunk implements CancelableRunnable {
     246             :     private final AsyncContext asyncContext;
     247             :     private final String name;
     248           3 :     private final Object lock = new Object();
     249             :     private boolean done;
     250             :     private Thread worker;
     251             : 
     252           3 :     TaskThunk(AsyncContext asyncContext, HttpServletRequest req) {
     253           3 :       this.asyncContext = asyncContext;
     254           3 :       this.name = generateName(req);
     255           3 :     }
     256             : 
     257             :     @Override
     258             :     public void run() {
     259           3 :       resume();
     260             : 
     261           3 :       synchronized (lock) {
     262           3 :         while (!done) {
     263             :           try {
     264           2 :             lock.wait();
     265           0 :           } catch (InterruptedException e) {
     266           0 :             if (worker != null) {
     267           0 :               worker.interrupt();
     268             :             } else {
     269           0 :               break;
     270             :             }
     271           2 :           }
     272             :         }
     273           3 :       }
     274           3 :     }
     275             : 
     276             :     void begin(Thread thread) {
     277           3 :       synchronized (lock) {
     278           3 :         worker = thread;
     279           3 :       }
     280           3 :     }
     281             : 
     282             :     void end() {
     283           3 :       synchronized (lock) {
     284           3 :         worker = null;
     285           3 :         done = true;
     286           3 :         lock.notifyAll();
     287           3 :       }
     288           3 :     }
     289             : 
     290             :     @Override
     291             :     public void cancel() {
     292           0 :       RequestState.CANCELED.set(asyncContext.getRequest());
     293           0 :       resume();
     294           0 :     }
     295             : 
     296             :     private void resume() {
     297           3 :       ServletRequest req = asyncContext.getRequest();
     298           3 :       if (RequestState.SUSPENDED.equals(RequestState.get(req))) {
     299           2 :         RequestState.RESUMED.set(req);
     300           2 :         asyncContext.dispatch();
     301             :       }
     302           3 :     }
     303             : 
     304             :     public boolean isDone() {
     305           1 :       return done;
     306             :     }
     307             : 
     308             :     @Override
     309             :     public String toString() {
     310           1 :       return name;
     311             :     }
     312             : 
     313             :     private String generateName(HttpServletRequest req) {
     314           3 :       String userName = "";
     315             : 
     316           3 :       CurrentUser who = user.get();
     317           3 :       if (who.isIdentifiedUser()) {
     318           2 :         Optional<String> name = who.asIdentifiedUser().getUserName();
     319           2 :         if (name.isPresent()) {
     320           2 :           userName = " (" + name.get() + ")";
     321             :         }
     322             :       }
     323             : 
     324           3 :       String uri = req.getServletPath();
     325           3 :       Matcher m = URI_PATTERN.matcher(uri);
     326           3 :       if (m.matches()) {
     327           2 :         String path = m.group(1);
     328           2 :         String cmd = m.group(2);
     329           2 :         return cmd + " " + path + userName;
     330             :       }
     331             : 
     332           1 :       return req.getMethod() + " " + uri + userName;
     333             :     }
     334             :   }
     335             : }

Generated by: LCOV version 1.16+git.20220603.dfeb750