Implement the code to read the results which are already serialized.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1236 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
index f4b091a..5fd4f84 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -72,8 +72,7 @@
             JSONObject jsonResponse = new JSONObject();
             JSONArray results = new JSONArray();
             while (resultReader.read(buffer) > 0) {
-                results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor(),
-                        resultReader.getRecordDescriptor()));
+                results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
             }
             jsonResponse.put("results", results);
             out.write(jsonResponse.toString());
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
index 964c4b0..f36e4e3 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
@@ -26,7 +26,6 @@
 import org.json.JSONObject;
 
 import edu.uci.ics.asterix.result.ResultReader;
-import edu.uci.ics.asterix.runtime.formats.FormatUtils;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
@@ -64,7 +63,7 @@
             /* TODO(madhusudancs): We need to find a way to JSON serialize default format obtained from
              * metadataProvider in the AQLTranslator and store it as part of the result handle.
              */
-            ResultReader resultReader = new ResultReader(hcc, FormatUtils.getDefaultFormat());
+            ResultReader resultReader = new ResultReader(hcc);
             resultReader.open(jobId, rsId);
 
             JSONObject jsonResponse = new JSONObject();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
index dfd6e7e..94e8dbd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
@@ -48,10 +48,13 @@
         PrintWriter out = response.getWriter();
 
         DisplayFormat format = DisplayFormat.HTML;
-        if (request.getContentType().equals("application/json")) {
-            format = DisplayFormat.JSON;
-        } else if (request.getContentType().equals("text/plain")) {
+
+        String contentType = request.getContentType();
+
+        if ((contentType == null) || (contentType.equals("text/plain"))) {
             format = DisplayFormat.TEXT;
+        } else if (contentType.equals("application/json")) {
+            format = DisplayFormat.JSON;
         }
 
         String query = getQueryParameter(request);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index ccc72c1..45b45f8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -68,6 +68,7 @@
 import edu.uci.ics.asterix.file.FeedOperations;
 import edu.uci.ics.asterix.file.IndexOperations;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.formats.nontagged.AqlResultSerializerFactoryProvider;
 import edu.uci.ics.asterix.metadata.IDatasetDetails;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -105,6 +106,7 @@
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
@@ -159,6 +161,7 @@
         List<QueryResult> executionResult = new ArrayList<QueryResult>();
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
+        IResultSerializerFactoryProvider resultSerializerFactoryProvider = AqlResultSerializerFactoryProvider.INSTANCE;
         Map<String, String> config = new HashMap<String, String>();
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
 
@@ -167,6 +170,7 @@
             MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
             metadataProvider.setWriterFactory(writerFactory);
+            metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
             metadataProvider.setOutputFile(outputFile);
             metadataProvider.setConfig(config);
             jobsToExecute.clear();
@@ -294,13 +298,12 @@
                         response.put("handle", handle);
                     } else {
                         ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
-                        ResultReader resultReader = new ResultReader(hcc, metadataProvider.getFormat());
+                        ResultReader resultReader = new ResultReader(hcc);
                         resultReader.open(jobId, metadataProvider.getResultSetId());
                         buffer.clear();
                         JSONArray results = new JSONArray();
                         while (resultReader.read(buffer) > 0) {
-                            results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor(),
-                                    resultReader.getRecordDescriptor()));
+                            results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
                         }
                         response.put("results", results);
                     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultFrameTupleAccessor.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultFrameTupleAccessor.java
