Merged fullstack_staging branch into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/algebricks/algebricks-runtime/pom.xml b/fullstack/algebricks/algebricks-runtime/pom.xml
new file mode 100644
index 0000000..1e9fed2
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>algebricks-runtime</artifactId>
+ <name>algebricks-runtime</name>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>algebricks</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-btree</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-rtree</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>algebricks-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>algebricks-data</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
new file mode 100644
index 0000000..0658d62
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.hyracks.algebricks.runtime.aggregators;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleCountAggregateFunctionFactory implements IAggregateEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
+ final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+ return new IAggregateEvaluator() {
+
+ int cnt;
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ ++cnt;
+ }
+
+ @Override
+ public void init() throws AlgebricksException {
+ cnt = 0;
+ }
+
+ @Override
+ public void finish(IPointable result) throws AlgebricksException {
+ try {
+ abvs.reset();
+ abvs.getDataOutput().writeInt(cnt);
+ result.set(abvs);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
new file mode 100644
index 0000000..fa73f6c
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.aggregators;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleCountRunningAggregateFunctionFactory implements IRunningAggregateEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException {
+ final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+ return new IRunningAggregateEvaluator() {
+
+ int cnt;
+
+ @Override
+ public void step(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ ++cnt;
+ try {
+ abvs.reset();
+ abvs.getDataOutput().writeInt(cnt);
+ result.set(abvs);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void init() throws AlgebricksException {
+ cnt = 0;
+ }
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
new file mode 100644
index 0000000..50f014b
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class AlgebricksPipeline implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final IPushRuntimeFactory[] runtimeFactories;
+ private final RecordDescriptor[] recordDescriptors;
+
+ public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] recordDescriptors) {
+ this.runtimeFactories = runtimeFactories;
+ this.recordDescriptors = recordDescriptors;
+ // this.projectedColumns = projectedColumns;
+ }
+
+ public IPushRuntimeFactory[] getRuntimeFactories() {
+ return runtimeFactories;
+ }
+
+ public RecordDescriptor[] getRecordDescriptors() {
+ return recordDescriptors;
+ }
+
+ public int getOutputWidth() {
+ return recordDescriptors[recordDescriptors.length - 1].getFieldCount();
+ }
+
+ // public int[] getProjectedColumns() {
+ // return projectedColumns;
+ // }
+
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
new file mode 100644
index 0000000..9b014ee
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
@@ -0,0 +1,13 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IAggregateEvaluator {
+ public void init() throws AlgebricksException;
+
+ public void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+ public void finish(IPointable result) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
new file mode 100644
index 0000000..83bfbf7
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IAggregateEvaluatorFactory extends Serializable {
+ public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
new file mode 100644
index 0000000..c768674
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopyAggregateFunction {
+ /** should be called each time a new aggregate value is computed */
+ public void init() throws AlgebricksException;
+
+ public void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+ public void finish() throws AlgebricksException;
+
+ public void finishPartial() throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
new file mode 100644
index 0000000..702f392
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface ICopyAggregateFunctionFactory extends Serializable {
+ public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluator.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluator.java
new file mode 100644
index 0000000..58d7569
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluator.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopyEvaluator {
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
new file mode 100644
index 0000000..649902c
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface ICopyEvaluatorFactory extends Serializable {
+ public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
new file mode 100644
index 0000000..3b14b28
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopyRunningAggregateFunction {
+ public void init() throws AlgebricksException;
+
+ public void step(IFrameTupleReference tuple) throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
new file mode 100644
index 0000000..1065020
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface ICopyRunningAggregateFunctionFactory extends Serializable {
+ public ICopyRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider)
+ throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java
new file mode 100644
index 0000000..47aec7d
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunction.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopySerializableAggregateFunction {
+ /**
+ * initialize the space occupied by internal state
+ *
+ * @param state
+ * @throws AlgebricksException
+ * @return length of the intermediate state
+ */
+ public void init(DataOutput state) throws AlgebricksException;
+
+ /**
+ * update the internal state
+ *
+ * @param tuple
+ * @param state
+ * @throws AlgebricksException
+ */
+ public void step(IFrameTupleReference tuple, byte[] data, int start, int len) throws AlgebricksException;
+
+ /**
+ * output the state to result
+ *
+ * @param state
+ * @param result
+ * @throws AlgebricksException
+ */
+ public void finish(byte[] data, int start, int len, DataOutput result) throws AlgebricksException;
+
+ /**
+ * output the partial state to partial result
+ *
+ * @param state
+ * @param partialResult
+ * @throws AlgebricksException
+ */
+ public void finishPartial(byte[] data, int start, int len, DataOutput partialResult) throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
new file mode 100644
index 0000000..69d56a8
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface ICopySerializableAggregateFunctionFactory extends Serializable {
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
new file mode 100644
index 0000000..3443403
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ICopyUnnestingFunction {
+ public void init(IFrameTupleReference tuple) throws AlgebricksException;
+
+ public boolean step() throws AlgebricksException;
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
new file mode 100644
index 0000000..e43da8b
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public interface ICopyUnnestingFunctionFactory extends Serializable {
+ public ICopyUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java
new file mode 100644
index 0000000..26fcc67
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public interface IPushRuntime extends IFrameWriter {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
new file mode 100644
index 0000000..f1d01e6
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IPushRuntimeFactory extends Serializable {
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluator.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluator.java
new file mode 100644
index 0000000..f76a33c
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IRunningAggregateEvaluator {
+ public void init() throws AlgebricksException;
+
+ public void step(IFrameTupleReference tuple, IPointable result) throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
new file mode 100644
index 0000000..80f6511
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface IRunningAggregateEvaluatorFactory extends Serializable {
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluator.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluator.java
new file mode 100644
index 0000000..e22097a
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluator.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IScalarEvaluator {
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
new file mode 100644
index 0000000..e40b96c
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IScalarEvaluatorFactory extends Serializable {
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
new file mode 100644
index 0000000..f65192a
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IUnnestingEvaluator {
+ public void init(IFrameTupleReference tuple) throws AlgebricksException;
+
+ public boolean step(IPointable result) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
new file mode 100644
index 0000000..8bca2b9
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface IUnnestingEvaluatorFactory extends Serializable {
+ public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
new file mode 100644
index 0000000..8ab92ee
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.context;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+
+public class AsterixBTreeRegistry {
+
+ private HashMap<Integer, BTree> map = new HashMap<Integer, BTree>();
+ private Lock registryLock = new ReentrantLock();
+
+ public BTree get(int fileId) {
+ return map.get(fileId);
+ }
+
+ // TODO: not very high concurrency, but good enough for now
+ public void lock() {
+ registryLock.lock();
+ }
+
+ public void unlock() {
+ registryLock.unlock();
+ }
+
+ public void register(int fileId, BTree btree) {
+ map.put(fileId, btree);
+ }
+
+ public void unregister(int fileId) {
+ map.remove(fileId);
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
new file mode 100644
index 0000000..5f5d19c
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ColumnAccessEvalFactory implements ICopyEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int fieldIndex;
+
+ public ColumnAccessEvalFactory(int fieldIndex) {
+ this.fieldIndex = fieldIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnAccess(" + fieldIndex + ")";
+ }
+
+ @Override
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ return new ICopyEvaluator() {
+
+ private DataOutput out = output.getDataOutput();
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ byte[] buffer = tuple.getFieldData(fieldIndex);
+ int start = tuple.getFieldStart(fieldIndex);
+ int length = tuple.getFieldLength(fieldIndex);
+ try {
+ out.write(buffer, start, length);
+ } catch (IOException ioe) {
+ throw new AlgebricksException(ioe);
+ }
+ }
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
new file mode 100644
index 0000000..c537c93
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ConstantEvalFactory implements ICopyEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private byte[] value;
+
+ public ConstantEvalFactory(byte[] value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "Constant";
+ }
+
+ @Override
+ public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ return new ICopyEvaluator() {
+
+ private DataOutput out = output.getDataOutput();
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ try {
+ out.write(value, 0, value.length);
+ } catch (IOException ioe) {
+ throw new AlgebricksException(ioe);
+ }
+ }
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
new file mode 100644
index 0000000..4f7c35a
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ConstantEvaluatorFactory implements IScalarEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private byte[] value;
+
+ public ConstantEvaluatorFactory(byte[] value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "Constant";
+ }
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
+ @Override
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ result.set(value, 0, value.length);
+ }
+ };
+ }
+
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
new file mode 100644
index 0000000..29328ff
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleFieldEvaluatorFactory implements IScalarEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final int fieldIndex;
+
+ public TupleFieldEvaluatorFactory(int fieldIndex) {
+ this.fieldIndex = fieldIndex;
+ }
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IScalarEvaluator() {
+ @Override
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ result.set(tuple.getFieldData(fieldIndex), tuple.getFieldStart(fieldIndex),
+ tuple.getFieldLength(fieldIndex));
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
new file mode 100644
index 0000000..a975195
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ // private int[] outColumns;
+ private IAggregateEvaluatorFactory[] aggregFactories;
+
+ public AggregateRuntimeFactory(IAggregateEvaluatorFactory[] aggregFactories) {
+ super(null);
+ // this.outColumns = outColumns;
+ this.aggregFactories = aggregFactories;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("assign [");
+ for (int i = 0; i < aggregFactories.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(aggregFactories[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private IAggregateEvaluator[] aggregs = new IAggregateEvaluator[aggregFactories.length];
+ private IPointable result = VoidPointable.FACTORY.createPointable();
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
+
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ if (first) {
+ first = false;
+ initAccessAppendRef(ctx);
+ for (int i = 0; i < aggregFactories.length; i++) {
+ aggregs[i] = aggregFactories[i].createAggregateEvaluator(ctx);
+ }
+ }
+ for (int i = 0; i < aggregFactories.length; i++) {
+ aggregs[i].init();
+ }
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ processTuple(tRef);
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ computeAggregate();
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ super.close();
+ }
+
+ private void computeAggregate() throws HyracksDataException {
+ tupleBuilder.reset();
+ for (int f = 0; f < aggregs.length; f++) {
+ try {
+ aggregs[f].finish(result);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ tupleBuilder.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
+ }
+ }
+
+ private void processTuple(FrameTupleReference tupleRef) throws HyracksDataException {
+ for (int f = 0; f < aggregs.length; f++) {
+ try {
+ aggregs[f].step(tupleRef);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..2ae6f72
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private AlgebricksPipeline[] subplans;
+ private int[] keyFieldIdx;
+ private int[] decorFieldIdx;
+
+ public NestedPlansAccumulatingAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx,
+ int[] decorFieldIdx) {
+ this.subplans = subplans;
+ this.keyFieldIdx = keyFieldIdx;
+ this.decorFieldIdx = decorFieldIdx;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
+
+ final AggregatorOutput outputWriter = new AggregatorOutput(ctx.getFrameSize(), subplans, keyFieldIdx.length,
+ decorFieldIdx.length);
+ final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+ for (int i = 0; i < subplans.length; i++) {
+ try {
+ pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+ tb.reset();
+ for (int i = 0; i < keyFieldIdx.length; ++i) {
+ tb.addField(accessor, tIndex, keyFieldIdx[i]);
+ }
+ for (int i = 0; i < decorFieldIdx.length; ++i) {
+ tb.addField(accessor, tIndex, decorFieldIdx[i]);
+ }
+ for (int i = 0; i < pipelines.length; ++i) {
+ pipelines[i].open();
+ }
+
+ // aggregate the first tuple
+ for (int i = 0; i < pipelines.length; i++) {
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ // it only works if the output of the aggregator fits in one
+ // frame
+ for (int i = 0; i < pipelines.length; i++) {
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ }
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ for (int i = 0; i < pipelines.length; i++) {
+ outputWriter.setInputIdx(i);
+ pipelines[i].close();
+ }
+ // outputWriter.writeTuple(appender);
+ tupleBuilder.reset();
+ ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+ byte[] data = tb.getByteArray();
+ int[] fieldEnds = tb.getFieldEndOffsets();
+ int start = 0;
+ int offset = 0;
+ for (int i = 0; i < fieldEnds.length; i++) {
+ if (i > 0)
+ start = fieldEnds[i - 1];
+ offset = fieldEnds[i] - start;
+ tupleBuilder.addField(data, start, offset);
+ }
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState();
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("this method should not be called");
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ };
+ }
+
+ private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ // plug the operators
+ IFrameWriter start = writer;
+ IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+ RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+ for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+ IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
+ newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+ if (i > 0) {
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+ } else {
+ // the nts has the same input and output rec. desc.
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+ }
+ start = newRuntime;
+ }
+ return start;
+ }
+
+ /**
+ * We suppose for now, that each subplan only produces one tuple.
+ */
+ private static class AggregatorOutput implements IFrameWriter {
+
+ // private ByteBuffer frame;
+ private FrameTupleAccessor[] tAccess;
+ private RecordDescriptor[] inputRecDesc;
+ private int inputIdx;
+ private ArrayTupleBuilder tb;
+ private AlgebricksPipeline[] subplans;
+
+ public AggregatorOutput(int frameSize, AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
+ this.subplans = subplans;
+ // this.keyFieldIndexes = keyFieldIndexes;
+ int totalAggFields = 0;
+ this.inputRecDesc = new RecordDescriptor[subplans.length];
+ for (int i = 0; i < subplans.length; i++) {
+ RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+ this.inputRecDesc[i] = rd[rd.length - 1];
+ totalAggFields += subplans[i].getOutputWidth();
+ }
+ tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+
+ this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+ for (int i = 0; i < inputRecDesc.length; i++) {
+ tAccess[i] = new FrameTupleAccessor(frameSize, inputRecDesc[i]);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ /**
+ * Since each pipeline only produces one tuple, this method is only
+ * called by the close method of the pipelines.
+ */
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int tIndex = 0;
+ int w = subplans[inputIdx].getOutputWidth();
+ IFrameTupleAccessor accessor = tAccess[inputIdx];
+ accessor.reset(buffer);
+ for (int f = 0; f < w; f++) {
+ tb.addField(accessor, tIndex, f);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // clearFrame();
+ }
+
+ public void setInputIdx(int inputIdx) {
+ this.inputIdx = inputIdx;
+ }
+
+ public ArrayTupleBuilder getTupleBuilder() {
+ return tb;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..4d5d3d6
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -0,0 +1,147 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+ private ICopySerializableAggregateFunctionFactory[] aggFactories;
+
+ public SerializableAggregatorDescriptorFactory(ICopySerializableAggregateFunctionFactory[] aggFactories) {
+ this.aggFactories = aggFactories;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ final int[] keys = keyFields;
+
+ /**
+ * one IAggregatorDescriptor instance per Gby operator
+ */
+ return new IAggregatorDescriptor() {
+ private FrameTupleReference ftr = new FrameTupleReference();
+ private ICopySerializableAggregateFunction[] aggs = new ICopySerializableAggregateFunction[aggFactories.length];
+ private int offsetFieldIndex = keys.length;
+ private int stateFieldLength[] = new int[aggFactories.length];
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState();
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ DataOutput output = tb.getDataOutput();
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ int begin = tb.getSize();
+ if (aggs[i] == null) {
+ aggs[i] = aggFactories[i].createAggregateFunction();
+ }
+ aggs[i].init(output);
+ tb.addFieldEndOffset();
+ stateFieldLength[i] = tb.getSize() - begin;
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ // doing initial aggregate
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ byte[] data = tb.getByteArray();
+ int prevFieldPos = i + keys.length - 1;
+ int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
+ aggs[i].step(ftr, data, start, stateFieldLength[i]);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ ftr.reset(accessor, tIndex);
+ int stateTupleStart = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int fieldSlotLength = stateAccessor.getFieldSlotsLength();
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ byte[] data = stateAccessor.getBuffer().array();
+ int start = stateAccessor.getFieldStartOffset(stateTupleIndex, i + keys.length)
+ + stateTupleStart + fieldSlotLength;
+ aggs[i].step(ftr, data, start, stateFieldLength[i]);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte[] data = accessor.getBuffer().array();
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+ int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+ int start = refOffset;
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ aggs[i].finishPartial(data, start, stateFieldLength[i], tb.getDataOutput());
+ start += stateFieldLength[i];
+ tb.addFieldEndOffset();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte[] data = accessor.getBuffer().array();
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+ int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+ int start = refOffset;
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ aggs[i].finish(data, start, stateFieldLength[i], tb.getDataOutput());
+ start += stateFieldLength[i];
+ tb.addFieldEndOffset();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void close() {
+ reset();
+ }
+
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..83925cc
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private IAggregateEvaluatorFactory[] aggFactories;
+
+ public SimpleAlgebricksAccumulatingAggregatorFactory(IAggregateEvaluatorFactory[] aggFactories, int[] keys,
+ int[] fdColumns) {
+ this.aggFactories = aggFactories;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+
+ return new IAggregatorDescriptor() {
+
+ private FrameTupleReference ftr = new FrameTupleReference();
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
+
+ // initialize aggregate functions
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].init();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].step(ftr);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].step(ftr);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ IAggregateEvaluator[] agg = (IAggregateEvaluator[]) state.state;
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].finish(p);
+ tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ IAggregateEvaluator[] agg = new IAggregateEvaluator[aggFactories.length];
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i] = aggFactories[i].createAggregateEvaluator(ctx);
+ } catch (AlgebricksException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return new AggregateState(agg);
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("this method should not be called");
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ };
+ }
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
new file mode 100644
index 0000000..cd9931b
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends AbstractOneInputOneOutputPushRuntime {
+
+ protected FrameTupleAppender appender;
+ protected ByteBuffer frame;
+ protected FrameTupleAccessor tAccess;
+ protected FrameTupleReference tRef;
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!failed) {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ }
+ writer.close();
+ appender.reset(frame, true);
+ }
+
+ protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException(
+ "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+ }
+ }
+ }
+
+ protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
+ if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+ throw new IllegalStateException(
+ "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
+ }
+ }
+ }
+
+ protected void appendTupleToFrame(int tIndex) throws HyracksDataException {
+ if (!appender.append(tAccess, tIndex)) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tAccess, tIndex)) {
+ throw new IllegalStateException(
+ "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendTupleToFrame).");
+ }
+ }
+ }
+
+ protected final void initAccessAppend(IHyracksTaskContext ctx) {
+ // if (allocFrame) {
+ frame = ctx.allocateFrame();
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(frame, true);
+ // }
+ tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+ }
+
+ protected final void initAccessAppendRef(IHyracksTaskContext ctx) {
+ initAccessAppend(ctx);
+ tRef = new FrameTupleReference();
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
new file mode 100644
index 0000000..22f7df6
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractOneInputOneOutputPushRuntime extends AbstractOneInputPushRuntime {
+
+ protected RecordDescriptor inputRecordDesc;
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.inputRecordDesc = recordDescriptor;
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
new file mode 100644
index 0000000..f2c1008
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ protected int[] projectionList;
+
+ public AbstractOneInputOneOutputRuntimeFactory(int[] projectionList) {
+ this.projectionList = projectionList;
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+ return createOneOutputPushRuntime(ctx);
+ }
+
+ public abstract AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx)
+ throws AlgebricksException;
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
new file mode 100644
index 0000000..e600264
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
+ protected IFrameWriter writer;
+ protected RecordDescriptor outputRecordDesc;
+ protected boolean failed;
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ this.writer = writer;
+ this.outputRecordDesc = recordDesc;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ failed = true;
+ writer.fail();
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
new file mode 100644
index 0000000..0fbe51f
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractOneInputSinkPushRuntime implements IPushRuntime {
+ protected RecordDescriptor inputRecordDesc;
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.inputRecordDesc = recordDescriptor;
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
new file mode 100644
index 0000000..1d23d14
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInputPushRuntime {
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
new file mode 100644
index 0000000..f175367
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class SinkRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public SinkRuntimeFactory() {
+ }
+
+ @Override
+ public String toString() {
+ return "sink";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+ return new AbstractOneInputSinkPushRuntime() {
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
new file mode 100644
index 0000000..388e6c9
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+public class MicroPreClusteredGroupRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final int[] groupFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final RecordDescriptor inRecordDesc;
+ private final RecordDescriptor outRecordDesc;
+
+ public MicroPreClusteredGroupRuntimeFactory(int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDesc, int[] projectionList) {
+ super(projectionList);
+ // Obs: the projection list is currently ignored.
+ if (projectionList != null) {
+ throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
+ }
+ this.groupFields = groupFields;
+ this.comparatorFactories = comparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ this.inRecordDesc = inRecordDesc;
+ this.outRecordDesc = outRecordDesc;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ try {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+ outRecordDesc, groupFields, groupFields);
+ final ByteBuffer copyFrame = ctx.allocateFrame();
+ final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
+ ByteBuffer outFrame = ctx.allocateFrame();
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(outFrame, true);
+
+ return new AbstractOneInputOneOutputPushRuntime() {
+
+ private PreclusteredGroupWriter pgw;
+
+ @Override
+ public void open() throws HyracksDataException {
+ pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
+ outRecordDesc, writer);
+ pgw.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ pgw.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ pgw.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ pgw.close();
+ }
+ };
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
new file mode 100644
index 0000000..f2659f5
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import java.nio.ByteBuffer;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ // array of factories for building the local runtime pipeline
+ private final AlgebricksPipeline pipeline;
+
+ public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
+ IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
+ super(spec, inputArity, outputArity);
+ if (outputArity == 1) {
+ this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+ }
+ this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
+ }
+
+ public AlgebricksPipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = super.toJSON();
+ json.put("micro-operators", pipeline.getRuntimeFactories());
+ return json;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Asterix { \n");
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" " + f.toString() + ";\n");
+ }
+ sb.append("}");
+ // sb.append(super.getInputArity());
+ // sb.append(";");
+ // sb.append(super.getOutputArity());
+ // sb.append(";");
+ return sb.toString();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ if (inputArity == 0) {
+ return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ } else {
+ return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ }
+ }
+
+ private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ public void initialize() throws HyracksDataException {
+ IFrameWriter startOfPipeline;
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
+
+ PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
+ pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ startOfPipeline.open();
+ startOfPipeline.close();
+ }
+ };
+ }
+
+ private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+ private IFrameWriter startOfPipeline;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (startOfPipeline == null) {
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
+ RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
+ AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
+ PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
+ pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ startOfPipeline.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ startOfPipeline.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ startOfPipeline.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ startOfPipeline.fail();
+ }
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
new file mode 100644
index 0000000..92410c2
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PipelineAssembler {
+
+ // array of factories for building the local runtime pipeline
+ private final RecordDescriptor pipelineInputRecordDescriptor;
+ private final RecordDescriptor pipelineOutputRecordDescriptor;
+
+ private final int inputArity;
+ private final int outputArity;
+ private final AlgebricksPipeline pipeline;
+
+ public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
+ RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
+ this.pipeline = pipeline;
+ this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
+ this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
+ this.inputArity = inputArity;
+ this.outputArity = outputArity;
+ }
+
+ public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws AlgebricksException {
+ // plug the operators
+ IFrameWriter start = writer;// this.writer;
+ for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
+ IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+ if (i == pipeline.getRuntimeFactories().length - 1) {
+ if (outputArity == 1) {
+ newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+ }
+ } else {
+ newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+ }
+ if (i > 0) {
+ newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
+ } else if (inputArity > 0) {
+ newRuntime.setInputRecordDescriptor(0, pipelineInputRecordDescriptor);
+ }
+ start = newRuntime;
+ }
+ return start;
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
new file mode 100644
index 0000000..f1ee570
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AlgebricksPipeline pipeline;
+ private final RecordDescriptor inputRecordDesc;
+ private final INullWriterFactory[] nullWriterFactories;
+
+ public SubplanRuntimeFactory(AlgebricksPipeline pipeline, INullWriterFactory[] nullWriterFactories,
+ RecordDescriptor inputRecordDesc, int[] projectionList) {
+ super(projectionList);
+ this.pipeline = pipeline;
+ this.nullWriterFactories = nullWriterFactories;
+ this.inputRecordDesc = inputRecordDesc;
+ if (projectionList != null) {
+ throw new NotImplementedException();
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Subplan { \n");
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" " + f.toString() + ";\n");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+
+ RecordDescriptor pipelineOutputRecordDescriptor = null;
+
+ final PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc,
+ pipelineOutputRecordDescriptor);
+ final INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
+ for (int i = 0; i < nullWriterFactories.length; i++) {
+ nullWriters[i] = nullWriterFactories[i].createNullWriter();
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ /**
+ * Computes the outer product between a given tuple and the frames
+ * passed.
+ */
+ class TupleOuterProduct implements IFrameWriter {
+
+ private boolean smthWasWritten = false;
+ private IHyracksTaskContext hCtx = ctx;
+ private int frameSize = hCtx.getFrameSize();
+ private FrameTupleAccessor ta = new FrameTupleAccessor(frameSize,
+ pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
+ private ArrayTupleBuilder tb = new ArrayTupleBuilder(nullWriters.length);
+
+ @Override
+ public void open() throws HyracksDataException {
+ smthWasWritten = false;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ta.reset(buffer);
+ int nTuple = ta.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
+ throw new IllegalStateException("Could not write frame.");
+ }
+ }
+ }
+ smthWasWritten = true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!smthWasWritten) {
+ // the case when we need to write nulls
+ appendNullsToTuple();
+ appendToFrameFromTupleBuilder(tb);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ private void appendNullsToTuple() throws HyracksDataException {
+ tb.reset();
+ int n0 = tRef.getFieldCount();
+ for (int f = 0; f < n0; f++) {
+ tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f);
+ }
+ DataOutput dos = tb.getDataOutput();
+ for (int i = 0; i < nullWriters.length; i++) {
+ nullWriters[i].writeNull(dos);
+ tb.addFieldEndOffset();
+ }
+ }
+
+ }
+
+ IFrameWriter endPipe = new TupleOuterProduct();
+
+ NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
+
+ boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ initAccessAppendRef(ctx);
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ startOfPipeline.writeTuple(buffer, t);
+ startOfPipeline.open();
+ startOfPipeline.close();
+ }
+ }
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
new file mode 100644
index 0000000..fd2710d
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;
+
+public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] sortFields;
+ private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+ private IBinaryComparatorFactory[] comparatorFactories;
+
+ public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
+ super(projectionList);
+ // Obs: the projection list is currently ignored.
+ if (projectionList != null) {
+ throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
+ }
+ this.sortFields = sortFields;
+ this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.comparatorFactories = comparatorFactories;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+
+ return new AbstractOneInputOneOutputPushRuntime() {
+
+ FrameSorter frameSorter = null;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (frameSorter == null) {
+ frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+ outputRecordDesc);
+ }
+ frameSorter.reset();
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ frameSorter.insertFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ frameSorter.sortFrames();
+ frameSorter.flushFrames(writer);
+ writer.close();
+ }
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
new file mode 100644
index 0000000..e47384d
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private int[] outColumns;
+ private IScalarEvaluatorFactory[] evalFactories;
+
+ /**
+ * @param outColumns
+ * a sorted array of columns into which the result is written to
+ * @param evalFactories
+ * @param projectionList
+ * an array of columns to be projected
+ */
+
+ public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList) {
+ super(projectionList);
+ this.outColumns = outColumns;
+ this.evalFactories = evalFactories;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("assign [");
+ for (int i = 0; i < outColumns.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(outColumns[i]);
+ }
+ sb.append("] := [");
+ for (int i = 0; i < evalFactories.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(evalFactories[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ final int[] projectionToOutColumns = new int[projectionList.length];
+ for (int j = 0; j < projectionList.length; j++) {
+ projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+ private IPointable result = VoidPointable.FACTORY.createPointable();
+ private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ initAccessAppendRef(ctx);
+ first = false;
+ int n = evalFactories.length;
+ for (int i = 0; i < n; i++) {
+ try {
+ eval[i] = evalFactories[i].createScalarEvaluator(ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ produceTuple(tupleBuilder, tAccess, t, tRef);
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ }
+
+ private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ FrameTupleReference tupleRef) throws HyracksDataException {
+ tb.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int k = projectionToOutColumns[f];
+ if (k >= 0) {
+ try {
+ eval[k].evaluate(tupleRef, result);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ tb.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
+ } else {
+ tb.addField(accessor, tIndex, projectionList[f]);
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
new file mode 100644
index 0000000..dbe1ab1
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public EmptyTupleSourceRuntimeFactory() {
+ }
+
+ @Override
+ public String toString() {
+ return "ets";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) {
+ return new AbstractOneInputSourcePushRuntime() {
+
+ private ByteBuffer frame = ctx.allocateFrame();
+ private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+ private FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ FrameUtils.flushFrame(frame, writer);
+ writer.close();
+ }
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
new file mode 100644
index 0000000..42724f7
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public NestedTupleSourceRuntimeFactory() {
+ }
+
+ @Override
+ public String toString() {
+ return "nts";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+ return new NestedTupleSourceRuntime(ctx);
+ }
+
+ public static class NestedTupleSourceRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+ public NestedTupleSourceRuntime(IHyracksTaskContext ctx) {
+ initAccessAppend(ctx);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws HyracksDataException {
+ tAccess.reset(inputBuffer);
+ appendTupleToFrame(tIndex);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
new file mode 100644
index 0000000..a54c96d
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -0,0 +1,160 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ public static int NO_DEFAULT_BRANCH = -1;
+
+ private final ICopyEvaluatorFactory[] evalFactories;
+ private final IBinaryBooleanInspector boolInspector;
+ private final int defaultBranchIndex;
+
+ public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories,
+ IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) {
+ super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
+ for (int i = 0; i < evalFactories.length; i++) {
+ recordDescriptors[i] = rDesc;
+ }
+ this.evalFactories = evalFactories;
+ this.boolInspector = boolInspector;
+ this.defaultBranchIndex = defaultBranchIndex;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputOperatorNodePushable() {
+ private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+ private final ByteBuffer[] writeBuffers = new ByteBuffer[outputArity];
+ private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
+ private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
+ private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ private final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inOutRecDesc);
+ private final FrameTupleReference frameTuple = new FrameTupleReference();
+
+ private final FrameTupleAppender tupleAppender = new FrameTupleAppender(ctx.getFrameSize());
+ private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
+ private final DataOutput tupleDos = tupleBuilder.getDataOutput();
+
+ @Override
+ public void close() throws HyracksDataException {
+ // Flush (possibly not full) buffers that have data, and close writers.
+ for (int i = 0; i < outputArity; i++) {
+ tupleAppender.reset(writeBuffers[i], false);
+ if (tupleAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffers[i], writers[i]);
+ }
+ writers[i].close();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ frameTuple.reset(accessor, i);
+ boolean found = false;
+ for (int j = 0; j < evals.length; j++) {
+ try {
+ evalBuf.reset();
+ evals[j].evaluate(frameTuple);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
+ if (found) {
+ copyAndAppendTuple(j);
+ break;
+ }
+ }
+ // Optionally write to default output branch.
+ if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
+ copyAndAppendTuple(defaultBranchIndex);
+ }
+ }
+ }
+
+ private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
+ // Copy tuple into tuple builder.
+ try {
+ tupleBuilder.reset();
+ for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+ tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+ frameTuple.getFieldLength(i));
+ tupleBuilder.addFieldEndOffset();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ // Append to frame.
+ tupleAppender.reset(writeBuffers[outputIndex], false);
+ if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(writeBuffers[outputIndex], writers[outputIndex]);
+ tupleAppender.reset(writeBuffers[outputIndex], true);
+ if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.open();
+ }
+ // Create write buffers.
+ for (int i = 0; i < outputArity; i++) {
+ writeBuffers[i] = ctx.allocateFrame();
+ // Make sure to clear all buffers, since we are reusing the tupleAppender.
+ tupleAppender.reset(writeBuffers[i], true);
+ }
+ // Create evaluators for partitioning.
+ try {
+ for (int i = 0; i < evalFactories.length; i++) {
+ evals[i] = evalFactories[i].createEvaluator(evalBuf);
+ }
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+}
+
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
new file mode 100644
index 0000000..5b53fc2
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PrinterRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] printColumns;
+ private final IPrinterFactory[] printerFactories;
+ private final RecordDescriptor inputRecordDesc;
+
+ public PrinterRuntimeFactory(int[] printColumns, IPrinterFactory[] printerFactories,
+ RecordDescriptor inputRecordDesc) {
+ this.printColumns = printColumns;
+ this.printerFactories = printerFactories;
+ this.inputRecordDesc = inputRecordDesc;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("print [");
+ for (int i = 0; i < printColumns.length; i++) {
+ if (i > 0) {
+ buf.append("; ");
+ }
+ buf.append(printColumns[i]);
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+ IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories,
+ inputRecordDesc);
+ return new SinkWriterRuntime(w, ctx, System.out, inputRecordDesc);
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
new file mode 100644
index 0000000..c15e050
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private int[] outColumns;
+ private IRunningAggregateEvaluatorFactory[] runningAggregates;
+
+ /**
+ * @param outColumns
+ * a sorted array of columns into which the result is written to
+ * @param runningAggregates
+ * @param projectionList
+ * an array of columns to be projected
+ */
+
+ public RunningAggregateRuntimeFactory(int[] outColumns, IRunningAggregateEvaluatorFactory[] runningAggregates,
+ int[] projectionList) {
+ super(projectionList);
+ this.outColumns = outColumns;
+ this.runningAggregates = runningAggregates;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("running-aggregate [");
+ for (int i = 0; i < outColumns.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(outColumns[i]);
+ }
+ sb.append("] := [");
+ for (int i = 0; i < runningAggregates.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(runningAggregates[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ final int[] projectionToOutColumns = new int[projectionList.length];
+ for (int j = 0; j < projectionList.length; j++) {
+ projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+ private IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ initAccessAppendRef(ctx);
+ if (first) {
+ first = false;
+ int n = runningAggregates.length;
+ for (int i = 0; i < n; i++) {
+ try {
+ raggs[i] = runningAggregates[i].createRunningAggregateEvaluator();
+ raggs[i].init();
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ produceTuple(tupleBuilder, tAccess, t, tRef);
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ }
+
+ private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ FrameTupleReference tupleRef) throws HyracksDataException {
+ tb.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int k = projectionToOutColumns[f];
+ if (k >= 0) {
+ try {
+ raggs[k].step(tupleRef, p);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ tb.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ } else {
+ tb.addField(accessor, tIndex, projectionList[f]);
+ }
+ }
+ }
+
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
new file mode 100644
index 0000000..148f087
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
+
+ private final IHyracksTaskContext ctx;
+ private final PrintStream printStream;
+ private final IAWriter writer;
+ private RecordDescriptor inputRecordDesc;
+ private FrameTupleAccessor tAccess;
+ private boolean autoClose = false;
+ private boolean first = true;
+
+ public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
+ RecordDescriptor inputRecordDesc) {
+ this.writer = writer;
+ this.ctx = ctx;
+ this.printStream = printStream;
+ this.inputRecordDesc = inputRecordDesc;
+ this.tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+ }
+
+ public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
+ RecordDescriptor inputRecordDesc, boolean autoClose) {
+ this(writer, ctx, printStream, inputRecordDesc);
+ this.autoClose = autoClose;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ tAccess = new FrameTupleAccessor(ctx.getFrameSize(), inputRecordDesc);
+ try {
+ writer.init();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ try {
+ writer.printTuple(tAccess, t);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (autoClose) {
+ printStream.close();
+ }
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.inputRecordDesc = recordDescriptor;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
new file mode 100644
index 0000000..1aff1aa
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SinkWriterRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] fields;
+ private final IPrinterFactory[] printerFactories;
+ private final File outputFile;
+ private final RecordDescriptor inputRecordDesc;
+ private final IAWriterFactory writerFactory;
+
+ public SinkWriterRuntimeFactory(int[] fields, IPrinterFactory[] printerFactories, File outputFile,
+ IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc) {
+ this.fields = fields;
+ this.printerFactories = printerFactories;
+ this.outputFile = outputFile;
+ this.writerFactory = writerFactory;
+ this.inputRecordDesc = inputRecordDesc;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("sink-write " + "[");
+ for (int i = 0; i < fields.length; i++) {
+ if (i > 0) {
+ buf.append("; ");
+ }
+ buf.append(fields[i]);
+ }
+ buf.append("] outputFile");
+ return buf.toString();
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+ PrintStream filePrintStream = null;
+ try {
+ filePrintStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
+ } catch (FileNotFoundException e) {
+ throw new AlgebricksException(e);
+ }
+ IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc);
+ return new SinkWriterRuntime(w, ctx, filePrintStream, inputRecordDesc, true);
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
new file mode 100644
index 0000000..796ef0a
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
@@ -0,0 +1,98 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+
+public class StreamDieRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IScalarEvaluatorFactory aftterObjectsEvalFactory;
+ private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
+
+ public StreamDieRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory, int[] projectionList,
+ IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
+ super(projectionList);
+ this.aftterObjectsEvalFactory = maxObjectsEvalFactory;
+ this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
+ }
+
+ @Override
+ public String toString() {
+ String s = "stream-die " + aftterObjectsEvalFactory.toString();
+ return s;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+ final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+ private IScalarEvaluator evalAfterObjects;
+ private int toWrite = -1;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (evalAfterObjects == null) {
+ initAccessAppendRef(ctx);
+ try {
+ evalAfterObjects = aftterObjectsEvalFactory.createScalarEvaluator(ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ if (toWrite < 0) {
+ toWrite = evaluateInteger(evalAfterObjects, 0);
+ }
+ for (int t = 0; t < nTuple; t++) {
+ if (toWrite > 0) {
+ toWrite--;
+ if (projectionList != null) {
+ appendProjectionToFrame(t, projectionList);
+ } else {
+ appendTupleToFrame(t);
+ }
+ } else {
+ throw new HyracksDataException("injected failure");
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ super.close();
+ }
+
+ private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
+ tRef.reset(tAccess, tIdx);
+ try {
+ eval.evaluate(tRef, p);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+ return lim;
+ }
+
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
new file mode 100644
index 0000000..4e9d51c
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -0,0 +1,136 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+
+public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IScalarEvaluatorFactory maxObjectsEvalFactory;
+ private IScalarEvaluatorFactory offsetEvalFactory;
+ private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
+
+ public StreamLimitRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory,
+ IScalarEvaluatorFactory offsetEvalFactory, int[] projectionList,
+ IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
+ super(projectionList);
+ this.maxObjectsEvalFactory = maxObjectsEvalFactory;
+ this.offsetEvalFactory = offsetEvalFactory;
+ this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
+ }
+
+ @Override
+ public String toString() {
+ String s = "stream-limit " + maxObjectsEvalFactory.toString();
+ if (offsetEvalFactory != null) {
+ return s + ", " + offsetEvalFactory.toString();
+ } else {
+ return s;
+ }
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+ final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+ private IScalarEvaluator evalMaxObjects;
+ private IScalarEvaluator evalOffset = null;
+ private int toWrite = 0; // how many tuples still to write
+ private int toSkip = 0; // how many tuples still to skip
+ private boolean firstTuple = true;
+ private boolean afterLastTuple = false;
+
+ @Override
+ public void open() throws HyracksDataException {
+ // if (first) {
+ if (evalMaxObjects == null) {
+ initAccessAppendRef(ctx);
+ try {
+ evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator(ctx);
+ if (offsetEvalFactory != null) {
+ evalOffset = offsetEvalFactory.createScalarEvaluator(ctx);
+ }
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ writer.open();
+ afterLastTuple = false;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (afterLastTuple) {
+ // ignore the data
+ return;
+ }
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ int start = 0;
+ if (nTuple <= toSkip) {
+ toSkip -= nTuple;
+ return;
+ } else if (toSkip > 0) {
+ start = toSkip;
+ toSkip = 0;
+ }
+ for (int t = start; t < nTuple; t++) {
+ if (firstTuple) {
+ firstTuple = false;
+ toWrite = evaluateInteger(evalMaxObjects, t);
+ if (evalOffset != null) {
+ toSkip = evaluateInteger(evalOffset, t);
+ }
+ }
+ if (toSkip > 0) {
+ toSkip--;
+ } else if (toWrite > 0) {
+ toWrite--;
+ if (projectionList != null) {
+ appendProjectionToFrame(t, projectionList);
+ } else {
+ appendTupleToFrame(t);
+ }
+ } else {
+ // close();
+ afterLastTuple = true;
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // if (!afterLastTuple) {
+ super.close();
+ // }
+ }
+
+ private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
+ tRef.reset(tAccess, tIdx);
+ try {
+ eval.evaluate(tRef, p);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+ return lim;
+ }
+
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
new file mode 100644
index 0000000..455ad8e
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public StreamProjectRuntimeFactory(int[] projectionList) {
+ super(projectionList);
+ }
+
+ @Override
+ public String toString() {
+ return "stream-project " + Arrays.toString(projectionList);
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ initAccessAppend(ctx);
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
+
+ }
+
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
new file mode 100644
index 0000000..b0c94b7
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+
+public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IScalarEvaluatorFactory cond;
+
+ private IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+
+ /**
+ * @param cond
+ * @param projectionList
+ * if projectionList is null, then no projection is performed
+ */
+ public StreamSelectRuntimeFactory(IScalarEvaluatorFactory cond, int[] projectionList,
+ IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
+ super(projectionList);
+ this.cond = cond;
+ this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
+ }
+
+ @Override
+ public String toString() {
+ return "stream-select " + cond.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
+ final IBinaryBooleanInspector bbi = binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+ private IScalarEvaluator eval;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (eval == null) {
+ initAccessAppendRef(ctx);
+ try {
+ eval = cond.createScalarEvaluator(ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ try {
+ eval.evaluate(tRef, p);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) {
+ if (projectionList != null) {
+ appendProjectionToFrame(t, projectionList);
+ } else {
+ appendTupleToFrame(t);
+ }
+ }
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
new file mode 100644
index 0000000..3f2765f
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IPrinter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private String command;
+ private IPrinterFactory[] printerFactories;
+ private char fieldDelimiter;
+ private ITupleParserFactory parserFactory;
+
+ public StringStreamingRuntimeFactory(String command, IPrinterFactory[] printerFactories, char fieldDelimiter,
+ ITupleParserFactory parserFactory) {
+ super(null);
+ this.command = command;
+ this.printerFactories = printerFactories;
+ this.fieldDelimiter = fieldDelimiter;
+ this.parserFactory = parserFactory;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ final IPrinter[] printers = new IPrinter[printerFactories.length];
+ for (int i = 0; i < printerFactories.length; i++) {
+ printers[i] = printerFactories[i].createPrinter();
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ final class ForwardScriptOutput implements Runnable {
+
+ private InputStream inStream;
+ private ITupleParser parser;
+
+ public ForwardScriptOutput(ITupleParser parser, InputStream inStream) {
+ this.parser = parser;
+ this.inStream = inStream;
+ }
+
+ @Override
+ public void run() {
+ try {
+ parser.parse(inStream, writer);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ inStream.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ final class DumpInStreamToPrintStream implements Runnable {
+
+ private BufferedReader reader;
+ private PrintStream printStream;
+
+ public DumpInStreamToPrintStream(InputStream inStream, PrintStream printStream) {
+ this.reader = new BufferedReader(new InputStreamReader(inStream));
+ this.printStream = printStream;
+ }
+
+ @Override
+ public void run() {
+ String s;
+ try {
+ while ((s = reader.readLine()) != null) {
+ printStream.println(s);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ printStream.close();
+ }
+ }
+
+ }
+
+ private Process process;
+ private PrintStream ps;
+ private boolean first = true;
+ private Thread outputPipe;
+ private Thread dumpStderr;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ initAccessAppendRef(ctx);
+ }
+
+ try {
+ ITupleParser parser = parserFactory.createTupleParser(ctx);
+ process = Runtime.getRuntime().exec(command);
+ ps = new PrintStream(process.getOutputStream());
+ ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream());
+ outputPipe = new Thread(fso);
+ outputPipe.start();
+ DumpInStreamToPrintStream disps = new DumpInStreamToPrintStream(process.getErrorStream(),
+ System.err);
+ dumpStderr = new Thread(disps);
+ dumpStderr.start();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ for (int i = 0; i < printers.length; i++) {
+ try {
+ printers[i].print(buffer.array(), tRef.getFieldStart(i), tRef.getFieldLength(i), ps);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ ps.print(fieldDelimiter);
+ if (i == printers.length - 1) {
+ ps.print('\n');
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // first close the printer printing to the process
+ ps.close();
+ int ret = 0;
+ // then wait for the process to finish
+
+ try {
+ ret = process.waitFor();
+ outputPipe.join();
+ dumpStderr.join();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ if (ret != 0) {
+ throw new HyracksDataException("Process exit value: " + ret);
+ }
+ // close the following operator in the chain
+ super.close();
+ }
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
new file mode 100644
index 0000000..d42a294
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int outCol;
+ private final IUnnestingEvaluatorFactory unnestingFactory;
+ private int outColPos;
+ private final boolean outColIsProjected;
+
+ // Each time step() is called on the aggregate, a new value is written in
+ // its output. One byte is written before that value and is neglected.
+ // By convention, if the aggregate function writes nothing, it means it
+ // produced the last value.
+
+ public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+ super(projectionList);
+ this.outCol = outCol;
+ this.unnestingFactory = unnestingFactory;
+ outColPos = -1;
+ for (int f = 0; f < projectionList.length; f++) {
+ if (projectionList[f] == outCol) {
+ outColPos = f;
+ }
+ }
+ outColIsProjected = outColPos >= 0;
+ }
+
+ @Override
+ public String toString() {
+ return "unnest " + outCol + " <- " + unnestingFactory;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
+ throws AlgebricksException {
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+ private IUnnestingEvaluator agg;
+ private ArrayTupleBuilder tupleBuilder;
+
+ @Override
+ public void open() throws HyracksDataException {
+ initAccessAppendRef(ctx);
+ try {
+ agg = unnestingFactory.createUnnestingEvaluator(ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ try {
+ agg.init(tRef);
+ boolean goon = true;
+ do {
+ tupleBuilder.reset();
+ if (!agg.step(p)) {
+ goon = false;
+ } else {
+ if (!outColIsProjected) {
+ appendProjectionToFrame(t, projectionList);
+ } else {
+ for (int f = 0; f < outColPos; f++) {
+ tupleBuilder.addField(tAccess, t, f);
+ }
+ tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ for (int f = outColPos + 1; f < projectionList.length; f++) {
+ tupleBuilder.addField(tAccess, t, f);
+ }
+ }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ } while (goon);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ }
+ };
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
new file mode 100644
index 0000000..9c53241
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
@@ -0,0 +1,54 @@
+package edu.uci.ics.hyracks.algebricks.runtime.writers;
+
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PrinterBasedWriterFactory implements IAWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final PrinterBasedWriterFactory INSTANCE = new PrinterBasedWriterFactory();
+
+ public PrinterBasedWriterFactory() {
+ }
+
+ @Override
+ public IAWriter createWriter(final int[] fields, final PrintStream printStream,
+ final IPrinterFactory[] printerFactories, RecordDescriptor inputRecordDescriptor) {
+ final IPrinter[] printers = new IPrinter[printerFactories.length];
+ for (int i = 0; i < printerFactories.length; i++) {
+ printers[i] = printerFactories[i].createPrinter();
+ }
+
+ return new IAWriter() {
+
+ @Override
+ public void init() throws AlgebricksException {
+ for (int i = 0; i < printers.length; i++) {
+ printers[i].init();
+ }
+ }
+
+ @Override
+ public void printTuple(IFrameTupleAccessor tAccess, int tIdx) throws AlgebricksException {
+ for (int i = 0; i < fields.length; i++) {
+ int fldStart = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength()
+ + tAccess.getFieldStartOffset(tIdx, fields[i]);
+ int fldLen = tAccess.getFieldLength(tIdx, fields[i]);
+ if (i > 0) {
+ printStream.print("; ");
+ }
+ printers[i].print(tAccess.getBuffer().array(), fldStart, fldLen, printStream);
+ }
+ printStream.println();
+ }
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
new file mode 100644
index 0000000..61f822e
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.hyracks.algebricks.runtime.writers;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SerializedDataWriterFactory implements IAWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAWriter createWriter(final int[] fields, final PrintStream ps, IPrinterFactory[] printerFactories,
+ final RecordDescriptor inputRecordDescriptor) {
+ return new IAWriter() {
+
+ @Override
+ public void init() throws AlgebricksException {
+ // dump the SerializerDeserializers to disk
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(inputRecordDescriptor);
+ baos.writeTo(ps);
+ oos.close();
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void printTuple(IFrameTupleAccessor tAccess, int tIdx) throws AlgebricksException {
+ for (int i = 0; i < fields.length; i++) {
+ int fldStart = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength()
+ + tAccess.getFieldStartOffset(tIdx, fields[i]);
+ int fldLen = tAccess.getFieldLength(tIdx, fields[i]);
+ ps.write(tAccess.getBuffer().array(), fldStart, fldLen);
+ }
+ }
+ };
+ }
+}