Merge "Merge commit 'aa7ff7b' from 'gerrit/mad-hatter'"
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
index 0b07bc4..0e51f28 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -91,65 +91,85 @@
private static final long FILTERED_MEMORY_COMPONENT_SIZE = 16 * 1024l;
@BeforeClass
- public static void setUp() throws Exception {
- System.out.println("SetUp: ");
- TestHelper.deleteExistingInstanceFiles();
- String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
- + File.separator + "resources" + File.separator + "cc.conf";
- nc = new TestNodeController(configPath, false);
+ public static void setUp() {
+ try {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+ + File.separator + "resources" + File.separator + "cc.conf";
+ nc = new TestNodeController(configPath, false);
+ } catch (Throwable e) {
+ LOGGER.error(e);
+ Assert.fail(e.getMessage());
+ }
}
@Before
- public void initializeTest() throws Exception {
+ public void initializeTest() {
// initialize NC before each test
- initializeNc();
- initializeTestCtx();
- createIndex();
- readIndex();
- tupleGenerator = StorageTestUtils.getTupleGenerator();
+ try {
+ initializeNc();
+ initializeTestCtx();
+ createIndex();
+ readIndex();
+ tupleGenerator = StorageTestUtils.getTupleGenerator();
+ } catch (Throwable e) {
+ LOGGER.error(e);
+ Assert.fail(e.getMessage());
+ }
}
@After
- public void deinitializeTest() throws Exception {
- dropIndex();
- // cleanup after each test case
- nc.deInit(true);
- nc.clearOpts();
+ public void deinitializeTest() {
+ try {
+ dropIndex();
+ // cleanup after each test case
+ nc.deInit(true);
+ nc.clearOpts();
+ } catch (Throwable e) {
+ LOGGER.error(e);
+ Assert.fail(e.getMessage());
+ }
}
@Test
- public void testFlushes() throws Exception {
- List<Thread> threads = new ArrayList<>();
- int records = 16 * 1024;
- int threadsPerPartition = 2;
- AtomicReference<Exception> exceptionRef = new AtomicReference<>();
- for (int p = 0; p < NUM_PARTITIONS; p++) {
- for (int t = 0; t < threadsPerPartition; t++) {
- threads.add(insertRecords(records, p, false, exceptionRef));
- threads.add(insertRecords(records, p, true, exceptionRef));
+ public void testFlushes() {
+ try {
+ List<Thread> threads = new ArrayList<>();
+ int records = 16 * 1024;
+ int threadsPerPartition = 2;
+ AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+ for (int p = 0; p < NUM_PARTITIONS; p++) {
+ for (int t = 0; t < threadsPerPartition; t++) {
+ threads.add(insertRecords(records, p, false, exceptionRef));
+ threads.add(insertRecords(records, p, true, exceptionRef));
+ }
}
- }
- for (Thread thread : threads) {
- thread.join();
- }
- if (exceptionRef.get() != null) {
- exceptionRef.get().printStackTrace();
- Assert.fail();
- }
- for (int i = 0; i < NUM_PARTITIONS; i++) {
- Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
- Assert.assertTrue(
- primaryIndexes[i].getDiskComponents().stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
- .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ if (exceptionRef.get() != null) {
+ exceptionRef.get().printStackTrace();
+ Assert.fail();
+ }
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
+ Assert.assertTrue(
+ primaryIndexes[i].getDiskComponents().stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
+ .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
- Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
- Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
- .allMatch(c -> ((AbstractTreeIndex) c.getIndex()).getFileReference().getFile()
- .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
- }
+ Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
+ Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
+ .allMatch(c -> ((AbstractTreeIndex) c.getIndex()).getFileReference().getFile()
+ .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
+ }
- nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
- nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
+ } catch (Throwable e) {
+ LOGGER.error(e);
+ Assert.fail(e.getMessage());
+ }
}
private void initializeNc() throws Exception {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index 5177b25..c0197b0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -452,33 +452,36 @@
private void scheduleFlush() throws HyracksDataException {
synchronized (GlobalVirtualBufferCache.this) {
- if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
- return;
- }
int cycles = 0;
- // find the first modified memory component while avoiding infinite loops
- while (cycles <= primaryIndexes.size()
- && !primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
- flushPtr = (flushPtr + 1) % primaryIndexes.size();
- cycles++;
- }
- if (primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
- // flush the current memory component
- flushingIndex = primaryIndexes.get(flushPtr);
+ while (vbc.getUsage() >= flushPageBudget && flushingIndex == null && cycles <= primaryIndexes.size()) {
+ // find the first modified memory component while avoiding infinite loops
+ while (cycles <= primaryIndexes.size()
+ && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
+ flushPtr = (flushPtr + 1) % primaryIndexes.size();
+ cycles++;
+ }
+
+ ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
flushPtr = (flushPtr + 1) % primaryIndexes.size();
// we need to manually flush this memory component because it may be idle at this point
// note that this is different from flushing a filtered memory component
PrimaryIndexOperationTracker opTracker =
- (PrimaryIndexOperationTracker) flushingIndex.getOperationTracker();
+ (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
synchronized (opTracker) {
- opTracker.setFlushOnExit(true);
- opTracker.flushIfNeeded();
- // If the flush cannot be scheduled at this time, then there must be active writers.
- // The flush will be eventually scheduled when writers exit
+ boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
+ if (flushable && !opTracker.isFlushLogCreated()) {
+ // if the flush log has already been created, then we can simply wait for
+ // that flush to complete
+ opTracker.setFlushOnExit(true);
+ opTracker.flushIfNeeded();
+ // If the flush cannot be scheduled at this time, then there must be active writers.
+ // The flush will be eventually scheduled when writers exit
+ }
+ if (flushable || opTracker.isFlushLogCreated()) {
+ flushingIndex = primaryIndex;
+ break;
+ }
}
- } else {
- throw new IllegalStateException(
- "Cannot find modified memory component after checking all primary indexes");
}
}
}