[ASTERIXDB-2304] Ensure Flush is Finished in FlushRecoveryTest

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Previous in LSMFlushRecoveryTest, it's possible during recovery
we check component ids before the flush is finished (since flush is
asynchronous), and thus causes intermittent failures. This patch fixes
this problem by waiting for active IOs before checking component ids.
- Fix the problem the override config options in TestNodeController not
working.
- Also fix [ASTERIXDB-2309] to ensure only indexes of a given partition
are flushed upon seeing a FLUSH record.

Change-Id: I1704c6606c7c7bef226ae31961c347c6ebb76c2a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2437
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index e23be96..74277ce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -387,7 +387,7 @@
                             // if an index has no ongoing updates, then it's memory component must be empty
                             // and there is nothing to flush
                             for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-                                if (iInfo.isOpen()) {
+                                if (iInfo.isOpen() && iInfo.getPartition() == partition) {
                                     maxDiskLastLsn = resourceId2MaxLSNMap.get(iInfo.getResourceId());
                                     index = iInfo.getIndex();
                                     AbstractLSMIOOperationCallback ioCallback =
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index ee96d00..acc3970 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -137,11 +137,12 @@
                 ncConfigManager = new ConfigManager(new String[] { "-config-file", confFile });
             }
             ncApplication.registerConfig(ncConfigManager);
+            opts.forEach(opt -> ncConfigManager.set(nodeId, opt.getLeft(), opt.getRight()));
             nodeControllers.add(
                     new NodeControllerService(fixupIODevices(createNCConfig(nodeId, ncConfigManager)), ncApplication));
         }
 
-        opts.stream().forEach(opt -> configManager.set(opt.getLeft(), opt.getRight()));
+        opts.forEach(opt -> configManager.set(opt.getLeft(), opt.getRight()));
         cc.start();
 
         // Starts ncs.
@@ -359,6 +360,10 @@
         opts.add(Pair.of(name, value));
     }
 
+    public void clearOptions() {
+        opts.clear();
+    }
+
     /**
      * @return the asterix-app absolute path if found, otherwise the default user path.
      */
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index e8e38eb..5cda9f2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -175,6 +175,7 @@
 
     public void clearOpts() {
         options.clear();
+        ExecutionTestUtil.integrationUtil.clearOptions();
     }
 
     public TxnId getTxnJobId(IHyracksTaskContext ctx) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index b10e9b1..dabb32c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.test.dataflow;
 
 import java.io.File;
+import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -32,7 +33,9 @@
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.StorageProperties.Option;
+import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
@@ -43,6 +46,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -51,6 +55,7 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
@@ -64,25 +69,27 @@
 import org.junit.Test;
 
 public class LSMFlushRecoveryTest {
-
     private static TestNodeController nc;
     private static Dataset dataset;
-    private static PrimaryIndexInfo primaryIndexInfo;
-    private static SecondaryIndexInfo secondaryIndexInfo;
-    private static TestLsmBtree primaryIndex;
-    private static TestLsmBtree secondaryIndex;
+    private static PrimaryIndexInfo[] primaryIndexInfos;
+    private static SecondaryIndexInfo[] secondaryIndexInfo;
+    private static TestLsmBtree[] primaryIndexes;
+    private static TestLsmBtree[] secondaryIndexes;
     private static Index secondaryIndexEntity;
     private static NCAppRuntimeContext ncAppCtx;
     private static IDatasetLifecycleManager dsLifecycleMgr;
 
-    private static IHyracksTaskContext ctx;
-    private static IIndexDataflowHelper primaryIndexDataflowHelper;
-    private static IIndexDataflowHelper secondaryIndexDataflowHelper;
+    private static IHyracksTaskContext[] testCtxs;
+    private static IIndexDataflowHelper[] primaryIndexDataflowHelpers;
+    private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
     private static ITransactionContext txnCtx;
-    private static LSMInsertDeleteOperatorNodePushable insertOp;
-    private static final int PARTITION = 0;
+    private static LSMInsertDeleteOperatorNodePushable[] insertOps;
     private static TupleGenerator tupleGenerator;
 
+    private static final int NUM_PARTITIONS = 2;
+    private static final int PARTITION_0 = 0;
+    private static final int PARTITION_1 = 1;
+
     private static final String SECONDARY_INDEX_NAME = "TestIdx";
     private static final IndexType SECONDARY_INDEX_TYPE = IndexType.BTREE;
     private static final List<List<String>> SECONDARY_INDEX_FIELD_NAMES =
@@ -106,36 +113,41 @@
         initializeTestCtx();
         createIndex();
         readIndex();
-        insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, secondaryIndexEntity);
+        createInsertOps();
         tupleGenerator = StorageTestUtils.getTupleGenerator();
     }
 
     @After
