Line data Source code
1 : // Copyright (C) 2012 The Android Open Source Project
2 : //
3 : // Licensed under the Apache License, Version 2.0 (the "License");
4 : // you may not use this file except in compliance with the License.
5 : // You may obtain a copy of the License at
6 : //
7 : // http://www.apache.org/licenses/LICENSE-2.0
8 : //
9 : // Unless required by applicable law or agreed to in writing, software
10 : // distributed under the License is distributed on an "AS IS" BASIS,
11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : // See the License for the specific language governing permissions and
13 : // limitations under the License.
14 :
15 : package com.google.gerrit.server.cache.h2;
16 :
17 : import com.google.common.base.Throwables;
18 : import com.google.common.cache.AbstractLoadingCache;
19 : import com.google.common.cache.Cache;
20 : import com.google.common.cache.CacheLoader;
21 : import com.google.common.cache.CacheStats;
22 : import com.google.common.cache.LoadingCache;
23 : import com.google.common.collect.ImmutableMap;
24 : import com.google.common.collect.ImmutableSet;
25 : import com.google.common.flogger.FluentLogger;
26 : import com.google.common.hash.BloomFilter;
27 : import com.google.common.util.concurrent.FutureCallback;
28 : import com.google.common.util.concurrent.Futures;
29 : import com.google.common.util.concurrent.ListenableFuture;
30 : import com.google.gerrit.common.Nullable;
31 : import com.google.gerrit.server.cache.PersistentCache;
32 : import com.google.gerrit.server.cache.serialize.CacheSerializer;
33 : import com.google.gerrit.server.logging.Metadata;
34 : import com.google.gerrit.server.logging.TraceContext;
35 : import com.google.gerrit.server.logging.TraceContext.TraceTimer;
36 : import com.google.gerrit.server.util.time.TimeUtil;
37 : import com.google.inject.TypeLiteral;
38 : import java.io.IOException;
39 : import java.io.InvalidClassException;
40 : import java.sql.Connection;
41 : import java.sql.PreparedStatement;
42 : import java.sql.ResultSet;
43 : import java.sql.SQLException;
44 : import java.sql.Statement;
45 : import java.sql.Timestamp;
46 : import java.time.Duration;
47 : import java.time.Instant;
48 : import java.util.ArrayList;
49 : import java.util.Calendar;
50 : import java.util.HashMap;
51 : import java.util.List;
52 : import java.util.Map;
53 : import java.util.concurrent.ArrayBlockingQueue;
54 : import java.util.concurrent.BlockingQueue;
55 : import java.util.concurrent.Callable;
56 : import java.util.concurrent.ExecutionException;
57 : import java.util.concurrent.Executor;
58 : import java.util.concurrent.Future;
59 : import java.util.concurrent.ScheduledExecutorService;
60 : import java.util.concurrent.TimeUnit;
61 : import java.util.concurrent.atomic.AtomicLong;
62 :
63 : /**
64 : * Hybrid in-memory and database backed cache built on H2.
65 : *
66 : * <p>This cache can be used as either a recall cache, or a loading cache if a CacheLoader was
67 : * supplied to its constructor at build time. Before creating an entry the in-memory cache is
68 : * checked for the item, then the database is checked, and finally the CacheLoader is used to
69 : * construct the item. This is mostly useful for CacheLoaders that are computationally intensive,
70 : * such as the PatchListCache.
71 : *
72 : * <p>Cache stores and invalidations are performed on a background thread, hiding the latency
73 : * associated with serializing the key and value pairs and writing them to the database log.
74 : *
75 : * <p>A BloomFilter is used around the database to reduce the number of SELECTs issued against the
76 : * database for new cache items that have not been seen before, a common operation for the
77 : * PatchListCache. The BloomFilter is sized when the cache starts to be 64,000 entries or double the
78 : * number of items currently in the database table.
79 : *
80 : * <p>This cache does not export its items as a ConcurrentMap.
81 : *
82 : * @see H2CacheFactory
83 : */
84 : public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements PersistentCache {
85 16 : private static final FluentLogger logger = FluentLogger.forEnclosingClass();
86 :
87 16 : private static final ImmutableSet<String> OLD_CLASS_NAMES =
88 16 : ImmutableSet.of("com.google.gerrit.server.change.ChangeKind");
89 :
90 : private final Executor executor;
91 : private final SqlStore<K, V> store;
92 : private final TypeLiteral<K> keyType;
93 : private final Cache<K, ValueHolder<V>> mem;
94 :
95 : H2CacheImpl(
96 : Executor executor,
97 : SqlStore<K, V> store,
98 : TypeLiteral<K> keyType,
99 16 : Cache<K, ValueHolder<V>> mem) {
100 16 : this.executor = executor;
101 16 : this.store = store;
102 16 : this.keyType = keyType;
103 16 : this.mem = mem;
104 16 : }
105 :
106 : @Nullable
107 : @Override
108 : public V getIfPresent(Object objKey) {
109 6 : if (!keyType.getRawType().isInstance(objKey)) {
110 0 : return null;
111 : }
112 :
113 : @SuppressWarnings("unchecked")
114 6 : K key = (K) objKey;
115 :
116 6 : ValueHolder<V> h = mem.getIfPresent(key);
117 6 : if (h != null) {
118 1 : return h.value;
119 : }
120 :
121 6 : if (store.mightContain(key)) {
122 1 : h = store.getIfPresent(key);
123 1 : if (h != null) {
124 1 : mem.put(key, h);
125 1 : return h.value;
126 : }
127 : }
128 6 : return null;
129 : }
130 :
131 : @Override
132 : public V get(K key) throws ExecutionException {
133 16 : if (mem instanceof LoadingCache) {
134 16 : LoadingCache<K, ValueHolder<V>> asLoadingCache = (LoadingCache<K, ValueHolder<V>>) mem;
135 16 : ValueHolder<V> valueHolder = asLoadingCache.get(key);
136 16 : if (store.needsRefresh(valueHolder.created)) {
137 1 : asLoadingCache.refresh(key);
138 : }
139 16 : return valueHolder.value;
140 : }
141 0 : throw new UnsupportedOperationException();
142 : }
143 :
144 : @Override
145 : public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException {
146 16 : if (mem instanceof LoadingCache) {
147 16 : ImmutableMap.Builder<K, V> result = ImmutableMap.builder();
148 16 : LoadingCache<K, ValueHolder<V>> asLoadingCache = (LoadingCache<K, ValueHolder<V>>) mem;
149 16 : ImmutableMap<K, ValueHolder<V>> values = asLoadingCache.getAll(keys);
150 16 : for (Map.Entry<K, ValueHolder<V>> entry : values.entrySet()) {
151 16 : result.put(entry.getKey(), entry.getValue().value);
152 16 : if (store.needsRefresh(entry.getValue().created)) {
153 0 : asLoadingCache.refresh(entry.getKey());
154 : }
155 16 : }
156 16 : return result.build();
157 : }
158 0 : throw new UnsupportedOperationException();
159 : }
160 :
161 : @Override
162 : public V get(K key, Callable<? extends V> valueLoader) throws ExecutionException {
163 7 : return mem.get(
164 : key,
165 : () -> {
166 7 : if (store.mightContain(key)) {
167 1 : ValueHolder<V> h = store.getIfPresent(key);
168 1 : if (h != null) {
169 1 : return h;
170 : }
171 : }
172 :
173 7 : ValueHolder<V> h =
174 7 : new ValueHolder<>(valueLoader.call(), Instant.ofEpochMilli(TimeUtil.nowMs()));
175 7 : executor.execute(() -> store.put(key, h));
176 7 : return h;
177 : })
178 : .value;
179 : }
180 :
181 : @Override
182 : public void put(K key, V val) {
183 1 : final ValueHolder<V> h = new ValueHolder<>(val, Instant.ofEpochMilli(TimeUtil.nowMs()));
184 1 : mem.put(key, h);
185 1 : executor.execute(() -> store.put(key, h));
186 1 : }
187 :
188 : @SuppressWarnings("unchecked")
189 : @Override
190 : public void invalidate(Object key) {
191 0 : if (keyType.getRawType().isInstance(key) && store.mightContain((K) key)) {
192 0 : executor.execute(() -> store.invalidate((K) key));
193 : }
194 0 : mem.invalidate(key);
195 0 : }
196 :
197 : @Override
198 : public void invalidateAll() {
199 0 : store.invalidateAll();
200 0 : mem.invalidateAll();
201 0 : }
202 :
203 : @Override
204 : public long size() {
205 15 : return mem.size();
206 : }
207 :
208 : @Override
209 : public CacheStats stats() {
210 15 : return mem.stats();
211 : }
212 :
213 : @Override
214 : public DiskStats diskStats() {
215 16 : return store.diskStats();
216 : }
217 :
218 : void start() {
219 15 : store.open();
220 15 : }
221 :
222 : void stop() {
223 15 : for (Map.Entry<K, ValueHolder<V>> e : mem.asMap().entrySet()) {
224 15 : ValueHolder<V> h = e.getValue();
225 15 : if (!h.clean) {
226 6 : store.put(e.getKey(), h);
227 : }
228 15 : }
229 15 : store.close();
230 15 : }
231 :
232 : void prune(ScheduledExecutorService service) {
233 1 : store.prune(mem);
234 :
235 1 : Calendar cal = Calendar.getInstance();
236 1 : cal.set(Calendar.HOUR_OF_DAY, 01);
237 1 : cal.set(Calendar.MINUTE, 0);
238 1 : cal.set(Calendar.SECOND, 0);
239 1 : cal.set(Calendar.MILLISECOND, 0);
240 1 : cal.add(Calendar.DAY_OF_MONTH, 1);
241 :
242 1 : long delay = cal.getTimeInMillis() - TimeUtil.nowMs();
243 : @SuppressWarnings("unused")
244 1 : Future<?> possiblyIgnoredError =
245 1 : service.schedule(() -> prune(service), delay, TimeUnit.MILLISECONDS);
246 1 : }
247 :
248 : static class ValueHolder<V> {
249 : final V value;
250 : final Instant created;
251 : volatile boolean clean;
252 :
253 16 : ValueHolder(V value, Instant created) {
254 16 : this.value = value;
255 16 : this.created = created;
256 16 : }
257 : }
258 :
259 : static class Loader<K, V> extends CacheLoader<K, ValueHolder<V>> {
260 : private final Executor executor;
261 : private final SqlStore<K, V> store;
262 : private final CacheLoader<K, V> loader;
263 :
264 16 : Loader(Executor executor, SqlStore<K, V> store, CacheLoader<K, V> loader) {
265 16 : this.executor = executor;
266 16 : this.store = store;
267 16 : this.loader = loader;
268 16 : }
269 :
270 : @Override
271 : public ValueHolder<V> load(K key) throws Exception {
272 16 : try (TraceTimer timer =
273 16 : TraceContext.newTimer(
274 16 : "Loading value from cache", Metadata.builder().cacheKey(key.toString()).build())) {
275 16 : if (store.mightContain(key)) {
276 15 : ValueHolder<V> h = store.getIfPresent(key);
277 15 : if (h != null) {
278 15 : return h;
279 : }
280 : }
281 :
282 16 : final ValueHolder<V> h =
283 16 : new ValueHolder<>(loader.load(key), Instant.ofEpochMilli(TimeUtil.nowMs()));
284 16 : executor.execute(() -> store.put(key, h));
285 16 : return h;
286 15 : }
287 : }
288 :
289 : @Override
290 : public Map<K, ValueHolder<V>> loadAll(Iterable<? extends K> keys) throws Exception {
291 15 : try (TraceTimer timer = TraceContext.newTimer("Loading multiple values from cache")) {
292 15 : List<K> notInMemory = new ArrayList<>();
293 15 : Map<K, ValueHolder<V>> result = new HashMap<>();
294 15 : for (K key : keys) {
295 15 : if (!store.mightContain(key)) {
296 15 : notInMemory.add(key);
297 15 : continue;
298 : }
299 15 : ValueHolder<V> h = store.getIfPresent(key);
300 15 : if (h != null) {
301 15 : result.put(key, h);
302 : } else {
303 0 : notInMemory.add(key);
304 : }
305 15 : }
306 : try {
307 6 : Map<K, V> remaining = loader.loadAll(notInMemory);
308 6 : Instant instant = Instant.ofEpochMilli(TimeUtil.nowMs());
309 6 : storeInDatabase(remaining, instant);
310 6 : remaining
311 6 : .entrySet()
312 6 : .forEach(e -> result.put(e.getKey(), new ValueHolder<>(e.getValue(), instant)));
313 15 : } catch (UnsupportedLoadingOperationException e) {
314 : // Fallback to the default load() if loadAll() is not implemented
315 15 : for (K k : notInMemory) {
316 15 : result.put(k, load(k)); // No need to storeInDatabase here; load(k) does that.
317 15 : }
318 6 : }
319 15 : return result;
320 : }
321 : }
322 :
323 : @Override
324 : public ListenableFuture<ValueHolder<V>> reload(K key, ValueHolder<V> oldValue)
325 : throws Exception {
326 1 : ListenableFuture<V> reloadedValue = loader.reload(key, oldValue.value);
327 1 : Futures.addCallback(
328 : reloadedValue,
329 1 : new FutureCallback<V>() {
330 : @Override
331 : public void onSuccess(V result) {
332 1 : store.put(key, new ValueHolder<>(result, TimeUtil.now()));
333 1 : }
334 :
335 : @Override
336 : public void onFailure(Throwable t) {
337 0 : logger.atWarning().withCause(t).log("Unable to reload cache value");
338 0 : }
339 : },
340 : executor);
341 :
342 1 : return Futures.transform(reloadedValue, v -> new ValueHolder<>(v, TimeUtil.now()), executor);
343 : }
344 :
345 : private void storeInDatabase(Map<K, V> entries, Instant instant) {
346 6 : executor.execute(
347 : () -> {
348 6 : for (Map.Entry<K, V> entry : entries.entrySet()) {
349 6 : store.put(entry.getKey(), new ValueHolder<>(entry.getValue(), instant));
350 6 : }
351 6 : });
352 6 : }
353 : }
354 :
355 : static class SqlStore<K, V> {
356 : private final String url;
357 : private final KeyType<K> keyType;
358 : private final CacheSerializer<V> valueSerializer;
359 : private final int version;
360 : private final long maxSize;
361 : @Nullable private final Duration expireAfterWrite;
362 : @Nullable private final Duration refreshAfterWrite;
363 : private final BlockingQueue<SqlHandle> handles;
364 16 : private final AtomicLong hitCount = new AtomicLong();
365 16 : private final AtomicLong missCount = new AtomicLong();
366 : private volatile BloomFilter<K> bloomFilter;
367 : private int estimatedSize;
368 :
369 : SqlStore(
370 : String jdbcUrl,
371 : TypeLiteral<K> keyType,
372 : CacheSerializer<K> keySerializer,
373 : CacheSerializer<V> valueSerializer,
374 : int version,
375 : long maxSize,
376 : @Nullable Duration expireAfterWrite,
377 16 : @Nullable Duration refreshAfterWrite) {
378 16 : this.url = jdbcUrl;
379 16 : this.keyType = createKeyType(keyType, keySerializer);
380 16 : this.valueSerializer = valueSerializer;
381 16 : this.version = version;
382 16 : this.maxSize = maxSize;
383 16 : this.expireAfterWrite = expireAfterWrite;
384 16 : this.refreshAfterWrite = refreshAfterWrite;
385 :
386 16 : int cores = Runtime.getRuntime().availableProcessors();
387 16 : int keep = Math.min(cores, 16);
388 16 : this.handles = new ArrayBlockingQueue<>(keep);
389 16 : }
390 :
391 : @SuppressWarnings("unchecked")
392 : private static <T> KeyType<T> createKeyType(
393 : TypeLiteral<T> type, CacheSerializer<T> serializer) {
394 16 : if (type.getRawType() == String.class) {
395 16 : return (KeyType<T>) StringKeyTypeImpl.INSTANCE;
396 : }
397 15 : return new ObjectKeyTypeImpl<>(serializer);
398 : }
399 :
400 : synchronized void open() {
401 15 : if (bloomFilter == null) {
402 15 : bloomFilter = buildBloomFilter();
403 : }
404 15 : }
405 :
406 : void close() {
407 : SqlHandle h;
408 15 : while ((h = handles.poll()) != null) {
409 15 : h.close();
410 : }
411 15 : }
412 :
413 : boolean mightContain(K key) {
414 16 : BloomFilter<K> b = bloomFilter;
415 16 : if (b == null) {
416 16 : synchronized (this) {
417 16 : b = bloomFilter;
418 16 : if (b == null) {
419 16 : b = buildBloomFilter();
420 16 : bloomFilter = b;
421 : }
422 16 : }
423 : }
424 16 : return b == null || b.mightContain(key);
425 : }
426 :
427 : @Nullable
428 : private BloomFilter<K> buildBloomFilter() {
429 16 : SqlHandle c = null;
430 : try {
431 16 : c = acquire();
432 16 : if (estimatedSize <= 0) {
433 16 : try (PreparedStatement ps =
434 16 : c.conn.prepareStatement("SELECT COUNT(*) FROM data WHERE version=?")) {
435 16 : ps.setInt(1, version);
436 16 : try (ResultSet r = ps.executeQuery()) {
437 16 : estimatedSize = r.next() ? r.getInt(1) : 0;
438 : }
439 : }
440 : }
441 :
442 16 : BloomFilter<K> b = newBloomFilter();
443 16 : try (PreparedStatement ps = c.conn.prepareStatement("SELECT k FROM data WHERE version=?")) {
444 16 : ps.setInt(1, version);
445 16 : try (ResultSet r = ps.executeQuery()) {
446 16 : while (r.next()) {
447 16 : b.put(keyType.get(r, 1));
448 : }
449 : }
450 0 : } catch (Exception e) {
451 0 : if (Throwables.getCausalChain(e).stream()
452 0 : .anyMatch(InvalidClassException.class::isInstance)) {
453 : // If deserialization failed using default Java serialization, this means we are using
454 : // the old serialVersionUID-based invalidation strategy. In that case, authors are
455 : // most likely bumping serialVersionUID rather than using the new versioning in the
456 : // CacheBinding. That's ok; we'll continue to support both for now.
457 : // TODO(dborowitz): Remove this case when Java serialization is no longer used.
458 0 : logger.atWarning().log(
459 : "Entries cached for %s have an incompatible class and can't be deserialized. "
460 : + "Cache is flushed.",
461 : url);
462 0 : invalidateAll();
463 : } else {
464 0 : throw e;
465 : }
466 16 : }
467 16 : return b;
468 3 : } catch (IOException | SQLException e) {
469 3 : logger.atWarning().log("Cannot build BloomFilter for %s: %s", url, e.getMessage());
470 3 : c = close(c);
471 3 : return null;
472 : } finally {
473 16 : release(c);
474 : }
475 : }
476 :
477 : @Nullable
478 : ValueHolder<V> getIfPresent(K key) {
479 16 : SqlHandle c = null;
480 : try {
481 16 : c = acquire();
482 16 : if (c.get == null) {
483 16 : c.get = c.conn.prepareStatement("SELECT v, created FROM data WHERE k=? AND version=?");
484 : }
485 16 : keyType.set(c.get, 1, key);
486 :
487 : // Silently no results when the only value in the database is an older version. This will
488 : // result in put overwriting the stored value with the new version, which is intended.
489 16 : c.get.setInt(2, version);
490 :
491 16 : try (ResultSet r = c.get.executeQuery()) {
492 16 : if (!r.next()) {
493 1 : missCount.incrementAndGet();
494 1 : return null;
495 : }
496 :
497 16 : Timestamp created = r.getTimestamp(2);
498 16 : if (expired(created.toInstant())) {
499 0 : invalidate(key);
500 0 : missCount.incrementAndGet();
501 0 : return null;
502 : }
503 :
504 16 : V val = valueSerializer.deserialize(r.getBytes(1));
505 16 : ValueHolder<V> h = new ValueHolder<>(val, created.toInstant());
506 16 : h.clean = true;
507 16 : hitCount.incrementAndGet();
508 16 : touch(c, key);
509 16 : return h;
510 1 : } finally {
511 16 : c.get.clearParameters();
512 : }
513 0 : } catch (IOException | SQLException e) {
514 0 : if (!isOldClassNameError(e)) {
515 0 : logger.atWarning().withCause(e).log("Cannot read cache %s for %s", url, key);
516 : }
517 0 : c = close(c);
518 0 : return null;
519 : } finally {
520 16 : release(c);
521 : }
522 : }
523 :
524 : private static boolean isOldClassNameError(Throwable t) {
525 0 : for (Throwable c : Throwables.getCausalChain(t)) {
526 0 : if (c instanceof ClassNotFoundException && OLD_CLASS_NAMES.contains(c.getMessage())) {
527 0 : return true;
528 : }
529 0 : }
530 0 : return false;
531 : }
532 :
533 : private boolean expired(Instant created) {
534 16 : if (expireAfterWrite == null) {
535 16 : return false;
536 : }
537 0 : Duration age = Duration.between(created, TimeUtil.now());
538 0 : return age.compareTo(expireAfterWrite) > 0;
539 : }
540 :
541 : private boolean needsRefresh(Instant created) {
542 16 : if (refreshAfterWrite == null) {
543 16 : return false;
544 : }
545 1 : Duration age = Duration.between(created, TimeUtil.now());
546 1 : return age.compareTo(refreshAfterWrite) > 0;
547 : }
548 :
549 : private void touch(SqlHandle c, K key) throws IOException, SQLException {
550 16 : if (c.touch == null) {
551 16 : c.touch = c.conn.prepareStatement("UPDATE data SET accessed=? WHERE k=? AND version=?");
552 : }
553 : try {
554 16 : c.touch.setTimestamp(1, new Timestamp(TimeUtil.nowMs()));
555 16 : keyType.set(c.touch, 2, key);
556 16 : c.touch.setInt(3, version);
557 16 : c.touch.executeUpdate();
558 : } finally {
559 16 : c.touch.clearParameters();
560 : }
561 16 : }
562 :
563 : void put(K key, ValueHolder<V> holder) {
564 16 : if (holder.clean) {
565 0 : return;
566 : }
567 :
568 16 : BloomFilter<K> b = bloomFilter;
569 16 : if (b != null) {
570 16 : b.put(key);
571 16 : bloomFilter = b;
572 : }
573 :
574 16 : SqlHandle c = null;
575 : try {
576 16 : c = acquire();
577 16 : if (c.put == null) {
578 16 : c.put =
579 16 : c.conn.prepareStatement(
580 : "MERGE INTO data (k, v, version, created, accessed) VALUES(?,?,?,?,?)");
581 : }
582 : try {
583 16 : keyType.set(c.put, 1, key);
584 16 : c.put.setBytes(2, valueSerializer.serialize(holder.value));
585 16 : c.put.setInt(3, version);
586 16 : c.put.setTimestamp(4, Timestamp.from(holder.created));
587 16 : c.put.setTimestamp(5, new Timestamp(TimeUtil.nowMs()));
588 16 : c.put.executeUpdate();
589 16 : holder.clean = true;
590 : } finally {
591 16 : c.put.clearParameters();
592 : }
593 0 : } catch (IOException | SQLException e) {
594 0 : logger.atWarning().withCause(e).log("Cannot put into cache %s", url);
595 0 : c = close(c);
596 : } finally {
597 16 : release(c);
598 : }
599 16 : }
600 :
601 : void invalidate(K key) {
602 0 : SqlHandle c = null;
603 : try {
604 0 : c = acquire();
605 0 : invalidate(c, key);
606 0 : } catch (IOException | SQLException e) {
607 0 : logger.atWarning().withCause(e).log("Cannot invalidate cache %s", url);
608 0 : c = close(c);
609 : } finally {
610 0 : release(c);
611 : }
612 0 : }
613 :
614 : private void invalidate(SqlHandle c, K key) throws IOException, SQLException {
615 0 : if (c.invalidate == null) {
616 0 : c.invalidate = c.conn.prepareStatement("DELETE FROM data WHERE k=? and version=?");
617 : }
618 : try {
619 0 : keyType.set(c.invalidate, 1, key);
620 0 : c.invalidate.setInt(2, version);
621 0 : c.invalidate.executeUpdate();
622 : } finally {
623 0 : c.invalidate.clearParameters();
624 : }
625 0 : }
626 :
627 : void invalidateAll() {
628 0 : SqlHandle c = null;
629 : try {
630 0 : c = acquire();
631 0 : try (Statement s = c.conn.createStatement()) {
632 0 : s.executeUpdate("DELETE FROM data");
633 : }
634 0 : bloomFilter = newBloomFilter();
635 0 : } catch (SQLException e) {
636 0 : logger.atWarning().withCause(e).log("Cannot invalidate cache %s", url);
637 0 : c = close(c);
638 : } finally {
639 0 : release(c);
640 : }
641 0 : }
642 :
643 : void prune(Cache<K, ?> mem) {
644 1 : SqlHandle c = null;
645 : try {
646 1 : c = acquire();
647 1 : try (PreparedStatement ps = c.conn.prepareStatement("DELETE FROM data WHERE version!=?")) {
648 1 : ps.setInt(1, version);
649 1 : int oldEntries = ps.executeUpdate();
650 1 : if (oldEntries > 0) {
651 0 : logger.atInfo().log(
652 : "Pruned %d entries not matching version %d from cache %s",
653 0 : oldEntries, version, url);
654 : }
655 : }
656 1 : try (Statement s = c.conn.createStatement()) {
657 : // Compute size without restricting to version (although obsolete data was just pruned
658 : // anyway).
659 : long used;
660 1 : try (ResultSet r = s.executeQuery("SELECT SUM(space) FROM data")) {
661 1 : used = r.next() ? r.getLong(1) : 0;
662 : }
663 1 : if (used <= maxSize) {
664 1 : return;
665 : }
666 :
667 0 : try (ResultSet r =
668 0 : s.executeQuery("SELECT k, space, created FROM data ORDER BY accessed")) {
669 0 : while (maxSize < used && r.next()) {
670 0 : K key = keyType.get(r, 1);
671 0 : Timestamp created = r.getTimestamp(3);
672 0 : if (mem.getIfPresent(key) != null && !expired(created.toInstant())) {
673 0 : touch(c, key);
674 : } else {
675 0 : invalidate(c, key);
676 0 : used -= r.getLong(2);
677 : }
678 0 : }
679 : }
680 1 : }
681 0 : } catch (IOException | SQLException e) {
682 0 : logger.atWarning().withCause(e).log("Cannot prune cache %s", url);
683 0 : c = close(c);
684 : } finally {
685 1 : release(c);
686 : }
687 0 : }
688 :
689 : DiskStats diskStats() {
690 16 : long size = 0;
691 16 : long space = 0;
692 16 : SqlHandle c = null;
693 : try {
694 16 : c = acquire();
695 16 : try (Statement s = c.conn.createStatement();
696 : // Stats include total size regardless of version.
697 16 : ResultSet r = s.executeQuery("SELECT COUNT(*), SUM(space) FROM data")) {
698 16 : if (r.next()) {
699 16 : size = r.getLong(1);
700 16 : space = r.getLong(2);
701 : }
702 : }
703 0 : } catch (SQLException e) {
704 0 : logger.atWarning().withCause(e).log("Cannot get DiskStats for %s", url);
705 0 : c = close(c);
706 : } finally {
707 16 : release(c);
708 : }
709 16 : return new DiskStats(size, space, hitCount.get(), missCount.get());
710 : }
711 :
712 : private SqlHandle acquire() throws SQLException {
713 16 : SqlHandle h = handles.poll();
714 16 : return h != null ? h : new SqlHandle(url, keyType);
715 : }
716 :
717 : private void release(SqlHandle h) {
718 16 : if (h != null && !handles.offer(h)) {
719 0 : h.close();
720 : }
721 16 : }
722 :
723 : @Nullable
724 : private SqlHandle close(SqlHandle h) {
725 3 : if (h != null) {
726 0 : h.close();
727 : }
728 3 : return null;
729 : }
730 :
731 : private BloomFilter<K> newBloomFilter() {
732 16 : int cnt = Math.max(64 * 1024, 2 * estimatedSize);
733 16 : return BloomFilter.create(keyType.funnel(), cnt);
734 : }
735 : }
736 :
737 : static class SqlHandle {
738 : private final String url;
739 : Connection conn;
740 : PreparedStatement get;
741 : PreparedStatement put;
742 : PreparedStatement touch;
743 : PreparedStatement invalidate;
744 :
745 16 : SqlHandle(String url, KeyType<?> type) throws SQLException {
746 16 : this.url = url;
747 16 : this.conn = org.h2.Driver.load().connect(url, null);
748 16 : try (Statement stmt = conn.createStatement()) {
749 16 : stmt.addBatch(
750 : "CREATE TABLE IF NOT EXISTS data"
751 : + "(k "
752 16 : + type.columnType()
753 : + " NOT NULL PRIMARY KEY HASH"
754 : + ",v OTHER NOT NULL"
755 : + ",created TIMESTAMP NOT NULL"
756 : + ",accessed TIMESTAMP NOT NULL"
757 : + ")");
758 16 : stmt.addBatch(
759 : "ALTER TABLE data ADD COLUMN IF NOT EXISTS "
760 : + "space BIGINT AS OCTET_LENGTH(k) + OCTET_LENGTH(v)");
761 16 : stmt.addBatch("ALTER TABLE data ADD COLUMN IF NOT EXISTS version INT DEFAULT 0 NOT NULL");
762 16 : stmt.executeBatch();
763 : }
764 16 : }
765 :
766 : void close() {
767 15 : get = closeStatement(get);
768 15 : put = closeStatement(put);
769 15 : touch = closeStatement(touch);
770 15 : invalidate = closeStatement(invalidate);
771 :
772 15 : if (conn != null) {
773 : try {
774 15 : conn.close();
775 0 : } catch (SQLException e) {
776 0 : logger.atWarning().withCause(e).log("Cannot close connection to %s", url);
777 : } finally {
778 15 : conn = null;
779 : }
780 : }
781 15 : }
782 :
783 : @Nullable
784 : private PreparedStatement closeStatement(PreparedStatement ps) {
785 15 : if (ps != null) {
786 : try {
787 15 : ps.close();
788 0 : } catch (SQLException e) {
789 0 : logger.atWarning().withCause(e).log("Cannot close statement for %s", url);
790 15 : }
791 : }
792 15 : return null;
793 : }
794 : }
795 : }
|