Implement the code to read the results which are already serialized.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1236 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
index f4b091a..5fd4f84 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -72,8 +72,7 @@
JSONObject jsonResponse = new JSONObject();
JSONArray results = new JSONArray();
while (resultReader.read(buffer) > 0) {
- results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor(),
- resultReader.getRecordDescriptor()));
+ results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
}
jsonResponse.put("results", results);
out.write(jsonResponse.toString());
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
index 964c4b0..f36e4e3 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
@@ -26,7 +26,6 @@
import org.json.JSONObject;
import edu.uci.ics.asterix.result.ResultReader;
-import edu.uci.ics.asterix.runtime.formats.FormatUtils;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
@@ -64,7 +63,7 @@
/* TODO(madhusudancs): We need to find a way to JSON serialize default format obtained from
* metadataProvider in the AQLTranslator and store it as part of the result handle.
*/
- ResultReader resultReader = new ResultReader(hcc, FormatUtils.getDefaultFormat());
+ ResultReader resultReader = new ResultReader(hcc);
resultReader.open(jobId, rsId);
JSONObject jsonResponse = new JSONObject();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
index dfd6e7e..94e8dbd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
@@ -48,10 +48,13 @@
PrintWriter out = response.getWriter();
DisplayFormat format = DisplayFormat.HTML;
- if (request.getContentType().equals("application/json")) {
- format = DisplayFormat.JSON;
- } else if (request.getContentType().equals("text/plain")) {
+
+ String contentType = request.getContentType();
+
+ if ((contentType == null) || (contentType.equals("text/plain"))) {
format = DisplayFormat.TEXT;
+ } else if (contentType.equals("application/json")) {
+ format = DisplayFormat.JSON;
}
String query = getQueryParameter(request);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index ccc72c1..45b45f8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -68,6 +68,7 @@
import edu.uci.ics.asterix.file.FeedOperations;
import edu.uci.ics.asterix.file.IndexOperations;
import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.formats.nontagged.AqlResultSerializerFactoryProvider;
import edu.uci.ics.asterix.metadata.IDatasetDetails;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -105,6 +106,7 @@
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
@@ -159,6 +161,7 @@
List<QueryResult> executionResult = new ArrayList<QueryResult>();
FileSplit outputFile = null;
IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
+ IResultSerializerFactoryProvider resultSerializerFactoryProvider = AqlResultSerializerFactoryProvider.INSTANCE;
Map<String, String> config = new HashMap<String, String>();
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
@@ -167,6 +170,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
metadataProvider.setWriterFactory(writerFactory);
+ metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
metadataProvider.setOutputFile(outputFile);
metadataProvider.setConfig(config);
jobsToExecute.clear();
@@ -294,13 +298,12 @@
response.put("handle", handle);
} else {
ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
- ResultReader resultReader = new ResultReader(hcc, metadataProvider.getFormat());
+ ResultReader resultReader = new ResultReader(hcc);
resultReader.open(jobId, metadataProvider.getResultSetId());
buffer.clear();
JSONArray results = new JSONArray();
while (resultReader.read(buffer) > 0) {
- results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor(),
- resultReader.getRecordDescriptor()));
+ results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
}
response.put("results", results);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultFrameTupleAccessor.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultFrameTupleAccessor.java
new file mode 100644
index 0000000..9d4712b0
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultFrameTupleAccessor.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.result;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class ResultFrameTupleAccessor implements IFrameTupleAccessor {
+
+ private final int frameSize;
+ private ByteBuffer buffer;
+
+ public ResultFrameTupleAccessor(int frameSize) {
+ this.frameSize = frameSize;
+ }
+
+ @Override
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public int getTupleCount() {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+ }
+
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
+ }
+
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+ }
+
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+ }
+
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
+ }
+
+ @Override
+ public int getFieldLength(int tupleIndex, int fIdx) {
+ return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getFieldSlotsLength() {
+ return getFieldCount() * 4;
+ }
+
+ public void prettyPrint() {
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis);
+ int tc = getTupleCount();
+ System.err.println("TC: " + tc);
+ for (int i = 0; i < tc; ++i) {
+ System.err.print(i + ":(" + getTupleStartOffset(i) + ", " + getTupleEndOffset(i) + ")[");
+
+ bbis.setByteBuffer(buffer, getTupleStartOffset(i));
+ System.err.print(dis);
+
+ System.err.println("]");
+ }
+ }
+
+ @Override
+ public int getFieldCount() {
+ return 1;
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
index 8137e02..a366abd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
@@ -17,29 +17,22 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public class ResultReader {
private final DatasetClientContext datasetClientCtx;
private final IHyracksDataset hyracksDataset;
- private final IDataFormat format;
-
- private RecordDescriptor recordDescriptor;
-
- private FrameTupleAccessor frameTupleAccessor;
+ private IFrameTupleAccessor frameTupleAccessor;
// Number of parallel result reader buffers
private static final int NUM_READERS = 1;
@@ -47,21 +40,15 @@
// 32K buffer size;
public static final int FRAME_SIZE = 32768;
- public ResultReader(IHyracksClientConnection hcc, IDataFormat format) throws Exception {
- this.format = format;
-
+ public ResultReader(IHyracksClientConnection hcc) throws Exception {
datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
}
public void open(JobId jobId, ResultSetId resultSetId) throws IOException, ClassNotFoundException {
hyracksDataset.open(jobId, resultSetId);
- byte[] serializedRecordDescriptor = hyracksDataset.getSerializedRecordDescriptor();
- recordDescriptor = (RecordDescriptor) JavaSerializationUtils.deserialize(serializedRecordDescriptor, format
- .getSerdeProvider().getClass().getClassLoader());
-
- frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(), recordDescriptor);
+ frameTupleAccessor = new ResultFrameTupleAccessor(datasetClientCtx.getFrameSize());
}
public Status getStatus() {
@@ -72,11 +59,7 @@
return hyracksDataset.read(buffer);
}
- public FrameTupleAccessor getFrameTupleAccessor() {
+ public IFrameTupleAccessor getFrameTupleAccessor() {
return frameTupleAccessor;
}
-
- public RecordDescriptor getRecordDescriptor() {
- return recordDescriptor;
- }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
index adb8573..e923821 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
@@ -14,41 +14,38 @@
*/
package edu.uci.ics.asterix.result;
-import java.io.DataInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-import edu.uci.ics.asterix.om.base.IAObject;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
public class ResultUtils {
- public static JSONArray getJSONFromBuffer(ByteBuffer buffer, FrameTupleAccessor fta,
- RecordDescriptor recordDescriptor) throws HyracksDataException {
+ public static JSONArray getJSONFromBuffer(ByteBuffer buffer, IFrameTupleAccessor fta) throws HyracksDataException {
JSONArray resultRecords = new JSONArray();
ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream di = new DataInputStream(bbis);
try {
fta.reset(buffer);
for (int tIndex = 0; tIndex < fta.getTupleCount(); tIndex++) {
- int start = fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength();
+ int start = fta.getTupleStartOffset(tIndex);
+ int length = fta.getTupleEndOffset(tIndex) - start;
bbis.setByteBuffer(buffer, start);
- Object[] record = new Object[recordDescriptor.getFieldCount()];
- JSONArray resultRecord = new JSONArray();
- for (int i = 0; i < record.length; ++i) {
- IAObject instance = (IAObject) recordDescriptor.getFields()[i].deserialize(di);
- resultRecord.put(instance.toJSON());
- }
- resultRecords.put(resultRecord);
+ byte[] recordBytes = new byte[length];
+ bbis.read(recordBytes, 0, length);
+ resultRecords.put(new String(recordBytes, 0, length));
}
- } catch (JSONException e) {
- throw new HyracksDataException(e);
+ } finally {
+ try {
+ bbis.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
return resultRecords;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 9183d6f..7fa3659 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -83,11 +83,13 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -123,6 +125,7 @@
private IAWriterFactory writerFactory;
private FileSplit outputFile;
private ResultSetId resultSetId;
+ private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
private long jobTxnId;
private final Dataverse defaultDataverse;
@@ -195,6 +198,14 @@
this.resultSetId = resultSetId;
}
+ public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) {
+ this.resultSerializerFactoryProvider = rafp;
+ }
+
+ public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
+ return resultSerializerFactoryProvider;
+ }
+
@Override
public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
AqlSourceId aqlId = (AqlSourceId) id;
@@ -554,7 +565,8 @@
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- RecordDescriptor recordDescriptor, boolean ordered, JobSpecification spec) throws AlgebricksException {
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+ JobSpecification spec) throws AlgebricksException {
ResultSetDataSink rsds = (ResultSetDataSink) sink;
ResultSetSinkId rssId = (ResultSetSinkId) rsds.getId();
ResultSetId rsId = rssId.getResultSetId();
@@ -562,7 +574,10 @@
ResultWriterOperatorDescriptor resultWriter = null;
try {
- resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, recordDescriptor);
+ IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
+ .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory(), inputDesc);
+ resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, inputDesc,
+ resultSerializedAppenderFactory);
} catch (IOException e) {
throw new AlgebricksException(e);
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlResultSerializerFactoryProvider.java b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlResultSerializerFactoryProvider.java
new file mode 100644
index 0000000..0f8d73c
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlResultSerializerFactoryProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.formats.nontagged;
+
+import java.io.PrintStream;
+import java.nio.BufferOverflowException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class AqlResultSerializerFactoryProvider implements IResultSerializerFactoryProvider {
+ private static final long serialVersionUID = 1L;
+
+ public static final AqlResultSerializerFactoryProvider INSTANCE = new AqlResultSerializerFactoryProvider();
+
+ private AqlResultSerializerFactoryProvider() {
+ }
+
+ @Override
+ public IResultSerializerFactory getAqlResultSerializerFactoryProvider(final int[] fields,
+ final IPrinterFactory[] printerFactories, final IAWriterFactory writerFactory,
+ final RecordDescriptor inputRecordDesc) {
+ return new IResultSerializerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IResultSerializer createResultSerializer(PrintStream printStream) {
+ final IAWriter writer = writerFactory.createWriter(fields, printStream, printerFactories,
+ inputRecordDesc);
+
+ return new IResultSerializer() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
+ try {
+ writer.printTuple(tAccess, tIdx);
+ } catch (BufferOverflowException e) {
+ return false;
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ return true;
+ }
+ };
+ }
+ };
+ }
+}