-    public void testRecovery() {
-        try {
-            // right now we've inserted 1000 records to the index, and each record is at least 12 bytes.
-            // thus, the memory component size is at least 12KB.
-            List<Pair<IOption, Object>> opts = new ArrayList<>();
-            opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, "128MB"));
-            opts.add(Pair.of(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, "10000"));
-            nc.setOpts(opts);
-            nc.init(false);
-            initializeTestCtx();
-            readIndex();
-            checkComponentIds();
-            insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, secondaryIndexEntity);
-            // insert more records
-            insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
-            checkComponentIds();
+    public void testRecovery() throws Exception {
+        // right now we've inserted 1000 records to the index, and each record is at least 12 bytes.
+        // thus, the memory component size is at least 12KB.
+        List<Pair<IOption, Object>> opts = new ArrayList<>();
+        opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 20 * 1024 * 1024L));
+        opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_PAGESIZE, 1 * 1024));
+        // each memory component only gets 4 pages (we have 2 partitions, 2 memory components/partition)
+        // and some reserved memory for metadata dataset
+        opts.add(Pair.of(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, 1024));
+        nc.setOpts(opts);
+        initializeNc(false);
+        initializeTestCtx();
+        readIndex();
+        // wait for ongoing flushes triggered by recovery to finish
+        DatasetInfo dsInfo = dsLifecycleMgr.getDatasetInfo(dataset.getDatasetId());
+        dsInfo.waitForIO();
 
