Line data Source code
1 : // Copyright (C) 2016 The Android Open Source Project
2 : //
3 : // Licensed under the Apache License, Version 2.0 (the "License");
4 : // you may not use this file except in compliance with the License.
5 : // You may obtain a copy of the License at
6 : //
7 : // http://www.apache.org/licenses/LICENSE-2.0
8 : //
9 : // Unless required by applicable law or agreed to in writing, software
10 : // distributed under the License is distributed on an "AS IS" BASIS,
11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : // See the License for the specific language governing permissions and
13 : // limitations under the License.
14 :
15 : package com.google.gerrit.server.notedb;
16 :
17 : import static com.google.common.base.Preconditions.checkArgument;
18 : import static com.google.gerrit.entities.RefNames.REFS;
19 : import static com.google.gerrit.entities.RefNames.REFS_SEQUENCES;
20 : import static java.nio.charset.StandardCharsets.UTF_8;
21 : import static java.util.Objects.requireNonNull;
22 : import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
23 :
24 : import com.github.rholder.retry.RetryException;
25 : import com.github.rholder.retry.Retryer;
26 : import com.github.rholder.retry.RetryerBuilder;
27 : import com.github.rholder.retry.StopStrategies;
28 : import com.github.rholder.retry.WaitStrategies;
29 : import com.google.common.annotations.VisibleForTesting;
30 : import com.google.common.base.Throwables;
31 : import com.google.common.collect.ImmutableList;
32 : import com.google.common.collect.Iterables;
33 : import com.google.common.flogger.FluentLogger;
34 : import com.google.common.util.concurrent.Runnables;
35 : import com.google.gerrit.entities.Project;
36 : import com.google.gerrit.entities.RefNames;
37 : import com.google.gerrit.exceptions.StorageException;
38 : import com.google.gerrit.git.LockFailureException;
39 : import com.google.gerrit.git.RefUpdateUtil;
40 : import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
41 : import com.google.gerrit.server.git.GitRepositoryManager;
42 : import java.io.IOException;
43 : import java.util.ArrayList;
44 : import java.util.List;
45 : import java.util.Optional;
46 : import java.util.concurrent.ExecutionException;
47 : import java.util.concurrent.TimeUnit;
48 : import java.util.concurrent.locks.Lock;
49 : import java.util.concurrent.locks.ReentrantLock;
50 : import org.eclipse.jgit.lib.ObjectId;
51 : import org.eclipse.jgit.lib.ObjectInserter;
52 : import org.eclipse.jgit.lib.RefUpdate;
53 : import org.eclipse.jgit.lib.Repository;
54 : import org.eclipse.jgit.revwalk.RevWalk;
55 : import org.eclipse.jgit.transport.ReceiveCommand;
56 :
57 : /**
58 : * Class for managing an incrementing sequence backed by a git repository.
59 : *
60 : * <p>The current sequence number is stored as UTF-8 text in a blob pointed to by a ref in the
61 : * {@code refs/sequences/*} namespace. Multiple processes can share the same sequence by
62 : * incrementing the counter using normal git ref updates. To amortize the cost of these ref updates,
63 : * processes can increment the counter by a larger number and hand out numbers from that range in
64 : * memory until they run out. This means concurrent processes will hand out somewhat non-monotonic
65 : * numbers.
66 : */
67 : public class RepoSequence {
68 151 : private static final FluentLogger logger = FluentLogger.forEnclosingClass();
69 :
70 : @FunctionalInterface
71 : public interface Seed {
72 : int get();
73 : }
74 :
75 : @VisibleForTesting
76 : static RetryerBuilder<ImmutableList<Integer>> retryerBuilder() {
77 151 : return RetryerBuilder.<ImmutableList<Integer>>newBuilder()
78 151 : .retryIfException(
79 : t ->
80 1 : t instanceof StorageException
81 1 : && ((StorageException) t).getCause() instanceof LockFailureException)
82 151 : .withWaitStrategy(
83 151 : WaitStrategies.join(
84 151 : WaitStrategies.exponentialWait(5, TimeUnit.SECONDS),
85 151 : WaitStrategies.randomWait(50, TimeUnit.MILLISECONDS)))
86 151 : .withStopStrategy(StopStrategies.stopAfterDelay(30, TimeUnit.SECONDS));
87 : }
88 :
89 151 : private static final Retryer<ImmutableList<Integer>> RETRYER = retryerBuilder().build();
90 :
91 : private final GitRepositoryManager repoManager;
92 : private final GitReferenceUpdated gitRefUpdated;
93 : private final Project.NameKey projectName;
94 : private final String refName;
95 : private final Seed seed;
96 : private final int floor;
97 : private final int batchSize;
98 : private final Runnable afterReadRef;
99 : private final Retryer<ImmutableList<Integer>> retryer;
100 :
101 : // Protects all non-final fields.
102 : private final Lock counterLock;
103 :
104 : private int limit;
105 : private int counter;
106 :
107 : @VisibleForTesting int acquireCount;
108 :
109 : public RepoSequence(
110 : GitRepositoryManager repoManager,
111 : GitReferenceUpdated gitRefUpdated,
112 : Project.NameKey projectName,
113 : String name,
114 : Seed seed,
115 : int batchSize) {
116 151 : this(
117 : repoManager,
118 : gitRefUpdated,
119 : projectName,
120 : name,
121 : seed,
122 : batchSize,
123 151 : Runnables.doNothing(),
124 : RETRYER,
125 : 0);
126 151 : }
127 :
128 : public RepoSequence(
129 : GitRepositoryManager repoManager,
130 : GitReferenceUpdated gitRefUpdated,
131 : Project.NameKey projectName,
132 : String name,
133 : Seed seed,
134 : int batchSize,
135 : int floor) {
136 0 : this(
137 : repoManager,
138 : gitRefUpdated,
139 : projectName,
140 : name,
141 : seed,
142 : batchSize,
143 0 : Runnables.doNothing(),
144 : RETRYER,
145 : floor);
146 0 : }
147 :
148 : @VisibleForTesting
149 : RepoSequence(
150 : GitRepositoryManager repoManager,
151 : GitReferenceUpdated gitRefUpdated,
152 : Project.NameKey projectName,
153 : String name,
154 : Seed seed,
155 : int batchSize,
156 : Runnable afterReadRef,
157 : Retryer<ImmutableList<Integer>> retryer) {
158 1 : this(repoManager, gitRefUpdated, projectName, name, seed, batchSize, afterReadRef, retryer, 0);
159 1 : }
160 :
161 : RepoSequence(
162 : GitRepositoryManager repoManager,
163 : GitReferenceUpdated gitRefUpdated,
164 : Project.NameKey projectName,
165 : String name,
166 : Seed seed,
167 : int batchSize,
168 : Runnable afterReadRef,
169 : Retryer<ImmutableList<Integer>> retryer,
170 151 : int floor) {
171 151 : this.repoManager = requireNonNull(repoManager, "repoManager");
172 151 : this.gitRefUpdated = requireNonNull(gitRefUpdated, "gitRefUpdated");
173 151 : this.projectName = requireNonNull(projectName, "projectName");
174 :
175 151 : checkArgument(
176 : name != null
177 151 : && !name.startsWith(REFS)
178 151 : && !name.startsWith(REFS_SEQUENCES.substring(REFS.length())),
179 : "name should be a suffix to follow \"refs/sequences/\", got: %s",
180 : name);
181 151 : this.refName = RefNames.REFS_SEQUENCES + name;
182 :
183 151 : this.seed = requireNonNull(seed, "seed");
184 151 : this.floor = floor;
185 :
186 151 : checkArgument(batchSize > 0, "expected batchSize > 0, got: %s", batchSize);
187 151 : this.batchSize = batchSize;
188 151 : this.afterReadRef = requireNonNull(afterReadRef, "afterReadRef");
189 151 : this.retryer = requireNonNull(retryer, "retryer");
190 :
191 151 : logger.atFine().log("sequence batch size for %s is %s", name, batchSize);
192 151 : counterLock = new ReentrantLock(true);
193 151 : }
194 :
195 : /**
196 : * Retrieves the next available sequence number.
197 : *
198 : * <p>This method is thread-safe.
199 : *
200 : * @return the next available sequence number
201 : */
202 : public int next() {
203 151 : return Iterables.getOnlyElement(next(1));
204 : }
205 :
206 : /**
207 : * Retrieves the next N available sequence number.
208 : *
209 : * <p>This method is thread-safe.
210 : *
211 : * @param count the number of sequence numbers which should be returned
212 : * @return the next N available sequence numbers
213 : */
214 : public ImmutableList<Integer> next(int count) {
215 151 : if (count == 0) {
216 38 : return ImmutableList.of();
217 : }
218 151 : checkArgument(count > 0, "count is negative: %s", count);
219 :
220 : try {
221 151 : return retryer.call(
222 : () -> {
223 151 : counterLock.lock();
224 : try {
225 151 : if (count == 1) {
226 151 : if (counter >= limit) {
227 151 : acquire(batchSize);
228 : }
229 151 : return ImmutableList.of(counter++);
230 : }
231 :
232 17 : List<Integer> ids = new ArrayList<>(count);
233 17 : while (counter < limit) {
234 16 : ids.add(counter++);
235 16 : if (ids.size() == count) {
236 16 : return ImmutableList.copyOf(ids);
237 : }
238 : }
239 7 : acquire(Math.max(count - ids.size(), batchSize));
240 7 : while (ids.size() < count) {
241 7 : ids.add(counter++);
242 : }
243 7 : return ImmutableList.copyOf(ids);
244 : } finally {
245 151 : counterLock.unlock();
246 : }
247 : });
248 1 : } catch (ExecutionException | RetryException e) {
249 1 : if (e.getCause() != null) {
250 0 : Throwables.throwIfInstanceOf(e.getCause(), StorageException.class);
251 : }
252 0 : throw new StorageException(e);
253 : }
254 : }
255 :
256 : /**
257 : * Updates the next available sequence number in NoteDb in order to have a batch of sequence
258 : * numbers available that can be handed out. {@link #counter} stores the next sequence number that
259 : * can be handed out. When {@link #limit} is reached a new batch of sequence numbers needs to be
260 : * retrieved by calling this method.
261 : *
262 : * <p><strong>Note:</strong> Callers are required to acquire the {@link #counterLock} before
263 : * calling this method.
264 : *
265 : * @param count the number of sequence numbers which should be retrieved
266 : */
267 : private void acquire(int count) {
268 151 : try (Repository repo = repoManager.openRepository(projectName);
269 151 : RevWalk rw = new RevWalk(repo)) {
270 151 : logger.atFine().log("acquire %d ids on %s in %s", count, refName, projectName);
271 151 : Optional<IntBlob> blob = IntBlob.parse(repo, refName, rw);
272 151 : afterReadRef.run();
273 : ObjectId oldId;
274 : int next;
275 151 : if (!blob.isPresent()) {
276 151 : oldId = ObjectId.zeroId();
277 151 : next = seed.get();
278 : } else {
279 151 : oldId = blob.get().id();
280 151 : next = blob.get().value();
281 : }
282 151 : next = Math.max(floor, next);
283 151 : RefUpdate refUpdate =
284 151 : IntBlob.tryStore(repo, rw, projectName, refName, oldId, next + count, gitRefUpdated);
285 151 : RefUpdateUtil.checkResult(refUpdate);
286 151 : counter = next;
287 151 : limit = counter + count;
288 151 : acquireCount++;
289 1 : } catch (IOException e) {
290 1 : throw new StorageException(e);
291 151 : }
292 151 : }
293 :
294 : public static ReceiveCommand storeNew(ObjectInserter ins, String name, int val)
295 : throws IOException {
296 151 : ObjectId newId = ins.insert(OBJ_BLOB, Integer.toString(val).getBytes(UTF_8));
297 151 : return new ReceiveCommand(ObjectId.zeroId(), newId, RefNames.REFS_SEQUENCES + name);
298 : }
299 :
300 : public void storeNew(int value) {
301 0 : counterLock.lock();
302 0 : try (Repository repo = repoManager.openRepository(projectName);
303 0 : RevWalk rw = new RevWalk(repo)) {
304 0 : Optional<IntBlob> blob = IntBlob.parse(repo, refName, rw);
305 0 : afterReadRef.run();
306 : ObjectId oldId;
307 0 : if (!blob.isPresent()) {
308 0 : oldId = ObjectId.zeroId();
309 : } else {
310 0 : oldId = blob.get().id();
311 : }
312 0 : RefUpdate refUpdate =
313 0 : IntBlob.tryStore(repo, rw, projectName, refName, oldId, value, gitRefUpdated);
314 0 : RefUpdateUtil.checkResult(refUpdate);
315 0 : counter = value;
316 0 : limit = counter + batchSize;
317 0 : acquireCount++;
318 0 : } catch (IOException e) {
319 0 : throw new StorageException(e);
320 : } finally {
321 0 : counterLock.unlock();
322 : }
323 0 : }
324 :
325 : public int current() {
326 0 : counterLock.lock();
327 0 : try (Repository repo = repoManager.openRepository(projectName);
328 0 : RevWalk rw = new RevWalk(repo)) {
329 0 : Optional<IntBlob> blob = IntBlob.parse(repo, refName, rw);
330 : int current;
331 0 : if (!blob.isPresent()) {
332 0 : current = seed.get();
333 : } else {
334 0 : current = blob.get().value();
335 : }
336 0 : return current;
337 0 : } catch (IOException e) {
338 0 : throw new StorageException(e);
339 : } finally {
340 0 : counterLock.unlock();
341 : }
342 : }
343 :
344 : /**
345 : * Retrieves the last returned sequence number.
346 : *
347 : * <p>Explicitly calls {@link #next()} if this instance didn't return sequence number until now.
348 : */
349 : public int last() {
350 1 : if (counter == 0) {
351 0 : next();
352 : }
353 1 : return counter - 1;
354 : }
355 : }
|