[NO ISSUE][TEST] Test flushing of empty component

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Some use cases require flushing of component that have no data
but some Metadata. This change adds a test to ensure that always
works.

Change-Id: If921323dfe03cbd70edc3a8ea8e01226d7527bb3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2421
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java
new file mode 100644
index 0000000..f9421a1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.nio.file.Paths;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.test.base.TestMethodTracer;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.DataUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+public class FlushMetadataOnlyTest {
+    private static TestNodeController nc;
+    private static TestLsmBtree lsmBtree;
+    private static NCAppRuntimeContext ncAppCtx;
+    private static IDatasetLifecycleManager dsLifecycleMgr;
+    private static IHyracksTaskContext ctx;
+    private static IIndexDataflowHelper indexDataflowHelper;
+    private static final int PARTITION = 0;
+
+    @Rule
+    public TestRule watcher = new TestMethodTracer();
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestHelper.deleteExistingInstanceFiles();
+        String configPath = Paths.get(System.getProperty("user.dir"), "src", "test", "resources", "cc.conf").toString();
+        nc = new TestNodeController(configPath, false);
+        nc.init();
+        ncAppCtx = nc.getAppRuntimeContext();
+        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        nc.deInit();
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Before
+    public void createIndex() throws Exception {
+        PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        JobId jobId = nc.newJobId();
+        ctx = nc.createTestContext(jobId, PARTITION, false);
+        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
+        indexDataflowHelper.open();
+        lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+        indexDataflowHelper.close();
+    }
+
+    @Test
+    public void testFlushMetadataOnlyComponent() throws Exception {
+        // allow all operations
+        StorageTestUtils.allowAllOps(lsmBtree);
+        // ensure no disk component and memory component is empty
+        Assert.assertEquals(0, lsmBtree.getDiskComponents().size());
+        Assert.assertFalse(lsmBtree.isMemoryComponentsAllocated());
+        MutableArrayValueReference key = new MutableArrayValueReference("FlushMetadataOnlyTestKey".getBytes());
+        MutableArrayValueReference value = new MutableArrayValueReference("FlushMetadataOnlyTestValue".getBytes());
+        indexDataflowHelper.open();
+        ILSMIndexAccessor accessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        accessor.updateMeta(key, value);
+        Assert.assertTrue(lsmBtree.isMemoryComponentsAllocated());
+        Assert.assertTrue(lsmBtree.getCurrentMemoryComponent().isModified());
+        indexDataflowHelper.close();
+        // flush synchronously
+        StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, false);
+        // assert one disk component
+        Assert.assertEquals(1, lsmBtree.getDiskComponents().size());
+        VoidPointable pointable = VoidPointable.FACTORY.createPointable();
+        ComponentUtils.get(lsmBtree, key, pointable);
+        Assert.assertTrue(DataUtils.equals(pointable, value));
+        // ensure that we can search this component
+        StorageTestUtils.searchAndAssertCount(nc, PARTITION, 0);
+    }
+
+    @After
+    public void destroyIndex() throws Exception {
+        indexDataflowHelper.destroy();
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 8ed4bb6..3886115 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -187,8 +187,7 @@
     private void decrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
         //modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
         if (modificationCallback != NoOpOperationCallback.INSTANCE) {
-            numActiveOperations.decrementAndGet();
-            if (numActiveOperations.get() < 0) {
+            if (numActiveOperations.decrementAndGet() < 0) {
                 throw new IllegalStateException("The number of active operations cannot be negative!");
             }
             ((AbstractOperationCallback) modificationCallback).afterOperation();
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
index 5de0b84..23f4b66 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
@@ -56,4 +56,45 @@
     public static void copyInto(IValueReference value, byte[] copy, int offset) {
         System.arraycopy(value.getByteArray(), value.getStartOffset(), copy, offset, value.getLength());
     }
+
+    /**
+     * Check whether two value references are equals
+     *
+     * @param first
+     *            first value
+     * @param second
+     *            second value
+     * @return true if the two values are equal, false otherwise
+     */
+    public static boolean equals(IValueReference first, IValueReference second) { // NOSONAR
+        if (first.getLength() != second.getLength()) {
+            return false;
+        }
+        return equalsInRange(first.getByteArray(), first.getStartOffset(), second.getByteArray(),
+                second.getStartOffset(), first.getLength());
+    }
+
+    /**
+     * Check whether subranges of two byte arrays are equal
+     *
+     * @param arr1
+     *            first array
+     * @param offset1
+     *            first offset
+     * @param arr2
+     *            second array
+     * @param offset2
+     *            second offset
+     * @param length
+     *            the length of the window
+     * @return true if the two arrays have equal subranges, false otherwise
+     */
+    public static boolean equalsInRange(byte[] arr1, int offset1, byte[] arr2, int offset2, int length) {
+        for (int i = 0; i < length; i++) {
+            if (arr1[offset1 + i] != arr2[offset2 + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
 }