View Javadoc
1   /*
2    * Copyright (C) 2017, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.storage.dfs;
12  
13  import static java.util.concurrent.TimeUnit.MILLISECONDS;
14  import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
15  import static org.junit.Assert.assertEquals;
16  import static org.junit.Assert.assertTrue;
17  
18  import java.time.Duration;
19  import java.util.Arrays;
20  import java.util.Collections;
21  import java.util.HashMap;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.stream.LongStream;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  
30  import org.eclipse.jgit.internal.storage.dfs.DfsBlockCacheConfig.IndexEventConsumer;
31  import org.eclipse.jgit.internal.storage.pack.PackExt;
32  import org.eclipse.jgit.junit.TestRepository;
33  import org.eclipse.jgit.junit.TestRng;
34  import org.eclipse.jgit.lib.ObjectId;
35  import org.eclipse.jgit.lib.ObjectInserter;
36  import org.eclipse.jgit.lib.ObjectReader;
37  import org.eclipse.jgit.revwalk.RevCommit;
38  import org.junit.Before;
39  import org.junit.Rule;
40  import org.junit.Test;
41  import org.junit.rules.TestName;
42  
43  public class DfsBlockCacheTest {
44  	@Rule
45  	public TestName testName = new TestName();
46  	private TestRng rng;
47  	private DfsBlockCache cache;
48  	private ExecutorService pool;
49  
50  	@Before
51  	public void setUp() {
52  		rng = new TestRng(testName.getMethodName());
53  		pool = Executors.newFixedThreadPool(10);
54  		resetCache();
55  	}
56  
57  	@SuppressWarnings("resource")
58  	@Test
59  	public void streamKeyReusesBlocks() throws Exception {
60  		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
61  		InMemoryRepository r1 = new InMemoryRepository(repo);
62  		byte[] content = rng.nextBytes(424242);
63  		ObjectId id;
64  		try (ObjectInserter ins = r1.newObjectInserter()) {
65  			id = ins.insert(OBJ_BLOB, content);
66  			ins.flush();
67  		}
68  
69  		long oldSize = LongStream.of(cache.getCurrentSize()).sum();
70  		assertTrue(oldSize > 2000);
71  		assertEquals(0, LongStream.of(cache.getHitCount()).sum());
72  
73  		List<DfsPackDescription> packs = r1.getObjectDatabase().listPacks();
74  		InMemoryRepository r2 = new InMemoryRepository(repo);
75  		r2.getObjectDatabase().commitPack(packs, Collections.emptyList());
76  		try (ObjectReader rdr = r2.newObjectReader()) {
77  			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
78  			assertTrue(Arrays.equals(content, actual));
79  		}
80  		assertEquals(0, LongStream.of(cache.getMissCount()).sum());
81  		assertEquals(oldSize, LongStream.of(cache.getCurrentSize()).sum());
82  	}
83  
84  	@SuppressWarnings("resource")
85  	@Test
86  	public void weirdBlockSize() throws Exception {
87  		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
88  		InMemoryRepository r1 = new InMemoryRepository(repo);
89  
90  		byte[] content1 = rng.nextBytes(4);
91  		byte[] content2 = rng.nextBytes(424242);
92  		ObjectId id1;
93  		ObjectId id2;
94  		try (ObjectInserter ins = r1.newObjectInserter()) {
95  			id1 = ins.insert(OBJ_BLOB, content1);
96  			id2 = ins.insert(OBJ_BLOB, content2);
97  			ins.flush();
98  		}
99  
100 		resetCache();
101 		List<DfsPackDescription> packs = r1.getObjectDatabase().listPacks();
102 
103 		InMemoryRepository r2 = new InMemoryRepository(repo);
104 		r2.getObjectDatabase().setReadableChannelBlockSizeForTest(500);
105 		r2.getObjectDatabase().commitPack(packs, Collections.emptyList());
106 		try (ObjectReader rdr = r2.newObjectReader()) {
107 			byte[] actual = rdr.open(id1, OBJ_BLOB).getBytes();
108 			assertTrue(Arrays.equals(content1, actual));
109 		}
110 
111 		InMemoryRepository r3 = new InMemoryRepository(repo);
112 		r3.getObjectDatabase().setReadableChannelBlockSizeForTest(500);
113 		r3.getObjectDatabase().commitPack(packs, Collections.emptyList());
114 		try (ObjectReader rdr = r3.newObjectReader()) {
115 			byte[] actual = rdr.open(id2, OBJ_BLOB).getBytes();
116 			assertTrue(Arrays.equals(content2, actual));
117 		}
118 	}
119 
120 	@SuppressWarnings("resource")
121 	@Test
122 	public void hasCacheHotMap() throws Exception {
123 		Map<PackExt, Integer> cacheHotMap = new HashMap<>();
124 		// Pack index will be kept in cache longer.
125 		cacheHotMap.put(PackExt.INDEX, Integer.valueOf(3));
126 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
127 				.setBlockLimit(512 * 4).setCacheHotMap(cacheHotMap));
128 		cache = DfsBlockCache.getInstance();
129 
130 		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
131 		InMemoryRepository r1 = new InMemoryRepository(repo);
132 		byte[] content = rng.nextBytes(424242);
133 		ObjectId id;
134 		try (ObjectInserter ins = r1.newObjectInserter()) {
135 			id = ins.insert(OBJ_BLOB, content);
136 			ins.flush();
137 		}
138 
139 		try (ObjectReader rdr = r1.newObjectReader()) {
140 			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
141 			assertTrue(Arrays.equals(content, actual));
142 		}
143 		// All cache entries are hot and cache is at capacity.
144 		assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
145 		assertEquals(99, cache.getFillPercentage());
146 
147 		InMemoryRepository r2 = new InMemoryRepository(repo);
148 		content = rng.nextBytes(424242);
149 		try (ObjectInserter ins = r2.newObjectInserter()) {
150 			ins.insert(OBJ_BLOB, content);
151 			ins.flush();
152 		}
153 		assertEquals(0, LongStream.of(cache.getMissCount()).sum());
154 		assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
155 		assertEquals(0, cache.getEvictions()[PackExt.INDEX.getPosition()]);
156 	}
157 
158 	@SuppressWarnings("resource")
159 	@Test
160 	public void hasIndexEventConsumerOnlyLoaded() throws Exception {
161 		AtomicInteger loaded = new AtomicInteger();
162 		IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
163 			@Override
164 			public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
165 					long loadMicros, long bytes,
166 					Duration lastEvictionDuration) {
167 				assertEquals(PackExt.INDEX.getPosition(), packExtPos);
168 				assertTrue(cacheHit);
169 				assertTrue(lastEvictionDuration.isZero());
170 				loaded.incrementAndGet();
171 			}
172 		};
173 
174 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
175 				.setBlockLimit(512 * 4)
176 				.setIndexEventConsumer(indexEventConsumer));
177 		cache = DfsBlockCache.getInstance();
178 
179 		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
180 		InMemoryRepository r1 = new InMemoryRepository(repo);
181 		byte[] content = rng.nextBytes(424242);
182 		ObjectId id;
183 		try (ObjectInserter ins = r1.newObjectInserter()) {
184 			id = ins.insert(OBJ_BLOB, content);
185 			ins.flush();
186 		}
187 
188 		try (ObjectReader rdr = r1.newObjectReader()) {
189 			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
190 			assertTrue(Arrays.equals(content, actual));
191 		}
192 		// All cache entries are hot and cache is at capacity.
193 		assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
194 		assertEquals(99, cache.getFillPercentage());
195 
196 		InMemoryRepository r2 = new InMemoryRepository(repo);
197 		content = rng.nextBytes(424242);
198 		try (ObjectInserter ins = r2.newObjectInserter()) {
199 			ins.insert(OBJ_BLOB, content);
200 			ins.flush();
201 		}
202 		assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
203 		assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
204 		assertEquals(1, loaded.get());
205 	}
206 
207 	@SuppressWarnings("resource")
208 	@Test
209 	public void hasIndexEventConsumerLoadedAndEvicted() throws Exception {
210 		AtomicInteger loaded = new AtomicInteger();
211 		AtomicInteger evicted = new AtomicInteger();
212 		IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
213 			@Override
214 			public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
215 					long loadMicros, long bytes,
216 					Duration lastEvictionDuration) {
217 				assertEquals(PackExt.INDEX.getPosition(), packExtPos);
218 				assertTrue(cacheHit);
219 				assertTrue(lastEvictionDuration.isZero());
220 				loaded.incrementAndGet();
221 			}
222 
223 			@Override
224 			public void acceptEvictedEvent(int packExtPos, long bytes,
225 					int totalCacheHitCount, Duration lastEvictionDuration) {
226 				assertEquals(PackExt.INDEX.getPosition(), packExtPos);
227 				assertTrue(totalCacheHitCount > 0);
228 				assertTrue(lastEvictionDuration.isZero());
229 				evicted.incrementAndGet();
230 			}
231 
232 			@Override
233 			public boolean shouldReportEvictedEvent() {
234 				return true;
235 			}
236 		};
237 
238 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
239 				.setBlockLimit(512 * 4)
240 				.setIndexEventConsumer(indexEventConsumer));
241 		cache = DfsBlockCache.getInstance();
242 
243 		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
244 		InMemoryRepository r1 = new InMemoryRepository(repo);
245 		byte[] content = rng.nextBytes(424242);
246 		ObjectId id;
247 		try (ObjectInserter ins = r1.newObjectInserter()) {
248 			id = ins.insert(OBJ_BLOB, content);
249 			ins.flush();
250 		}
251 
252 		try (ObjectReader rdr = r1.newObjectReader()) {
253 			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
254 			assertTrue(Arrays.equals(content, actual));
255 		}
256 		// All cache entries are hot and cache is at capacity.
257 		assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
258 		assertEquals(99, cache.getFillPercentage());
259 
260 		InMemoryRepository r2 = new InMemoryRepository(repo);
261 		content = rng.nextBytes(424242);
262 		try (ObjectInserter ins = r2.newObjectInserter()) {
263 			ins.insert(OBJ_BLOB, content);
264 			ins.flush();
265 		}
266 		assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
267 		assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
268 		assertEquals(1, loaded.get());
269 		assertEquals(1, evicted.get());
270 	}
271 
272 	@Test
273 	public void noConcurrencySerializedReads_oneRepo() throws Exception {
274 		InMemoryRepository r1 = createRepoWithBitmap("test");
275 		// Reset cache with concurrency Level at 1 i.e. no concurrency.
276 		resetCache(1);
277 
278 		DfsReader reader = (DfsReader) r1.newObjectReader();
279 		for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
280 			// Only load non-garbage pack with bitmap.
281 			if (pack.isGarbage()) {
282 				continue;
283 			}
284 			asyncRun(() -> pack.getBitmapIndex(reader));
285 			asyncRun(() -> pack.getPackIndex(reader));
286 			asyncRun(() -> pack.getBitmapIndex(reader));
287 		}
288 		waitForExecutorPoolTermination();
289 
290 		assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
291 		assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
292 		// Reverse index has no pack extension, it defaults to 0.
293 		assertEquals(1, cache.getMissCount()[0]);
294 	}
295 
296 	@SuppressWarnings("resource")
297 	@Test
298 	public void noConcurrencySerializedReads_twoRepos() throws Exception {
299 		InMemoryRepository r1 = createRepoWithBitmap("test1");
300 		InMemoryRepository r2 = createRepoWithBitmap("test2");
301 		resetCache(1);
302 
303 		DfsReader reader = (DfsReader) r1.newObjectReader();
304 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
305 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
306 		// Safety check that both repos have the same number of packs.
307 		assertEquals(r1Packs.length, r2Packs.length);
308 
309 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
310 			DfsPackFile pack1 = r1Packs[i];
311 			DfsPackFile pack2 = r2Packs[i];
312 			if (pack1.isGarbage() || pack2.isGarbage()) {
313 				continue;
314 			}
315 			asyncRun(() -> pack1.getBitmapIndex(reader));
316 			asyncRun(() -> pack2.getBitmapIndex(reader));
317 		}
318 
319 		waitForExecutorPoolTermination();
320 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
321 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
322 		assertEquals(2, cache.getMissCount()[0]);
323 	}
324 
325 	@SuppressWarnings("resource")
326 	@Test
327 	public void lowConcurrencyParallelReads_twoRepos() throws Exception {
328 		InMemoryRepository r1 = createRepoWithBitmap("test1");
329 		InMemoryRepository r2 = createRepoWithBitmap("test2");
330 		resetCache(2);
331 
332 		DfsReader reader = (DfsReader) r1.newObjectReader();
333 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
334 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
335 		// Safety check that both repos have the same number of packs.
336 		assertEquals(r1Packs.length, r2Packs.length);
337 
338 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
339 			DfsPackFile pack1 = r1Packs[i];
340 			DfsPackFile pack2 = r2Packs[i];
341 			if (pack1.isGarbage() || pack2.isGarbage()) {
342 				continue;
343 			}
344 			asyncRun(() -> pack1.getBitmapIndex(reader));
345 			asyncRun(() -> pack2.getBitmapIndex(reader));
346 		}
347 
348 		waitForExecutorPoolTermination();
349 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
350 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
351 		assertEquals(2, cache.getMissCount()[0]);
352 	}
353 
354 	@SuppressWarnings("resource")
355 	@Test
356 	public void lowConcurrencyParallelReads_twoReposAndIndex()
357 			throws Exception {
358 		InMemoryRepository r1 = createRepoWithBitmap("test1");
359 		InMemoryRepository r2 = createRepoWithBitmap("test2");
360 		resetCache(2);
361 
362 		DfsReader reader = (DfsReader) r1.newObjectReader();
363 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
364 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
365 		// Safety check that both repos have the same number of packs.
366 		assertEquals(r1Packs.length, r2Packs.length);
367 
368 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
369 			DfsPackFile pack1 = r1Packs[i];
370 			DfsPackFile pack2 = r2Packs[i];
371 			if (pack1.isGarbage() || pack2.isGarbage()) {
372 				continue;
373 			}
374 			asyncRun(() -> pack1.getBitmapIndex(reader));
375 			asyncRun(() -> pack1.getPackIndex(reader));
376 			asyncRun(() -> pack2.getBitmapIndex(reader));
377 		}
378 		waitForExecutorPoolTermination();
379 
380 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
381 		// Index is loaded once for each repo.
382 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
383 		assertEquals(2, cache.getMissCount()[0]);
384 	}
385 
386 	@Test
387 	public void highConcurrencyParallelReads_oneRepo() throws Exception {
388 		InMemoryRepository r1 = createRepoWithBitmap("test");
389 		resetCache();
390 
391 		DfsReader reader = (DfsReader) r1.newObjectReader();
392 		for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
393 			// Only load non-garbage pack with bitmap.
394 			if (pack.isGarbage()) {
395 				continue;
396 			}
397 			asyncRun(() -> pack.getBitmapIndex(reader));
398 			asyncRun(() -> pack.getPackIndex(reader));
399 			asyncRun(() -> pack.getBitmapIndex(reader));
400 		}
401 		waitForExecutorPoolTermination();
402 
403 		assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
404 		assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
405 		assertEquals(1, cache.getMissCount()[0]);
406 	}
407 
408 	@Test
409 	public void highConcurrencyParallelReads_oneRepoParallelReverseIndex()
410 			throws Exception {
411 		InMemoryRepository r1 = createRepoWithBitmap("test");
412 		resetCache();
413 
414 		DfsReader reader = (DfsReader) r1.newObjectReader();
415 		reader.getOptions().setLoadRevIndexInParallel(true);
416 		for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
417 			// Only load non-garbage pack with bitmap.
418 			if (pack.isGarbage()) {
419 				continue;
420 			}
421 			asyncRun(() -> pack.getBitmapIndex(reader));
422 			asyncRun(() -> pack.getPackIndex(reader));
423 			asyncRun(() -> pack.getBitmapIndex(reader));
424 		}
425 		waitForExecutorPoolTermination();
426 
427 		assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
428 		assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
429 		assertEquals(1, cache.getMissCount()[0]);
430 	}
431 
432 	private void resetCache() {
433 		resetCache(32);
434 	}
435 
436 	private void resetCache(int concurrencyLevel) {
437 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
438 				.setConcurrencyLevel(concurrencyLevel).setBlockLimit(1 << 20));
439 		cache = DfsBlockCache.getInstance();
440 	}
441 
442 	private InMemoryRepository createRepoWithBitmap(String repoName)
443 			throws Exception {
444 		DfsRepositoryDescription repoDesc = new DfsRepositoryDescription(
445 				repoName);
446 		InMemoryRepository repo = new InMemoryRepository(repoDesc);
447 		try (TestRepository<InMemoryRepository> repository = new TestRepository<>(
448 				repo)) {
449 			RevCommit commit = repository.branch("/refs/ref1" + repoName)
450 					.commit().add("blob1", "blob1" + repoName).create();
451 			repository.branch("/refs/ref2" + repoName).commit()
452 					.add("blob2", "blob2" + repoName).parent(commit).create();
453 		}
454 		new DfsGarbageCollector(repo).pack(null);
455 		return repo;
456 	}
457 
458 	private void asyncRun(Callable<?> call) {
459 		pool.execute(() -> {
460 			try {
461 				call.call();
462 			} catch (Exception e) {
463 				// Ignore.
464 			}
465 		});
466 	}
467 
468 	private void waitForExecutorPoolTermination() throws Exception {
469 		pool.shutdown();
470 		pool.awaitTermination(500, MILLISECONDS);
471 		assertTrue("Threads did not complete, likely due to a deadlock.",
472 				pool.isTerminated());
473 	}
474 }