Merged 229:231 from hyracks_online_aggregation branch

git-svn-id: https://hyracks.googlecode.com/svn/trunk@233 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
index 1bdfe4b..c6e6cd3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
@@ -14,9 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.exceptions;
 
-import java.io.IOException;
-
-public class HyracksDataException extends IOException {
+public class HyracksDataException extends HyracksException {
     private static final long serialVersionUID = 1L;
 
     public HyracksDataException() {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
index 450708b..43913ce 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
@@ -1,6 +1,8 @@
 package edu.uci.ics.hyracks.api.exceptions;
 
-public class HyracksException extends Exception {
+import java.io.IOException;
+
+public class HyracksException extends IOException {
     private static final long serialVersionUID = 1L;
 
     public HyracksException() {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
index c2f207e..008fdb1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -16,10 +16,12 @@
 
 import java.util.UUID;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
 public interface IJobLifecycleListener {
-    public void notifyJobCreation(UUID jobId, JobSpecification jobSpec);
+    public void notifyJobCreation(UUID jobId, JobSpecification jobSpec) throws HyracksException;
 
-    public void notifyJobStart(UUID jobId);
+    public void notifyJobStart(UUID jobId) throws HyracksException;
 
-    public void notifyJobFinish(UUID jobId);
+    public void notifyJobFinish(UUID jobId) throws HyracksException;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
index 05a75f9..1451c69 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -22,6 +22,8 @@
 import java.io.ObjectOutputStream;
 import java.io.ObjectStreamClass;
 import java.io.Serializable;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
 
 public class JavaSerializationUtils {
     public static byte[] serialize(Serializable jobSpec) throws IOException {
@@ -32,8 +34,14 @@
     }
 
     public static Object deserialize(byte[] bytes, ClassLoader classLoader) throws IOException, ClassNotFoundException {
-        ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
-        return ois.readObject();
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(classLoader);
+            ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
+            return ois.readObject();
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
     }
 
     private static class ClassLoaderObjectInputStream extends ObjectInputStream {
@@ -49,5 +57,33 @@
         protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
             return Class.forName(desc.getName(), false, classLoader);
         }
+
+        @Override
+        protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+            ClassLoader nonPublicLoader = null;
+            boolean hasNonPublicInterface = false;
+
+            // define proxy in class loader of non-public interface(s), if any
+            Class[] classObjs = new Class[interfaces.length];
+            for (int i = 0; i < interfaces.length; i++) {
+                Class cl = Class.forName(interfaces[i], false, classLoader);
+                if ((cl.getModifiers() & Modifier.PUBLIC) == 0) {
+                    if (hasNonPublicInterface) {
+                        if (nonPublicLoader != cl.getClassLoader()) {
+                            throw new IllegalAccessError("conflicting non-public interface class loaders");
+                        }
+                    } else {
+                        nonPublicLoader = cl.getClassLoader();
+                        hasNonPublicInterface = true;
+                    }
+                }
+                classObjs[i] = cl;
+            }
+            try {
+                return Proxy.getProxyClass(hasNonPublicInterface ? nonPublicLoader : classLoader, classObjs);
+            } catch (IllegalArgumentException e) {
+                throw new ClassNotFoundException(null, e);
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 8ced75d..5ca0269 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -59,19 +59,19 @@
         jobLifecycleListeners.add(jobLifecycleListener);
     }
 
-    public synchronized void notifyJobStart(UUID jobId) {
+    public synchronized void notifyJobStart(UUID jobId) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
             l.notifyJobStart(jobId);
         }
     }
 
-    public synchronized void notifyJobFinish(UUID jobId) {
+    public synchronized void notifyJobFinish(UUID jobId) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
             l.notifyJobFinish(jobId);
         }
     }
 
-    public synchronized void notifyJobCreation(UUID jobId, JobSpecification specification) {
+    public synchronized void notifyJobCreation(UUID jobId, JobSpecification specification) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
             l.notifyJobCreation(jobId, specification);
         }
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
index 4df524c..c746f34 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.hyracks.control.cc.job;
 
 import java.io.IOException;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index 2368e3b..3337385 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
@@ -49,102 +48,43 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+    public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
-        RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+        final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
         final IAccumulatingAggregator aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
                 recordDescriptors[0]);
         final ByteBuffer copyFrame = ctx.getResourceManager().allocateFrame();
-        final FrameTupleAccessor inFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
         final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
         copyFrameAccessor.reset(copyFrame);
         ByteBuffer outFrame = ctx.getResourceManager().allocateFrame();
         final FrameTupleAppender appender = new FrameTupleAppender(ctx);
         appender.reset(outFrame, true);
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-            private boolean first;
+            private PreclusteredGroupWriter pgw;
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
-                first = true;
+                pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc, writer);
+                pgw.open();
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                inFrameAccessor.reset(buffer);
-                int nTuples = inFrameAccessor.getTupleCount();
-                for (int i = 0; i < nTuples; ++i) {
-                    if (first) {
-                        aggregator.init(inFrameAccessor, i);
-                        first = false;
-                    } else {
-                        if (i == 0) {
-                            switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1,
-                                    inFrameAccessor, i);
-                        } else {
-                            switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
-                        }
-                    }
-                    aggregator.accumulate(inFrameAccessor, i);
-                }
-                FrameUtils.copy(buffer, copyFrame);
-            }
-
-            private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
-                    FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
-                if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
-                    writeOutput(prevTupleAccessor, prevTupleIndex);
-                    aggregator.init(currTupleAccessor, currTupleIndex);
-                }
-            }
-
-            private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
-                    throws HyracksDataException {
-                if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
-                    FrameUtils.flushFrame(appender.getBuffer(), writer);
-                    appender.reset(appender.getBuffer(), true);
-                    if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
-                        throw new IllegalStateException();
-                    }
-                }
-            }
-
-            private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
-                for (int i = 0; i < comparators.length; ++i) {
-                    int fIdx = groupFields[i];
-                    int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
-                            + a1.getFieldStartOffset(t1Idx, fIdx);
-                    int l1 = a1.getFieldLength(t1Idx, fIdx);
-                    int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
-                            + a2.getFieldStartOffset(t2Idx, fIdx);
-                    int l2 = a2.getFieldLength(t2Idx, fIdx);
-                    if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
-                        return false;
-                    }
-                }
-                return true;
+                pgw.nextFrame(buffer);
             }
 
             @Override
             public void flush() throws HyracksDataException {
-                FrameUtils.flushFrame(appender.getBuffer(), writer);
-                appender.reset(appender.getBuffer(), true);
+                pgw.flush();
             }
 
             @Override
             public void close() throws HyracksDataException {
-                if (!first) {
-                    writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
-                    if (appender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(appender.getBuffer(), writer);
-                    }
-                }
-                writer.close();
+                pgw.close();
             }
         };
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
new file mode 100644
index 0000000..93c8b3f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class PreclusteredGroupWriter implements IFrameWriter {
+    private final int[] groupFields;
+    private final IBinaryComparator[] comparators;
+    private final IAccumulatingAggregator aggregator;
+    private final IFrameWriter writer;
+    private final ByteBuffer copyFrame;
+    private final FrameTupleAccessor inFrameAccessor;
+    private final FrameTupleAccessor copyFrameAccessor;
+    private final ByteBuffer outFrame;
+    private final FrameTupleAppender appender;
+    private boolean first;
+
+    public PreclusteredGroupWriter(IHyracksContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+            IAccumulatingAggregator aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
+        this.groupFields = groupFields;
+        this.comparators = comparators;
+        this.aggregator = aggregator;
+        this.writer = writer;
+        copyFrame = ctx.getResourceManager().allocateFrame();
+        inFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
+        copyFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
+        copyFrameAccessor.reset(copyFrame);
+        outFrame = ctx.getResourceManager().allocateFrame();
+        appender = new FrameTupleAppender(ctx);
+        appender.reset(outFrame, true);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+        first = true;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inFrameAccessor.reset(buffer);
+        int nTuples = inFrameAccessor.getTupleCount();
+        for (int i = 0; i < nTuples; ++i) {
+            if (first) {
+                aggregator.init(inFrameAccessor, i);
+                first = false;
+            } else {
+                if (i == 0) {
+                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+                } else {
+                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                }
+            }
+            aggregator.accumulate(inFrameAccessor, i);
+        }
+        FrameUtils.copy(buffer, copyFrame);
+    }
+
+    private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+            FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+        if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+            writeOutput(prevTupleAccessor, prevTupleIndex);
+            aggregator.init(currTupleAccessor, currTupleIndex);
+        }
+    }
+
+    private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+            throws HyracksDataException {
+        if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+            FrameUtils.flushFrame(appender.getBuffer(), writer);
+            appender.reset(appender.getBuffer(), true);
+            if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+        for (int i = 0; i < comparators.length; ++i) {
+            int fIdx = groupFields[i];
+            int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+            int l1 = a1.getFieldLength(t1Idx, fIdx);
+            int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+            int l2 = a2.getFieldLength(t2Idx, fIdx);
+            if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        FrameUtils.flushFrame(appender.getBuffer(), writer);
+        appender.reset(appender.getBuffer(), true);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (!first) {
+            writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(appender.getBuffer(), writer);
+            }
+        }
+        writer.close();
+    }
+}
\ No newline at end of file