Fix Upsert Pipeline
Change-Id: I5c19d448f9664ecaeac600668a6dbdcf40673c56
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1308
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
index 304eb0c..64cca8c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
@@ -43,11 +43,6 @@
this.fta = fta;
}
- public void prettyPrint(ByteBuffer frame) {
- fta.reset(frame);
- fta.prettyPrint();
- }
-
@Override
public ByteBuffer handle(HyracksDataException th, ByteBuffer frame) {
try {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index e05aa25..f6eeb66 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -26,10 +26,10 @@
import java.util.Map;
import org.apache.asterix.common.config.AsterixStorageProperties;
-import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
import org.apache.asterix.common.context.ITransactionSubsystemProvider;
import org.apache.asterix.common.context.TransactionSubsystemProvider;
@@ -42,8 +42,8 @@
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -1097,7 +1097,7 @@
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec) throws AlgebricksException {
@@ -1119,22 +1119,22 @@
int i = 0;
// set the keys' permutations
for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
+ int idx = inputSchema.findVariable(varKey);
fieldPermutation[i] = idx;
bloomFilterKeyFields[i] = i;
i++;
}
// set the record permutation
- fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+ fieldPermutation[i++] = inputSchema.findVariable(payload);
// set the filters' permutations.
if (numFilterFields > 0) {
- int idx = propagatedSchema.findVariable(filterKeys.get(0));
+ int idx = inputSchema.findVariable(filterKeys.get(0));
fieldPermutation[i++] = idx;
}
if (additionalNonFilterFields != null) {
for (LogicalVariable var : additionalNonFilterFields) {
- int idx = propagatedSchema.findVariable(var);
+ int idx = inputSchema.findVariable(var);
fieldPermutation[i++] = idx;
}
}
@@ -1195,22 +1195,19 @@
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount()
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
- for (int j = 0; j < recordDesc.getFieldCount(); j++) {
- outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
- outputSerDes[j] = recordDesc.getFields()[j];
- }
- outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
- .getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
- outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
- .getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
+ // add the previous record first
+ int f = 0;
+ outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+ f++;
+ // add the previous meta second
if (dataset.hasMetaPart()) {
- outputSerDes[outputSerDes.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
- .getSerdeProvider().getSerializerDeserializer(metaItemType);
- outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
- .getTypeTraitProvider().getTypeTrait(metaItemType);
+ outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(
+ metaItemType);
+ outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+ f++;
}
-
+ // add the previous filter third
int fieldIdx = -1;
if (numFilterFields > 0) {
String filterField = DatasetUtils.getFilterField(dataset).get(0);
@@ -1220,10 +1217,15 @@
}
}
fieldIdx = i;
- outputTypeTraits[outputTypeTraits.length - 1] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
- .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
- outputSerDes[outputSerDes.length - 1] = FormatUtils.getDefaultFormat().getSerdeProvider()
+ outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType
+ .getFieldTypes()[fieldIdx]);
+ outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ f++;
+ }
+ for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+ outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
+ outputSerDes[j + f] = recordDesc.getFields()[j];
}
RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index afd6019..96f9e76 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -78,6 +78,8 @@
private ARecordPointable recPointable;
private DataOutput prevDos;
private final boolean hasMeta;
+ private final int filterFieldIndex;
+ private final int metaFieldIndex;
public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
@@ -93,6 +95,8 @@
key.setFieldPermutation(searchKeyPermutations);
hasMeta = (fieldPermutation.length > numOfPrimaryKeys + 1) && (filterFieldIndex < 0
|| (filterFieldIndex >= 0 && (fieldPermutation.length > numOfPrimaryKeys + 2)));
+ this.metaFieldIndex = numOfPrimaryKeys + 1;
+ this.filterFieldIndex = numOfPrimaryKeys + (hasMeta ? 2 : 1);
if (filterFieldIndex >= 0) {
isFiltered = true;
this.recordType = recordType;
@@ -101,7 +105,6 @@
this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
}
-
}
// we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -141,8 +144,8 @@
.createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
- IAsterixAppRuntimeContext runtimeCtx =
- (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
runtimeCtx.getTransactionSubsystem().getLogManager());
} catch (Exception e) {
@@ -156,41 +159,12 @@
searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp);
}
- private void writeOutput(int tupleIndex, boolean recordWasInserted) throws IOException {
- boolean recordWasDeleted = prevTuple != null;
- tb.reset();
+ private void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
frameTuple.reset(accessor, tupleIndex);
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
tb.addFieldEndOffset();
}
- if (recordWasDeleted) {
- dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
- prevTuple.getFieldLength(numOfPrimaryKeys));
- tb.addFieldEndOffset();
- // if has meta, then append meta
- if (hasMeta) {
- dos.write(prevTuple.getFieldData(numOfPrimaryKeys + 1), prevTuple.getFieldStart(numOfPrimaryKeys + 1),
- prevTuple.getFieldLength(numOfPrimaryKeys + 1));
- tb.addFieldEndOffset();
- }
- // if with filters, append the filter
- if (isFiltered) {
- dos.write(prevTuple.getFieldData(numOfPrimaryKeys + (hasMeta ? 2 : 1)),
- prevTuple.getFieldStart(numOfPrimaryKeys + (hasMeta ? 2 : 1)),
- prevTuple.getFieldLength(numOfPrimaryKeys + (hasMeta ? 2 : 1)));
- tb.addFieldEndOffset();
- }
- } else {
- addNullField();
- if (hasMeta) {
- addNullField();
- }
- // if with filters, append null
- if (isFiltered) {
- addNullField();
- }
- }
if (recordWasInserted || recordWasDeleted) {
FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
}
@@ -214,6 +188,7 @@
int i = 0;
try {
while (i < tupleCount) {
+ tb.reset();
boolean recordWasInserted = false;
tuple.reset(accessor, i);
resetSearchPredicate(i);
@@ -222,10 +197,26 @@
cursor.next();
prevTuple = cursor.getTuple();
cursor.reset();
- modCallback.setOp(Operation.DELETE);
if (isFiltered) {
prevTuple = getPrevTupleWithFilter(prevTuple);
}
+ dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
+ prevTuple.getFieldLength(numOfPrimaryKeys));
+ tb.addFieldEndOffset();
+ // if has meta, then append meta
+ if (hasMeta) {
+ dos.write(prevTuple.getFieldData(metaFieldIndex), prevTuple.getFieldStart(metaFieldIndex),
+ prevTuple.getFieldLength(metaFieldIndex));
+ tb.addFieldEndOffset();
+ }
+ // if with filters, append the filter
+ if (isFiltered) {
+ dos.write(prevTuple.getFieldData(filterFieldIndex),
+ prevTuple.getFieldStart(filterFieldIndex),
+ prevTuple.getFieldLength(filterFieldIndex));
+ tb.addFieldEndOffset();
+ }
+ modCallback.setOp(Operation.DELETE);
if (i == 0) {
lsmAccessor.delete(prevTuple);
} else {
@@ -233,6 +224,14 @@
}
} else {
prevTuple = null;
+ addNullField();
+ if (hasMeta) {
+ addNullField();
+ }
+ // if with filters, append null
+ if (isFiltered) {
+ addNullField();
+ }
cursor.reset();
}
if (!isNull(tuple, numOfPrimaryKeys)) {
@@ -244,7 +243,7 @@
}
recordWasInserted = true;
}
- writeOutput(i, recordWasInserted);
+ writeOutput(i, recordWasInserted, prevTuple != null);
i++;
}
appender.write(writer, true);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 68710a5..6947942 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -193,7 +193,7 @@
public IFunctionInfo lookupFunction(FunctionIdentifier fid);
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
- IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification jobSpec) throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 5dc327a..9838c12 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -84,7 +84,6 @@
@Override
public void recomputeSchema() throws AlgebricksException {
schema = new ArrayList<LogicalVariable>();
- schema.addAll(inputs.get(0).getValue().getSchema());
if (operation == Kind.UPSERT) {
// The upsert case also produces the previous record
schema.add(prevRecordVar);
@@ -95,6 +94,7 @@
schema.add(prevFilterVar);
}
}
+ schema.addAll(inputs.get(0).getValue().getSchema());
}
public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
@@ -146,7 +146,6 @@
@Override
public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
throws AlgebricksException {
- target.addAllVariables(sources[0]);
if (operation == Kind.UPSERT) {
target.addVariable(prevRecordVar);
if (prevAdditionalNonFilteringVars != null) {
@@ -158,6 +157,7 @@
target.addVariable(prevFilterVar);
}
}
+ target.addAllVariables(sources[0]);
}
};
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 28f4e5e..59ccd84 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -34,11 +34,11 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -295,7 +295,12 @@
@Override
public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
- standardLayout(op);
+ // produced first
+ VariableUtilities.getProducedVariables(op, schemaVariables);
+ // then propagated
+ for (Mutable<ILogicalOperator> c : op.getInputs()) {
+ VariableUtilities.getLiveVariables(c.getValue(), schemaVariables);
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 3c9cddf..6ded4a3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -114,7 +114,7 @@
runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
additionalFilteringKeys, inputDesc, context, spec);
} else if (operation == Kind.UPSERT) {
- runtimeAndConstraints = mp.getUpsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ runtimeAndConstraints = mp.getUpsertRuntime(dataSource, inputSchemas[0], typeEnv, keys, payload,
additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec);
} else {
throw new AlgebricksException("Unsupported Operation " + operation);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index d99e2f2..cefada7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -18,18 +18,12 @@
*/
package org.apache.hyracks.dataflow.common.comm.io;
-import java.io.DataInputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import org.apache.hyracks.api.comm.FrameConstants;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.util.IntSerDeUtils;
/**
@@ -118,132 +112,8 @@
return getFieldCount() * FrameConstants.SIZE_LEN;
}
- public void prettyPrint(String prefix) {
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream dis = new DataInputStream(bbis);
- int tc = getTupleCount();
- StringBuilder sb = new StringBuilder();
- sb.append(prefix).append("TC: " + tc).append("\n");
- for (int i = 0; i < tc; ++i) {
- prettyPrint(i, bbis, dis, sb);
- }
- System.err.println(sb.toString());
- }
-
- public void prettyPrint() {
- prettyPrint("");
- }
-
- protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) {
- sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
- for (int j = 0; j < getFieldCount(); ++j) {
- sb.append(" ");
- if (j > 0) {
- sb.append("|");
- }
- sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
- sb.append("{");
- bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
- try {
- sb.append(recordDescriptor.getFields()[j].deserialize(dis));
- } catch (Exception e) {
- e.printStackTrace();
- sb.append("Failed to deserialize field" + j);
- }
- sb.append("}");
- }
- sb.append("\n");
- }
-
- public void prettyPrint(int tid) {
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream dis = new DataInputStream(bbis);
- StringBuilder sb = new StringBuilder();
- prettyPrint(tid, bbis, dis, sb);
- System.err.println(sb.toString());
- }
-
@Override
public int getFieldCount() {
return recordDescriptor.getFieldCount();
}
-
- /*
- * The two methods below can be used for debugging.
- * They are safe as they don't print records. Printing records
- * using IserializerDeserializer can print incorrect results or throw exceptions.
- * A better way yet would be to use record pointable.
- */
- public void prettyPrint(String prefix, int[] recordFields) throws IOException {
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream dis = new DataInputStream(bbis);
- int tc = getTupleCount();
- StringBuilder sb = new StringBuilder();
- sb.append(prefix).append("TC: " + tc).append("\n");
- for (int i = 0; i < tc; ++i) {
- prettyPrint(i, bbis, dis, sb, recordFields);
- }
- System.err.println(sb.toString());
- }
-
- public void prettyPrint(int tIdx, int[] recordFields) throws IOException {
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream dis = new DataInputStream(bbis);
- StringBuilder sb = new StringBuilder();
- prettyPrint(tIdx, bbis, dis, sb, recordFields);
- System.err.println(sb.toString());
- }
-
- public void prettyPrint(ITupleReference tuple, int fieldsIdx, int descIdx) throws HyracksDataException {
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream dis = new DataInputStream(bbis);
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- sb.append("f" + fieldsIdx + ":(" + tuple.getFieldStart(fieldsIdx) + ", "
- + (tuple.getFieldLength(fieldsIdx) + tuple.getFieldStart(fieldsIdx)) + ") ");
- sb.append("{");
- ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(fieldsIdx));
- bbis.setByteBuffer(bytebuff, tuple.getFieldStart(fieldsIdx));
- sb.append(recordDescriptor.getFields()[descIdx].deserialize(dis));
- sb.append("}");
- sb.append("\n");
- System.err.println(sb.toString());
- }
-
- public void prettyPrint(ITupleReference tuple, int[] descF) throws HyracksDataException {
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream dis = new DataInputStream(bbis);
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- for (int j = 0; j < descF.length; ++j) {
- sb.append("f" + j + ":(" + tuple.getFieldStart(j) + ", "
- + (tuple.getFieldLength(j) + tuple.getFieldStart(j)) + ") ");
- sb.append("{");
- ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(j));
- bbis.setByteBuffer(bytebuff, tuple.getFieldStart(j));
- sb.append(recordDescriptor.getFields()[descF[j]].deserialize(dis));
- sb.append("}");
- }
- sb.append("\n");
- System.err.println(sb.toString());
- }
-
- protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb,
- int[] recordFields) throws IOException {
- Arrays.sort(recordFields);
- sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
- for (int j = 0; j < getFieldCount(); ++j) {
- sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
- sb.append("{");
- bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
- if (Arrays.binarySearch(recordFields, j) >= 0) {
- sb.append("{a record field: only print using pointable:");
- sb.append("tag->" + dis.readByte() + "}");
- } else {
- sb.append(recordDescriptor.getFields()[j].deserialize(dis));
- }
- sb.append("}");
- }
- sb.append("\n");
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
index d05b3ed..874ac46 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ResultFrameTupleAccessor.java
@@ -18,10 +18,6 @@
*/
package org.apache.hyracks.dataflow.common.comm.io;
-import java.io.DataInputStream;
-
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
public class ResultFrameTupleAccessor extends FrameTupleAccessor {
public ResultFrameTupleAccessor() {
@@ -29,16 +25,6 @@
}
@Override
- protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) {
- sb.append(tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
-
- bbis.setByteBuffer(getBuffer(), getTupleStartOffset(tid));
- sb.append(dis);
-
- sb.append("]\n");
- }
-
- @Override
public int getFieldCount() {
return 1;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java
new file mode 100644
index 0000000..aa27c42
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/FrameDebugUtils.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * A Util class used for inspecting frames
+ * for debugging purposes
+ */
+public class FrameDebugUtils {
+ private FrameDebugUtils() {
+ }
+
+ /**
+ * Debugging method
+ * @param fta
+ * @param recordDescriptor
+ * @param prefix
+ */
+ public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix) {
+ try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis)) {
+ int tc = fta.getTupleCount();
+ StringBuilder sb = new StringBuilder();
+ sb.append(prefix).append("TC: " + tc).append("\n");
+ for (int i = 0; i < tc; ++i) {
+ prettyPrint(fta, recordDescriptor, i, bbis, dis, sb);
+ }
+ System.err.println(sb.toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Debugging method
+ * @param fta
+ * @param recordDescriptor
+ */
+ public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor) {
+ prettyPrint(fta, recordDescriptor, "");
+ }
+
+ /**
+ * Debugging method
+ * @param fta
+ * @param operator
+ */
+ public void prettyPrintTags(IFrameTupleAccessor fta, String operator) {
+ try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis)) {
+ int tc = fta.getTupleCount();
+ StringBuilder sb = new StringBuilder();
+ sb.append(operator + ":");
+ sb.append("TC: " + tc).append("\n");
+ for (int i = 0; i < tc; ++i) {
+ prettyPrintTag(fta, i, bbis, dis, sb);
+ }
+ System.err.println(sb.toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Debugging method
+ * @param fta
+ * @param tid
+ * @param bbis
+ * @param dis
+ * @param sb
+ */
+ protected void prettyPrintTag(IFrameTupleAccessor fta, int tid, ByteBufferInputStream bbis, DataInputStream dis,
+ StringBuilder sb) {
+ sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")[");
+ for (int j = 0; j < fta.getFieldCount(); ++j) {
+ sb.append(" ");
+ if (j > 0) {
+ sb.append("|");
+ }
+ sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") ");
+ sb.append("{");
+ sb.append(Byte.toString(fta.getBuffer().array()[fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength()
+ + fta.getFieldStartOffset(tid, j)]));
+ sb.append("}");
+ }
+ sb.append("\n");
+ }
+
+ /**
+ * Debugging method
+ * @param fta
+ * @param recordDescriptor
+ * @param tid
+ * @param bbis
+ * @param dis
+ * @param sb
+ */
+ protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid,
+ ByteBufferInputStream bbis, DataInputStream dis,
+ StringBuilder sb) {
+ sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")[");
+ for (int j = 0; j < fta.getFieldCount(); ++j) {
+ sb.append(" ");
+ if (j > 0) {
+ sb.append("|");
+ }
+ sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") ");
+ sb.append("{");
+ bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + fta
+ .getFieldStartOffset(tid, j));
+ try {
+ sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+ } catch (Exception e) {
+ e.printStackTrace();
+ sb.append("Failed to deserialize field" + j);
+ }
+ sb.append("}");
+ }
+ sb.append("\n");
+ }
+
+
+ /**
+ * Debugging method
+ * @param fta
+ * @param recordDescriptor
+ * @param tid
+ */
+ public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid) {
+ try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis)) {
+ StringBuilder sb = new StringBuilder();
+ prettyPrint(fta, recordDescriptor, tid, bbis, dis, sb);
+ System.err.println(sb.toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Debugging method
+ * They are safe as they don't print records. Printing records
+ * using IserializerDeserializer can print incorrect results or throw exceptions.
+ * A better way yet would be to use record pointable.
+ * @param fta
+ * @param recordDescriptor
+ * @param prefix
+ * @param recordFields
+ * @throws IOException
+ */
+ public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String prefix,
+ int[] recordFields) throws IOException {
+ try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis)) {
+ int tc = fta.getTupleCount();
+ StringBuilder sb = new StringBuilder();
+ sb.append(prefix).append("TC: " + tc).append("\n");
+ for (int i = 0; i < tc; ++i) {
+ prettyPrint(fta, recordDescriptor, i, bbis, dis, sb, recordFields);
+ }
+ System.err.println(sb.toString());
+ }
+ }
+
+ /**
+ * Debugging method
+ * @param fta
+ * @param recordDescriptor
+ * @param tIdx
+ * @param recordFields
+ * @throws IOException
+ */
+ public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tIdx, int[] recordFields)
+ throws IOException {
+ try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis)) {
+ StringBuilder sb = new StringBuilder();
+ prettyPrint(fta, recordDescriptor, tIdx, bbis, dis, sb, recordFields);
+ System.err.println(sb.toString());
+ }
+ }
+
+ /**
+ * Debugging method
+ * @param tuple
+ * @param fieldsIdx
+ * @param descIdx
+ * @throws HyracksDataException
+ */
+ public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple,
+ int fieldsIdx, int descIdx)
+ throws HyracksDataException {
+ try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis)) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ sb.append("f" + fieldsIdx + ":(" + tuple.getFieldStart(fieldsIdx) + ", "
+ + (tuple.getFieldLength(fieldsIdx) + tuple.getFieldStart(fieldsIdx)) + ") ");
+ sb.append("{");
+ ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(fieldsIdx));
+ bbis.setByteBuffer(bytebuff, tuple.getFieldStart(fieldsIdx));
+ sb.append(recordDescriptor.getFields()[descIdx].deserialize(dis));
+ sb.append("}");
+ sb.append("\n");
+ System.err.println(sb.toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Debugging method
+ * @param tuple
+ * @param descF
+ * @throws HyracksDataException
+ */
+ public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference tuple,
+ int[] descF) throws HyracksDataException {
+ try (ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis)) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (int j = 0; j < descF.length; ++j) {
+ sb.append("f" + j + ":(" + tuple.getFieldStart(j) + ", "
+ + (tuple.getFieldLength(j) + tuple.getFieldStart(j)) + ") ");
+ sb.append("{");
+ ByteBuffer bytebuff = ByteBuffer.wrap(tuple.getFieldData(j));
+ bbis.setByteBuffer(bytebuff, tuple.getFieldStart(j));
+ sb.append(recordDescriptor.getFields()[descF[j]].deserialize(dis));
+ sb.append("}");
+ }
+ sb.append("\n");
+ System.err.println(sb.toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Debugging method
+ * @param fta
+ * @param recordDescriptor
+ * @param tid
+ * @param bbis
+ * @param dis
+ * @param sb
+ * @param recordFields
+ * @throws IOException
+ */
+ protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int tid,
+ ByteBufferInputStream bbis, DataInputStream dis,
+ StringBuilder sb,
+ int[] recordFields) throws IOException {
+ Arrays.sort(recordFields);
+ sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid) + ")[");
+ for (int j = 0; j < fta.getFieldCount(); ++j) {
+ sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid, j) + ") ");
+ sb.append("{");
+ bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta
+ .getFieldSlotsLength() + fta.getFieldStartOffset(tid, j));
+ if (Arrays.binarySearch(recordFields, j) >= 0) {
+ sb.append("{a record field: only print using pointable:");
+ sb.append("tag->" + dis.readByte() + "}");
+ } else {
+ sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+ }
+ sb.append("}");
+ }
+ sb.append("\n");
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index 567b7df..c68d59d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -31,7 +31,6 @@
import static org.junit.Assert.assertTrue;
import java.io.DataInputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -40,8 +39,6 @@
import java.util.Map;
import java.util.TreeMap;
-import junit.extensions.PA;
-
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameReader;
@@ -62,6 +59,8 @@
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
import org.junit.Test;
+import junit.extensions.PA;
+
public class RunMergingFrameReaderTest {
static IBinaryComparator[] Comparators = new IBinaryComparator[] { ComparatorFactories[0].createBinaryComparator(),
ComparatorFactories[1].createBinaryComparator(), };
@@ -125,12 +124,6 @@
return true;
}
- private void printFrame(ByteBuffer buffer) {
- FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
- fta.reset(buffer);
- fta.prettyPrint();
- }
-
@Override
public void close() throws HyracksDataException {
}