-            dropIndex();
-            // cleanup after each test case
-            nc.deInit(true);
-            nc.clearOpts();
-        } catch (Throwable e) {
-            e.printStackTrace();
-            Assert.fail(e.getMessage());
-        }
+        checkComponentIds();
+        // insert more records
+        createInsertOps();
+        insertRecords(PARTITION_0, StorageTestUtils.RECORDS_PER_COMPONENT, StorageTestUtils.RECORDS_PER_COMPONENT,
+                true);
+
+        dsInfo.waitForIO();
+        checkComponentIds();
+
+        dropIndex();
+        // cleanup after each test case
+        nc.deInit(true);
+        nc.clearOpts();
     }
 
     private void initializeNc(boolean cleanUpOnStart) throws Exception {
@@ -145,60 +157,84 @@
     }
 
     private void createIndex() throws Exception {
-        primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
-        dataset = primaryIndexInfo.getDataset();
+        dataset = StorageTestUtils.DATASET;
         secondaryIndexEntity = new Index(dataset.getDataverseName(), dataset.getDatasetName(), SECONDARY_INDEX_NAME,
                 SECONDARY_INDEX_TYPE, SECONDARY_INDEX_FIELD_NAMES, SECONDARY_INDEX_FIELD_INDICATORS,
                 SECONDARY_INDEX_FIELD_TYPES, false, false, false, 0);
-        secondaryIndexInfo = nc.createSecondaryIndex(primaryIndexInfo, secondaryIndexEntity,
-                StorageTestUtils.STORAGE_MANAGER, PARTITION);
+
+        primaryIndexInfos = new PrimaryIndexInfo[NUM_PARTITIONS];
+        secondaryIndexInfo = new SecondaryIndexInfo[NUM_PARTITIONS];
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            primaryIndexInfos[i] = StorageTestUtils.createPrimaryIndex(nc, i);
+            secondaryIndexInfo[i] = nc.createSecondaryIndex(primaryIndexInfos[i], secondaryIndexEntity,
+                    StorageTestUtils.STORAGE_MANAGER, i);
+        }
+
     }
 
     private void initializeTestCtx() throws Exception {
         JobId jobId = nc.newJobId();
-        ctx = nc.createTestContext(jobId, PARTITION, false);
-        txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+        testCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            testCtxs[i] = nc.createTestContext(jobId, i, false);
+        }
+        txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
     }
 
     private void readIndex() throws HyracksDataException {
-        IndexDataflowHelperFactory primaryHelperFactory =
-                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
-        primaryIndexDataflowHelper = primaryHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
-        primaryIndexDataflowHelper.open();
-        primaryIndex = (TestLsmBtree) primaryIndexDataflowHelper.getIndexInstance();
-        primaryIndexDataflowHelper.close();
+        primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+        primaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            IIndexDataflowHelperFactory factory =
+                    new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfos[i].getFileSplitProvider());
+            primaryIndexDataflowHelpers[i] = factory.create(testCtxs[i].getJobletContext().getServiceContext(), i);
+            primaryIndexDataflowHelpers[i].open();
+            primaryIndexes[i] = (TestLsmBtree) primaryIndexDataflowHelpers[i].getIndexInstance();
+            primaryIndexDataflowHelpers[i].close();
+        }
 
-        IndexDataflowHelperFactory secodnaryIHelperFactory =
-                new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider());
-        secondaryIndexDataflowHelper =
-                secodnaryIHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
-        secondaryIndexDataflowHelper.open();
-        secondaryIndex = (TestLsmBtree) secondaryIndexDataflowHelper.getIndexInstance();
-        secondaryIndexDataflowHelper.close();
+        secondaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+        secondaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            IIndexDataflowHelperFactory factory = new IndexDataflowHelperFactory(nc.getStorageManager(),
+                    secondaryIndexInfo[i].getFileSplitProvider());
+            secondaryIndexDataflowHelpers[i] = factory.create(testCtxs[i].getJobletContext().getServiceContext(), i);
+            secondaryIndexDataflowHelpers[i].open();
+            secondaryIndexes[i] = (TestLsmBtree) secondaryIndexDataflowHelpers[i].getIndexInstance();
+            secondaryIndexDataflowHelpers[i].close();
+        }
+    }
+
+    private void createInsertOps() throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        insertOps = new LSMInsertDeleteOperatorNodePushable[NUM_PARTITIONS];
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            insertOps[i] = StorageTestUtils.getInsertPipeline(nc, testCtxs[i], secondaryIndexEntity);
+        }
     }
 
     private void dropIndex() throws HyracksDataException {
-        primaryIndexDataflowHelper.destroy();
-        secondaryIndexDataflowHelper.destroy();
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            primaryIndexDataflowHelpers[i].destroy();
+            secondaryIndexDataflowHelpers[i].destroy();
+        }
     }
 
     @Test
     public void testBothFlushSucceed() throws Exception {
-        insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
         // shutdown the server
         nc.deInit(false);
     }
 
     @Test
     public void testSecondaryFlushFails() throws Exception {
-        insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
 
-        primaryIndex.clearFlushCallbacks();
-        secondaryIndex.clearFlushCallbacks();
+        primaryIndexes[PARTITION_0].clearFlushCallbacks();
+        secondaryIndexes[PARTITION_0].clearFlushCallbacks();
 
-        Semaphore primaryFlushSemaphore = new Semaphore(0);
-        secondaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+        secondaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() {
             @Override
             public void before(Semaphore t) throws HyracksDataException {
                 throw new HyracksDataException("Kill the flush thread");
@@ -210,8 +246,9 @@
             }
         });
 
