Line data Source code
1 : // Copyright (C) 2010 The Android Open Source Project 2 : // 3 : // Licensed under the Apache License, Version 2.0 (the "License"); 4 : // you may not use this file except in compliance with the License. 5 : // You may obtain a copy of the License at 6 : // 7 : // http://www.apache.org/licenses/LICENSE-2.0 8 : // 9 : // Unless required by applicable law or agreed to in writing, software 10 : // distributed under the License is distributed on an "AS IS" BASIS, 11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 : // See the License for the specific language governing permissions and 13 : // limitations under the License. 14 : 15 : package com.google.gerrit.sshd.commands; 16 : 17 : import static java.nio.charset.StandardCharsets.UTF_8; 18 : 19 : import com.google.common.flogger.FluentLogger; 20 : import com.google.gerrit.common.data.GlobalCapability; 21 : import com.google.gerrit.extensions.annotations.RequiresCapability; 22 : import com.google.gerrit.extensions.registration.DynamicSet; 23 : import com.google.gerrit.extensions.registration.RegistrationHandle; 24 : import com.google.gerrit.server.CurrentUser; 25 : import com.google.gerrit.server.DynamicOptions; 26 : import com.google.gerrit.server.IdentifiedUser; 27 : import com.google.gerrit.server.events.Event; 28 : import com.google.gerrit.server.events.EventGson; 29 : import com.google.gerrit.server.events.EventTypes; 30 : import com.google.gerrit.server.events.UserScopedEventListener; 31 : import com.google.gerrit.server.git.WorkQueue.CancelableRunnable; 32 : import com.google.gerrit.sshd.BaseCommand; 33 : import com.google.gerrit.sshd.CommandMetaData; 34 : import com.google.gerrit.sshd.StreamCommandExecutor; 35 : import com.google.gson.Gson; 36 : import com.google.inject.Inject; 37 : import java.io.IOException; 38 : import java.io.PrintWriter; 39 : import java.util.ArrayList; 40 : import java.util.List; 41 : import java.util.concurrent.Future; 42 : import java.util.concurrent.LinkedBlockingQueue; 43 : import java.util.concurrent.ScheduledThreadPoolExecutor; 44 : import org.apache.sshd.server.Environment; 45 : import org.apache.sshd.server.channel.ChannelSession; 46 : import org.kohsuke.args4j.Option; 47 : 48 : @RequiresCapability(GlobalCapability.STREAM_EVENTS) 49 : @CommandMetaData(name = "stream-events", description = "Monitor events occurring in real time") 50 1 : public final class StreamEvents extends BaseCommand { 51 1 : private static final FluentLogger logger = FluentLogger.forEnclosingClass(); 52 : 53 : /** Maximum number of events that may be queued up for each connection. */ 54 : private static final int MAX_EVENTS = 128; 55 : 56 : /** Number of events to write before yielding off the thread. */ 57 : private static final int BATCH_SIZE = 32; 58 : 59 1 : @Option( 60 : name = "--subscribe", 61 : aliases = {"-s"}, 62 : metaVar = "SUBSCRIBE", 63 : usage = "subscribe to specific stream-events") 64 : private List<String> subscribedToEvents = new ArrayList<>(); 65 : 66 : @Inject private IdentifiedUser currentUser; 67 : 68 : @Inject private DynamicSet<UserScopedEventListener> eventListeners; 69 : 70 : @Inject @StreamCommandExecutor private ScheduledThreadPoolExecutor pool; 71 : 72 : @Inject @EventGson private Gson gson; 73 : 74 : /** Queue of events to stream to the connected user. */ 75 1 : private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS); 76 : 77 : private RegistrationHandle eventListenerRegistration; 78 : 79 : /** Special event to notify clients they missed other events. */ 80 : private static final class DroppedOutputEvent extends Event { 81 : private static final String TYPE = "dropped-output"; 82 : 83 : DroppedOutputEvent() { 84 0 : super(TYPE); 85 0 : } 86 : } 87 : 88 : static { 89 1 : EventTypes.register(DroppedOutputEvent.TYPE, DroppedOutputEvent.class); 90 1 : } 91 : 92 : /** True if {@link DroppedOutputEvent} needs to be sent. */ 93 : private volatile boolean dropped; 94 : 95 : /** Lock to protect {@link #queue}, {@link #task}, {@link #done}. */ 96 1 : private final Object taskLock = new Object(); 97 : 98 : /** True if no more messages should be sent to the output. */ 99 : private boolean done; 100 : 101 : /** 102 : * Currently scheduled task to spin out {@link #queue}. 103 : * 104 : * <p>This field is usually {@code null}, unless there is at least one object present inside of 105 : * {@link #queue} ready for delivery. Tasks are only started when there are events to be sent. 106 : */ 107 : private Future<?> task; 108 : 109 : @Override 110 : public void start(ChannelSession channel, Environment env) throws IOException { 111 1 : try (DynamicOptions pluginOptions = new DynamicOptions(injector, dynamicBeans)) { 112 : try { 113 1 : parseCommandLine(pluginOptions); 114 1 : } catch (UnloggedFailure e) { 115 1 : String msg = e.getMessage(); 116 1 : if (!msg.endsWith("\n")) { 117 0 : msg += "\n"; 118 : } 119 1 : err.write(msg.getBytes(UTF_8)); 120 1 : err.flush(); 121 1 : onExit(1); 122 1 : return; 123 1 : } 124 : 125 1 : PrintWriter stdout = toPrintWriter(out); 126 1 : CancelableRunnable writer = 127 1 : new CancelableRunnable() { 128 : @Override 129 : public void run() { 130 1 : writeEvents(this, stdout); 131 1 : } 132 : 133 : @Override 134 : public void cancel() { 135 0 : onExit(0); 136 0 : } 137 : 138 : @Override 139 : public String toString() { 140 0 : StringBuilder b = new StringBuilder(); 141 0 : b.append("Stream Events"); 142 0 : if (currentUser.getUserName().isPresent()) { 143 0 : b.append(" (").append(currentUser.getUserName().get()).append(")"); 144 : } 145 0 : return b.toString(); 146 : } 147 : }; 148 : 149 1 : eventListenerRegistration = 150 1 : eventListeners.add( 151 : "gerrit", 152 1 : new UserScopedEventListener() { 153 : @Override 154 : public void onEvent(Event event) { 155 1 : if (subscribedToEvents.isEmpty() 156 0 : || subscribedToEvents.contains(event.getType())) { 157 1 : offer(writer, event); 158 : } 159 1 : } 160 : 161 : @Override 162 : public CurrentUser getUser() { 163 1 : return currentUser; 164 : } 165 : }); 166 1 : } 167 1 : } 168 : 169 : private void removeEventListenerRegistration() { 170 1 : if (eventListenerRegistration != null) { 171 1 : eventListenerRegistration.remove(); 172 : } 173 1 : } 174 : 175 : @Override 176 : protected void onExit(int rc) { 177 1 : removeEventListenerRegistration(); 178 : 179 1 : synchronized (taskLock) { 180 1 : done = true; 181 1 : } 182 : 183 1 : super.onExit(rc); 184 1 : } 185 : 186 : @Override 187 : public void destroy(ChannelSession channel) { 188 1 : removeEventListenerRegistration(); 189 : 190 : final boolean exit; 191 1 : synchronized (taskLock) { 192 1 : if (task != null) { 193 0 : task.cancel(true); 194 0 : exit = false; // onExit will be invoked by the task cancellation. 195 : } else { 196 1 : exit = !done; 197 : } 198 1 : done = true; 199 1 : } 200 1 : if (exit) { 201 1 : onExit(0); 202 : } 203 1 : } 204 : 205 : private void offer(CancelableRunnable writer, Event event) { 206 1 : synchronized (taskLock) { 207 1 : if (!queue.offer(event)) { 208 0 : dropped = true; 209 : } 210 : 211 1 : if (task == null && !done) { 212 1 : task = pool.submit(writer); 213 : } 214 1 : } 215 1 : } 216 : 217 : private Event poll() { 218 1 : synchronized (taskLock) { 219 1 : Event event = queue.poll(); 220 1 : if (event == null) { 221 1 : task = null; 222 : } 223 1 : return event; 224 : } 225 : } 226 : 227 : private void writeEvents(CancelableRunnable writer, PrintWriter stdout) { 228 1 : int processed = 0; 229 : 230 1 : while (processed < BATCH_SIZE) { 231 1 : if (Thread.interrupted() || stdout.checkError()) { 232 : // The other side either requested a shutdown by calling our 233 : // destroy() above, or it closed the stream and is no longer 234 : // accepting output. Either way terminate this instance. 235 : // 236 0 : removeEventListenerRegistration(); 237 0 : flush(stdout); 238 0 : onExit(0); 239 0 : return; 240 : } 241 : 242 1 : if (dropped) { 243 0 : write(stdout, new DroppedOutputEvent()); 244 0 : dropped = false; 245 : } 246 : 247 1 : final Event event = poll(); 248 1 : if (event == null) { 249 1 : break; 250 : } 251 : 252 1 : write(stdout, event); 253 1 : processed++; 254 1 : } 255 : 256 1 : flush(stdout); 257 : 258 1 : if (BATCH_SIZE <= processed) { 259 : // We processed the limit, but more might remain in the queue. 260 : // Schedule the write task again so we will come back here and 261 : // can process more events. 262 : // 263 0 : synchronized (taskLock) { 264 0 : task = pool.submit(writer); 265 0 : } 266 : } 267 1 : } 268 : 269 : private void write(PrintWriter stdout, Object message) { 270 1 : String msg = null; 271 : try { 272 1 : msg = gson.toJson(message) + "\n"; 273 0 : } catch (Exception e) { 274 0 : logger.atWarning().withCause(e).log("Could not deserialize the msg"); 275 1 : } 276 1 : if (msg != null) { 277 1 : synchronized (stdout) { 278 1 : stdout.print(msg); 279 1 : } 280 : } 281 1 : } 282 : 283 : private void flush(PrintWriter stdout) { 284 1 : synchronized (stdout) { 285 1 : stdout.flush(); 286 1 : } 287 1 : } 288 : }