1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.storage.dfs;
12
13 import static java.util.concurrent.TimeUnit.MILLISECONDS;
14 import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertTrue;
17
18 import java.time.Duration;
19 import java.util.Arrays;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.stream.LongStream;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29
30 import org.eclipse.jgit.internal.storage.dfs.DfsBlockCacheConfig.IndexEventConsumer;
31 import org.eclipse.jgit.internal.storage.pack.PackExt;
32 import org.eclipse.jgit.junit.TestRepository;
33 import org.eclipse.jgit.junit.TestRng;
34 import org.eclipse.jgit.lib.ObjectId;
35 import org.eclipse.jgit.lib.ObjectInserter;
36 import org.eclipse.jgit.lib.ObjectReader;
37 import org.eclipse.jgit.revwalk.RevCommit;
38 import org.junit.Before;
39 import org.junit.Rule;
40 import org.junit.Test;
41 import org.junit.rules.TestName;
42
43 public class DfsBlockCacheTest {
44 @Rule
45 public TestName testName = new TestName();
46 private TestRng rng;
47 private DfsBlockCache cache;
48 private ExecutorService pool;
49
50 @Before
51 public void setUp() {
52 rng = new TestRng(testName.getMethodName());
53 pool = Executors.newFixedThreadPool(10);
54 resetCache();
55 }
56
57 @SuppressWarnings("resource")
58 @Test
59 public void streamKeyReusesBlocks() throws Exception {
60 DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
61 InMemoryRepository r1 = new InMemoryRepository(repo);
62 byte[] content = rng.nextBytes(424242);
63 ObjectId id;
64 try (ObjectInserter ins = r1.newObjectInserter()) {
65 id = ins.insert(OBJ_BLOB, content);
66 ins.flush();
67 }
68
69 long oldSize = LongStream.of(cache.getCurrentSize()).sum();
70 assertTrue(oldSize > 2000);
71 assertEquals(0, LongStream.of(cache.getHitCount()).sum());
72
73 List<DfsPackDescription> packs = r1.getObjectDatabase().listPacks();
74 InMemoryRepository r2 = new InMemoryRepository(repo);
75 r2.getObjectDatabase().commitPack(packs, Collections.emptyList());
76 try (ObjectReader rdr = r2.newObjectReader()) {
77 byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
78 assertTrue(Arrays.equals(content, actual));
79 }
80 assertEquals(0, LongStream.of(cache.getMissCount()).sum());
81 assertEquals(oldSize, LongStream.of(cache.getCurrentSize()).sum());
82 }
83
84 @SuppressWarnings("resource")
85 @Test
86 public void weirdBlockSize() throws Exception {
87 DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
88 InMemoryRepository r1 = new InMemoryRepository(repo);
89
90 byte[] content1 = rng.nextBytes(4);
91 byte[] content2 = rng.nextBytes(424242);
92 ObjectId id1;
93 ObjectId id2;
94 try (ObjectInserter ins = r1.newObjectInserter()) {
95 id1 = ins.insert(OBJ_BLOB, content1);
96 id2 = ins.insert(OBJ_BLOB, content2);
97 ins.flush();
98 }
99
100 resetCache();
101 List<DfsPackDescription> packs = r1.getObjectDatabase().listPacks();
102
103 InMemoryRepository r2 = new InMemoryRepository(repo);
104 r2.getObjectDatabase().setReadableChannelBlockSizeForTest(500);
105 r2.getObjectDatabase().commitPack(packs, Collections.emptyList());
106 try (ObjectReader rdr = r2.newObjectReader()) {
107 byte[] actual = rdr.open(id1, OBJ_BLOB).getBytes();
108 assertTrue(Arrays.equals(content1, actual));
109 }
110
111 InMemoryRepository r3 = new InMemoryRepository(repo);
112 r3.getObjectDatabase().setReadableChannelBlockSizeForTest(500);
113 r3.getObjectDatabase().commitPack(packs, Collections.emptyList());
114 try (ObjectReader rdr = r3.newObjectReader()) {
115 byte[] actual = rdr.open(id2, OBJ_BLOB).getBytes();
116 assertTrue(Arrays.equals(content2, actual));
117 }
118 }
119
120 @SuppressWarnings("resource")
121 @Test
122 public void hasCacheHotMap() throws Exception {
123 Map<PackExt, Integer> cacheHotMap = new HashMap<>();
124
125 cacheHotMap.put(PackExt.INDEX, Integer.valueOf(3));
126 DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
127 .setBlockLimit(512 * 4).setCacheHotMap(cacheHotMap));
128 cache = DfsBlockCache.getInstance();
129
130 DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
131 InMemoryRepository r1 = new InMemoryRepository(repo);
132 byte[] content = rng.nextBytes(424242);
133 ObjectId id;
134 try (ObjectInserter ins = r1.newObjectInserter()) {
135 id = ins.insert(OBJ_BLOB, content);
136 ins.flush();
137 }
138
139 try (ObjectReader rdr = r1.newObjectReader()) {
140 byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
141 assertTrue(Arrays.equals(content, actual));
142 }
143
144 assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
145 assertEquals(99, cache.getFillPercentage());
146
147 InMemoryRepository r2 = new InMemoryRepository(repo);
148 content = rng.nextBytes(424242);
149 try (ObjectInserter ins = r2.newObjectInserter()) {
150 ins.insert(OBJ_BLOB, content);
151 ins.flush();
152 }
153 assertEquals(0, LongStream.of(cache.getMissCount()).sum());
154 assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
155 assertEquals(0, cache.getEvictions()[PackExt.INDEX.getPosition()]);
156 }
157
158 @SuppressWarnings("resource")
159 @Test
160 public void hasIndexEventConsumerOnlyLoaded() throws Exception {
161 AtomicInteger loaded = new AtomicInteger();
162 IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
163 @Override
164 public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
165 long loadMicros, long bytes,
166 Duration lastEvictionDuration) {
167 assertEquals(PackExt.INDEX.getPosition(), packExtPos);
168 assertTrue(cacheHit);
169 assertTrue(lastEvictionDuration.isZero());
170 loaded.incrementAndGet();
171 }
172 };
173
174 DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
175 .setBlockLimit(512 * 4)
176 .setIndexEventConsumer(indexEventConsumer));
177 cache = DfsBlockCache.getInstance();
178
179 DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
180 InMemoryRepository r1 = new InMemoryRepository(repo);
181 byte[] content = rng.nextBytes(424242);
182 ObjectId id;
183 try (ObjectInserter ins = r1.newObjectInserter()) {
184 id = ins.insert(OBJ_BLOB, content);
185 ins.flush();
186 }
187
188 try (ObjectReader rdr = r1.newObjectReader()) {
189 byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
190 assertTrue(Arrays.equals(content, actual));
191 }
192
193 assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
194 assertEquals(99, cache.getFillPercentage());
195
196 InMemoryRepository r2 = new InMemoryRepository(repo);
197 content = rng.nextBytes(424242);
198 try (ObjectInserter ins = r2.newObjectInserter()) {
199 ins.insert(OBJ_BLOB, content);
200 ins.flush();
201 }
202 assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
203 assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
204 assertEquals(1, loaded.get());
205 }
206
207 @SuppressWarnings("resource")
208 @Test
209 public void hasIndexEventConsumerLoadedAndEvicted() throws Exception {
210 AtomicInteger loaded = new AtomicInteger();
211 AtomicInteger evicted = new AtomicInteger();
212 IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
213 @Override
214 public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
215 long loadMicros, long bytes,
216 Duration lastEvictionDuration) {
217 assertEquals(PackExt.INDEX.getPosition(), packExtPos);
218 assertTrue(cacheHit);
219 assertTrue(lastEvictionDuration.isZero());
220 loaded.incrementAndGet();
221 }
222
223 @Override
224 public void acceptEvictedEvent(int packExtPos, long bytes,
225 int totalCacheHitCount, Duration lastEvictionDuration) {
226 assertEquals(PackExt.INDEX.getPosition(), packExtPos);
227 assertTrue(totalCacheHitCount > 0);
228 assertTrue(lastEvictionDuration.isZero());
229 evicted.incrementAndGet();
230 }
231
232 @Override
233 public boolean shouldReportEvictedEvent() {
234 return true;
235 }
236 };
237
238 DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
239 .setBlockLimit(512 * 4)
240 .setIndexEventConsumer(indexEventConsumer));
241 cache = DfsBlockCache.getInstance();
242
243 DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
244 InMemoryRepository r1 = new InMemoryRepository(repo);
245 byte[] content = rng.nextBytes(424242);
246 ObjectId id;
247 try (ObjectInserter ins = r1.newObjectInserter()) {
248 id = ins.insert(OBJ_BLOB, content);
249 ins.flush();
250 }
251
252 try (ObjectReader rdr = r1.newObjectReader()) {
253 byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
254 assertTrue(Arrays.equals(content, actual));
255 }
256
257 assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
258 assertEquals(99, cache.getFillPercentage());
259
260 InMemoryRepository r2 = new InMemoryRepository(repo);
261 content = rng.nextBytes(424242);
262 try (ObjectInserter ins = r2.newObjectInserter()) {
263 ins.insert(OBJ_BLOB, content);
264 ins.flush();
265 }
266 assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
267 assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
268 assertEquals(1, loaded.get());
269 assertEquals(1, evicted.get());
270 }
271
272 @Test
273 public void noConcurrencySerializedReads_oneRepo() throws Exception {
274 InMemoryRepository r1 = createRepoWithBitmap("test");
275
276 resetCache(1);
277
278 DfsReader reader = (DfsReader) r1.newObjectReader();
279 for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
280
281 if (pack.isGarbage()) {
282 continue;
283 }
284 asyncRun(() -> pack.getBitmapIndex(reader));
285 asyncRun(() -> pack.getPackIndex(reader));
286 asyncRun(() -> pack.getBitmapIndex(reader));
287 }
288 waitForExecutorPoolTermination();
289
290 assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
291 assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
292
293 assertEquals(1, cache.getMissCount()[0]);
294 }
295
296 @SuppressWarnings("resource")
297 @Test
298 public void noConcurrencySerializedReads_twoRepos() throws Exception {
299 InMemoryRepository r1 = createRepoWithBitmap("test1");
300 InMemoryRepository r2 = createRepoWithBitmap("test2");
301 resetCache(1);
302
303 DfsReader reader = (DfsReader) r1.newObjectReader();
304 DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
305 DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
306
307 assertEquals(r1Packs.length, r2Packs.length);
308
309 for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
310 DfsPackFile pack1 = r1Packs[i];
311 DfsPackFile pack2 = r2Packs[i];
312 if (pack1.isGarbage() || pack2.isGarbage()) {
313 continue;
314 }
315 asyncRun(() -> pack1.getBitmapIndex(reader));
316 asyncRun(() -> pack2.getBitmapIndex(reader));
317 }
318
319 waitForExecutorPoolTermination();
320 assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
321 assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
322 assertEquals(2, cache.getMissCount()[0]);
323 }
324
325 @SuppressWarnings("resource")
326 @Test
327 public void lowConcurrencyParallelReads_twoRepos() throws Exception {
328 InMemoryRepository r1 = createRepoWithBitmap("test1");
329 InMemoryRepository r2 = createRepoWithBitmap("test2");
330 resetCache(2);
331
332 DfsReader reader = (DfsReader) r1.newObjectReader();
333 DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
334 DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
335
336 assertEquals(r1Packs.length, r2Packs.length);
337
338 for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
339 DfsPackFile pack1 = r1Packs[i];
340 DfsPackFile pack2 = r2Packs[i];
341 if (pack1.isGarbage() || pack2.isGarbage()) {
342 continue;
343 }
344 asyncRun(() -> pack1.getBitmapIndex(reader));
345 asyncRun(() -> pack2.getBitmapIndex(reader));
346 }
347
348 waitForExecutorPoolTermination();
349 assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
350 assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
351 assertEquals(2, cache.getMissCount()[0]);
352 }
353
354 @SuppressWarnings("resource")
355 @Test
356 public void lowConcurrencyParallelReads_twoReposAndIndex()
357 throws Exception {
358 InMemoryRepository r1 = createRepoWithBitmap("test1");
359 InMemoryRepository r2 = createRepoWithBitmap("test2");
360 resetCache(2);
361
362 DfsReader reader = (DfsReader) r1.newObjectReader();
363 DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
364 DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
365
366 assertEquals(r1Packs.length, r2Packs.length);
367
368 for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
369 DfsPackFile pack1 = r1Packs[i];
370 DfsPackFile pack2 = r2Packs[i];
371 if (pack1.isGarbage() || pack2.isGarbage()) {
372 continue;
373 }
374 asyncRun(() -> pack1.getBitmapIndex(reader));
375 asyncRun(() -> pack1.getPackIndex(reader));
376 asyncRun(() -> pack2.getBitmapIndex(reader));
377 }
378 waitForExecutorPoolTermination();
379
380 assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
381
382 assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
383 assertEquals(2, cache.getMissCount()[0]);
384 }
385
386 @Test
387 public void highConcurrencyParallelReads_oneRepo() throws Exception {
388 InMemoryRepository r1 = createRepoWithBitmap("test");
389 resetCache();
390
391 DfsReader reader = (DfsReader) r1.newObjectReader();
392 for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
393
394 if (pack.isGarbage()) {
395 continue;
396 }
397 asyncRun(() -> pack.getBitmapIndex(reader));
398 asyncRun(() -> pack.getPackIndex(reader));
399 asyncRun(() -> pack.getBitmapIndex(reader));
400 }
401 waitForExecutorPoolTermination();
402
403 assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
404 assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
405 assertEquals(1, cache.getMissCount()[0]);
406 }
407
408 @Test
409 public void highConcurrencyParallelReads_oneRepoParallelReverseIndex()
410 throws Exception {
411 InMemoryRepository r1 = createRepoWithBitmap("test");
412 resetCache();
413
414 DfsReader reader = (DfsReader) r1.newObjectReader();
415 reader.getOptions().setLoadRevIndexInParallel(true);
416 for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
417
418 if (pack.isGarbage()) {
419 continue;
420 }
421 asyncRun(() -> pack.getBitmapIndex(reader));
422 asyncRun(() -> pack.getPackIndex(reader));
423 asyncRun(() -> pack.getBitmapIndex(reader));
424 }
425 waitForExecutorPoolTermination();
426
427 assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
428 assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
429 assertEquals(1, cache.getMissCount()[0]);
430 }
431
432 private void resetCache() {
433 resetCache(32);
434 }
435
436 private void resetCache(int concurrencyLevel) {
437 DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
438 .setConcurrencyLevel(concurrencyLevel).setBlockLimit(1 << 20));
439 cache = DfsBlockCache.getInstance();
440 }
441
442 private InMemoryRepository createRepoWithBitmap(String repoName)
443 throws Exception {
444 DfsRepositoryDescription repoDesc = new DfsRepositoryDescription(
445 repoName);
446 InMemoryRepository repo = new InMemoryRepository(repoDesc);
447 try (TestRepository<InMemoryRepository> repository = new TestRepository<>(
448 repo)) {
449 RevCommit commit = repository.branch("/refs/ref1" + repoName)
450 .commit().add("blob1", "blob1" + repoName).create();
451 repository.branch("/refs/ref2" + repoName).commit()
452 .add("blob2", "blob2" + repoName).parent(commit).create();
453 }
454 new DfsGarbageCollector(repo).pack(null);
455 return repo;
456 }
457
458 private void asyncRun(Callable<?> call) {
459 pool.execute(() -> {
460 try {
461 call.call();
462 } catch (Exception e) {
463
464 }
465 });
466 }
467
468 private void waitForExecutorPoolTermination() throws Exception {
469 pool.shutdown();
470 pool.awaitTermination(500, MILLISECONDS);
471 assertTrue("Threads did not complete, likely due to a deadlock.",
472 pool.isTerminated());
473 }
474 }