-        primaryIndex.addFlushCallback(AllowTestOpCallback.INSTANCE);
-        primaryIndex.addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
+        Semaphore primaryFlushSemaphore = new Semaphore(0);
+        primaryIndexes[PARTITION_0].addFlushCallback(AllowTestOpCallback.INSTANCE);
+        primaryIndexes[PARTITION_0].addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
             @Override
             public void before(Void t) throws HyracksDataException {
 
@@ -222,11 +259,11 @@
                 primaryFlushSemaphore.release();
             }
         });
-        StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+        StorageTestUtils.flush(dsLifecycleMgr, primaryIndexes[PARTITION_0], true);
 
         primaryFlushSemaphore.acquire();
-        List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
-        List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents();
+        List<ILSMDiskComponent> primaryComponents = primaryIndexes[PARTITION_0].getDiskComponents();
+        List<ILSMDiskComponent> secondaryComponents = secondaryIndexes[PARTITION_0].getDiskComponents();
         Assert.assertEquals(primaryComponents.size(), secondaryComponents.size() + 1);
         // shutdown the NC
         nc.deInit(false);
@@ -234,14 +271,12 @@
 
     @Test
     public void testPrimaryFlushFails() throws Exception {
-        insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
 
-        primaryIndex.clearFlushCallbacks();
-        secondaryIndex.clearFlushCallbacks();
+        primaryIndexes[PARTITION_0].clearFlushCallbacks();
+        secondaryIndexes[PARTITION_0].clearFlushCallbacks();
 
-        Semaphore secondaryFlushSemaphore = new Semaphore(0);
-
-        primaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+        primaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() {
             @Override
             public void before(Semaphore t) throws HyracksDataException {
                 throw new HyracksDataException("Kill the flush thread");
@@ -253,8 +288,9 @@
             }
         });
 
-        secondaryIndex.addFlushCallback(AllowTestOpCallback.INSTANCE);
-        secondaryIndex.addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
+        Semaphore secondaryFlushSemaphore = new Semaphore(0);
+        secondaryIndexes[PARTITION_0].addFlushCallback(AllowTestOpCallback.INSTANCE);
+        secondaryIndexes[PARTITION_0].addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
             @Override
             public void before(Void t) throws HyracksDataException {
 
@@ -265,27 +301,85 @@
                 secondaryFlushSemaphore.release();
             }
         });
-        StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+        StorageTestUtils.flush(dsLifecycleMgr, primaryIndexes[PARTITION_0], true);
 
         secondaryFlushSemaphore.acquire();
-        List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
-        List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents();
+        List<ILSMDiskComponent> primaryComponents = primaryIndexes[PARTITION_0].getDiskComponents();
+        List<ILSMDiskComponent> secondaryComponents = secondaryIndexes[PARTITION_0].getDiskComponents();
         Assert.assertEquals(secondaryComponents.size(), primaryComponents.size() + 1);
         // shutdown the NC
         nc.deInit(false);
     }
 
     @Test
