Reverting the merge of fullstack_hyracks_result_distribution branch until all the tests pass.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3033 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-common/pom.xml b/algebricks/algebricks-common/pom.xml
index c93fe3e..a5677a5 100644
--- a/algebricks/algebricks-common/pom.xml
+++ b/algebricks/algebricks-common/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/algebricks/algebricks-compiler/pom.xml b/algebricks/algebricks-compiler/pom.xml
index f1f4521..8dc083d 100644
--- a/algebricks/algebricks-compiler/pom.xml
+++ b/algebricks/algebricks-compiler/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/algebricks/algebricks-core/pom.xml b/algebricks/algebricks-core/pom.xml
index 118ea57..a74a540 100644
--- a/algebricks/algebricks-core/pom.xml
+++ b/algebricks/algebricks-core/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 32cfb9a..a969372 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -7,7 +7,6 @@
BTREE_SEARCH,
STATS,
DATASOURCE_SCAN,
- DISTRIBUTE_RESULT,
EMPTY_TUPLE_SOURCE,
EXTERNAL_GROUP_BY,
IN_MEMORY_HASH_JOIN,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 82187e3..899b633 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -51,10 +51,6 @@
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
throws AlgebricksException;
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
- JobSpecification spec) throws AlgebricksException;
-
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
deleted file mode 100644
index d835da4..0000000
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
-import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class DistributeResultPOperator extends AbstractPhysicalOperator {
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.DISTRIBUTE_RESULT;
- }
-
- @Override
- public boolean isMicroOperator() {
- return false;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- ILogicalOperator op2 = op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent) {
- WriteOperator write = (WriteOperator) op;
- IDataSink sink = write.getDataSink();
- IPartitioningProperty pp = sink.getPartitioningProperty();
- StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
- return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- WriteOperator resultOp = (WriteOperator) op;
- IMetadataProvider mp = context.getMetadataProvider();
-
- JobSpecification spec = builder.getJobSpec();
-
- int[] columns = new int[resultOp.getExpressions().size()];
- int i = 0;
- for (Mutable<ILogicalExpression> exprRef : resultOp.getExpressions()) {
- ILogicalExpression expr = exprRef.getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new NotImplementedException("Only writing variable expressions is supported.");
- }
- VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
- LogicalVariable v = varRef.getVariableReference();
- columns[i++] = inputSchemas[0].findVariable(v);
- }
- RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
- context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
-
- IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
- context, columns);
-
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
- resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
-
- builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
- builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
- ILogicalOperator src = resultOp.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, resultOp, 0);
- }
-}
diff --git a/algebricks/algebricks-data/pom.xml b/algebricks/algebricks-data/pom.xml
index 3f7592a..3d927d9 100644
--- a/algebricks/algebricks-data/pom.xml
+++ b/algebricks/algebricks-data/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
deleted file mode 100644
index a6ebf01..0000000
--- a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.data;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
-import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public interface IResultSerializerFactoryProvider extends Serializable {
- /**
- * Returns a result serializer factory
- *
- * @param fields
- * - A position of the fields in the order it should be written in the output.
- * @param printerFactories
- * - A printer factory array to print the tuple containing different fields.
- * @param writerFactory
- * - A writer factory to write the serialized data to the print stream.
- * @param inputRecordDesc
- * - The record descriptor describing the input frame to be serialized.
- * @return A new instance of result serialized appender.
- */
- public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields,
- IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc);
-}
diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml
index ea36fb6..954938a 100644
--- a/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 0efe0ae..d678803 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -62,10 +62,13 @@
@SuppressWarnings("unchecked")
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<String> dataSource,
- List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException {
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
+ IDataSource<String> dataSource,
+ List<LogicalVariable> scanVariables,
+ List<LogicalVariable> projectVariables, boolean projectPushed,
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
+ JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException {
PigletFileDataSource ds = (PigletFileDataSource) dataSource;
FileSplit[] fileSplits = ds.getFileSplits();
@@ -136,13 +139,6 @@
}
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
- JobSpecification spec) throws AlgebricksException {
- return null;
- }
-
- @Override
public IDataSourceIndex<String, String> findDataSourceIndex(String indexId, String dataSourceId)
throws AlgebricksException {
return null;
@@ -191,7 +187,7 @@
// TODO Auto-generated method stub
return null;
}
-
+
@Override
public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
return FN_MAP.get(fid);
diff --git a/algebricks/algebricks-rewriter/pom.xml b/algebricks/algebricks-rewriter/pom.xml
index 448221d..41979d3 100644
--- a/algebricks/algebricks-rewriter/pom.xml
+++ b/algebricks/algebricks-rewriter/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 666f01a..38cf96e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -38,7 +38,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -51,6 +50,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamDiePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
@@ -242,7 +242,7 @@
break;
}
case WRITE: {
- op.setPhysicalOperator(new DistributeResultPOperator());
+ op.setPhysicalOperator(new SinkWritePOperator());
break;
}
case WRITE_RESULT: {
@@ -267,8 +267,8 @@
List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
- op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys, opInsDel
- .getFilterExpression(), opInsDel.getDataSourceIndex()));
+ op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
+ opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
break;
}
case SINK: {
diff --git a/algebricks/algebricks-runtime/pom.xml b/algebricks/algebricks-runtime/pom.xml
index 70243d2..e40dfb0 100644
--- a/algebricks/algebricks-runtime/pom.xml
+++ b/algebricks/algebricks-runtime/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index 28597a7..148f087 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -70,7 +70,6 @@
for (int t = 0; t < nTuple; t++) {
try {
writer.printTuple(tAccess, t);
- printStream.println();
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
index 762e133..9c53241 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
@@ -47,6 +47,7 @@
}
printers[i].print(tAccess.getBuffer().array(), fldStart, fldLen, printStream);
}
+ printStream.println();
}
};
}
diff --git a/algebricks/algebricks-tests/pom.xml b/algebricks/algebricks-tests/pom.xml
index 228baa9..19e6711 100644
--- a/algebricks/algebricks-tests/pom.xml
+++ b/algebricks/algebricks-tests/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>