[ASTERIXDB-2664][TEST] Add Run File Test

	- user model changes: no
	- storage format changes: no
	- interface changes: no

	details:
	-added Run File Test

Change-Id: I856f699d04642eb95fdb89c8dfed2a689bb2eeaa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4743
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/io/RunFileTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/io/RunFileTest.java
new file mode 100644
index 0000000..599a1ce
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/io/RunFileTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.io;
+
+import java.io.DataOutput;
+import java.util.Random;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for RunFileReader and RunFileWriter.
+ * <p>
+ * Runs test for single and multiple frames written to the run file.
+ * The reading can be done in sequence, out of sequence, reverse
+ * and in separate sessions between writing.
+ *
+ * @see org.apache.hyracks.dataflow.common.io.RunFileReader
+ * @see org.apache.hyracks.dataflow.common.io.RunFileWriter
+ */
+
+public class RunFileTest {
+
+    private static final int TEST_FRAME_SIZE = 256;
+    private static final int TEST_FRAME_SIZE_ALTERNATE = 512;
+    private static final int MULTIPLE_FRAMES = 5;
+    private static final int NUMBER_OF_TUPLES = 10;
+    private static final int NUMBER_OF_TUPLES_ALTERNATE = 100;
+    private static final int FIELD_COUNT = 2;
+
+    private final IHyracksTaskContext ctx = TestUtils.create(TEST_FRAME_SIZE);
+
+    private RunFileWriter writer;
+
+    @Before
+    public void setup() throws HyracksDataException {
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("RunFileTest");
+        IIOManager ioManager = ctx.getIoManager();
+        writer = new RunFileWriter(file, ioManager);
+    }
+
+    @Test
+    public void testSingleFrame() throws HyracksDataException {
+
+        //Declare fields
+        IFrame frame = new VSizeFrame(ctx);
+        IFrame readFrame = new VSizeFrame(ctx);
+
+        // Writer test
+        writer.open();
+        writeFrame(frame);
+
+        // Reading what was written
+        RunFileReader reader;
+        reader = writer.createDeleteOnCloseReader();
+        writer.close();
+        reader.open();
+        reader.nextFrame(readFrame);
+        Assert.assertArrayEquals("Reading frame bytes", frame.getBuffer().array(), readFrame.getBuffer().array());
+        reader.close();
+    }
+
+    @Test
+    public void testMultipleFrame() throws HyracksDataException {
+
+        //Declare fields
+        IFrame[] frames = getFrames(MULTIPLE_FRAMES);
+
+        // Writer test
+        writer.open();
+        writeFrames(frames);
+        IFrame[] readFrames = getFrames(MULTIPLE_FRAMES);
+
+        // Reading what was written
+        RunFileReader reader;
+        reader = writer.createDeleteOnCloseReader();
+        writer.close();
+        reader.open();
+        readFrames(frames, readFrames, reader);
+        reader.close();
+    }
+
+    @Test
+    public void testMultipleFrameMultipleReads() throws HyracksDataException {
+
+        //Declare Fields
+        IFrame[] frames = getFrames(MULTIPLE_FRAMES);
+
+        // Writer test
+        writer.open();
+        writeFrames(frames);
+        IFrame[] readFrames = getFrames(MULTIPLE_FRAMES);
+
+        // Reading what was written
+        RunFileReader reader = writer.createDeleteOnCloseReader();
+        writer.close();
+        reader.open();
+        readFrames(frames, readFrames, reader);
+        reader.seek(0);
+        readFrames(frames, readFrames, reader);
+        reader.close();
+    }
+
+    @Test
+    public void testMultipleFrameRandomReads() throws HyracksDataException {
+
+        //Declare fields
+        IFrame[] frames = getFrames(MULTIPLE_FRAMES);
+        IFrame[] readFrames = getFrames(MULTIPLE_FRAMES);
+        long[] frameOffsets = new long[frames.length];
+
+        // Writer test
+        writer.open();
+        writeFramesWithOffsets(frames, frameOffsets, 1);
+
+        // Reading what was written
+        RunFileReader reader;
+        reader = writer.createDeleteOnCloseReader();
+        writer.close();
+        reader.open();
+        readFrames(frames, readFrames, reader);
+        readFramesBackwards(frames, readFrames, frameOffsets, reader);
+        reader.close();
+    }
+
+    @Test
+    public void testMultipleDifferentSizedFrames() throws HyracksDataException {
+
+        //Declare fields
+        IFrame[] frames = getFramesAlternate(MULTIPLE_FRAMES);
+        IFrame[] readFrames = getFrames(MULTIPLE_FRAMES);
+        long[] frameOffsets = new long[frames.length];
+
+        // Writer test
+        writer.open();
+        writeFramesWithDifferentSizes(frames, frameOffsets);
+
+        // Reading what was written
+        RunFileReader reader;
+        reader = writer.createDeleteOnCloseReader();
+        writer.close();
+        reader.open();
+        readFrames(frames, readFrames, reader);
+        readFramesBackwards(frames, readFrames, frameOffsets, reader);
+        reader.close();
+    }
+
+    @Test
+    public void testMultipleWriteReadSessions() throws HyracksDataException {
+
+        //Declare fields
+        IFrame[] framesAll = getFrames(MULTIPLE_FRAMES * 2);
+        IFrame[] readFrames = getFrames(framesAll.length);
+        long[] frameOffsets = new long[framesAll.length];
+
+        // Writer test
+        writer.open();
+        writeFramesWithOffsets(framesAll, frameOffsets, 1);
+
+        // Reading what was written
+        readMultipleFramesForwardsAndBackwards(framesAll, readFrames, frameOffsets, 1);
+
+        // Second write session
+        writeFramesWithOffsets(framesAll, frameOffsets, 2);
+
+        // Reading what was written in both sections
+        readMultipleFramesForwardsAndBackwards(framesAll, readFrames, frameOffsets, 2);
+        writer.close();
+    }
+
+    private IFrame[] getFrames(int count) throws HyracksDataException {
+        IFrame[] frames = new IFrame[count];
+        for (int f = 0; f < frames.length; f++) {
+            frames[f] = new VSizeFrame(ctx);
+        }
+        return frames;
+    }
+
+    private IFrame[] getFramesAlternate(int count) throws HyracksDataException {
+        if (count < 2) {
+            throw new HyracksDataException("Must provide more than one frame.");
+        }
+        IFrame[] frames = new IFrame[count];
+        frames[0] = new VSizeFrame(ctx);
+        frames[1] = new VSizeFrame(ctx, TEST_FRAME_SIZE_ALTERNATE);
+        for (int f = 2; f < frames.length; f++) {
+            frames[f] = new VSizeFrame(ctx);
+        }
+        return frames;
+    }
+
+    public void writeTupleToFrame(FrameTupleAppender appender, int tupleCount) throws HyracksDataException {
+
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(FIELD_COUNT);
+        DataOutput dos = tb.getDataOutput();
+
+        Random rnd = new Random();
+        rnd.setSeed(50);
+        for (int i = 0; i < tupleCount; i++) {
+            int f0 = rnd.nextInt() % 100000;
+            int f1 = 5;
+
+            tb.reset();
+            IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
+            tb.addFieldEndOffset();
+            IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
+            tb.addFieldEndOffset();
+
+            if (appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                break;
+            }
+        }
+    }
+
+    public void writeFrame(IFrame frame) throws HyracksDataException {
+
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(frame, true);
+        writeTupleToFrame(appender, NUMBER_OF_TUPLES);
+
+        writer.nextFrame(frame.getBuffer());
+    }
+
+    public void writeFrames(IFrame[] frames) throws HyracksDataException {
+
+        FrameTupleAppender appender = new FrameTupleAppender();
+
+        for (int f = 0; f < MULTIPLE_FRAMES; f++) {
+            appender.reset(frames[f], true);
+            writeTupleToFrame(appender, NUMBER_OF_TUPLES);
+            writer.nextFrame(frames[f].getBuffer());
+        }
+    }
+
+    public void writeFramesWithOffsets(IFrame[] frames, long[] frameOffsets, int writeSession)
+            throws HyracksDataException {
+
+        FrameTupleAppender appender = new FrameTupleAppender();
+
+        for (int f = 0; f < MULTIPLE_FRAMES * writeSession; f++) {
+            appender.reset(frames[f], true);
+            writeTupleToFrame(appender, NUMBER_OF_TUPLES);
+            frameOffsets[f] = writer.getFileSize();
+            writer.nextFrame(frames[f].getBuffer());
+        }
+    }
+
+    public void writeFramesWithDifferentSizes(IFrame[] frames, long[] frameOffsets) throws HyracksDataException {
+
+        FrameTupleAppender appender = new FrameTupleAppender();
+
+        for (int f = 0; f < frames.length; f++) {
+            appender.reset(frames[f], false);
+            int tupleCount = (f == 1 ? NUMBER_OF_TUPLES_ALTERNATE : NUMBER_OF_TUPLES);
+            writeTupleToFrame(appender, tupleCount);
+            frameOffsets[f] = writer.getFileSize();
+            writer.nextFrame(frames[f].getBuffer());
+        }
+    }
+
+    public void readFrames(IFrame[] frames, IFrame[] readFrames, RunFileReader reader) throws HyracksDataException {
+
+        for (int f = 0; f < frames.length; f++) {
+            reader.nextFrame(readFrames[f]);
+            Assert.assertArrayEquals("Reading frame[" + f + "] bytes", frames[f].getBuffer().array(),
+                    readFrames[f].getBuffer().array());
+        }
+    }
+
+    public void readFramesBackwards(IFrame[] frames, IFrame[] readFrames, long[] frameOffsets, RunFileReader reader)
+            throws HyracksDataException {
+
+        for (int f = frames.length - 1; f >= 0; f--) {
+            reader.seek(frameOffsets[f]);
+            reader.nextFrame(readFrames[f]);
+            Assert.assertArrayEquals("Reading backwards frame[" + f + "] bytes", frames[f].getBuffer().array(),
+                    readFrames[f].getBuffer().array());
+        }
+    }
+
+    public void readMultipleFramesForwardsAndBackwards(IFrame[] framesAll, IFrame[] readFrames, long[] frameOffsets,
+            int readSession) throws HyracksDataException {
+
+        RunFileReader reader;
+        reader = writer.createReader();
+        reader.open();
+        for (int f = 0; f < MULTIPLE_FRAMES * readSession; f++) {
+            reader.nextFrame(readFrames[f]);
+            Assert.assertArrayEquals("Reading frame[" + f + "] bytes", framesAll[f].getBuffer().array(),
+                    readFrames[f].getBuffer().array());
+        }
+        for (int f = MULTIPLE_FRAMES * readSession - 1; f >= 0; f--) {
+            reader.seek(frameOffsets[f]);
+            reader.nextFrame(readFrames[f]);
+            Assert.assertArrayEquals("Reading backwards frame[" + f + "] bytes", framesAll[f].getBuffer().array(),
+                    readFrames[f].getBuffer().array());
+        }
+        reader.close();
+    }
+}