new file mode 100644
index 0000000..9d4712b0
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultFrameTupleAccessor.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.result;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class ResultFrameTupleAccessor implements IFrameTupleAccessor {
+
+    private final int frameSize;
+    private ByteBuffer buffer;
+
+    public ResultFrameTupleAccessor(int frameSize) {
+        this.frameSize = frameSize;
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public int getTupleCount() {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return getFieldCount() * 4;
+    }
+
+    public void prettyPrint() {
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream dis = new DataInputStream(bbis);
+        int tc = getTupleCount();
+        System.err.println("TC: " + tc);
+        for (int i = 0; i < tc; ++i) {
+            System.err.print(i + ":(" + getTupleStartOffset(i) + ", " + getTupleEndOffset(i) + ")[");
+
+            bbis.setByteBuffer(buffer, getTupleStartOffset(i));
+            System.err.print(dis);
+
+            System.err.println("]");
+        }
+    }
+
+    @Override
+    public int getFieldCount() {
+        return 1;
+    }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
index 8137e02..a366abd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
@@ -17,29 +17,22 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
 import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class ResultReader {
     private final DatasetClientContext datasetClientCtx;
 
     private final IHyracksDataset hyracksDataset;
 
-    private final IDataFormat format;
-
-    private RecordDescriptor recordDescriptor;
-
-    private FrameTupleAccessor frameTupleAccessor;
+    private IFrameTupleAccessor frameTupleAccessor;
 
     // Number of parallel result reader buffers
     private static final int NUM_READERS = 1;
@@ -47,21 +40,15 @@
     // 32K buffer size;
     public static final int FRAME_SIZE = 32768;
 
-    public ResultReader(IHyracksClientConnection hcc, IDataFormat format) throws Exception {
-        this.format = format;
-
+    public ResultReader(IHyracksClientConnection hcc) throws Exception {
         datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
         hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
     }
 
     public void open(JobId jobId, ResultSetId resultSetId) throws IOException, ClassNotFoundException {
         hyracksDataset.open(jobId, resultSetId);
-        byte[] serializedRecordDescriptor = hyracksDataset.getSerializedRecordDescriptor();
 
-        recordDescriptor = (RecordDescriptor) JavaSerializationUtils.deserialize(serializedRecordDescriptor, format
-                .getSerdeProvider().getClass().getClassLoader());
-
-        frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(), recordDescriptor);
+        frameTupleAccessor = new ResultFrameTupleAccessor(datasetClientCtx.getFrameSize());
     }
 
     public Status getStatus() {
@@ -72,11 +59,7 @@
         return hyracksDataset.read(buffer);
     }
 
-    public FrameTupleAccessor getFrameTupleAccessor() {
+    public IFrameTupleAccessor getFrameTupleAccessor() {
         return frameTupleAccessor;
     }
-
-    public RecordDescriptor getRecordDescriptor() {
-        return recordDescriptor;
-    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
index adb8573..e923821 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
@@ -14,41 +14,38 @@
  */
 package edu.uci.ics.asterix.result;
 
-import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-import edu.uci.ics.asterix.om.base.IAObject;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
 public class ResultUtils {
-    public static JSONArray getJSONFromBuffer(ByteBuffer buffer, FrameTupleAccessor fta,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
+    public static JSONArray getJSONFromBuffer(ByteBuffer buffer, IFrameTupleAccessor fta) throws HyracksDataException {
         JSONArray resultRecords = new JSONArray();
         ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream di = new DataInputStream(bbis);
 
         try {
             fta.reset(buffer);
             for (int tIndex = 0; tIndex < fta.getTupleCount(); tIndex++) {
-                int start = fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength();
+                int start = fta.getTupleStartOffset(tIndex);
+                int length = fta.getTupleEndOffset(tIndex) - start;
                 bbis.setByteBuffer(buffer, start);
-                Object[] record = new Object[recordDescriptor.getFieldCount()];
-                JSONArray resultRecord = new JSONArray();
-                for (int i = 0; i < record.length; ++i) {
-                    IAObject instance = (IAObject) recordDescriptor.getFields()[i].deserialize(di);
-                    resultRecord.put(instance.toJSON());
-                }
-                resultRecords.put(resultRecord);
+                byte[] recordBytes = new byte[length];
+                bbis.read(recordBytes, 0, length);
+                resultRecords.put(new String(recordBytes, 0, length));
             }
-        } catch (JSONException e) {
-            throw new HyracksDataException(e);
+        } finally {
+            try {
+                bbis.close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
         }
         return resultRecords;
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 9183d6f..7fa3659 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -83,11 +83,13 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -123,6 +125,7 @@
     private IAWriterFactory writerFactory;
     private FileSplit outputFile;
     private ResultSetId resultSetId;
+    private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
     private long jobTxnId;
 
     private final Dataverse defaultDataverse;
@@ -195,6 +198,14 @@
         this.resultSetId = resultSetId;
     }
 
+    public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) {
+        this.resultSerializerFactoryProvider = rafp;
+    }
+
+    public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
+        return resultSerializerFactoryProvider;
+    }
+
     @Override
     public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
         AqlSourceId aqlId = (AqlSourceId) id;
@@ -554,7 +565,8 @@
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            RecordDescriptor recordDescriptor, boolean ordered, JobSpecification spec) throws AlgebricksException {
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+            JobSpecification spec) throws AlgebricksException {
         ResultSetDataSink rsds = (ResultSetDataSink) sink;
         ResultSetSinkId rssId = (ResultSetSinkId) rsds.getId();
         ResultSetId rsId = rssId.getResultSetId();
@@ -562,7 +574,10 @@
 
         ResultWriterOperatorDescriptor resultWriter = null;
         try {
-            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, recordDescriptor);
+            IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
+                    .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory(), inputDesc);
+            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, inputDesc,
+                    resultSerializedAppenderFactory);
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlResultSerializerFactoryProvider.java b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlResultSerializerFactoryProvider.java
new file mode 100644
index 0000000..0f8d73c
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlResultSerializerFactoryProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.formats.nontagged;
+
+import java.io.PrintStream;
+import java.nio.BufferOverflowException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class AqlResultSerializerFactoryProvider implements IResultSerializerFactoryProvider {
+    private static final long serialVersionUID = 1L;
+
+    public static final AqlResultSerializerFactoryProvider INSTANCE = new AqlResultSerializerFactoryProvider();
+
+    private AqlResultSerializerFactoryProvider() {
+    }
+
+    @Override
+    public IResultSerializerFactory getAqlResultSerializerFactoryProvider(final int[] fields,
+            final IPrinterFactory[] printerFactories, final IAWriterFactory writerFactory,
+            final RecordDescriptor inputRecordDesc) {
+        return new IResultSerializerFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IResultSerializer createResultSerializer(PrintStream printStream) {
+                final IAWriter writer = writerFactory.createWriter(fields, printStream, printerFactories,
+                        inputRecordDesc);
+
+                return new IResultSerializer() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
+                        try {
+                            writer.printTuple(tAccess, tIdx);
+                        } catch (BufferOverflowException e) {
+                            return false;
+                        } catch (AlgebricksException e) {
+                            throw new HyracksDataException(e);
+                        }
+                        return true;
+                    }
+                };
+            }
+        };
+    }
+}