Merged 229:231 from hyracks_online_aggregation branch
git-svn-id: https://hyracks.googlecode.com/svn/trunk@233 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
index 1bdfe4b..c6e6cd3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
@@ -14,9 +14,7 @@
*/
package edu.uci.ics.hyracks.api.exceptions;
-import java.io.IOException;
-
-public class HyracksDataException extends IOException {
+public class HyracksDataException extends HyracksException {
private static final long serialVersionUID = 1L;
public HyracksDataException() {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
index 450708b..43913ce 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
@@ -1,6 +1,8 @@
package edu.uci.ics.hyracks.api.exceptions;
-public class HyracksException extends Exception {
+import java.io.IOException;
+
+public class HyracksException extends IOException {
private static final long serialVersionUID = 1L;
public HyracksException() {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
index c2f207e..008fdb1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -16,10 +16,12 @@
import java.util.UUID;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
public interface IJobLifecycleListener {
- public void notifyJobCreation(UUID jobId, JobSpecification jobSpec);
+ public void notifyJobCreation(UUID jobId, JobSpecification jobSpec) throws HyracksException;
- public void notifyJobStart(UUID jobId);
+ public void notifyJobStart(UUID jobId) throws HyracksException;
- public void notifyJobFinish(UUID jobId);
+ public void notifyJobFinish(UUID jobId) throws HyracksException;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
index 05a75f9..1451c69 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -22,6 +22,8 @@
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.Serializable;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
public class JavaSerializationUtils {
public static byte[] serialize(Serializable jobSpec) throws IOException {
@@ -32,8 +34,14 @@
}
public static Object deserialize(byte[] bytes, ClassLoader classLoader) throws IOException, ClassNotFoundException {
- ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
- return ois.readObject();
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
+ return ois.readObject();
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
}
private static class ClassLoaderObjectInputStream extends ObjectInputStream {
@@ -49,5 +57,33 @@
protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
return Class.forName(desc.getName(), false, classLoader);
}
+
+ @Override
+ protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+ ClassLoader nonPublicLoader = null;
+ boolean hasNonPublicInterface = false;
+
+ // define proxy in class loader of non-public interface(s), if any
+ Class[] classObjs = new Class[interfaces.length];
+ for (int i = 0; i < interfaces.length; i++) {
+ Class cl = Class.forName(interfaces[i], false, classLoader);
+ if ((cl.getModifiers() & Modifier.PUBLIC) == 0) {
+ if (hasNonPublicInterface) {
+ if (nonPublicLoader != cl.getClassLoader()) {
+ throw new IllegalAccessError("conflicting non-public interface class loaders");
+ }
+ } else {
+ nonPublicLoader = cl.getClassLoader();
+ hasNonPublicInterface = true;
+ }
+ }
+ classObjs[i] = cl;
+ }
+ try {
+ return Proxy.getProxyClass(hasNonPublicInterface ? nonPublicLoader : classLoader, classObjs);
+ } catch (IllegalArgumentException e) {
+ throw new ClassNotFoundException(null, e);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 8ced75d..5ca0269 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -59,19 +59,19 @@
jobLifecycleListeners.add(jobLifecycleListener);
}
- public synchronized void notifyJobStart(UUID jobId) {
+ public synchronized void notifyJobStart(UUID jobId) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
l.notifyJobStart(jobId);
}
}
- public synchronized void notifyJobFinish(UUID jobId) {
+ public synchronized void notifyJobFinish(UUID jobId) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
l.notifyJobFinish(jobId);
}
}
- public synchronized void notifyJobCreation(UUID jobId, JobSpecification specification) {
+ public synchronized void notifyJobCreation(UUID jobId, JobSpecification specification) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
l.notifyJobCreation(jobId, specification);
}
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
index 4df524c..c746f34 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.control.cc.job;
import java.io.IOException;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index 2368e3b..3337385 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -49,102 +48,43 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final IAccumulatingAggregator aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc,
recordDescriptors[0]);
final ByteBuffer copyFrame = ctx.getResourceManager().allocateFrame();
- final FrameTupleAccessor inFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
copyFrameAccessor.reset(copyFrame);
ByteBuffer outFrame = ctx.getResourceManager().allocateFrame();
final FrameTupleAppender appender = new FrameTupleAppender(ctx);
appender.reset(outFrame, true);
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private boolean first;
+ private PreclusteredGroupWriter pgw;
@Override
public void open() throws HyracksDataException {
- writer.open();
- first = true;
+ pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator, inRecordDesc, writer);
+ pgw.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inFrameAccessor.reset(buffer);
- int nTuples = inFrameAccessor.getTupleCount();
- for (int i = 0; i < nTuples; ++i) {
- if (first) {
- aggregator.init(inFrameAccessor, i);
- first = false;
- } else {
- if (i == 0) {
- switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1,
- inFrameAccessor, i);
- } else {
- switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
- }
- }
- aggregator.accumulate(inFrameAccessor, i);
- }
- FrameUtils.copy(buffer, copyFrame);
- }
-
- private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
- FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
- if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
- writeOutput(prevTupleAccessor, prevTupleIndex);
- aggregator.init(currTupleAccessor, currTupleIndex);
- }
- }
-
- private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
- throws HyracksDataException {
- if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
- FrameUtils.flushFrame(appender.getBuffer(), writer);
- appender.reset(appender.getBuffer(), true);
- if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
- throw new IllegalStateException();
- }
- }
- }
-
- private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
- for (int i = 0; i < comparators.length; ++i) {
- int fIdx = groupFields[i];
- int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
- + a1.getFieldStartOffset(t1Idx, fIdx);
- int l1 = a1.getFieldLength(t1Idx, fIdx);
- int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
- + a2.getFieldStartOffset(t2Idx, fIdx);
- int l2 = a2.getFieldLength(t2Idx, fIdx);
- if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
- return false;
- }
- }
- return true;
+ pgw.nextFrame(buffer);
}
@Override
public void flush() throws HyracksDataException {
- FrameUtils.flushFrame(appender.getBuffer(), writer);
- appender.reset(appender.getBuffer(), true);
+ pgw.flush();
}
@Override
public void close() throws HyracksDataException {
- if (!first) {
- writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(appender.getBuffer(), writer);
- }
- }
- writer.close();
+ pgw.close();
}
};
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
new file mode 100644
index 0000000..93c8b3f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class PreclusteredGroupWriter implements IFrameWriter {
+ private final int[] groupFields;
+ private final IBinaryComparator[] comparators;
+ private final IAccumulatingAggregator aggregator;
+ private final IFrameWriter writer;
+ private final ByteBuffer copyFrame;
+ private final FrameTupleAccessor inFrameAccessor;
+ private final FrameTupleAccessor copyFrameAccessor;
+ private final ByteBuffer outFrame;
+ private final FrameTupleAppender appender;
+ private boolean first;
+
+ public PreclusteredGroupWriter(IHyracksContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+ IAccumulatingAggregator aggregator, RecordDescriptor inRecordDesc, IFrameWriter writer) {
+ this.groupFields = groupFields;
+ this.comparators = comparators;
+ this.aggregator = aggregator;
+ this.writer = writer;
+ copyFrame = ctx.getResourceManager().allocateFrame();
+ inFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
+ copyFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
+ outFrame = ctx.getResourceManager().allocateFrame();
+ appender = new FrameTupleAppender(ctx);
+ appender.reset(outFrame, true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ first = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inFrameAccessor.reset(buffer);
+ int nTuples = inFrameAccessor.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
+ aggregator.init(inFrameAccessor, i);
+ first = false;
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+ } else {
+ switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ }
+ }
+ aggregator.accumulate(inFrameAccessor, i);
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
+
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ writeOutput(prevTupleAccessor, prevTupleIndex);
+ aggregator.init(currTupleAccessor, currTupleIndex);
+ }
+ }
+
+ private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+ throws HyracksDataException {
+ if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ for (int i = 0; i < comparators.length; ++i) {
+ int fIdx = groupFields[i];
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
+ int l1 = a1.getFieldLength(t1Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!first) {
+ writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ }
+ }
+ writer.close();
+ }
+}
\ No newline at end of file