LCOV - code coverage report
Current view: top level - server/cache/h2 - H2CacheImpl.java (source / functions) Hit Total Coverage
Test: _coverage_report.dat Lines: 289 378 76.5 %
Date: 2022-11-19 15:00:39 Functions: 48 57 84.2 %

          Line data    Source code
       1             : // Copyright (C) 2012 The Android Open Source Project
       2             : //
       3             : // Licensed under the Apache License, Version 2.0 (the "License");
       4             : // you may not use this file except in compliance with the License.
       5             : // You may obtain a copy of the License at
       6             : //
       7             : // http://www.apache.org/licenses/LICENSE-2.0
       8             : //
       9             : // Unless required by applicable law or agreed to in writing, software
      10             : // distributed under the License is distributed on an "AS IS" BASIS,
      11             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12             : // See the License for the specific language governing permissions and
      13             : // limitations under the License.
      14             : 
      15             : package com.google.gerrit.server.cache.h2;
      16             : 
      17             : import com.google.common.base.Throwables;
      18             : import com.google.common.cache.AbstractLoadingCache;
      19             : import com.google.common.cache.Cache;
      20             : import com.google.common.cache.CacheLoader;
      21             : import com.google.common.cache.CacheStats;
      22             : import com.google.common.cache.LoadingCache;
      23             : import com.google.common.collect.ImmutableMap;
      24             : import com.google.common.collect.ImmutableSet;
      25             : import com.google.common.flogger.FluentLogger;
      26             : import com.google.common.hash.BloomFilter;
      27             : import com.google.common.util.concurrent.FutureCallback;
      28             : import com.google.common.util.concurrent.Futures;
      29             : import com.google.common.util.concurrent.ListenableFuture;
      30             : import com.google.gerrit.common.Nullable;
      31             : import com.google.gerrit.server.cache.PersistentCache;
      32             : import com.google.gerrit.server.cache.serialize.CacheSerializer;
      33             : import com.google.gerrit.server.logging.Metadata;
      34             : import com.google.gerrit.server.logging.TraceContext;
      35             : import com.google.gerrit.server.logging.TraceContext.TraceTimer;
      36             : import com.google.gerrit.server.util.time.TimeUtil;
      37             : import com.google.inject.TypeLiteral;
      38             : import java.io.IOException;
      39             : import java.io.InvalidClassException;
      40             : import java.sql.Connection;
      41             : import java.sql.PreparedStatement;
      42             : import java.sql.ResultSet;
      43             : import java.sql.SQLException;
      44             : import java.sql.Statement;
      45             : import java.sql.Timestamp;
      46             : import java.time.Duration;
      47             : import java.time.Instant;
      48             : import java.util.ArrayList;
      49             : import java.util.Calendar;
      50             : import java.util.HashMap;
      51             : import java.util.List;
      52             : import java.util.Map;
      53             : import java.util.concurrent.ArrayBlockingQueue;
      54             : import java.util.concurrent.BlockingQueue;
      55             : import java.util.concurrent.Callable;
      56             : import java.util.concurrent.ExecutionException;
      57             : import java.util.concurrent.Executor;
      58             : import java.util.concurrent.Future;
      59             : import java.util.concurrent.ScheduledExecutorService;
      60             : import java.util.concurrent.TimeUnit;
      61             : import java.util.concurrent.atomic.AtomicLong;
      62             : 
      63             : /**
      64             :  * Hybrid in-memory and database backed cache built on H2.
      65             :  *
      66             :  * <p>This cache can be used as either a recall cache, or a loading cache if a CacheLoader was
      67             :  * supplied to its constructor at build time. Before creating an entry the in-memory cache is
      68             :  * checked for the item, then the database is checked, and finally the CacheLoader is used to
      69             :  * construct the item. This is mostly useful for CacheLoaders that are computationally intensive,
      70             :  * such as the PatchListCache.
      71             :  *
      72             :  * <p>Cache stores and invalidations are performed on a background thread, hiding the latency
      73             :  * associated with serializing the key and value pairs and writing them to the database log.
      74             :  *
      75             :  * <p>A BloomFilter is used around the database to reduce the number of SELECTs issued against the
      76             :  * database for new cache items that have not been seen before, a common operation for the
      77             :  * PatchListCache. The BloomFilter is sized when the cache starts to be 64,000 entries or double the
      78             :  * number of items currently in the database table.
      79             :  *
      80             :  * <p>This cache does not export its items as a ConcurrentMap.
      81             :  *
      82             :  * @see H2CacheFactory
      83             :  */
      84             : public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements PersistentCache {
      85          16 :   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
      86             : 
      87          16 :   private static final ImmutableSet<String> OLD_CLASS_NAMES =
      88          16 :       ImmutableSet.of("com.google.gerrit.server.change.ChangeKind");
      89             : 
      90             :   private final Executor executor;
      91             :   private final SqlStore<K, V> store;
      92             :   private final TypeLiteral<K> keyType;
      93             :   private final Cache<K, ValueHolder<V>> mem;
      94             : 
      95             :   H2CacheImpl(
      96             :       Executor executor,
      97             :       SqlStore<K, V> store,
      98             :       TypeLiteral<K> keyType,
      99          16 :       Cache<K, ValueHolder<V>> mem) {
     100          16 :     this.executor = executor;
     101          16 :     this.store = store;
     102          16 :     this.keyType = keyType;
     103          16 :     this.mem = mem;
     104          16 :   }
     105             : 
     106             :   @Nullable
     107             :   @Override
     108             :   public V getIfPresent(Object objKey) {
     109           6 :     if (!keyType.getRawType().isInstance(objKey)) {
     110           0 :       return null;
     111             :     }
     112             : 
     113             :     @SuppressWarnings("unchecked")
     114           6 :     K key = (K) objKey;
     115             : 
     116           6 :     ValueHolder<V> h = mem.getIfPresent(key);
     117           6 :     if (h != null) {
     118           1 :       return h.value;
     119             :     }
     120             : 
     121           6 :     if (store.mightContain(key)) {
     122           1 :       h = store.getIfPresent(key);
     123           1 :       if (h != null) {
     124           1 :         mem.put(key, h);
     125           1 :         return h.value;
     126             :       }
     127             :     }
     128           6 :     return null;
     129             :   }
     130             : 
     131             :   @Override
     132             :   public V get(K key) throws ExecutionException {
     133          16 :     if (mem instanceof LoadingCache) {
     134          16 :       LoadingCache<K, ValueHolder<V>> asLoadingCache = (LoadingCache<K, ValueHolder<V>>) mem;
     135          16 :       ValueHolder<V> valueHolder = asLoadingCache.get(key);
     136          16 :       if (store.needsRefresh(valueHolder.created)) {
     137           1 :         asLoadingCache.refresh(key);
     138             :       }
     139          16 :       return valueHolder.value;
     140             :     }
     141           0 :     throw new UnsupportedOperationException();
     142             :   }
     143             : 
     144             :   @Override
     145             :   public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException {
     146          16 :     if (mem instanceof LoadingCache) {
     147          16 :       ImmutableMap.Builder<K, V> result = ImmutableMap.builder();
     148          16 :       LoadingCache<K, ValueHolder<V>> asLoadingCache = (LoadingCache<K, ValueHolder<V>>) mem;
     149          16 :       ImmutableMap<K, ValueHolder<V>> values = asLoadingCache.getAll(keys);
     150          16 :       for (Map.Entry<K, ValueHolder<V>> entry : values.entrySet()) {
     151          16 :         result.put(entry.getKey(), entry.getValue().value);
     152          16 :         if (store.needsRefresh(entry.getValue().created)) {
     153           0 :           asLoadingCache.refresh(entry.getKey());
     154             :         }
     155          16 :       }
     156          16 :       return result.build();
     157             :     }
     158           0 :     throw new UnsupportedOperationException();
     159             :   }
     160             : 
     161             :   @Override
     162             :   public V get(K key, Callable<? extends V> valueLoader) throws ExecutionException {
     163           7 :     return mem.get(
     164             :             key,
     165             :             () -> {
     166           7 :               if (store.mightContain(key)) {
     167           1 :                 ValueHolder<V> h = store.getIfPresent(key);
     168           1 :                 if (h != null) {
     169           1 :                   return h;
     170             :                 }
     171             :               }
     172             : 
     173           7 :               ValueHolder<V> h =
     174           7 :                   new ValueHolder<>(valueLoader.call(), Instant.ofEpochMilli(TimeUtil.nowMs()));
     175           7 :               executor.execute(() -> store.put(key, h));
     176           7 :               return h;
     177             :             })
     178             :         .value;
     179             :   }
     180             : 
     181             :   @Override
     182             :   public void put(K key, V val) {
     183           1 :     final ValueHolder<V> h = new ValueHolder<>(val, Instant.ofEpochMilli(TimeUtil.nowMs()));
     184           1 :     mem.put(key, h);
     185           1 :     executor.execute(() -> store.put(key, h));
     186           1 :   }
     187             : 
     188             :   @SuppressWarnings("unchecked")
     189             :   @Override
     190             :   public void invalidate(Object key) {
     191           0 :     if (keyType.getRawType().isInstance(key) && store.mightContain((K) key)) {
     192           0 :       executor.execute(() -> store.invalidate((K) key));
     193             :     }
     194           0 :     mem.invalidate(key);
     195           0 :   }
     196             : 
     197             :   @Override
     198             :   public void invalidateAll() {
     199           0 :     store.invalidateAll();
     200           0 :     mem.invalidateAll();
     201           0 :   }
     202             : 
     203             :   @Override
     204             :   public long size() {
     205          15 :     return mem.size();
     206             :   }
     207             : 
     208             :   @Override
     209             :   public CacheStats stats() {
     210          15 :     return mem.stats();
     211             :   }
     212             : 
     213             :   @Override
     214             :   public DiskStats diskStats() {
     215          16 :     return store.diskStats();
     216             :   }
     217             : 
     218             :   void start() {
     219          15 :     store.open();
     220          15 :   }
     221             : 
     222             :   void stop() {
     223          15 :     for (Map.Entry<K, ValueHolder<V>> e : mem.asMap().entrySet()) {
     224          15 :       ValueHolder<V> h = e.getValue();
     225          15 :       if (!h.clean) {
     226           6 :         store.put(e.getKey(), h);
     227             :       }
     228          15 :     }
     229          15 :     store.close();
     230          15 :   }
     231             : 
     232             :   void prune(ScheduledExecutorService service) {
     233           1 :     store.prune(mem);
     234             : 
     235           1 :     Calendar cal = Calendar.getInstance();
     236           1 :     cal.set(Calendar.HOUR_OF_DAY, 01);
     237           1 :     cal.set(Calendar.MINUTE, 0);
     238           1 :     cal.set(Calendar.SECOND, 0);
     239           1 :     cal.set(Calendar.MILLISECOND, 0);
     240           1 :     cal.add(Calendar.DAY_OF_MONTH, 1);
     241             : 
     242           1 :     long delay = cal.getTimeInMillis() - TimeUtil.nowMs();
     243             :     @SuppressWarnings("unused")
     244           1 :     Future<?> possiblyIgnoredError =
     245           1 :         service.schedule(() -> prune(service), delay, TimeUnit.MILLISECONDS);
     246           1 :   }
     247             : 
     248             :   static class ValueHolder<V> {
     249             :     final V value;
     250             :     final Instant created;
     251             :     volatile boolean clean;
     252             : 
     253          16 :     ValueHolder(V value, Instant created) {
     254          16 :       this.value = value;
     255          16 :       this.created = created;
     256          16 :     }
     257             :   }
     258             : 
     259             :   static class Loader<K, V> extends CacheLoader<K, ValueHolder<V>> {
     260             :     private final Executor executor;
     261             :     private final SqlStore<K, V> store;
     262             :     private final CacheLoader<K, V> loader;
     263             : 
     264          16 :     Loader(Executor executor, SqlStore<K, V> store, CacheLoader<K, V> loader) {
     265          16 :       this.executor = executor;
     266          16 :       this.store = store;
     267          16 :       this.loader = loader;
     268          16 :     }
     269             : 
     270             :     @Override
     271             :     public ValueHolder<V> load(K key) throws Exception {
     272          16 :       try (TraceTimer timer =
     273          16 :           TraceContext.newTimer(
     274          16 :               "Loading value from cache", Metadata.builder().cacheKey(key.toString()).build())) {
     275          16 :         if (store.mightContain(key)) {
     276          15 :           ValueHolder<V> h = store.getIfPresent(key);
     277          15 :           if (h != null) {
     278          15 :             return h;
     279             :           }
     280             :         }
     281             : 
     282          16 :         final ValueHolder<V> h =
     283          16 :             new ValueHolder<>(loader.load(key), Instant.ofEpochMilli(TimeUtil.nowMs()));
     284          16 :         executor.execute(() -> store.put(key, h));
     285          16 :         return h;
     286          15 :       }
     287             :     }
     288             : 
     289             :     @Override
     290             :     public Map<K, ValueHolder<V>> loadAll(Iterable<? extends K> keys) throws Exception {
     291          15 :       try (TraceTimer timer = TraceContext.newTimer("Loading multiple values from cache")) {
     292          15 :         List<K> notInMemory = new ArrayList<>();
     293          15 :         Map<K, ValueHolder<V>> result = new HashMap<>();
     294          15 :         for (K key : keys) {
     295          15 :           if (!store.mightContain(key)) {
     296          15 :             notInMemory.add(key);
     297          15 :             continue;
     298             :           }
     299          15 :           ValueHolder<V> h = store.getIfPresent(key);
     300          15 :           if (h != null) {
     301          15 :             result.put(key, h);
     302             :           } else {
     303           0 :             notInMemory.add(key);
     304             :           }
     305          15 :         }
     306             :         try {
     307           6 :           Map<K, V> remaining = loader.loadAll(notInMemory);
     308           6 :           Instant instant = Instant.ofEpochMilli(TimeUtil.nowMs());
     309           6 :           storeInDatabase(remaining, instant);
     310           6 :           remaining
     311           6 :               .entrySet()
     312           6 :               .forEach(e -> result.put(e.getKey(), new ValueHolder<>(e.getValue(), instant)));
     313          15 :         } catch (UnsupportedLoadingOperationException e) {
     314             :           // Fallback to the default load() if loadAll() is not implemented
     315          15 :           for (K k : notInMemory) {
     316          15 :             result.put(k, load(k)); // No need to storeInDatabase here; load(k) does that.
     317          15 :           }
     318           6 :         }
     319          15 :         return result;
     320             :       }
     321             :     }
     322             : 
     323             :     @Override
     324             :     public ListenableFuture<ValueHolder<V>> reload(K key, ValueHolder<V> oldValue)
     325             :         throws Exception {
     326           1 :       ListenableFuture<V> reloadedValue = loader.reload(key, oldValue.value);
     327           1 :       Futures.addCallback(
     328             :           reloadedValue,
     329           1 :           new FutureCallback<V>() {
     330             :             @Override
     331             :             public void onSuccess(V result) {
     332           1 :               store.put(key, new ValueHolder<>(result, TimeUtil.now()));
     333           1 :             }
     334             : 
     335             :             @Override
     336             :             public void onFailure(Throwable t) {
     337           0 :               logger.atWarning().withCause(t).log("Unable to reload cache value");
     338           0 :             }
     339             :           },
     340             :           executor);
     341             : 
     342           1 :       return Futures.transform(reloadedValue, v -> new ValueHolder<>(v, TimeUtil.now()), executor);
     343             :     }
     344             : 
     345             :     private void storeInDatabase(Map<K, V> entries, Instant instant) {
     346           6 :       executor.execute(
     347             :           () -> {
     348           6 :             for (Map.Entry<K, V> entry : entries.entrySet()) {
     349           6 :               store.put(entry.getKey(), new ValueHolder<>(entry.getValue(), instant));
     350           6 :             }
     351           6 :           });
     352           6 :     }
     353             :   }
     354             : 
     355             :   static class SqlStore<K, V> {
     356             :     private final String url;
     357             :     private final KeyType<K> keyType;
     358             :     private final CacheSerializer<V> valueSerializer;
     359             :     private final int version;
     360             :     private final long maxSize;
     361             :     @Nullable private final Duration expireAfterWrite;
     362             :     @Nullable private final Duration refreshAfterWrite;
     363             :     private final BlockingQueue<SqlHandle> handles;
     364          16 :     private final AtomicLong hitCount = new AtomicLong();
     365          16 :     private final AtomicLong missCount = new AtomicLong();
     366             :     private volatile BloomFilter<K> bloomFilter;
     367             :     private int estimatedSize;
     368             : 
     369             :     SqlStore(
     370             :         String jdbcUrl,
     371             :         TypeLiteral<K> keyType,
     372             :         CacheSerializer<K> keySerializer,
     373             :         CacheSerializer<V> valueSerializer,
     374             :         int version,
     375             :         long maxSize,
     376             :         @Nullable Duration expireAfterWrite,
     377          16 :         @Nullable Duration refreshAfterWrite) {
     378          16 :       this.url = jdbcUrl;
     379          16 :       this.keyType = createKeyType(keyType, keySerializer);
     380          16 :       this.valueSerializer = valueSerializer;
     381          16 :       this.version = version;
     382          16 :       this.maxSize = maxSize;
     383          16 :       this.expireAfterWrite = expireAfterWrite;
     384          16 :       this.refreshAfterWrite = refreshAfterWrite;
     385             : 
     386          16 :       int cores = Runtime.getRuntime().availableProcessors();
     387          16 :       int keep = Math.min(cores, 16);
     388          16 :       this.handles = new ArrayBlockingQueue<>(keep);
     389          16 :     }
     390             : 
     391             :     @SuppressWarnings("unchecked")
     392             :     private static <T> KeyType<T> createKeyType(
     393             :         TypeLiteral<T> type, CacheSerializer<T> serializer) {
     394          16 :       if (type.getRawType() == String.class) {
     395          16 :         return (KeyType<T>) StringKeyTypeImpl.INSTANCE;
     396             :       }
     397          15 :       return new ObjectKeyTypeImpl<>(serializer);
     398             :     }
     399             : 
     400             :     synchronized void open() {
     401          15 :       if (bloomFilter == null) {
     402          15 :         bloomFilter = buildBloomFilter();
     403             :       }
     404          15 :     }
     405             : 
     406             :     void close() {
     407             :       SqlHandle h;
     408          15 :       while ((h = handles.poll()) != null) {
     409          15 :         h.close();
     410             :       }
     411          15 :     }
     412             : 
     413             :     boolean mightContain(K key) {
     414          16 :       BloomFilter<K> b = bloomFilter;
     415          16 :       if (b == null) {
     416          16 :         synchronized (this) {
     417          16 :           b = bloomFilter;
     418          16 :           if (b == null) {
     419          16 :             b = buildBloomFilter();
     420          16 :             bloomFilter = b;
     421             :           }
     422          16 :         }
     423             :       }
     424          16 :       return b == null || b.mightContain(key);
     425             :     }
     426             : 
     427             :     @Nullable
     428             :     private BloomFilter<K> buildBloomFilter() {
     429          16 :       SqlHandle c = null;
     430             :       try {
     431          16 :         c = acquire();
     432          16 :         if (estimatedSize <= 0) {
     433          16 :           try (PreparedStatement ps =
     434          16 :               c.conn.prepareStatement("SELECT COUNT(*) FROM data WHERE version=?")) {
     435          16 :             ps.setInt(1, version);
     436          16 :             try (ResultSet r = ps.executeQuery()) {
     437          16 :               estimatedSize = r.next() ? r.getInt(1) : 0;
     438             :             }
     439             :           }
     440             :         }
     441             : 
     442          16 :         BloomFilter<K> b = newBloomFilter();
     443          16 :         try (PreparedStatement ps = c.conn.prepareStatement("SELECT k FROM data WHERE version=?")) {
     444          16 :           ps.setInt(1, version);
     445          16 :           try (ResultSet r = ps.executeQuery()) {
     446          16 :             while (r.next()) {
     447          16 :               b.put(keyType.get(r, 1));
     448             :             }
     449             :           }
     450           0 :         } catch (Exception e) {
     451           0 :           if (Throwables.getCausalChain(e).stream()
     452           0 :               .anyMatch(InvalidClassException.class::isInstance)) {
     453             :             // If deserialization failed using default Java serialization, this means we are using
     454             :             // the old serialVersionUID-based invalidation strategy. In that case, authors are
     455             :             // most likely bumping serialVersionUID rather than using the new versioning in the
     456             :             // CacheBinding.  That's ok; we'll continue to support both for now.
     457             :             // TODO(dborowitz): Remove this case when Java serialization is no longer used.
     458           0 :             logger.atWarning().log(
     459             :                 "Entries cached for %s have an incompatible class and can't be deserialized. "
     460             :                     + "Cache is flushed.",
     461             :                 url);
     462           0 :             invalidateAll();
     463             :           } else {
     464           0 :             throw e;
     465             :           }
     466          16 :         }
     467          16 :         return b;
     468           3 :       } catch (IOException | SQLException e) {
     469           3 :         logger.atWarning().log("Cannot build BloomFilter for %s: %s", url, e.getMessage());
     470           3 :         c = close(c);
     471           3 :         return null;
     472             :       } finally {
     473          16 :         release(c);
     474             :       }
     475             :     }
     476             : 
     477             :     @Nullable
     478             :     ValueHolder<V> getIfPresent(K key) {
     479          16 :       SqlHandle c = null;
     480             :       try {
     481          16 :         c = acquire();
     482          16 :         if (c.get == null) {
     483          16 :           c.get = c.conn.prepareStatement("SELECT v, created FROM data WHERE k=? AND version=?");
     484             :         }
     485          16 :         keyType.set(c.get, 1, key);
     486             : 
     487             :         // Silently no results when the only value in the database is an older version. This will
     488             :         // result in put overwriting the stored value with the new version, which is intended.
     489          16 :         c.get.setInt(2, version);
     490             : 
     491          16 :         try (ResultSet r = c.get.executeQuery()) {
     492          16 :           if (!r.next()) {
     493           1 :             missCount.incrementAndGet();
     494           1 :             return null;
     495             :           }
     496             : 
     497          16 :           Timestamp created = r.getTimestamp(2);
     498          16 :           if (expired(created.toInstant())) {
     499           0 :             invalidate(key);
     500           0 :             missCount.incrementAndGet();
     501           0 :             return null;
     502             :           }
     503             : 
     504          16 :           V val = valueSerializer.deserialize(r.getBytes(1));
     505          16 :           ValueHolder<V> h = new ValueHolder<>(val, created.toInstant());
     506          16 :           h.clean = true;
     507          16 :           hitCount.incrementAndGet();
     508          16 :           touch(c, key);
     509          16 :           return h;
     510           1 :         } finally {
     511          16 :           c.get.clearParameters();
     512             :         }
     513           0 :       } catch (IOException | SQLException e) {
     514           0 :         if (!isOldClassNameError(e)) {
     515           0 :           logger.atWarning().withCause(e).log("Cannot read cache %s for %s", url, key);
     516             :         }
     517           0 :         c = close(c);
     518           0 :         return null;
     519             :       } finally {
     520          16 :         release(c);
     521             :       }
     522             :     }
     523             : 
     524             :     private static boolean isOldClassNameError(Throwable t) {
     525           0 :       for (Throwable c : Throwables.getCausalChain(t)) {
     526           0 :         if (c instanceof ClassNotFoundException && OLD_CLASS_NAMES.contains(c.getMessage())) {
     527           0 :           return true;
     528             :         }
     529           0 :       }
     530           0 :       return false;
     531             :     }
     532             : 
     533             :     private boolean expired(Instant created) {
     534          16 :       if (expireAfterWrite == null) {
     535          16 :         return false;
     536             :       }
     537           0 :       Duration age = Duration.between(created, TimeUtil.now());
     538           0 :       return age.compareTo(expireAfterWrite) > 0;
     539             :     }
     540             : 
     541             :     private boolean needsRefresh(Instant created) {
     542          16 :       if (refreshAfterWrite == null) {
     543          16 :         return false;
     544             :       }
     545           1 :       Duration age = Duration.between(created, TimeUtil.now());
     546           1 :       return age.compareTo(refreshAfterWrite) > 0;
     547             :     }
     548             : 
     549             :     private void touch(SqlHandle c, K key) throws IOException, SQLException {
     550          16 :       if (c.touch == null) {
     551          16 :         c.touch = c.conn.prepareStatement("UPDATE data SET accessed=? WHERE k=? AND version=?");
     552             :       }
     553             :       try {
     554          16 :         c.touch.setTimestamp(1, new Timestamp(TimeUtil.nowMs()));
     555          16 :         keyType.set(c.touch, 2, key);
     556          16 :         c.touch.setInt(3, version);
     557          16 :         c.touch.executeUpdate();
     558             :       } finally {
     559          16 :         c.touch.clearParameters();
     560             :       }
     561          16 :     }
     562             : 
     563             :     void put(K key, ValueHolder<V> holder) {
     564          16 :       if (holder.clean) {
     565           0 :         return;
     566             :       }
     567             : 
     568          16 :       BloomFilter<K> b = bloomFilter;
     569          16 :       if (b != null) {
     570          16 :         b.put(key);
     571          16 :         bloomFilter = b;
     572             :       }
     573             : 
     574          16 :       SqlHandle c = null;
     575             :       try {
     576          16 :         c = acquire();
     577          16 :         if (c.put == null) {
     578          16 :           c.put =
     579          16 :               c.conn.prepareStatement(
     580             :                   "MERGE INTO data (k, v, version, created, accessed) VALUES(?,?,?,?,?)");
     581             :         }
     582             :         try {
     583          16 :           keyType.set(c.put, 1, key);
     584          16 :           c.put.setBytes(2, valueSerializer.serialize(holder.value));
     585          16 :           c.put.setInt(3, version);
     586          16 :           c.put.setTimestamp(4, Timestamp.from(holder.created));
     587          16 :           c.put.setTimestamp(5, new Timestamp(TimeUtil.nowMs()));
     588          16 :           c.put.executeUpdate();
     589          16 :           holder.clean = true;
     590             :         } finally {
     591          16 :           c.put.clearParameters();
     592             :         }
     593           0 :       } catch (IOException | SQLException e) {
     594           0 :         logger.atWarning().withCause(e).log("Cannot put into cache %s", url);
     595           0 :         c = close(c);
     596             :       } finally {
     597          16 :         release(c);
     598             :       }
     599          16 :     }
     600             : 
     601             :     void invalidate(K key) {
     602           0 :       SqlHandle c = null;
     603             :       try {
     604           0 :         c = acquire();
     605           0 :         invalidate(c, key);
     606           0 :       } catch (IOException | SQLException e) {
     607           0 :         logger.atWarning().withCause(e).log("Cannot invalidate cache %s", url);
     608           0 :         c = close(c);
     609             :       } finally {
     610           0 :         release(c);
     611             :       }
     612           0 :     }
     613             : 
     614             :     private void invalidate(SqlHandle c, K key) throws IOException, SQLException {
     615           0 :       if (c.invalidate == null) {
     616           0 :         c.invalidate = c.conn.prepareStatement("DELETE FROM data WHERE k=? and version=?");
     617             :       }
     618             :       try {
     619           0 :         keyType.set(c.invalidate, 1, key);
     620           0 :         c.invalidate.setInt(2, version);
     621           0 :         c.invalidate.executeUpdate();
     622             :       } finally {
     623           0 :         c.invalidate.clearParameters();
     624             :       }
     625           0 :     }
     626             : 
     627             :     void invalidateAll() {
     628           0 :       SqlHandle c = null;
     629             :       try {
     630           0 :         c = acquire();
     631           0 :         try (Statement s = c.conn.createStatement()) {
     632           0 :           s.executeUpdate("DELETE FROM data");
     633             :         }
     634           0 :         bloomFilter = newBloomFilter();
     635           0 :       } catch (SQLException e) {
     636           0 :         logger.atWarning().withCause(e).log("Cannot invalidate cache %s", url);
     637           0 :         c = close(c);
     638             :       } finally {
     639           0 :         release(c);
     640             :       }
     641           0 :     }
     642             : 
     643             :     void prune(Cache<K, ?> mem) {
     644           1 :       SqlHandle c = null;
     645             :       try {
     646           1 :         c = acquire();
     647           1 :         try (PreparedStatement ps = c.conn.prepareStatement("DELETE FROM data WHERE version!=?")) {
     648           1 :           ps.setInt(1, version);
     649           1 :           int oldEntries = ps.executeUpdate();
     650           1 :           if (oldEntries > 0) {
     651           0 :             logger.atInfo().log(
     652             :                 "Pruned %d entries not matching version %d from cache %s",
     653           0 :                 oldEntries, version, url);
     654             :           }
     655             :         }
     656           1 :         try (Statement s = c.conn.createStatement()) {
     657             :           // Compute size without restricting to version (although obsolete data was just pruned
     658             :           // anyway).
     659             :           long used;
     660           1 :           try (ResultSet r = s.executeQuery("SELECT SUM(space) FROM data")) {
     661           1 :             used = r.next() ? r.getLong(1) : 0;
     662             :           }
     663           1 :           if (used <= maxSize) {
     664           1 :             return;
     665             :           }
     666             : 
     667           0 :           try (ResultSet r =
     668           0 :               s.executeQuery("SELECT k, space, created FROM data ORDER BY accessed")) {
     669           0 :             while (maxSize < used && r.next()) {
     670           0 :               K key = keyType.get(r, 1);
     671           0 :               Timestamp created = r.getTimestamp(3);
     672           0 :               if (mem.getIfPresent(key) != null && !expired(created.toInstant())) {
     673           0 :                 touch(c, key);
     674             :               } else {
     675           0 :                 invalidate(c, key);
     676           0 :                 used -= r.getLong(2);
     677             :               }
     678           0 :             }
     679             :           }
     680           1 :         }
     681           0 :       } catch (IOException | SQLException e) {
     682           0 :         logger.atWarning().withCause(e).log("Cannot prune cache %s", url);
     683           0 :         c = close(c);
     684             :       } finally {
     685           1 :         release(c);
     686             :       }
     687           0 :     }
     688             : 
     689             :     DiskStats diskStats() {
     690          16 :       long size = 0;
     691          16 :       long space = 0;
     692          16 :       SqlHandle c = null;
     693             :       try {
     694          16 :         c = acquire();
     695          16 :         try (Statement s = c.conn.createStatement();
     696             :             // Stats include total size regardless of version.
     697          16 :             ResultSet r = s.executeQuery("SELECT COUNT(*), SUM(space) FROM data")) {
     698          16 :           if (r.next()) {
     699          16 :             size = r.getLong(1);
     700          16 :             space = r.getLong(2);
     701             :           }
     702             :         }
     703           0 :       } catch (SQLException e) {
     704           0 :         logger.atWarning().withCause(e).log("Cannot get DiskStats for %s", url);
     705           0 :         c = close(c);
     706             :       } finally {
     707          16 :         release(c);
     708             :       }
     709          16 :       return new DiskStats(size, space, hitCount.get(), missCount.get());
     710             :     }
     711             : 
     712             :     private SqlHandle acquire() throws SQLException {
     713          16 :       SqlHandle h = handles.poll();
     714          16 :       return h != null ? h : new SqlHandle(url, keyType);
     715             :     }
     716             : 
     717             :     private void release(SqlHandle h) {
     718          16 :       if (h != null && !handles.offer(h)) {
     719           0 :         h.close();
     720             :       }
     721          16 :     }
     722             : 
     723             :     @Nullable
     724             :     private SqlHandle close(SqlHandle h) {
     725           3 :       if (h != null) {
     726           0 :         h.close();
     727             :       }
     728           3 :       return null;
     729             :     }
     730             : 
     731             :     private BloomFilter<K> newBloomFilter() {
     732          16 :       int cnt = Math.max(64 * 1024, 2 * estimatedSize);
     733          16 :       return BloomFilter.create(keyType.funnel(), cnt);
     734             :     }
     735             :   }
     736             : 
     737             :   static class SqlHandle {
     738             :     private final String url;
     739             :     Connection conn;
     740             :     PreparedStatement get;
     741             :     PreparedStatement put;
     742             :     PreparedStatement touch;
     743             :     PreparedStatement invalidate;
     744             : 
     745          16 :     SqlHandle(String url, KeyType<?> type) throws SQLException {
     746          16 :       this.url = url;
     747          16 :       this.conn = org.h2.Driver.load().connect(url, null);
     748          16 :       try (Statement stmt = conn.createStatement()) {
     749          16 :         stmt.addBatch(
     750             :             "CREATE TABLE IF NOT EXISTS data"
     751             :                 + "(k "
     752          16 :                 + type.columnType()
     753             :                 + " NOT NULL PRIMARY KEY HASH"
     754             :                 + ",v OTHER NOT NULL"
     755             :                 + ",created TIMESTAMP NOT NULL"
     756             :                 + ",accessed TIMESTAMP NOT NULL"
     757             :                 + ")");
     758          16 :         stmt.addBatch(
     759             :             "ALTER TABLE data ADD COLUMN IF NOT EXISTS "
     760             :                 + "space BIGINT AS OCTET_LENGTH(k) + OCTET_LENGTH(v)");
     761          16 :         stmt.addBatch("ALTER TABLE data ADD COLUMN IF NOT EXISTS version INT DEFAULT 0 NOT NULL");
     762          16 :         stmt.executeBatch();
     763             :       }
     764          16 :     }
     765             : 
     766             :     void close() {
     767          15 :       get = closeStatement(get);
     768          15 :       put = closeStatement(put);
     769          15 :       touch = closeStatement(touch);
     770          15 :       invalidate = closeStatement(invalidate);
     771             : 
     772          15 :       if (conn != null) {
     773             :         try {
     774          15 :           conn.close();
     775           0 :         } catch (SQLException e) {
     776           0 :           logger.atWarning().withCause(e).log("Cannot close connection to %s", url);
     777             :         } finally {
     778          15 :           conn = null;
     779             :         }
     780             :       }
     781          15 :     }
     782             : 
     783             :     @Nullable
     784             :     private PreparedStatement closeStatement(PreparedStatement ps) {
     785          15 :       if (ps != null) {
     786             :         try {
     787          15 :           ps.close();
     788           0 :         } catch (SQLException e) {
     789           0 :           logger.atWarning().withCause(e).log("Cannot close statement for %s", url);
     790          15 :         }
     791             :       }
     792          15 :       return null;
     793             :     }
     794             :   }
     795             : }

Generated by: LCOV version 1.16+git.20220603.dfeb750