-    public void testBothFlushFail() throws Exception {
-        insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+    public void testMultiPartition() throws Exception {
+        // insert records to partition 0
+        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT,
+                false);
+        // insert records to partition 1
+        insertRecords(PARTITION_1, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
+        StorageTestUtils.waitForOperations(primaryIndexes[PARTITION_0]);
+        StorageTestUtils.waitForOperations(primaryIndexes[PARTITION_1]);
 
-        primaryIndex.clearFlushCallbacks();
-        secondaryIndex.clearFlushCallbacks();
+        // now both partitions have some extra records in memory component
+        Assert.assertTrue(primaryIndexes[PARTITION_0].getCurrentMemoryComponent().isModified());
+        Assert.assertTrue(primaryIndexes[PARTITION_1].getCurrentMemoryComponent().isModified());
+
+        primaryIndexes[PARTITION_0].clearFlushCallbacks();
+        secondaryIndexes[PARTITION_0].clearFlushCallbacks();
+
+        primaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() {
+            @Override
+            public void before(Semaphore t) throws HyracksDataException {
+                throw new HyracksDataException("Kill the flush thread");
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+
+            }
+        });
+
+        Semaphore flushSemaphore = new Semaphore(0);
+        secondaryIndexes[PARTITION_0].addFlushCallback(AllowTestOpCallback.INSTANCE);
+        secondaryIndexes[PARTITION_0].addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
+            @Override
+            public void before(Void t) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+                flushSemaphore.release();
+            }
+        });
+
+        // only flush the partition 0
+        StorageTestUtils.flushPartition(dsLifecycleMgr, primaryIndexes[PARTITION_0], true);
+        flushSemaphore.acquire(1);
+
+        List<ILSMDiskComponent> primaryComponents = primaryIndexes[PARTITION_0].getDiskComponents();
+        List<ILSMDiskComponent> secondaryComponents = secondaryIndexes[PARTITION_0].getDiskComponents();
+        Assert.assertEquals(secondaryComponents.size(), primaryComponents.size() + 1);
+
+        Assert.assertEquals(secondaryIndexes[PARTITION_1].getDiskComponents().size(),
+                primaryIndexes[PARTITION_1].getDiskComponents().size());
+        // shutdown the NC
+        // upon recovery, it's expected that the FLUSH record on partition 0 does not affect partition 1
+        nc.deInit(false);
+    }
+
+    @Test
+    public void testBothFlushFail() throws Exception {
+        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
+
+        primaryIndexes[PARTITION_0].clearFlushCallbacks();
+        secondaryIndexes[PARTITION_0].clearFlushCallbacks();
 
         Semaphore primaryFlushSemaphore = new Semaphore(0);
         Semaphore secondaryFlushSemaphore = new Semaphore(0);
 
-        primaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+        primaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() {
             @Override
             public void before(Semaphore t) throws HyracksDataException {
                 primaryFlushSemaphore.release();
@@ -298,7 +392,7 @@
             }
         });
 
-        secondaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+        secondaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() {
             @Override
             public void before(Semaphore t) throws HyracksDataException {
                 secondaryFlushSemaphore.release();
@@ -310,51 +404,60 @@
 
             }
         });
-        StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+        StorageTestUtils.flush(dsLifecycleMgr, primaryIndexes[PARTITION_0], true);
 
         primaryFlushSemaphore.acquire();
         secondaryFlushSemaphore.acquire();
-        List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
-        List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents();
+        List<ILSMDiskComponent> primaryComponents = primaryIndexes[PARTITION_0].getDiskComponents();
+        List<ILSMDiskComponent> secondaryComponents = secondaryIndexes[PARTITION_0].getDiskComponents();
         Assert.assertEquals(secondaryComponents.size(), primaryComponents.size());
         // shutdown the NC
         nc.deInit(false);
     }
 
