merged hyracks_asterix_stabilization r1440:1453
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1488 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/pom.xml b/hyracks-algebricks/hyracks-algebricks-runtime/pom.xml
new file mode 100644
index 0000000..79bcaeb
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/pom.xml
@@ -0,0 +1,51 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-algebricks-runtime</artifactId>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-algebricks</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-btree</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-rtree</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-algebricks-common</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-algebricks-data</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
new file mode 100644
index 0000000..973fef6
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountAggregateFunctionFactory.java
@@ -0,0 +1,54 @@
+package edu.uci.ics.hyracks.algebricks.runtime.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleCountAggregateFunctionFactory implements IAggregateFunctionFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException {
+
+ final DataOutput out = provider.getDataOutput();
+ return new IAggregateFunction() {
+
+ int cnt;
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ ++cnt;
+ }
+
+ @Override
+ public void init() throws AlgebricksException {
+ cnt = 0;
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ out.writeInt(cnt);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ try {
+ out.writeInt(cnt);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
new file mode 100644
index 0000000..736baaa
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class TupleCountRunningAggregateFunctionFactory implements IRunningAggregateFunctionFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider)
+ throws AlgebricksException {
+
+ final DataOutput out = provider.getDataOutput();
+
+ return new IRunningAggregateFunction() {
+
+ int cnt;
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ ++cnt;
+ try {
+ out.writeInt(cnt);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void init() throws AlgebricksException {
+ cnt = 0;
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
new file mode 100644
index 0000000..50f014b
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class AlgebricksPipeline implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final IPushRuntimeFactory[] runtimeFactories;
+ private final RecordDescriptor[] recordDescriptors;
+
+ public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] recordDescriptors) {
+ this.runtimeFactories = runtimeFactories;
+ this.recordDescriptors = recordDescriptors;
+ // this.projectedColumns = projectedColumns;
+ }
+
+ public IPushRuntimeFactory[] getRuntimeFactories() {
+ return runtimeFactories;
+ }
+
+ public RecordDescriptor[] getRecordDescriptors() {
+ return recordDescriptors;
+ }
+
+ public int getOutputWidth() {
+ return recordDescriptors[recordDescriptors.length - 1].getFieldCount();
+ }
+
+ // public int[] getProjectedColumns() {
+ // return projectedColumns;
+ // }
+
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunction.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunction.java
new file mode 100644
index 0000000..8701c24
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunction.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IAggregateFunction {
+ /** should be called each time a new aggregate value is computed */
+ public void init() throws AlgebricksException;
+
+ public void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+ public void finish() throws AlgebricksException;
+
+ public void finishPartial() throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunctionFactory.java
new file mode 100644
index 0000000..3c3bfeb
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IAggregateFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public interface IAggregateFunctionFactory extends Serializable {
+ public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluator.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluator.java
new file mode 100644
index 0000000..11767e3
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluator.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IEvaluator {
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluatorFactory.java
new file mode 100644
index 0000000..dde64c0
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IEvaluatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public interface IEvaluatorFactory extends Serializable {
+ public IEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java
new file mode 100644
index 0000000..26fcc67
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public interface IPushRuntime extends IFrameWriter {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
new file mode 100644
index 0000000..3e349e1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+
+public interface IPushRuntimeFactory extends Serializable {
+ public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunction.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunction.java
new file mode 100644
index 0000000..dc67f29
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IRunningAggregateFunction {
+ public void init() throws AlgebricksException;
+
+ public void step(IFrameTupleReference tuple) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunctionFactory.java
new file mode 100644
index 0000000..632b6f3
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IRunningAggregateFunctionFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public interface IRunningAggregateFunctionFactory extends Serializable {
+ public IRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider)
+ throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunction.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunction.java
new file mode 100644
index 0000000..3167bce
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunction.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface ISerializableAggregateFunction {
+ /**
+ * initialize the space occupied by internal state
+ *
+ * @param state
+ * @throws AlgebricksException
+ * @return length of the intermediate state
+ */
+ public void init(DataOutput state) throws AlgebricksException;
+
+ /**
+ * update the internal state
+ *
+ * @param tuple
+ * @param state
+ * @throws AlgebricksException
+ */
+ public void step(IFrameTupleReference tuple, byte[] data, int start, int len) throws AlgebricksException;
+
+ /**
+ * output the state to result
+ *
+ * @param state
+ * @param result
+ * @throws AlgebricksException
+ */
+ public void finish(byte[] data, int start, int len, DataOutput result) throws AlgebricksException;
+
+ /**
+ * output the partial state to partial result
+ *
+ * @param state
+ * @param partialResult
+ * @throws AlgebricksException
+ */
+ public void finishPartial(byte[] data, int start, int len, DataOutput partialResult) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunctionFactory.java
new file mode 100644
index 0000000..262aa5a
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/ISerializableAggregateFunctionFactory.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface ISerializableAggregateFunctionFactory extends Serializable {
+ public ISerializableAggregateFunction createAggregateFunction() throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunction.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunction.java
new file mode 100644
index 0000000..d0e4e06
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public interface IUnnestingFunction {
+ public void init(IFrameTupleReference tuple) throws AlgebricksException;
+
+ public boolean step() throws AlgebricksException;
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunctionFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunctionFactory.java
new file mode 100644
index 0000000..196708d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingFunctionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public interface IUnnestingFunctionFactory extends Serializable {
+ public IUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException;
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
new file mode 100644
index 0000000..8ab92ee
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.context;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+
+public class AsterixBTreeRegistry {
+
+ private HashMap<Integer, BTree> map = new HashMap<Integer, BTree>();
+ private Lock registryLock = new ReentrantLock();
+
+ public BTree get(int fileId) {
+ return map.get(fileId);
+ }
+
+ // TODO: not very high concurrency, but good enough for now
+ public void lock() {
+ registryLock.lock();
+ }
+
+ public void unlock() {
+ registryLock.unlock();
+ }
+
+ public void register(int fileId, BTree btree) {
+ map.put(fileId, btree);
+ }
+
+ public void unregister(int fileId) {
+ map.remove(fileId);
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java
new file mode 100644
index 0000000..8a1cf0a
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/RuntimeContext.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.context;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class RuntimeContext {
+ private IHyracksTaskContext hyracksContext;
+
+ public RuntimeContext() {
+ }
+
+ public IHyracksTaskContext getHyracksContext() {
+ return hyracksContext;
+ }
+
+ public void setHyracksContext(IHyracksTaskContext hyracksContext) {
+ this.hyracksContext = hyracksContext;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
new file mode 100644
index 0000000..d019845
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ColumnAccessEvalFactory implements IEvaluatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int fieldIndex;
+
+ public ColumnAccessEvalFactory(int fieldIndex) {
+ this.fieldIndex = fieldIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnAccess(" + fieldIndex + ")";
+ }
+
+ @Override
+ public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ return new IEvaluator() {
+
+ private DataOutput out = output.getDataOutput();
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ byte[] buffer = tuple.getFieldData(fieldIndex);
+ int start = tuple.getFieldStart(fieldIndex);
+ int length = tuple.getFieldLength(fieldIndex);
+ try {
+ out.write(buffer, start, length);
+ } catch (IOException ioe) {
+ throw new AlgebricksException(ioe);
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
new file mode 100644
index 0000000..39f56b8
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.evaluators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ConstantEvalFactory implements IEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private byte[] value;
+
+ public ConstantEvalFactory(byte[] value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "Constant";
+ }
+
+ @Override
+ public IEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
+ return new IEvaluator() {
+
+ private DataOutput out = output.getDataOutput();
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+ try {
+ out.write(value, 0, value.length);
+ } catch (IOException ioe) {
+ throw new AlgebricksException(ioe);
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
new file mode 100644
index 0000000..f6a4f9d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class AggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ // private int[] outColumns;
+ private IAggregateFunctionFactory[] aggregFactories;
+
+ public AggregateRuntimeFactory(IAggregateFunctionFactory[] aggregFactories) {
+ super(null);
+ // this.outColumns = outColumns;
+ this.aggregFactories = aggregFactories;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("assign [");
+ for (int i = 0; i < aggregFactories.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(aggregFactories[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ throws AlgebricksException {
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private IAggregateFunction[] aggregs = new IAggregateFunction[aggregFactories.length];
+ private ArrayBackedValueStorage evalOutput = new ArrayBackedValueStorage();
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
+
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ if (first) {
+ first = false;
+ initAccessAppendRef(context);
+ for (int i = 0; i < aggregFactories.length; i++) {
+ aggregs[i] = aggregFactories[i].createAggregateFunction(evalOutput);
+ }
+ }
+ for (int i = 0; i < aggregFactories.length; i++) {
+ aggregs[i].init();
+ }
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ processTuple(tRef);
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ computeAggregate();
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ super.close();
+ }
+
+ private void computeAggregate() throws HyracksDataException {
+ tupleBuilder.reset();
+ for (int f = 0; f < aggregs.length; f++) {
+ evalOutput.reset();
+ try {
+ aggregs[f].finish();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ tupleBuilder.addField(evalOutput.getBytes(), evalOutput.getStartIndex(), evalOutput.getLength());
+ }
+ }
+
+ private void processTuple(FrameTupleReference tupleRef) throws HyracksDataException {
+ for (int f = 0; f < aggregs.length; f++) {
+ try {
+ aggregs[f].step(tupleRef);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..dfd9ec7
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private AlgebricksPipeline[] subplans;
+ private int[] keyFieldIdx;
+ private int[] decorFieldIdx;
+
+ public NestedPlansAccumulatingAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx,
+ int[] decorFieldIdx) {
+ this.subplans = subplans;
+ this.keyFieldIdx = keyFieldIdx;
+ this.decorFieldIdx = decorFieldIdx;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
+
+ final RuntimeContext rc = new RuntimeContext();
+ rc.setHyracksContext(ctx);
+ final AggregatorOutput outputWriter = new AggregatorOutput(ctx.getFrameSize(), subplans, keyFieldIdx.length,
+ decorFieldIdx.length);
+ final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
+ for (int i = 0; i < subplans.length; i++) {
+ try {
+ pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, rc);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+ tb.reset();
+ for (int i = 0; i < keyFieldIdx.length; ++i) {
+ tb.addField(accessor, tIndex, keyFieldIdx[i]);
+ }
+ for (int i = 0; i < decorFieldIdx.length; ++i) {
+ tb.addField(accessor, tIndex, decorFieldIdx[i]);
+ }
+ for (int i = 0; i < pipelines.length; ++i) {
+ pipelines[i].open();
+ }
+
+ // aggregate the first tuple
+ for (int i = 0; i < pipelines.length; i++) {
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ // it only works if the output of the aggregator fits in one
+ // frame
+ for (int i = 0; i < pipelines.length; i++) {
+ pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
+ }
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ for (int i = 0; i < pipelines.length; i++) {
+ outputWriter.setInputIdx(i);
+ pipelines[i].close();
+ }
+ // outputWriter.writeTuple(appender);
+ tupleBuilder.reset();
+ ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+ byte[] data = tb.getByteArray();
+ int[] fieldEnds = tb.getFieldEndOffsets();
+ int start = 0;
+ int offset = 0;
+ for (int i = 0; i < fieldEnds.length; i++) {
+ if (i > 0)
+ start = fieldEnds[i - 1];
+ offset = fieldEnds[i] - start;
+ tupleBuilder.addField(data, start, offset);
+ }
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState();
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("this method should not be called");
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ };
+ }
+
+ private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, RuntimeContext rc)
+ throws AlgebricksException {
+ // plug the operators
+ IFrameWriter start = writer;
+ IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
+ RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+ for (int i = runtimeFactories.length - 1; i >= 0; i--) {
+ IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(rc);
+ newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+ if (i > 0) {
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
+ } else {
+ // the nts has the same input and output rec. desc.
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
+ }
+ start = newRuntime;
+ }
+ return start;
+ }
+
+ /**
+ * We suppose for now, that each subplan only produces one tuple.
+ */
+ private static class AggregatorOutput implements IFrameWriter {
+
+ // private ByteBuffer frame;
+ private FrameTupleAccessor[] tAccess;
+ private RecordDescriptor[] inputRecDesc;
+ private int inputIdx;
+ private ArrayTupleBuilder tb;
+ private AlgebricksPipeline[] subplans;
+
+ public AggregatorOutput(int frameSize, AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
+ this.subplans = subplans;
+ // this.keyFieldIndexes = keyFieldIndexes;
+ int totalAggFields = 0;
+ this.inputRecDesc = new RecordDescriptor[subplans.length];
+ for (int i = 0; i < subplans.length; i++) {
+ RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
+ this.inputRecDesc[i] = rd[rd.length - 1];
+ totalAggFields += subplans[i].getOutputWidth();
+ }
+ tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
+
+ this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
+ for (int i = 0; i < inputRecDesc.length; i++) {
+ tAccess[i] = new FrameTupleAccessor(frameSize, inputRecDesc[i]);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ /**
+ * Since each pipeline only produces one tuple, this method is only
+ * called by the close method of the pipelines.
+ */
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int tIndex = 0;
+ int w = subplans[inputIdx].getOutputWidth();
+ IFrameTupleAccessor accessor = tAccess[inputIdx];
+ accessor.reset(buffer);
+ for (int f = 0; f < w; f++) {
+ tb.addField(accessor, tIndex, f);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // clearFrame();
+ }
+
+ public void setInputIdx(int inputIdx) {
+ this.inputIdx = inputIdx;
+ }
+
+ public ArrayTupleBuilder getTupleBuilder() {
+ return tb;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..ec9a63c
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -0,0 +1,147 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ISerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ISerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+ private ISerializableAggregateFunctionFactory[] aggFactories;
+
+ public SerializableAggregatorDescriptorFactory(ISerializableAggregateFunctionFactory[] aggFactories) {
+ this.aggFactories = aggFactories;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ final int[] keys = keyFields;
+
+ /**
+ * one IAggregatorDescriptor instance per Gby operator
+ */
+ return new IAggregatorDescriptor() {
+ private FrameTupleReference ftr = new FrameTupleReference();
+ private ISerializableAggregateFunction[] aggs = new ISerializableAggregateFunction[aggFactories.length];
+ private int offsetFieldIndex = keys.length;
+ private int stateFieldLength[] = new int[aggFactories.length];
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState();
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ DataOutput output = tb.getDataOutput();
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ int begin = tb.getSize();
+ if (aggs[i] == null) {
+ aggs[i] = aggFactories[i].createAggregateFunction();
+ }
+ aggs[i].init(output);
+ tb.addFieldEndOffset();
+ stateFieldLength[i] = tb.getSize() - begin;
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ // doing initial aggregate
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ byte[] data = tb.getByteArray();
+ int prevFieldPos = i + keys.length - 1;
+ int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
+ aggs[i].step(ftr, data, start, stateFieldLength[i]);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ ftr.reset(accessor, tIndex);
+ int stateTupleStart = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int fieldSlotLength = stateAccessor.getFieldSlotsLength();
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ byte[] data = stateAccessor.getBuffer().array();
+ int start = stateAccessor.getFieldStartOffset(stateTupleIndex, i + keys.length)
+ + stateTupleStart + fieldSlotLength;
+ aggs[i].step(ftr, data, start, stateFieldLength[i]);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte[] data = accessor.getBuffer().array();
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+ int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+ int start = refOffset;
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ aggs[i].finishPartial(data, start, stateFieldLength[i], tb.getDataOutput());
+ start += stateFieldLength[i];
+ tb.addFieldEndOffset();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte[] data = accessor.getBuffer().array();
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
+ int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
+ int start = refOffset;
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ aggs[i].finish(data, start, stateFieldLength[i], tb.getDataOutput());
+ start += stateFieldLength[i];
+ tb.addFieldEndOffset();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void close() {
+ reset();
+ }
+
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
new file mode 100644
index 0000000..99feae1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class SimpleAlgebricksAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private IAggregateFunctionFactory[] aggFactories;
+
+ public SimpleAlgebricksAccumulatingAggregatorFactory(IAggregateFunctionFactory[] aggFactories, int[] keys,
+ int[] fdColumns) {
+ this.aggFactories = aggFactories;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
+
+ return new IAggregatorDescriptor() {
+
+ private FrameTupleReference ftr = new FrameTupleReference();
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.first;
+ IAggregateFunction[] agg = aggState.second;
+
+ // initialize aggregate functions
+ for (int i = 0; i < agg.length; i++) {
+ aggOutput[i].reset();
+ try {
+ agg[i].init();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].step(ftr);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ IAggregateFunction[] agg = aggState.second;
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].step(ftr);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
+ ArrayBackedValueStorage[] aggOutput = aggState.first;
+ IAggregateFunction[] agg = aggState.second;
+ for (int i = 0; i < agg.length; i++) {
+ try {
+ agg[i].finish();
+ tupleBuilder.addField(aggOutput[i].getBytes(), aggOutput[i].getStartIndex(),
+ aggOutput[i].getLength());
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ IAggregateFunction[] agg = new IAggregateFunction[aggFactories.length];
+ ArrayBackedValueStorage[] aggOutput = new ArrayBackedValueStorage[aggFactories.length];
+ for (int i = 0; i < agg.length; i++) {
+ aggOutput[i] = new ArrayBackedValueStorage();
+ try {
+ agg[i] = aggFactories[i].createAggregateFunction(aggOutput[i]);
+ } catch (AlgebricksException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return new AggregateState(new Pair<ArrayBackedValueStorage[], IAggregateFunction[]>(aggOutput, agg));
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("this method should not be called");
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
new file mode 100644
index 0000000..0ff1b89
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends AbstractOneInputOneOutputPushRuntime {
+
+ protected FrameTupleAppender appender;
+ protected ByteBuffer frame;
+ protected FrameTupleAccessor tAccess;
+ protected FrameTupleReference tRef;
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ writer.close();
+ appender.reset(frame, true);
+ }
+
+ protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException(
+ "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+ }
+ }
+ }
+
+ protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
+ if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+ throw new IllegalStateException(
+ "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
+ }
+ }
+ }
+
+ protected void appendTupleToFrame(int tIndex) throws HyracksDataException {
+ if (!appender.append(tAccess, tIndex)) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tAccess, tIndex)) {
+ throw new IllegalStateException(
+ "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendTupleToFrame).");
+ }
+ }
+ }
+
+ protected final void initAccessAppend(RuntimeContext context) {
+ IHyracksTaskContext hCtx = context.getHyracksContext();
+ // if (allocFrame) {
+ frame = hCtx.allocateFrame();
+ appender = new FrameTupleAppender(hCtx.getFrameSize());
+ appender.reset(frame, true);
+ // }
+ tAccess = new FrameTupleAccessor(hCtx.getFrameSize(), inputRecordDesc);
+ }
+
+ protected final void initAccessAppendRef(RuntimeContext context) {
+ initAccessAppend(context);
+ tRef = new FrameTupleReference();
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
new file mode 100644
index 0000000..22f7df6
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractOneInputOneOutputPushRuntime extends AbstractOneInputPushRuntime {
+
+ protected RecordDescriptor inputRecordDesc;
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.inputRecordDesc = recordDescriptor;
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
new file mode 100644
index 0000000..81db95e
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+
+public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ protected int[] projectionList;
+
+ public AbstractOneInputOneOutputRuntimeFactory(int[] projectionList) {
+ this.projectionList = projectionList;
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
+ return createOneOutputPushRuntime(context);
+ }
+
+ public abstract AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(RuntimeContext context)
+ throws AlgebricksException;
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
new file mode 100644
index 0000000..b214f83
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
+ protected IFrameWriter writer;
+ protected RecordDescriptor outputRecordDesc;
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ this.writer = writer;
+ this.outputRecordDesc = recordDesc;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
new file mode 100644
index 0000000..0fbe51f
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractOneInputSinkPushRuntime implements IPushRuntime {
+ protected RecordDescriptor inputRecordDesc;
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.inputRecordDesc = recordDescriptor;
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
new file mode 100644
index 0000000..1d23d14
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractOneInputSourcePushRuntime extends AbstractOneInputPushRuntime {
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
new file mode 100644
index 0000000..81e91f6
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class SinkRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public SinkRuntimeFactory() {
+ }
+
+ @Override
+ public String toString() {
+ return "sink";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
+ return new AbstractOneInputSinkPushRuntime() {
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
new file mode 100644
index 0000000..ea83fdd
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -0,0 +1,94 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+public class MicroPreClusteredGroupRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final int[] groupFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final RecordDescriptor inRecordDesc;
+ private final RecordDescriptor outRecordDesc;
+
+ public MicroPreClusteredGroupRuntimeFactory(int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
+ RecordDescriptor outRecordDesc, int[] projectionList) {
+ super(projectionList);
+ // Obs: the projection list is currently ignored.
+ if (projectionList != null) {
+ throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
+ }
+ this.groupFields = groupFields;
+ this.comparatorFactories = comparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ this.inRecordDesc = inRecordDesc;
+ this.outRecordDesc = outRecordDesc;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(RuntimeContext context)
+ throws AlgebricksException {
+ try {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ final IHyracksTaskContext ctx = context.getHyracksContext();
+ final IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
+ outRecordDesc, groupFields, groupFields);
+ final ByteBuffer copyFrame = ctx.allocateFrame();
+ final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
+ ByteBuffer outFrame = ctx.allocateFrame();
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(outFrame, true);
+
+ return new AbstractOneInputOneOutputPushRuntime() {
+
+ private PreclusteredGroupWriter pgw;
+
+ @Override
+ public void open() throws HyracksDataException {
+ pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc,
+ outRecordDesc, writer);
+ pgw.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ pgw.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ pgw.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ pgw.close();
+ }
+ };
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
new file mode 100644
index 0000000..91fa8a1
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import java.nio.ByteBuffer;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ // array of factories for building the local runtime pipeline
+ private final AlgebricksPipeline pipeline;
+
+ public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
+ IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
+ super(spec, inputArity, outputArity);
+ if (outputArity == 1) {
+ this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+ }
+ this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
+ }
+
+ public AlgebricksPipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = super.toJSON();
+ json.put("micro-operators", pipeline.getRuntimeFactories());
+ return json;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Asterix { \n");
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" " + f.toString() + ";\n");
+ }
+ sb.append("}");
+ // sb.append(super.getInputArity());
+ // sb.append(";");
+ // sb.append(super.getOutputArity());
+ // sb.append(";");
+ return sb.toString();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ if (inputArity == 0) {
+ return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ } else {
+ return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ }
+ }
+
+ private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ public void initialize() throws HyracksDataException {
+ IFrameWriter startOfPipeline;
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
+
+ PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
+ pipelineOutputRecordDescriptor);
+ try {
+ RuntimeContext rc = new RuntimeContext();
+ rc.setHyracksContext(ctx);
+ startOfPipeline = pa.assemblePipeline(writer, rc);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ startOfPipeline.open();
+ startOfPipeline.close();
+ }
+ };
+ }
+
+ private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+ private IFrameWriter startOfPipeline;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (startOfPipeline == null) {
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
+ RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
+ AlgebricksMetaOperatorDescriptor.this.getOperatorId(), 0);
+ PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
+ pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
+ try {
+ RuntimeContext rc = new RuntimeContext();
+ rc.setHyracksContext(ctx);
+ startOfPipeline = pa.assemblePipeline(writer, rc);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ startOfPipeline.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ startOfPipeline.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ startOfPipeline.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
new file mode 100644
index 0000000..1041438
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PipelineAssembler {
+
+ // array of factories for building the local runtime pipeline
+ private final RecordDescriptor pipelineInputRecordDescriptor;
+ private final RecordDescriptor pipelineOutputRecordDescriptor;
+
+ private final int inputArity;
+ private final int outputArity;
+ private final AlgebricksPipeline pipeline;
+
+ public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
+ RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
+ this.pipeline = pipeline;
+ this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
+ this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
+ this.inputArity = inputArity;
+ this.outputArity = outputArity;
+ }
+
+ public IFrameWriter assemblePipeline(IFrameWriter writer, RuntimeContext rc) throws AlgebricksException {
+ // plug the operators
+ IFrameWriter start = writer;// this.writer;
+ for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
+ IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(rc);
+ if (i == pipeline.getRuntimeFactories().length - 1) {
+ if (outputArity == 1) {
+ newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+ }
+ } else {
+ newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+ }
+ if (i > 0) {
+ newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
+ } else if (inputArity > 0) {
+ newRuntime.setInputRecordDescriptor(0, pipelineInputRecordDescriptor);
+ }
+ start = newRuntime;
+ }
+ return start;
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
new file mode 100644
index 0000000..c7adf1f
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.meta;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AlgebricksPipeline pipeline;
+ private final RecordDescriptor inputRecordDesc;
+ private final INullWriterFactory[] nullWriterFactories;
+
+ public SubplanRuntimeFactory(AlgebricksPipeline pipeline, INullWriterFactory[] nullWriterFactories,
+ RecordDescriptor inputRecordDesc, int[] projectionList) {
+ super(projectionList);
+ this.pipeline = pipeline;
+ this.nullWriterFactories = nullWriterFactories;
+ this.inputRecordDesc = inputRecordDesc;
+ if (projectionList != null) {
+ throw new NotImplementedException();
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Subplan { \n");
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" " + f.toString() + ";\n");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ throws AlgebricksException {
+
+ RecordDescriptor pipelineOutputRecordDescriptor = null;
+
+ final PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc,
+ pipelineOutputRecordDescriptor);
+ final INullWriter[] nullWriters = new INullWriter[nullWriterFactories.length];
+ for (int i = 0; i < nullWriterFactories.length; i++) {
+ nullWriters[i] = nullWriterFactories[i].createNullWriter();
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ /**
+ *
+ * Computes the outer product between a given tuple and the frames
+ * passed.
+ *
+ */
+ class TupleOuterProduct implements IFrameWriter {
+
+ private boolean smthWasWritten = false;
+ private IHyracksTaskContext hCtx = context.getHyracksContext();
+ private int frameSize = hCtx.getFrameSize();
+ private FrameTupleAccessor ta = new FrameTupleAccessor(frameSize,
+ pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
+ private ArrayTupleBuilder tb = new ArrayTupleBuilder(nullWriters.length);
+
+ @Override
+ public void open() throws HyracksDataException {
+ smthWasWritten = false;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ta.reset(buffer);
+ int nTuple = ta.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
+ throw new IllegalStateException("Could not write frame.");
+ }
+ }
+ }
+ smthWasWritten = true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!smthWasWritten) {
+ // the case when we need to write nulls
+ appendNullsToTuple();
+ appendToFrameFromTupleBuilder(tb);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ private void appendNullsToTuple() throws HyracksDataException {
+ tb.reset();
+ int n0 = tRef.getFieldCount();
+ for (int f = 0; f < n0; f++) {
+ tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f);
+ }
+ DataOutput dos = tb.getDataOutput();
+ for (int i = 0; i < nullWriters.length; i++) {
+ nullWriters[i].writeNull(dos);
+ tb.addFieldEndOffset();
+ }
+ }
+
+ }
+
+ IFrameWriter endPipe = new TupleOuterProduct();
+
+ NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, context);
+
+ boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ initAccessAppendRef(context);
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ startOfPipeline.writeTuple(buffer, t);
+ startOfPipeline.open();
+ startOfPipeline.close();
+ }
+ }
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
new file mode 100644
index 0000000..3fa3a50
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;
+
+public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] sortFields;
+ private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+ private IBinaryComparatorFactory[] comparatorFactories;
+
+ public InMemorySortRuntimeFactory(int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories, int[] projectionList) {
+ super(projectionList);
+ // Obs: the projection list is currently ignored.
+ if (projectionList != null) {
+ throw new NotImplementedException("Cannot push projection into InMemorySortRuntime.");
+ }
+ this.sortFields = sortFields;
+ this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.comparatorFactories = comparatorFactories;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ throws AlgebricksException {
+
+ return new AbstractOneInputOneOutputPushRuntime() {
+
+ FrameSorter frameSorter = null;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (frameSorter == null) {
+ frameSorter = new FrameSorter(context.getHyracksContext(), sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, outputRecordDesc);
+ }
+ frameSorter.reset();
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ frameSorter.insertFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ frameSorter.sortFrames();
+ frameSorter.flushFrames(writer);
+ writer.close();
+ }
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
new file mode 100644
index 0000000..8a89b17
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private int[] outColumns;
+ private IEvaluatorFactory[] evalFactories;
+
+ /**
+ * @param outColumns
+ * a sorted array of columns into which the result is written to
+ * @param evalFactories
+ * @param projectionList
+ * an array of columns to be projected
+ */
+
+ public AssignRuntimeFactory(int[] outColumns, IEvaluatorFactory[] evalFactories, int[] projectionList) {
+ super(projectionList);
+ this.outColumns = outColumns;
+ this.evalFactories = evalFactories;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("assign [");
+ for (int i = 0; i < outColumns.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(outColumns[i]);
+ }
+ sb.append("] := [");
+ for (int i = 0; i < evalFactories.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(evalFactories[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ throws AlgebricksException {
+ final int[] projectionToOutColumns = new int[projectionList.length];
+ for (int j = 0; j < projectionList.length; j++) {
+ projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private ArrayBackedValueStorage evalOutput = new ArrayBackedValueStorage();
+ private IEvaluator[] eval = new IEvaluator[evalFactories.length];
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ initAccessAppendRef(context);
+ first = false;
+ int n = evalFactories.length;
+ for (int i = 0; i < n; i++) {
+ try {
+ eval[i] = evalFactories[i].createEvaluator(evalOutput);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ produceTuple(tupleBuilder, tAccess, t, tRef);
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ }
+
+ private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ FrameTupleReference tupleRef) throws HyracksDataException {
+ tb.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int k = projectionToOutColumns[f];
+ if (k >= 0) {
+ evalOutput.reset();
+ try {
+ eval[k].evaluate(tupleRef);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ tb.addField(evalOutput.getBytes(), evalOutput.getStartIndex(), evalOutput.getLength());
+ } else {
+ tb.addField(accessor, tIndex, projectionList[f]);
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
new file mode 100644
index 0000000..659ba74
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public EmptyTupleSourceRuntimeFactory() {
+ }
+
+ @Override
+ public String toString() {
+ return "ets";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(final RuntimeContext context) {
+ return new AbstractOneInputSourcePushRuntime() {
+
+ private IHyracksTaskContext hCtx = context.getHyracksContext();
+ private ByteBuffer frame = hCtx.allocateFrame();
+ private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+ private FrameTupleAppender appender = new FrameTupleAppender(hCtx.getFrameSize());
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ FrameUtils.flushFrame(frame, writer);
+ writer.close();
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
new file mode 100644
index 0000000..899147d
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public NestedTupleSourceRuntimeFactory() {
+ }
+
+ @Override
+ public String toString() {
+ return "nts";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(RuntimeContext context) {
+ return new NestedTupleSourceRuntime(context);
+ }
+
+ public static class NestedTupleSourceRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+ public NestedTupleSourceRuntime(RuntimeContext rc) {
+ initAccessAppend(rc);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws HyracksDataException {
+ tAccess.reset(inputBuffer);
+ appendTupleToFrame(tIndex);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
new file mode 100644
index 0000000..d4040f2
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PrinterRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] printColumns;
+ private final IPrinterFactory[] printerFactories;
+ private final RecordDescriptor inputRecordDesc;
+
+ public PrinterRuntimeFactory(int[] printColumns, IPrinterFactory[] printerFactories,
+ RecordDescriptor inputRecordDesc) {
+ this.printColumns = printColumns;
+ this.printerFactories = printerFactories;
+ this.inputRecordDesc = inputRecordDesc;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("print [");
+ for (int i = 0; i < printColumns.length; i++) {
+ if (i > 0) {
+ buf.append("; ");
+ }
+ buf.append(printColumns[i]);
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(final RuntimeContext context) {
+ IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories,
+ inputRecordDesc);
+ return new SinkWriterRuntime(w, context, System.out, inputRecordDesc);
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
new file mode 100644
index 0000000..83976c9
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private int[] outColumns;
+ private IRunningAggregateFunctionFactory[] runningAggregates;
+
+ /**
+ * @param outColumns
+ * a sorted array of columns into which the result is written to
+ * @param runningAggregates
+ * @param projectionList
+ * an array of columns to be projected
+ */
+
+ public RunningAggregateRuntimeFactory(int[] outColumns, IRunningAggregateFunctionFactory[] runningAggregates,
+ int[] projectionList) {
+ super(projectionList);
+ this.outColumns = outColumns;
+ this.runningAggregates = runningAggregates;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("running-aggregate [");
+ for (int i = 0; i < outColumns.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(outColumns[i]);
+ }
+ sb.append("] := [");
+ for (int i = 0; i < runningAggregates.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(runningAggregates[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ throws AlgebricksException {
+ final int[] projectionToOutColumns = new int[projectionList.length];
+ for (int j = 0; j < projectionList.length; j++) {
+ projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private ArrayBackedValueStorage evalOutput = new ArrayBackedValueStorage();
+ private IRunningAggregateFunction[] raggs = new IRunningAggregateFunction[runningAggregates.length];
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ initAccessAppendRef(context);
+ if (first) {
+ first = false;
+ int n = runningAggregates.length;
+ for (int i = 0; i < n; i++) {
+ try {
+ raggs[i] = runningAggregates[i].createRunningAggregateFunction(evalOutput);
+ raggs[i].init();
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ produceTuple(tupleBuilder, tAccess, t, tRef);
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ }
+
+ private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ FrameTupleReference tupleRef) throws HyracksDataException {
+ tb.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int k = projectionToOutColumns[f];
+ if (k >= 0) {
+ evalOutput.reset();
+ try {
+ raggs[k].step(tupleRef);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ tb.addField(evalOutput.getBytes(), evalOutput.getStartIndex(), evalOutput.getLength());
+ } else {
+ tb.addField(accessor, tIndex, projectionList[f]);
+ }
+ }
+ }
+
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
new file mode 100644
index 0000000..56a3f0e
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
+
+ private final RuntimeContext context;
+ private final PrintStream printStream;
+ private final IAWriter writer;
+ private RecordDescriptor inputRecordDesc;
+ private FrameTupleAccessor tAccess;
+ private boolean autoClose = false;
+ private boolean first = true;
+
+ public SinkWriterRuntime(IAWriter writer, RuntimeContext context, PrintStream printStream,
+ RecordDescriptor inputRecordDesc) {
+ this.writer = writer;
+ this.context = context;
+ this.printStream = printStream;
+ this.inputRecordDesc = inputRecordDesc;
+ this.tAccess = new FrameTupleAccessor(context.getHyracksContext().getFrameSize(), inputRecordDesc);
+ }
+
+ public SinkWriterRuntime(IAWriter writer, RuntimeContext context, PrintStream printStream,
+ RecordDescriptor inputRecordDesc, boolean autoClose) {
+ this(writer, context, printStream, inputRecordDesc);
+ this.autoClose = autoClose;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ tAccess = new FrameTupleAccessor(context.getHyracksContext().getFrameSize(), inputRecordDesc);
+ try {
+ writer.init();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ try {
+ writer.printTuple(tAccess, t);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (autoClose) {
+ printStream.close();
+ }
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.inputRecordDesc = recordDescriptor;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
new file mode 100644
index 0000000..0641b0c
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SinkWriterRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] fields;
+ private final IPrinterFactory[] printerFactories;
+ private final File outputFile;
+ private final RecordDescriptor inputRecordDesc;
+ private final IAWriterFactory writerFactory;
+
+ public SinkWriterRuntimeFactory(int[] fields, IPrinterFactory[] printerFactories, File outputFile,
+ IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc) {
+ this.fields = fields;
+ this.printerFactories = printerFactories;
+ this.outputFile = outputFile;
+ this.writerFactory = writerFactory;
+ this.inputRecordDesc = inputRecordDesc;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("sink-write " + "[");
+ for (int i = 0; i < fields.length; i++) {
+ if (i > 0) {
+ buf.append("; ");
+ }
+ buf.append(fields[i]);
+ }
+ buf.append("] outputFile");
+ return buf.toString();
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(RuntimeContext context) throws AlgebricksException {
+ PrintStream filePrintStream = null;
+ try {
+ filePrintStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
+ } catch (FileNotFoundException e) {
+ throw new AlgebricksException(e);
+ }
+ IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc);
+ return new SinkWriterRuntime(w, context, filePrintStream, inputRecordDesc, true);
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
new file mode 100644
index 0000000..8a12840
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
@@ -0,0 +1,98 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+
+public class StreamDieRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IEvaluatorFactory aftterObjectsEvalFactory;
+ private IBinaryIntegerInspector binaryIntegerInspector;
+
+ public StreamDieRuntimeFactory(IEvaluatorFactory maxObjectsEvalFactory, int[] projectionList,
+ IBinaryIntegerInspector binaryIntegerInspector) {
+ super(projectionList);
+ this.aftterObjectsEvalFactory = maxObjectsEvalFactory;
+ this.binaryIntegerInspector = binaryIntegerInspector;
+ }
+
+ @Override
+ public String toString() {
+ String s = "stream-die " + aftterObjectsEvalFactory.toString();
+ return s;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private IEvaluator evalAfterObjects;
+ private ArrayBackedValueStorage evalOutput;
+ private int toWrite = -1;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (evalAfterObjects == null) {
+ initAccessAppendRef(context);
+ evalOutput = new ArrayBackedValueStorage();
+ try {
+ evalAfterObjects = aftterObjectsEvalFactory.createEvaluator(evalOutput);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ if (toWrite < 0) {
+ toWrite = evaluateInteger(evalAfterObjects, 0);
+ }
+ for (int t = 0; t < nTuple; t++) {
+ if (toWrite > 0) {
+ toWrite--;
+ if (projectionList != null) {
+ appendProjectionToFrame(t, projectionList);
+ } else {
+ appendTupleToFrame(t);
+ }
+ } else {
+ throw new HyracksDataException("injected failure");
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ super.close();
+ }
+
+ private int evaluateInteger(IEvaluator eval, int tIdx) throws HyracksDataException {
+ tRef.reset(tAccess, tIdx);
+ evalOutput.reset();
+ try {
+ eval.evaluate(tRef);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ int lim = binaryIntegerInspector.getIntegerValue(evalOutput.getBytes(), 0, evalOutput.getLength());
+ return lim;
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
new file mode 100644
index 0000000..5a23b98
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -0,0 +1,135 @@
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+
+public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IEvaluatorFactory maxObjectsEvalFactory;
+ private IEvaluatorFactory offsetEvalFactory;
+ private IBinaryIntegerInspector binaryIntegerInspector;
+
+ public StreamLimitRuntimeFactory(IEvaluatorFactory maxObjectsEvalFactory, IEvaluatorFactory offsetEvalFactory,
+ int[] projectionList, IBinaryIntegerInspector binaryIntegerInspector) {
+ super(projectionList);
+ this.maxObjectsEvalFactory = maxObjectsEvalFactory;
+ this.offsetEvalFactory = offsetEvalFactory;
+ this.binaryIntegerInspector = binaryIntegerInspector;
+ }
+
+ @Override
+ public String toString() {
+ String s = "stream-limit " + maxObjectsEvalFactory.toString();
+ if (offsetEvalFactory != null) {
+ return s + ", " + offsetEvalFactory.toString();
+ } else {
+ return s;
+ }
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private IEvaluator evalMaxObjects;
+ private ArrayBackedValueStorage evalOutput;
+ private IEvaluator evalOffset = null;
+ private int toWrite = 0; // how many tuples still to write
+ private int toSkip = 0; // how many tuples still to skip
+ private boolean firstTuple = true;
+ private boolean afterLastTuple = false;
+
+ @Override
+ public void open() throws HyracksDataException {
+ // if (first) {
+ if (evalMaxObjects == null) {
+ initAccessAppendRef(context);
+ evalOutput = new ArrayBackedValueStorage();
+ try {
+ evalMaxObjects = maxObjectsEvalFactory.createEvaluator(evalOutput);
+ if (offsetEvalFactory != null) {
+ evalOffset = offsetEvalFactory.createEvaluator(evalOutput);
+ }
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ writer.open();
+ afterLastTuple = false;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (afterLastTuple) {
+ // ignore the data
+ return;
+ }
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ int start = 0;
+ if (nTuple <= toSkip) {
+ toSkip -= nTuple;
+ return;
+ } else if (toSkip > 0) {
+ start = toSkip;
+ toSkip = 0;
+ }
+ for (int t = start; t < nTuple; t++) {
+ if (firstTuple) {
+ firstTuple = false;
+ toWrite = evaluateInteger(evalMaxObjects, t);
+ if (evalOffset != null) {
+ toSkip = evaluateInteger(evalOffset, t);
+ }
+ }
+ if (toSkip > 0) {
+ toSkip--;
+ } else if (toWrite > 0) {
+ toWrite--;
+ if (projectionList != null) {
+ appendProjectionToFrame(t, projectionList);
+ } else {
+ appendTupleToFrame(t);
+ }
+ } else {
+ // close();
+ afterLastTuple = true;
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // if (!afterLastTuple) {
+ super.close();
+ // }
+ }
+
+ private int evaluateInteger(IEvaluator eval, int tIdx) throws HyracksDataException {
+ tRef.reset(tAccess, tIdx);
+ evalOutput.reset();
+ try {
+ eval.evaluate(tRef);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ int lim = binaryIntegerInspector.getIntegerValue(evalOutput.getBytes(), 0, evalOutput.getLength());
+ return lim;
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
new file mode 100644
index 0000000..e452059
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public StreamProjectRuntimeFactory(int[] projectionList) {
+ super(projectionList);
+ }
+
+ @Override
+ public String toString() {
+ return "stream-project " + Arrays.toString(projectionList);
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ throws AlgebricksException {
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ initAccessAppend(context);
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
+
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
new file mode 100644
index 0000000..1e4f4cc
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+
+public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private IEvaluatorFactory cond;
+
+ private IBinaryBooleanInspector binaryBooleanInspector;
+
+ /**
+ * @param cond
+ * @param projectionList
+ * if projectionList is null, then no projection is performed
+ */
+ public StreamSelectRuntimeFactory(IEvaluatorFactory cond, int[] projectionList,
+ IBinaryBooleanInspector binaryBooleanInspector) {
+ super(projectionList);
+ this.cond = cond;
+ this.binaryBooleanInspector = binaryBooleanInspector;
+ }
+
+ @Override
+ public String toString() {
+ return "stream-select " + cond.toString();
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context) {
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private IEvaluator eval;
+ private ArrayBackedValueStorage evalOutput;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (eval == null) {
+ initAccessAppendRef(context);
+ evalOutput = new ArrayBackedValueStorage();
+ try {
+ eval = cond.createEvaluator(evalOutput);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ evalOutput.reset();
+ try {
+ eval.evaluate(tRef);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ if (binaryBooleanInspector.getBooleanValue(evalOutput.getBytes(), 0, evalOutput.getLength())) {
+ if (projectionList != null) {
+ appendProjectionToFrame(t, projectionList);
+ } else {
+ appendTupleToFrame(t);
+ }
+ }
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
new file mode 100644
index 0000000..cfc21ad
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IPrinter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private String command;
+ private IPrinterFactory[] printerFactories;
+ private char fieldDelimiter;
+ private ITupleParserFactory parserFactory;
+
+ public StringStreamingRuntimeFactory(String command, IPrinterFactory[] printerFactories, char fieldDelimiter,
+ ITupleParserFactory parserFactory) {
+ super(null);
+ this.command = command;
+ this.printerFactories = printerFactories;
+ this.fieldDelimiter = fieldDelimiter;
+ this.parserFactory = parserFactory;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ throws AlgebricksException {
+ final IPrinter[] printers = new IPrinter[printerFactories.length];
+ for (int i = 0; i < printerFactories.length; i++) {
+ printers[i] = printerFactories[i].createPrinter();
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ final class ForwardScriptOutput implements Runnable {
+
+ private InputStream inStream;
+ private ITupleParser parser;
+
+ public ForwardScriptOutput(ITupleParser parser, InputStream inStream) {
+ this.parser = parser;
+ this.inStream = inStream;
+ }
+
+ @Override
+ public void run() {
+ try {
+ parser.parse(inStream, writer);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ inStream.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ final class DumpInStreamToPrintStream implements Runnable {
+
+ private BufferedReader reader;
+ private PrintStream printStream;
+
+ public DumpInStreamToPrintStream(InputStream inStream, PrintStream printStream) {
+ this.reader = new BufferedReader(new InputStreamReader(inStream));
+ this.printStream = printStream;
+ }
+
+ @Override
+ public void run() {
+ String s;
+ try {
+ while ((s = reader.readLine()) != null) {
+ printStream.println(s);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ printStream.close();
+ }
+ }
+
+ }
+
+ private Process process;
+ private PrintStream ps;
+ private boolean first = true;
+ private Thread outputPipe;
+ private Thread dumpStderr;
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ first = false;
+ initAccessAppendRef(context);
+ }
+
+ try {
+ ITupleParser parser = parserFactory.createTupleParser(context.getHyracksContext());
+ process = Runtime.getRuntime().exec(command);
+ ps = new PrintStream(process.getOutputStream());
+ ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream());
+ outputPipe = new Thread(fso);
+ outputPipe.start();
+ DumpInStreamToPrintStream disps = new DumpInStreamToPrintStream(process.getErrorStream(),
+ System.err);
+ dumpStderr = new Thread(disps);
+ dumpStderr.start();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ for (int i = 0; i < printers.length; i++) {
+ try {
+ printers[i].print(buffer.array(), tRef.getFieldStart(i), tRef.getFieldLength(i), ps);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ ps.print(fieldDelimiter);
+ if (i == printers.length - 1) {
+ ps.print('\n');
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // first close the printer printing to the process
+ ps.close();
+ int ret = 0;
+ // then wait for the process to finish
+
+ try {
+ ret = process.waitFor();
+ outputPipe.join();
+ dumpStderr.join();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ if (ret != 0) {
+ throw new HyracksDataException("Process exit value: " + ret);
+ }
+ // close the following operator in the chain
+ super.close();
+ }
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
new file mode 100644
index 0000000..e32b023
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.context.RuntimeContext;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+
+public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int outCol;
+ private final IUnnestingFunctionFactory unnestingFactory;
+ private int outColPos;
+ private final boolean outColIsProjected;
+
+ // Each time step() is called on the aggregate, a new value is written in
+ // its output. One byte is written before that value and is neglected.
+ // By convention, if the aggregate function writes nothing, it means it
+ // produced the last value.
+
+ public UnnestRuntimeFactory(int outCol, IUnnestingFunctionFactory unnestingFactory, int[] projectionList) {
+ super(projectionList);
+ this.outCol = outCol;
+ this.unnestingFactory = unnestingFactory;
+ outColPos = -1;
+ for (int f = 0; f < projectionList.length; f++) {
+ if (projectionList[f] == outCol) {
+ outColPos = f;
+ }
+ }
+ outColIsProjected = outColPos >= 0;
+ }
+
+ @Override
+ public String toString() {
+ return "unnest " + outCol + " <- " + unnestingFactory;
+ }
+
+ @Override
+ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final RuntimeContext context)
+ throws AlgebricksException {
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private ArrayBackedValueStorage evalOutput;
+ private IUnnestingFunction agg;
+ private ArrayTupleBuilder tupleBuilder;
+
+ @Override
+ public void open() throws HyracksDataException {
+ initAccessAppendRef(context);
+ evalOutput = new ArrayBackedValueStorage();
+ try {
+ agg = unnestingFactory.createUnnestingFunction(evalOutput);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ try {
+ agg.init(tRef);
+ boolean goon = true;
+ do {
+ tupleBuilder.reset();
+ evalOutput.reset();
+ if (!agg.step()) {
+ goon = false;
+ } else {
+ if (!outColIsProjected) {
+ appendProjectionToFrame(t, projectionList);
+ } else {
+ for (int f = 0; f < outColPos; f++) {
+ tupleBuilder.addField(tAccess, t, f);
+ }
+ tupleBuilder.addField(evalOutput.getBytes(), evalOutput.getStartIndex(),
+ evalOutput.getLength());
+ for (int f = outColPos + 1; f < projectionList.length; f++) {
+ tupleBuilder.addField(tAccess, t, f);
+ }
+ }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ } while (goon);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ }
+ };
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
new file mode 100644
index 0000000..9c53241
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
@@ -0,0 +1,54 @@
+package edu.uci.ics.hyracks.algebricks.runtime.writers;
+
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinter;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class PrinterBasedWriterFactory implements IAWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final PrinterBasedWriterFactory INSTANCE = new PrinterBasedWriterFactory();
+
+ public PrinterBasedWriterFactory() {
+ }
+
+ @Override
+ public IAWriter createWriter(final int[] fields, final PrintStream printStream,
+ final IPrinterFactory[] printerFactories, RecordDescriptor inputRecordDescriptor) {
+ final IPrinter[] printers = new IPrinter[printerFactories.length];
+ for (int i = 0; i < printerFactories.length; i++) {
+ printers[i] = printerFactories[i].createPrinter();
+ }
+
+ return new IAWriter() {
+
+ @Override
+ public void init() throws AlgebricksException {
+ for (int i = 0; i < printers.length; i++) {
+ printers[i].init();
+ }
+ }
+
+ @Override
+ public void printTuple(IFrameTupleAccessor tAccess, int tIdx) throws AlgebricksException {
+ for (int i = 0; i < fields.length; i++) {
+ int fldStart = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength()
+ + tAccess.getFieldStartOffset(tIdx, fields[i]);
+ int fldLen = tAccess.getFieldLength(tIdx, fields[i]);
+ if (i > 0) {
+ printStream.print("; ");
+ }
+ printers[i].print(tAccess.getBuffer().array(), fldStart, fldLen, printStream);
+ }
+ printStream.println();
+ }
+ };
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
new file mode 100644
index 0000000..61f822e
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.hyracks.algebricks.runtime.writers;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SerializedDataWriterFactory implements IAWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAWriter createWriter(final int[] fields, final PrintStream ps, IPrinterFactory[] printerFactories,
+ final RecordDescriptor inputRecordDescriptor) {
+ return new IAWriter() {
+
+ @Override
+ public void init() throws AlgebricksException {
+ // dump the SerializerDeserializers to disk
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(inputRecordDescriptor);
+ baos.writeTo(ps);
+ oos.close();
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void printTuple(IFrameTupleAccessor tAccess, int tIdx) throws AlgebricksException {
+ for (int i = 0; i < fields.length; i++) {
+ int fldStart = tAccess.getTupleStartOffset(tIdx) + tAccess.getFieldSlotsLength()
+ + tAccess.getFieldStartOffset(tIdx, fields[i]);
+ int fldLen = tAccess.getFieldLength(tIdx, fields[i]);
+ ps.write(tAccess.getBuffer().array(), fldStart, fldLen);
+ }
+ }
+ };
+ }
+}