Line data Source code
1 : // Copyright (C) 2010 The Android Open Source Project 2 : // 3 : // Licensed under the Apache License, Version 2.0 (the "License"); 4 : // you may not use this file except in compliance with the License. 5 : // You may obtain a copy of the License at 6 : // 7 : // http://www.apache.org/licenses/LICENSE-2.0 8 : // 9 : // Unless required by applicable law or agreed to in writing, software 10 : // distributed under the License is distributed on an "AS IS" BASIS, 11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 : // See the License for the specific language governing permissions and 13 : // limitations under the License. 14 : 15 : package com.google.gerrit.pgm.http.jetty; 16 : 17 : import static com.google.gerrit.server.config.ConfigUtil.getTimeUnit; 18 : import static java.util.concurrent.TimeUnit.MINUTES; 19 : import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE; 20 : 21 : import com.google.common.annotations.VisibleForTesting; 22 : import com.google.gerrit.server.CurrentUser; 23 : import com.google.gerrit.server.account.AccountLimits; 24 : import com.google.gerrit.server.config.GerritServerConfig; 25 : import com.google.gerrit.server.git.QueueProvider; 26 : import com.google.gerrit.server.git.WorkQueue.CancelableRunnable; 27 : import com.google.gerrit.sshd.CommandExecutorQueueProvider; 28 : import com.google.inject.Inject; 29 : import com.google.inject.Provider; 30 : import com.google.inject.Singleton; 31 : import com.google.inject.servlet.ServletModule; 32 : import java.io.IOException; 33 : import java.util.Optional; 34 : import java.util.concurrent.Future; 35 : import java.util.concurrent.ScheduledThreadPoolExecutor; 36 : import java.util.regex.Matcher; 37 : import java.util.regex.Pattern; 38 : import javax.servlet.AsyncContext; 39 : import javax.servlet.AsyncEvent; 40 : import javax.servlet.AsyncListener; 41 : import javax.servlet.DispatcherType; 42 : import javax.servlet.Filter; 43 : import javax.servlet.FilterChain; 44 : import javax.servlet.FilterConfig; 45 : import javax.servlet.ServletContext; 46 : import javax.servlet.ServletException; 47 : import javax.servlet.ServletRequest; 48 : import javax.servlet.ServletResponse; 49 : import javax.servlet.http.HttpServletRequest; 50 : import javax.servlet.http.HttpServletResponse; 51 : import org.eclipse.jgit.lib.Config; 52 : 53 : /** 54 : * Use AsyncContexts to defer execution until threads are available. 55 : * 56 : * <p>We actually schedule a task into the same execution queue as the SSH daemon uses for command 57 : * execution, and then park the web request in an AsyncContext until an execution thread is 58 : * available. This ensures that the overall JVM process doesn't exceed the configured limit on 59 : * concurrent Git requests. 60 : * 61 : * <p>During Git request execution however we have to use the Jetty service thread, not the thread 62 : * from the SSH execution queue. Trying to complete the request on the SSH execution queue caused 63 : * Jetty's HTTP parser to crash, so we instead block the SSH execution queue thread and ask Jetty to 64 : * resume processing on the web service thread. 65 : */ 66 : @Singleton 67 : public class ProjectQoSFilter implements Filter { 68 12 : private static final String ATT_SPACE = ProjectQoSFilter.class.getName() + "/"; 69 12 : private static final String TASK = ATT_SPACE + "TASK"; 70 : 71 : private static final String FILTER_RE = "^/(.*)/(git-upload-pack|git-receive-pack)$"; 72 12 : private static final Pattern URI_PATTERN = Pattern.compile(FILTER_RE); 73 : 74 11 : public static class ProjectQoSFilterModule extends ServletModule { 75 : @Override 76 : protected void configureServlets() { 77 11 : bind(QueueProvider.class).to(CommandExecutorQueueProvider.class); 78 11 : filterRegex(FILTER_RE).through(ProjectQoSFilter.class); 79 11 : } 80 : } 81 : 82 3 : public enum RequestState { 83 3 : INITIAL, 84 3 : SUSPENDED, 85 3 : RESUMED, 86 3 : CANCELED, 87 3 : UNEXPECTED; 88 : 89 3 : private static final String CANCELED_ATT = ATT_SPACE + CANCELED; 90 3 : private static final String SUSPENDED_ATT = ATT_SPACE + SUSPENDED; 91 3 : private static final String RESUMED_ATT = ATT_SPACE + RESUMED; 92 : 93 : private void set(ServletRequest req) { 94 2 : switch (this) { 95 : case SUSPENDED: 96 2 : req.setAttribute(SUSPENDED_ATT, true); 97 2 : req.setAttribute(RESUMED_ATT, false); 98 2 : break; 99 : case CANCELED: 100 0 : req.setAttribute(CANCELED_ATT, true); 101 0 : break; 102 : case RESUMED: 103 2 : req.setAttribute(RESUMED_ATT, true); 104 2 : break; 105 : case INITIAL: 106 : case UNEXPECTED: 107 : default: 108 : break; 109 : } 110 2 : } 111 : 112 : private static RequestState get(ServletRequest req) { 113 3 : if (Boolean.FALSE.equals(req.getAttribute(RESUMED_ATT)) 114 2 : && Boolean.TRUE.equals(req.getAttribute(SUSPENDED_ATT))) { 115 2 : return SUSPENDED; 116 : } 117 : 118 3 : if (req.getDispatcherType() != DispatcherType.ASYNC) { 119 3 : return INITIAL; 120 : } 121 : 122 2 : if (Boolean.TRUE.equals(req.getAttribute(RESUMED_ATT)) 123 2 : && Boolean.TRUE.equals(req.getAttribute(CANCELED_ATT))) { 124 0 : return CANCELED; 125 : } 126 : 127 2 : if (Boolean.TRUE.equals(req.getAttribute(RESUMED_ATT))) { 128 2 : return RESUMED; 129 : } 130 : 131 0 : return UNEXPECTED; 132 : } 133 : } 134 : 135 : private final AccountLimits.Factory limitsFactory; 136 : private final Provider<CurrentUser> user; 137 : private final QueueProvider queue; 138 : private final ServletContext context; 139 : private final long maxWait; 140 : 141 : @Inject 142 : ProjectQoSFilter( 143 : AccountLimits.Factory limitsFactory, 144 : Provider<CurrentUser> user, 145 : QueueProvider queue, 146 : ServletContext context, 147 12 : @GerritServerConfig Config cfg) { 148 12 : this.limitsFactory = limitsFactory; 149 12 : this.user = user; 150 12 : this.queue = queue; 151 12 : this.context = context; 152 12 : this.maxWait = MINUTES.toMillis(getTimeUnit(cfg, "httpd", null, "maxwait", 5, MINUTES)); 153 12 : } 154 : 155 : @Override 156 : public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 157 : throws IOException, ServletException { 158 2 : final HttpServletRequest req = (HttpServletRequest) request; 159 2 : final HttpServletResponse rsp = (HttpServletResponse) response; 160 : 161 : final TaskThunk task; 162 : 163 2 : switch (RequestState.get(request)) { 164 : case INITIAL: 165 2 : AsyncContext asyncContext = suspend(request); 166 2 : task = new TaskThunk(asyncContext, req); 167 2 : if (maxWait > 0) { 168 2 : asyncContext.setTimeout(maxWait); 169 : } 170 : 171 2 : request.setAttribute(TASK, task); 172 : 173 2 : Future<?> f = getExecutor().submit(task); 174 2 : asyncContext.addListener(new Listener(f, task)); 175 2 : break; 176 : case CANCELED: 177 0 : rsp.sendError(SC_SERVICE_UNAVAILABLE); 178 0 : break; 179 : case RESUMED: 180 2 : task = (TaskThunk) request.getAttribute(TASK); 181 : try { 182 2 : task.begin(Thread.currentThread()); 183 2 : chain.doFilter(req, rsp); 184 : } finally { 185 2 : Thread.interrupted(); 186 : } 187 2 : break; 188 : case SUSPENDED: 189 : case UNEXPECTED: 190 : default: 191 0 : context.log("Unexpected QoS state, aborting request"); 192 0 : rsp.sendError(SC_SERVICE_UNAVAILABLE); 193 : break; 194 : } 195 2 : } 196 : 197 : private AsyncContext suspend(ServletRequest request) { 198 2 : AsyncContext asyncContext = request.startAsync(); 199 2 : RequestState.SUSPENDED.set(request); 200 2 : return asyncContext; 201 : } 202 : 203 : private ScheduledThreadPoolExecutor getExecutor() { 204 2 : QueueProvider.QueueType qt = limitsFactory.create(user.get()).getQueueType(); 205 2 : return queue.getQueue(qt); 206 : } 207 : 208 : @Override 209 11 : public void init(FilterConfig config) {} 210 : 211 : @Override 212 11 : public void destroy() {} 213 : 214 : @VisibleForTesting 215 : protected static final class Listener implements AsyncListener { 216 : final Future<?> future; 217 : final TaskThunk task; 218 : 219 3 : Listener(Future<?> future, TaskThunk task) { 220 3 : this.future = future; 221 3 : this.task = task; 222 3 : } 223 : 224 : @Override 225 : public void onComplete(AsyncEvent event) throws IOException { 226 3 : task.end(); 227 3 : } 228 : 229 : @Override 230 : public void onTimeout(AsyncEvent event) throws IOException { 231 1 : task.end(); 232 1 : future.cancel(true); 233 1 : } 234 : 235 : @Override 236 : public void onError(AsyncEvent event) throws IOException { 237 1 : task.end(); 238 1 : } 239 : 240 : @Override 241 0 : public void onStartAsync(AsyncEvent event) throws IOException {} 242 : } 243 : 244 : @VisibleForTesting 245 : protected class TaskThunk implements CancelableRunnable { 246 : private final AsyncContext asyncContext; 247 : private final String name; 248 3 : private final Object lock = new Object(); 249 : private boolean done; 250 : private Thread worker; 251 : 252 3 : TaskThunk(AsyncContext asyncContext, HttpServletRequest req) { 253 3 : this.asyncContext = asyncContext; 254 3 : this.name = generateName(req); 255 3 : } 256 : 257 : @Override 258 : public void run() { 259 3 : resume(); 260 : 261 3 : synchronized (lock) { 262 3 : while (!done) { 263 : try { 264 2 : lock.wait(); 265 0 : } catch (InterruptedException e) { 266 0 : if (worker != null) { 267 0 : worker.interrupt(); 268 : } else { 269 0 : break; 270 : } 271 2 : } 272 : } 273 3 : } 274 3 : } 275 : 276 : void begin(Thread thread) { 277 3 : synchronized (lock) { 278 3 : worker = thread; 279 3 : } 280 3 : } 281 : 282 : void end() { 283 3 : synchronized (lock) { 284 3 : worker = null; 285 3 : done = true; 286 3 : lock.notifyAll(); 287 3 : } 288 3 : } 289 : 290 : @Override 291 : public void cancel() { 292 0 : RequestState.CANCELED.set(asyncContext.getRequest()); 293 0 : resume(); 294 0 : } 295 : 296 : private void resume() { 297 3 : ServletRequest req = asyncContext.getRequest(); 298 3 : if (RequestState.SUSPENDED.equals(RequestState.get(req))) { 299 2 : RequestState.RESUMED.set(req); 300 2 : asyncContext.dispatch(); 301 : } 302 3 : } 303 : 304 : public boolean isDone() { 305 1 : return done; 306 : } 307 : 308 : @Override 309 : public String toString() { 310 1 : return name; 311 : } 312 : 313 : private String generateName(HttpServletRequest req) { 314 3 : String userName = ""; 315 : 316 3 : CurrentUser who = user.get(); 317 3 : if (who.isIdentifiedUser()) { 318 2 : Optional<String> name = who.asIdentifiedUser().getUserName(); 319 2 : if (name.isPresent()) { 320 2 : userName = " (" + name.get() + ")"; 321 : } 322 : } 323 : 324 3 : String uri = req.getServletPath(); 325 3 : Matcher m = URI_PATTERN.matcher(uri); 326 3 : if (m.matches()) { 327 2 : String path = m.group(1); 328 2 : String cmd = m.group(2); 329 2 : return cmd + " " + path + userName; 330 : } 331 : 332 1 : return req.getMethod() + " " + uri + userName; 333 : } 334 : } 335 : }