-    private void insertRecords(int totalNumRecords, int recordsPerComponent) throws Exception {
-        StorageTestUtils.allowAllOps(primaryIndex);
-        StorageTestUtils.allowAllOps(secondaryIndex);
-        insertOp.open();
-        VSizeFrame frame = new VSizeFrame(ctx);
+    private void insertRecords(int partition, int totalNumRecords, int recordsPerComponent, boolean commit)
+            throws Exception {
+        StorageTestUtils.allowAllOps(primaryIndexes[partition]);
+        StorageTestUtils.allowAllOps(secondaryIndexes[partition]);
+        insertOps[partition].open();
+        VSizeFrame frame = new VSizeFrame(testCtxs[partition]);
         FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
         for (int i = 0; i < totalNumRecords; i++) {
             // flush every RECORDS_PER_COMPONENT records
-            if (i % recordsPerComponent == 0 && i + 1 != totalNumRecords) {
+            if (i % recordsPerComponent == 0 && i != 0 && i + 1 != totalNumRecords) {
                 if (tupleAppender.getTupleCount() > 0) {
-                    tupleAppender.write(insertOp, true);
+                    tupleAppender.write(insertOps[partition], true);
                 }
-                StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, false);
+                StorageTestUtils.flushPartition(dsLifecycleMgr, primaryIndexes[partition], false);
             }
             ITupleReference tuple = tupleGenerator.next();
-            DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOps[partition]);
         }
         if (tupleAppender.getTupleCount() > 0) {
-            tupleAppender.write(insertOp, true);
+            tupleAppender.write(insertOps[partition], true);
         }
-        insertOp.close();
-        nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        insertOps[partition].close();
+        if (commit) {
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        }
     }
 
     private void checkComponentIds() throws HyracksDataException {
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            checkComponentIds(i);
+        }
+    }
+
+    private void checkComponentIds(int partitionIndex) throws HyracksDataException {
         // check memory component
-        if (primaryIndex.isMemoryComponentsAllocated()) {
-            ILSMMemoryComponent primaryMemComponent = primaryIndex.getCurrentMemoryComponent();
-            ILSMMemoryComponent secondaryMemComponent = secondaryIndex.getCurrentMemoryComponent();
+        if (primaryIndexes[partitionIndex].isMemoryComponentsAllocated()) {
+            ILSMMemoryComponent primaryMemComponent = primaryIndexes[partitionIndex].getCurrentMemoryComponent();
+            ILSMMemoryComponent secondaryMemComponent = secondaryIndexes[partitionIndex].getCurrentMemoryComponent();
             Assert.assertEquals(primaryMemComponent.getId(), secondaryMemComponent.getId());
         }
 
-        List<ILSMDiskComponent> primaryDiskComponents = primaryIndex.getDiskComponents();
-        List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndex.getDiskComponents();
+        List<ILSMDiskComponent> primaryDiskComponents = primaryIndexes[partitionIndex].getDiskComponents();
+        List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndexes[partitionIndex].getDiskComponents();
 
         Assert.assertEquals(primaryDiskComponents.size(), secondaryDiskComponents.size());
         for (int i = 0; i < primaryDiskComponents.size(); i++) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index b2b03b3..390286a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
@@ -37,6 +38,7 @@
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -175,6 +177,35 @@
                 closeAnswer, deepCopyInputFrames);
     }
 
+    public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, TestLsmBtree lsmBtree, boolean async)
+            throws Exception {
+        flushPartition(dslLifecycleMgr, lsmBtree, DATASET, async);
+    }
+
+    public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
+            boolean async) throws Exception {
+        waitForOperations(lsmBtree);
+        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmBtree.getOperationTracker();
+        opTracker.setFlushOnExit(true);
+        opTracker.flushIfNeeded();
+
+        long maxWaitTime = TimeUnit.MINUTES.toNanos(1); // 1min
+        // wait for log record is flushed, i.e., the flush is scheduled
+        long before = System.nanoTime();
+        while (opTracker.isFlushLogCreated()) {
+            Thread.sleep(5); // NOSONAR: Test code with a timeout
+            if (System.nanoTime() - before > maxWaitTime) {
+                throw new IllegalStateException(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - before)
+                        + "ms passed without scheduling the flush operation");
+            }
+        }
+
+        if (!async) {
+            DatasetInfo dsInfo = dslLifecycleMgr.getDatasetInfo(dataset.getDatasetId());
+            dsInfo.waitForIO();
+        }
+    }
+
     public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, boolean async)
             throws Exception {
         flush(dsLifecycleMgr, lsmBtree, DATASET, async);