LCOV - code coverage report
Current view: top level - server/notedb - RepoSequence.java (source / functions) Hit Total Coverage
Test: _coverage_report.dat Lines: 80 115 69.6 %
Date: 2022-11-19 15:00:39 Functions: 12 15 80.0 %

          Line data    Source code
       1             : // Copyright (C) 2016 The Android Open Source Project
       2             : //
       3             : // Licensed under the Apache License, Version 2.0 (the "License");
       4             : // you may not use this file except in compliance with the License.
       5             : // You may obtain a copy of the License at
       6             : //
       7             : // http://www.apache.org/licenses/LICENSE-2.0
       8             : //
       9             : // Unless required by applicable law or agreed to in writing, software
      10             : // distributed under the License is distributed on an "AS IS" BASIS,
      11             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             : // See the License for the specific language governing permissions and
      13             : // limitations under the License.
      14             : 
      15             : package com.google.gerrit.server.notedb;
      16             : 
      17             : import static com.google.common.base.Preconditions.checkArgument;
      18             : import static com.google.gerrit.entities.RefNames.REFS;
      19             : import static com.google.gerrit.entities.RefNames.REFS_SEQUENCES;
      20             : import static java.nio.charset.StandardCharsets.UTF_8;
      21             : import static java.util.Objects.requireNonNull;
      22             : import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
      23             : 
      24             : import com.github.rholder.retry.RetryException;
      25             : import com.github.rholder.retry.Retryer;
      26             : import com.github.rholder.retry.RetryerBuilder;
      27             : import com.github.rholder.retry.StopStrategies;
      28             : import com.github.rholder.retry.WaitStrategies;
      29             : import com.google.common.annotations.VisibleForTesting;
      30             : import com.google.common.base.Throwables;
      31             : import com.google.common.collect.ImmutableList;
      32             : import com.google.common.collect.Iterables;
      33             : import com.google.common.flogger.FluentLogger;
      34             : import com.google.common.util.concurrent.Runnables;
      35             : import com.google.gerrit.entities.Project;
      36             : import com.google.gerrit.entities.RefNames;
      37             : import com.google.gerrit.exceptions.StorageException;
      38             : import com.google.gerrit.git.LockFailureException;
      39             : import com.google.gerrit.git.RefUpdateUtil;
      40             : import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
      41             : import com.google.gerrit.server.git.GitRepositoryManager;
      42             : import java.io.IOException;
      43             : import java.util.ArrayList;
      44             : import java.util.List;
      45             : import java.util.Optional;
      46             : import java.util.concurrent.ExecutionException;
      47             : import java.util.concurrent.TimeUnit;
      48             : import java.util.concurrent.locks.Lock;
      49             : import java.util.concurrent.locks.ReentrantLock;
      50             : import org.eclipse.jgit.lib.ObjectId;
      51             : import org.eclipse.jgit.lib.ObjectInserter;
      52             : import org.eclipse.jgit.lib.RefUpdate;
      53             : import org.eclipse.jgit.lib.Repository;
      54             : import org.eclipse.jgit.revwalk.RevWalk;
      55             : import org.eclipse.jgit.transport.ReceiveCommand;
      56             : 
      57             : /**
      58             :  * Class for managing an incrementing sequence backed by a git repository.
      59             :  *
      60             :  * <p>The current sequence number is stored as UTF-8 text in a blob pointed to by a ref in the
      61             :  * {@code refs/sequences/*} namespace. Multiple processes can share the same sequence by
      62             :  * incrementing the counter using normal git ref updates. To amortize the cost of these ref updates,
      63             :  * processes can increment the counter by a larger number and hand out numbers from that range in
      64             :  * memory until they run out. This means concurrent processes will hand out somewhat non-monotonic
      65             :  * numbers.
      66             :  */
      67             : public class RepoSequence {
      68         151 :   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
      69             : 
      70             :   @FunctionalInterface
      71             :   public interface Seed {
      72             :     int get();
      73             :   }
      74             : 
      75             :   @VisibleForTesting
      76             :   static RetryerBuilder<ImmutableList<Integer>> retryerBuilder() {
      77         151 :     return RetryerBuilder.<ImmutableList<Integer>>newBuilder()
      78         151 :         .retryIfException(
      79             :             t ->
      80           1 :                 t instanceof StorageException
      81           1 :                     && ((StorageException) t).getCause() instanceof LockFailureException)
      82         151 :         .withWaitStrategy(
      83         151 :             WaitStrategies.join(
      84         151 :                 WaitStrategies.exponentialWait(5, TimeUnit.SECONDS),
      85         151 :                 WaitStrategies.randomWait(50, TimeUnit.MILLISECONDS)))
      86         151 :         .withStopStrategy(StopStrategies.stopAfterDelay(30, TimeUnit.SECONDS));
      87             :   }
      88             : 
      89         151 :   private static final Retryer<ImmutableList<Integer>> RETRYER = retryerBuilder().build();
      90             : 
      91             :   private final GitRepositoryManager repoManager;
      92             :   private final GitReferenceUpdated gitRefUpdated;
      93             :   private final Project.NameKey projectName;
      94             :   private final String refName;
      95             :   private final Seed seed;
      96             :   private final int floor;
      97             :   private final int batchSize;
      98             :   private final Runnable afterReadRef;
      99             :   private final Retryer<ImmutableList<Integer>> retryer;
     100             : 
     101             :   // Protects all non-final fields.
     102             :   private final Lock counterLock;
     103             : 
     104             :   private int limit;
     105             :   private int counter;
     106             : 
     107             :   @VisibleForTesting int acquireCount;
     108             : 
     109             :   public RepoSequence(
     110             :       GitRepositoryManager repoManager,
     111             :       GitReferenceUpdated gitRefUpdated,
     112             :       Project.NameKey projectName,
     113             :       String name,
     114             :       Seed seed,
     115             :       int batchSize) {
     116         151 :     this(
     117             :         repoManager,
     118             :         gitRefUpdated,
     119             :         projectName,
     120             :         name,
     121             :         seed,
     122             :         batchSize,
     123         151 :         Runnables.doNothing(),
     124             :         RETRYER,
     125             :         0);
     126         151 :   }
     127             : 
     128             :   public RepoSequence(
     129             :       GitRepositoryManager repoManager,
     130             :       GitReferenceUpdated gitRefUpdated,
     131             :       Project.NameKey projectName,
     132             :       String name,
     133             :       Seed seed,
     134             :       int batchSize,
     135             :       int floor) {
     136           0 :     this(
     137             :         repoManager,
     138             :         gitRefUpdated,
     139             :         projectName,
     140             :         name,
     141             :         seed,
     142             :         batchSize,
     143           0 :         Runnables.doNothing(),
     144             :         RETRYER,
     145             :         floor);
     146           0 :   }
     147             : 
     148             :   @VisibleForTesting
     149             :   RepoSequence(
     150             :       GitRepositoryManager repoManager,
     151             :       GitReferenceUpdated gitRefUpdated,
     152             :       Project.NameKey projectName,
     153             :       String name,
     154             :       Seed seed,
     155             :       int batchSize,
     156             :       Runnable afterReadRef,
     157             :       Retryer<ImmutableList<Integer>> retryer) {
     158           1 :     this(repoManager, gitRefUpdated, projectName, name, seed, batchSize, afterReadRef, retryer, 0);
     159           1 :   }
     160             : 
     161             :   RepoSequence(
     162             :       GitRepositoryManager repoManager,
     163             :       GitReferenceUpdated gitRefUpdated,
     164             :       Project.NameKey projectName,
     165             :       String name,
     166             :       Seed seed,
     167             :       int batchSize,
     168             :       Runnable afterReadRef,
     169             :       Retryer<ImmutableList<Integer>> retryer,
     170         151 :       int floor) {
     171         151 :     this.repoManager = requireNonNull(repoManager, "repoManager");
     172         151 :     this.gitRefUpdated = requireNonNull(gitRefUpdated, "gitRefUpdated");
     173         151 :     this.projectName = requireNonNull(projectName, "projectName");
     174             : 
     175         151 :     checkArgument(
     176             :         name != null
     177         151 :             && !name.startsWith(REFS)
     178         151 :             && !name.startsWith(REFS_SEQUENCES.substring(REFS.length())),
     179             :         "name should be a suffix to follow \"refs/sequences/\", got: %s",
     180             :         name);
     181         151 :     this.refName = RefNames.REFS_SEQUENCES + name;
     182             : 
     183         151 :     this.seed = requireNonNull(seed, "seed");
     184         151 :     this.floor = floor;
     185             : 
     186         151 :     checkArgument(batchSize > 0, "expected batchSize > 0, got: %s", batchSize);
     187         151 :     this.batchSize = batchSize;
     188         151 :     this.afterReadRef = requireNonNull(afterReadRef, "afterReadRef");
     189         151 :     this.retryer = requireNonNull(retryer, "retryer");
     190             : 
     191         151 :     logger.atFine().log("sequence batch size for %s is %s", name, batchSize);
     192         151 :     counterLock = new ReentrantLock(true);
     193         151 :   }
     194             : 
     195             :   /**
     196             :    * Retrieves the next available sequence number.
     197             :    *
     198             :    * <p>This method is thread-safe.
     199             :    *
     200             :    * @return the next available sequence number
     201             :    */
     202             :   public int next() {
     203         151 :     return Iterables.getOnlyElement(next(1));
     204             :   }
     205             : 
     206             :   /**
     207             :    * Retrieves the next N available sequence number.
     208             :    *
     209             :    * <p>This method is thread-safe.
     210             :    *
     211             :    * @param count the number of sequence numbers which should be returned
     212             :    * @return the next N available sequence numbers
     213             :    */
     214             :   public ImmutableList<Integer> next(int count) {
     215         151 :     if (count == 0) {
     216          38 :       return ImmutableList.of();
     217             :     }
     218         151 :     checkArgument(count > 0, "count is negative: %s", count);
     219             : 
     220             :     try {
     221         151 :       return retryer.call(
     222             :           () -> {
     223         151 :             counterLock.lock();
     224             :             try {
     225         151 :               if (count == 1) {
     226         151 :                 if (counter >= limit) {
     227         151 :                   acquire(batchSize);
     228             :                 }
     229         151 :                 return ImmutableList.of(counter++);
     230             :               }
     231             : 
     232          17 :               List<Integer> ids = new ArrayList<>(count);
     233          17 :               while (counter < limit) {
     234          16 :                 ids.add(counter++);
     235          16 :                 if (ids.size() == count) {
     236          16 :                   return ImmutableList.copyOf(ids);
     237             :                 }
     238             :               }
     239           7 :               acquire(Math.max(count - ids.size(), batchSize));
     240           7 :               while (ids.size() < count) {
     241           7 :                 ids.add(counter++);
     242             :               }
     243           7 :               return ImmutableList.copyOf(ids);
     244             :             } finally {
     245         151 :               counterLock.unlock();
     246             :             }
     247             :           });
     248           1 :     } catch (ExecutionException | RetryException e) {
     249           1 :       if (e.getCause() != null) {
     250           0 :         Throwables.throwIfInstanceOf(e.getCause(), StorageException.class);
     251             :       }
     252           0 :       throw new StorageException(e);
     253             :     }
     254             :   }
     255             : 
     256             :   /**
     257             :    * Updates the next available sequence number in NoteDb in order to have a batch of sequence
     258             :    * numbers available that can be handed out. {@link #counter} stores the next sequence number that
     259             :    * can be handed out. When {@link #limit} is reached a new batch of sequence numbers needs to be
     260             :    * retrieved by calling this method.
     261             :    *
     262             :    * <p><strong>Note:</strong> Callers are required to acquire the {@link #counterLock} before
     263             :    * calling this method.
     264             :    *
     265             :    * @param count the number of sequence numbers which should be retrieved
     266             :    */
     267             :   private void acquire(int count) {
     268         151 :     try (Repository repo = repoManager.openRepository(projectName);
     269         151 :         RevWalk rw = new RevWalk(repo)) {
     270         151 :       logger.atFine().log("acquire %d ids on %s in %s", count, refName, projectName);
     271         151 :       Optional<IntBlob> blob = IntBlob.parse(repo, refName, rw);
     272         151 :       afterReadRef.run();
     273             :       ObjectId oldId;
     274             :       int next;
     275         151 :       if (!blob.isPresent()) {
     276         151 :         oldId = ObjectId.zeroId();
     277         151 :         next = seed.get();
     278             :       } else {
     279         151 :         oldId = blob.get().id();
     280         151 :         next = blob.get().value();
     281             :       }
     282         151 :       next = Math.max(floor, next);
     283         151 :       RefUpdate refUpdate =
     284         151 :           IntBlob.tryStore(repo, rw, projectName, refName, oldId, next + count, gitRefUpdated);
     285         151 :       RefUpdateUtil.checkResult(refUpdate);
     286         151 :       counter = next;
     287         151 :       limit = counter + count;
     288         151 :       acquireCount++;
     289           1 :     } catch (IOException e) {
     290           1 :       throw new StorageException(e);
     291         151 :     }
     292         151 :   }
     293             : 
     294             :   public static ReceiveCommand storeNew(ObjectInserter ins, String name, int val)
     295             :       throws IOException {
     296         151 :     ObjectId newId = ins.insert(OBJ_BLOB, Integer.toString(val).getBytes(UTF_8));
     297         151 :     return new ReceiveCommand(ObjectId.zeroId(), newId, RefNames.REFS_SEQUENCES + name);
     298             :   }
     299             : 
     300             :   public void storeNew(int value) {
     301           0 :     counterLock.lock();
     302           0 :     try (Repository repo = repoManager.openRepository(projectName);
     303           0 :         RevWalk rw = new RevWalk(repo)) {
     304           0 :       Optional<IntBlob> blob = IntBlob.parse(repo, refName, rw);
     305           0 :       afterReadRef.run();
     306             :       ObjectId oldId;
     307           0 :       if (!blob.isPresent()) {
     308           0 :         oldId = ObjectId.zeroId();
     309             :       } else {
     310           0 :         oldId = blob.get().id();
     311             :       }
     312           0 :       RefUpdate refUpdate =
     313           0 :           IntBlob.tryStore(repo, rw, projectName, refName, oldId, value, gitRefUpdated);
     314           0 :       RefUpdateUtil.checkResult(refUpdate);
     315           0 :       counter = value;
     316           0 :       limit = counter + batchSize;
     317           0 :       acquireCount++;
     318           0 :     } catch (IOException e) {
     319           0 :       throw new StorageException(e);
     320             :     } finally {
     321           0 :       counterLock.unlock();
     322             :     }
     323           0 :   }
     324             : 
     325             :   public int current() {
     326           0 :     counterLock.lock();
     327           0 :     try (Repository repo = repoManager.openRepository(projectName);
     328           0 :         RevWalk rw = new RevWalk(repo)) {
     329           0 :       Optional<IntBlob> blob = IntBlob.parse(repo, refName, rw);
     330             :       int current;
     331           0 :       if (!blob.isPresent()) {
     332           0 :         current = seed.get();
     333             :       } else {
     334           0 :         current = blob.get().value();
     335             :       }
     336           0 :       return current;
     337           0 :     } catch (IOException e) {
     338           0 :       throw new StorageException(e);
     339             :     } finally {
     340           0 :       counterLock.unlock();
     341             :     }
     342             :   }
     343             : 
     344             :   /**
     345             :    * Retrieves the last returned sequence number.
     346             :    *
     347             :    * <p>Explicitly calls {@link #next()} if this instance didn't return sequence number until now.
     348             :    */
     349             :   public int last() {
     350           1 :     if (counter == 0) {
     351           0 :       next();
     352             :     }
     353           1 :     return counter - 1;
     354             :   }
     355             : }

Generated by: LCOV version 1.16+git.20220603.dfeb750