Line data Source code
1 : // Copyright (C) 2022 The Android Open Source Project 2 : // 3 : // Licensed under the Apache License, Version 2.0 (the "License"); 4 : // you may not use this file except in compliance with the License. 5 : // You may obtain a copy of the License at 6 : // 7 : // http://www.apache.org/licenses/LICENSE-2.0 8 : // 9 : // Unless required by applicable law or agreed to in writing, software 10 : // distributed under the License is distributed on an "AS IS" BASIS, 11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 : // See the License for the specific language governing permissions and 13 : // limitations under the License. 14 : 15 : package com.google.gerrit.index.query; 16 : 17 : import static com.google.common.base.Preconditions.checkArgument; 18 : 19 : import com.google.common.collect.FluentIterable; 20 : import com.google.common.collect.ImmutableList; 21 : import com.google.common.collect.Iterables; 22 : import com.google.common.collect.Ordering; 23 : import com.google.gerrit.exceptions.StorageException; 24 : import com.google.gerrit.index.IndexConfig; 25 : import com.google.gerrit.index.PaginationType; 26 : import com.google.gerrit.index.QueryOptions; 27 : import java.util.ArrayList; 28 : import java.util.List; 29 : 30 : public class PaginatingSource<T> implements DataSource<T> { 31 : protected final DataSource<T> source; 32 : private final int start; 33 : private final int cardinality; 34 : private final IndexConfig indexConfig; 35 : 36 121 : public PaginatingSource(DataSource<T> source, int start, IndexConfig indexConfig) { 37 121 : checkArgument(start >= 0, "negative start: %s", start); 38 121 : this.source = source; 39 121 : this.start = start; 40 121 : this.cardinality = source.getCardinality(); 41 121 : this.indexConfig = indexConfig; 42 121 : } 43 : 44 : @Override 45 : public ResultSet<T> read() { 46 121 : if (source == null) { 47 0 : throw new StorageException("No DataSource: " + this); 48 : } 49 : 50 : // ResultSets are lazy. Calling #read here first and then dealing with ResultSets only when 51 : // requested allows the index to run asynchronous queries. 52 121 : ResultSet<T> resultSet = source.read(); 53 121 : return new LazyResultSet<>( 54 : () -> { 55 121 : List<T> r = new ArrayList<>(); 56 121 : T last = null; 57 121 : int pageResultSize = 0; 58 120 : for (T data : buffer(resultSet)) { 59 106 : if (!isMatchable() || match(data)) { 60 106 : r.add(data); 61 : } 62 106 : last = data; 63 106 : pageResultSize++; 64 106 : } 65 : 66 120 : if (last != null && source instanceof Paginated) { 67 : // Restart source and continue if we have not filled the 68 : // full limit the caller wants. 69 : // 70 : @SuppressWarnings("unchecked") 71 106 : Paginated<T> p = (Paginated<T>) source; 72 106 : QueryOptions opts = p.getOptions(); 73 106 : final int limit = opts.limit(); 74 106 : int pageSize = opts.pageSize(); 75 106 : int pageSizeMultiplier = opts.pageSizeMultiplier(); 76 106 : Object searchAfter = resultSet.searchAfter(); 77 106 : int nextStart = pageResultSize; 78 106 : while (pageResultSize == pageSize && r.size() <= limit) { // get 1 more than the limit 79 6 : pageSize = getNextPageSize(pageSize, pageSizeMultiplier); 80 : ResultSet<T> next = 81 6 : indexConfig.paginationType().equals(PaginationType.SEARCH_AFTER) 82 4 : ? p.restart(searchAfter, pageSize) 83 6 : : p.restart(nextStart, pageSize); 84 6 : pageResultSize = 0; 85 6 : for (T data : buffer(next)) { 86 6 : if (match(data)) { 87 6 : r.add(data); 88 : } 89 6 : pageResultSize++; 90 6 : } 91 6 : nextStart += pageResultSize; 92 6 : searchAfter = next.searchAfter(); 93 6 : } 94 : } 95 : 96 120 : if (start >= r.size()) { 97 115 : return ImmutableList.of(); 98 106 : } else if (start > 0) { 99 13 : return ImmutableList.copyOf(r.subList(start, r.size())); 100 : } 101 106 : return ImmutableList.copyOf(r); 102 : }); 103 : } 104 : 105 : @Override 106 : public ResultSet<FieldBundle> readRaw() { 107 : // TODO(hiesel): Implement 108 0 : throw new UnsupportedOperationException("not implemented"); 109 : } 110 : 111 : private Iterable<T> buffer(ResultSet<T> scanner) { 112 121 : return FluentIterable.from(Iterables.partition(scanner, 50)) 113 121 : .transformAndConcat(this::transformBuffer); 114 : } 115 : 116 : /** 117 : * Checks whether the given object matches. 118 : * 119 : * @param object the object to be matched 120 : * @return whether the given object matches 121 : */ 122 : protected boolean match(T object) { 123 89 : return true; 124 : } 125 : 126 : protected boolean isMatchable() { 127 89 : return true; 128 : } 129 : 130 : protected List<T> transformBuffer(List<T> buffer) { 131 106 : return buffer; 132 : } 133 : 134 : @Override 135 : public int getCardinality() { 136 0 : return cardinality; 137 : } 138 : 139 : private int getNextPageSize(int pageSize, int pageSizeMultiplier) { 140 6 : List<Integer> possiblePageSizes = new ArrayList<>(3); 141 : try { 142 6 : possiblePageSizes.add(Math.multiplyExact(pageSize, pageSizeMultiplier)); 143 0 : } catch (ArithmeticException e) { 144 0 : possiblePageSizes.add(Integer.MAX_VALUE); 145 6 : } 146 6 : if (indexConfig.maxPageSize() > 0) { 147 6 : possiblePageSizes.add(indexConfig.maxPageSize()); 148 : } 149 6 : if (indexConfig.maxLimit() > 0) { 150 6 : possiblePageSizes.add(indexConfig.maxLimit()); 151 : } 152 6 : return Ordering.natural().min(possiblePageSizes); 153 : } 154 : }