Line data Source code
1 : // Copyright (C) 2016 The Android Open Source Project
2 : //
3 : // Licensed under the Apache License, Version 2.0 (the "License");
4 : // you may not use this file except in compliance with the License.
5 : // You may obtain a copy of the License at
6 : //
7 : // http://www.apache.org/licenses/LICENSE-2.0
8 : //
9 : // Unless required by applicable law or agreed to in writing, software
10 : // distributed under the License is distributed on an "AS IS" BASIS,
11 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : // See the License for the specific language governing permissions and
13 : // limitations under the License.
14 :
15 : package com.google.gerrit.index.query;
16 :
17 : import static com.google.common.base.Preconditions.checkArgument;
18 : import static com.google.common.base.Preconditions.checkState;
19 : import static com.google.common.collect.ImmutableList.toImmutableList;
20 : import static com.google.common.flogger.LazyArgs.lazy;
21 : import static java.util.stream.Collectors.toList;
22 :
23 : import com.google.common.base.Throwables;
24 : import com.google.common.collect.ImmutableList;
25 : import com.google.common.collect.ImmutableSet;
26 : import com.google.common.collect.Ordering;
27 : import com.google.common.flogger.FluentLogger;
28 : import com.google.gerrit.common.Nullable;
29 : import com.google.gerrit.exceptions.StorageException;
30 : import com.google.gerrit.index.Index;
31 : import com.google.gerrit.index.IndexCollection;
32 : import com.google.gerrit.index.IndexConfig;
33 : import com.google.gerrit.index.IndexRewriter;
34 : import com.google.gerrit.index.QueryOptions;
35 : import com.google.gerrit.index.SchemaDefinitions;
36 : import com.google.gerrit.metrics.Description;
37 : import com.google.gerrit.metrics.Field;
38 : import com.google.gerrit.metrics.MetricMaker;
39 : import com.google.gerrit.metrics.Timer1;
40 : import com.google.gerrit.server.logging.CallerFinder;
41 : import com.google.gerrit.server.logging.Metadata;
42 : import java.util.ArrayList;
43 : import java.util.List;
44 : import java.util.Optional;
45 : import java.util.Set;
46 : import java.util.concurrent.TimeUnit;
47 : import java.util.concurrent.atomic.AtomicBoolean;
48 : import java.util.function.IntSupplier;
49 : import java.util.stream.IntStream;
50 :
51 : /**
52 : * Lower-level implementation for executing a single query over a secondary index.
53 : *
54 : * <p>Instances are one-time-use. Other singleton classes should inject a Provider rather than
55 : * holding on to a single instance.
56 : */
57 : public abstract class QueryProcessor<T> {
58 151 : private static final FluentLogger logger = FluentLogger.forEnclosingClass();
59 :
60 : protected static class Metrics {
61 : final Timer1<String> executionTime;
62 :
63 151 : Metrics(MetricMaker metricMaker) {
64 151 : executionTime =
65 151 : metricMaker.newTimer(
66 : "query/query_latency",
67 : new Description("Successful query latency, accumulated over the life of the process")
68 151 : .setCumulative()
69 151 : .setUnit(Description.Units.MILLISECONDS),
70 151 : Field.ofString("index", Metadata.Builder::indexName)
71 151 : .description("index name")
72 151 : .build());
73 151 : }
74 : }
75 :
76 : private final Metrics metrics;
77 : private final SchemaDefinitions<T> schemaDef;
78 : private final IndexConfig indexConfig;
79 : private final IndexCollection<?, T, ? extends Index<?, T>> indexes;
80 : private final IndexRewriter<T> rewriter;
81 : private final String limitField;
82 : private final IntSupplier userQueryLimit;
83 : private final CallerFinder callerFinder;
84 :
85 : // This class is not generally thread-safe, but programmer error may result in it being shared
86 : // across threads. At least ensure the bit for checking if it's been used is threadsafe.
87 : private final AtomicBoolean used;
88 :
89 : protected int start;
90 :
91 151 : private boolean enforceVisibility = true;
92 : private int userProvidedLimit;
93 : private boolean isNoLimit;
94 : private Set<String> requestedFields;
95 :
96 : protected QueryProcessor(
97 : MetricMaker metricMaker,
98 : SchemaDefinitions<T> schemaDef,
99 : IndexConfig indexConfig,
100 : IndexCollection<?, T, ? extends Index<?, T>> indexes,
101 : IndexRewriter<T> rewriter,
102 : String limitField,
103 151 : IntSupplier userQueryLimit) {
104 151 : this.metrics = new Metrics(metricMaker);
105 151 : this.schemaDef = schemaDef;
106 151 : this.indexConfig = indexConfig;
107 151 : this.indexes = indexes;
108 151 : this.rewriter = rewriter;
109 151 : this.limitField = limitField;
110 151 : this.userQueryLimit = userQueryLimit;
111 151 : this.used = new AtomicBoolean(false);
112 151 : this.callerFinder =
113 151 : CallerFinder.builder()
114 151 : .addTarget(InternalQuery.class)
115 151 : .addTarget(QueryProcessor.class)
116 151 : .matchSubClasses(true)
117 151 : .skip(1)
118 151 : .build();
119 151 : }
120 :
121 : public QueryProcessor<T> setStart(int n) {
122 31 : start = n;
123 31 : return this;
124 : }
125 :
126 : /**
127 : * Specify whether to enforce visibility by filtering out results that are not visible to the
128 : * user.
129 : *
130 : * <p>Enforcing visibility may have performance consequences, as the index system may need to
131 : * post-filter a large number of results to fill even a modest limit.
132 : *
133 : * <p>If visibility is enforced, the user's {@code queryLimit} global capability is also used to
134 : * bound the total number of results. If this capability is non-positive, this results in the
135 : * entire query processor being {@link #isDisabled() disabled}.
136 : *
137 : * @param enforce whether to enforce visibility.
138 : * @return this.
139 : */
140 : public QueryProcessor<T> enforceVisibility(boolean enforce) {
141 151 : enforceVisibility = enforce;
142 151 : return this;
143 : }
144 :
145 : /**
146 : * Set an end-user-provided limit on the number of results returned.
147 : *
148 : * <p>Since this limit is provided by an end user, it may exceed the limit that they are
149 : * authorized to use. This is allowed; the processor will take multiple possible limits into
150 : * account and choose the one that makes the most sense.
151 : *
152 : * @param n limit; zero or negative means no limit.
153 : * @return this.
154 : */
155 : public QueryProcessor<T> setUserProvidedLimit(int n) {
156 116 : userProvidedLimit = n;
157 116 : return this;
158 : }
159 :
160 : public QueryProcessor<T> setNoLimit(boolean isNoLimit) {
161 22 : this.isNoLimit = isNoLimit;
162 22 : return this;
163 : }
164 :
165 : public QueryProcessor<T> setRequestedFields(Set<String> fields) {
166 109 : requestedFields = fields;
167 109 : return this;
168 : }
169 :
170 : /**
171 : * Query for entities that match a structured query.
172 : *
173 : * @see #query(List)
174 : * @param query the query.
175 : * @return results of the query.
176 : */
177 : public QueryResult<T> query(Predicate<T> query) throws QueryParseException {
178 151 : return query(ImmutableList.of(query)).get(0);
179 : }
180 :
181 : /**
182 : * Perform multiple queries in parallel.
183 : *
184 : * <p>If querying is disabled, short-circuits the index and returns empty results. Callers that
185 : * wish to distinguish this case from a query returning no results from the index may call {@link
186 : * #isDisabled()} themselves.
187 : *
188 : * @param queries list of queries.
189 : * @return results of the queries, one QueryResult per input query, in the same order as the
190 : * input.
191 : */
192 : public List<QueryResult<T>> query(List<Predicate<T>> queries) throws QueryParseException {
193 : try {
194 151 : return query(null, queries);
195 0 : } catch (StorageException e) {
196 0 : if (e.getCause() != null) {
197 0 : Throwables.throwIfInstanceOf(e.getCause(), QueryParseException.class);
198 : }
199 0 : throw e;
200 : }
201 : }
202 :
203 : private List<QueryResult<T>> query(
204 : @Nullable List<String> queryStrings, List<Predicate<T>> queries) throws QueryParseException {
205 151 : long startNanos = System.nanoTime();
206 151 : checkState(!used.getAndSet(true), "%s has already been used", getClass().getSimpleName());
207 151 : int cnt = queries.size();
208 151 : if (queryStrings != null) {
209 0 : int qs = queryStrings.size();
210 0 : checkArgument(qs == cnt, "got %s query strings but %s predicates", qs, cnt);
211 : }
212 151 : if (cnt == 0) {
213 0 : return ImmutableList.of();
214 : }
215 151 : if (isDisabled()) {
216 0 : return disabledResults(queryStrings, queries);
217 : }
218 :
219 151 : logger.atFine().log(
220 : "Executing %d %s index queries for %s",
221 151 : cnt, schemaDef.getName(), callerFinder.findCallerLazy());
222 : List<QueryResult<T>> out;
223 : try {
224 : // Parse and rewrite all queries.
225 151 : List<Integer> limits = new ArrayList<>(cnt);
226 151 : List<Predicate<T>> predicates = new ArrayList<>(cnt);
227 151 : List<DataSource<T>> sources = new ArrayList<>(cnt);
228 151 : int queryCount = 0;
229 151 : for (Predicate<T> q : queries) {
230 151 : checkSupportedForQueries(q);
231 151 : int limit = getEffectiveLimit(q);
232 151 : limits.add(limit);
233 151 : int initialPageSize = getInitialPageSize(limit);
234 :
235 151 : if (initialPageSize == getBackendSupportedLimit()) {
236 151 : initialPageSize--;
237 : }
238 :
239 151 : int page = (start / limit) + 1;
240 151 : if (page > indexConfig.maxPages()) {
241 4 : throw new QueryParseException(
242 4 : "Cannot go beyond page " + indexConfig.maxPages() + " of results");
243 : }
244 :
245 : // Always bump initial page size by 1, even if this results in exceeding the
246 : // permitted max for this user. The only way to see if there are more entities
247 : // is to ask for one more result from the query.
248 : try {
249 151 : initialPageSize = Math.addExact(initialPageSize, 1);
250 0 : } catch (ArithmeticException e) {
251 0 : initialPageSize = Integer.MAX_VALUE;
252 151 : }
253 :
254 : // If pageSizeMultiplier is set to 1 (default), update it to 10 for no-limit queries as
255 : // it helps improve performance and also prevents no-limit queries from severely degrading
256 : // when pagination type is OFFSET.
257 151 : int pageSizeMultiplier = indexConfig.pageSizeMultiplier();
258 151 : if (isNoLimit && pageSizeMultiplier == 1) {
259 5 : pageSizeMultiplier = 10;
260 : }
261 :
262 151 : QueryOptions opts =
263 151 : createOptions(
264 : indexConfig,
265 : start,
266 : initialPageSize,
267 : pageSizeMultiplier,
268 : limit,
269 151 : getRequestedFields());
270 151 : logger.atFine().log("Query options: %s", opts);
271 151 : Predicate<T> pred = rewriter.rewrite(q, opts);
272 151 : if (enforceVisibility) {
273 117 : pred = enforceVisibility(pred);
274 : }
275 151 : predicates.add(pred);
276 151 : logger.atFine().log(
277 : "%s index query[%d]:\n%s",
278 151 : schemaDef.getName(),
279 151 : queryCount++,
280 151 : pred instanceof IndexedQuery ? pred.getChild(0) : pred);
281 :
282 : @SuppressWarnings("unchecked")
283 151 : DataSource<T> s = (DataSource<T>) pred;
284 151 : if (initialPageSize < limit && !(pred instanceof AndSource)) {
285 106 : s = new PaginatingSource<>(s, start, indexConfig);
286 : }
287 151 : sources.add(s);
288 151 : }
289 :
290 : // Run each query asynchronously, if supported.
291 151 : List<ResultSet<T>> matches = new ArrayList<>(cnt);
292 151 : for (DataSource<T> s : sources) {
293 151 : matches.add(s.read());
294 151 : }
295 :
296 151 : out = new ArrayList<>(cnt);
297 151 : for (int i = 0; i < cnt; i++) {
298 151 : ImmutableList<T> matchesList = matches.get(i).toList();
299 151 : logger.atFine().log(
300 : "Matches[%d]:\n%s",
301 151 : i, lazy(() -> matchesList.stream().map(this::formatForLogging).collect(toList())));
302 151 : out.add(
303 151 : QueryResult.create(
304 151 : queryStrings != null ? queryStrings.get(i) : null,
305 151 : predicates.get(i),
306 151 : limits.get(i),
307 : matchesList));
308 : }
309 :
310 : // Only measure successful queries that actually touched the index.
311 151 : metrics.executionTime.record(
312 151 : schemaDef.getName(), System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
313 0 : } catch (StorageException e) {
314 0 : Optional<QueryParseException> qpe = findQueryParseException(e);
315 0 : if (qpe.isPresent()) {
316 0 : throw new QueryParseException(qpe.get().getMessage(), e);
317 : }
318 0 : throw e;
319 151 : }
320 151 : return out;
321 : }
322 :
323 : private void checkSupportedForQueries(Predicate<T> predicate) throws QueryParseException {
324 151 : List<Predicate<T>> descendants = predicate.getFlattenedPredicateList();
325 151 : for (Predicate<T> p : descendants) {
326 151 : if (!p.supportedForQueries()) {
327 4 : throw new QueryParseException(String.format("Operator '%s' cannot be used in queries", p));
328 : }
329 151 : }
330 151 : }
331 :
332 : private static <T> ImmutableList<QueryResult<T>> disabledResults(
333 : List<String> queryStrings, List<Predicate<T>> queries) {
334 0 : return IntStream.range(0, queries.size())
335 0 : .mapToObj(
336 : i ->
337 0 : QueryResult.create(
338 0 : queryStrings != null ? queryStrings.get(i) : null,
339 0 : queries.get(i),
340 : 0,
341 0 : ImmutableList.of()))
342 0 : .collect(toImmutableList());
343 : }
344 :
345 : protected QueryOptions createOptions(
346 : IndexConfig indexConfig,
347 : int start,
348 : int pageSize,
349 : int pageSizeMultiplier,
350 : int limit,
351 : Set<String> requestedFields) {
352 151 : return QueryOptions.create(
353 : indexConfig, start, pageSize, pageSizeMultiplier, limit, requestedFields);
354 : }
355 :
356 : /**
357 : * Invoked after the query was rewritten. Subclasses must overwrite this method to filter out
358 : * results that are not visible to the calling user.
359 : *
360 : * @param pred the query
361 : * @return the modified query
362 : */
363 : protected abstract Predicate<T> enforceVisibility(Predicate<T> pred);
364 :
365 : private Set<String> getRequestedFields() {
366 151 : if (requestedFields != null) {
367 109 : return requestedFields;
368 : }
369 151 : Index<?, T> index = indexes.getSearchIndex();
370 151 : return index != null ? index.getSchema().getStoredFields() : ImmutableSet.of();
371 : }
372 :
373 : /**
374 : * Check whether querying should be disabled.
375 : *
376 : * <p>Currently, the only condition that can disable the whole query processor is if both {@link
377 : * #enforceVisibility(boolean) visibility is enforced} and the user has a non-positive maximum
378 : * value for the {@code queryLimit} capability.
379 : *
380 : * <p>If querying is disabled, all calls to {@link #query(Predicate)} and {@link #query(List)}
381 : * will return empty results. This method can be used if callers wish to distinguish this case
382 : * from a query returning no results from the index.
383 : *
384 : * @return true if querying should be disabled.
385 : */
386 : public boolean isDisabled() {
387 151 : return enforceVisibility && getPermittedLimit() <= 0;
388 : }
389 :
390 : private int getPermittedLimit() {
391 151 : return enforceVisibility ? userQueryLimit.getAsInt() : Integer.MAX_VALUE;
392 : }
393 :
394 : private int getBackendSupportedLimit() {
395 151 : return indexConfig.maxLimit();
396 : }
397 :
398 : public int getEffectiveLimit(Predicate<T> p) {
399 151 : if (isNoLimit == true) {
400 5 : return Integer.MAX_VALUE;
401 : }
402 151 : List<Integer> possibleLimits = new ArrayList<>(4);
403 151 : possibleLimits.add(getBackendSupportedLimit());
404 151 : possibleLimits.add(getPermittedLimit());
405 151 : if (userProvidedLimit > 0) {
406 115 : possibleLimits.add(userProvidedLimit);
407 : }
408 151 : if (limitField != null) {
409 151 : Integer limitFromPredicate = LimitPredicate.getLimit(limitField, p);
410 151 : if (limitFromPredicate != null) {
411 5 : possibleLimits.add(limitFromPredicate);
412 : }
413 : }
414 151 : int result = Ordering.natural().min(possibleLimits);
415 : // Should have short-circuited from #query or thrown some other exception before getting here.
416 151 : checkState(result > 0, "effective limit should be positive");
417 :
418 151 : return result;
419 : }
420 :
421 : private static Optional<QueryParseException> findQueryParseException(Throwable t) {
422 0 : return Throwables.getCausalChain(t).stream()
423 0 : .filter(c -> c instanceof QueryParseException)
424 0 : .map(QueryParseException.class::cast)
425 0 : .findFirst();
426 : }
427 :
428 : protected IntSupplier getUserQueryLimit() {
429 111 : return userQueryLimit;
430 : }
431 :
432 : protected int getInitialPageSize(int queryLimit) {
433 151 : return queryLimit;
434 : }
435 :
436 : protected abstract String formatForLogging(T t);
437 :
438 : protected abstract int getIndexSize();
439 :
440 : protected abstract int getBatchSize();
441 : }
|