Merged online_aggregation @186:220
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@221 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index 4ba5869..c739a5e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -14,5 +14,10 @@
*/
package edu.uci.ics.hyracks.api.application;
+import java.io.Serializable;
+
public interface IApplicationContext {
+ public ClassLoader getClassLoader();
+
+ public Serializable getDestributedState();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
index ceae504..efbf0df 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
@@ -18,6 +18,4 @@
public void start() throws Exception;
public void stop() throws Exception;
-
- public void setApplicationContext(IApplicationContext appCtx);
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
new file mode 100644
index 0000000..a431939
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
+
+public interface ICCApplicationContext extends IApplicationContext {
+ public void setDistributedState(Serializable state);
+
+ public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory);
+
+ public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java
new file mode 100644
index 0000000..3acbe5e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.hyracks.api.application;
+
+public interface ICCBootstrap extends IBootstrap {
+ public void setApplicationContext(ICCApplicationContext appCtx);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
new file mode 100644
index 0000000..9e63446
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.Serializable;
+
+public interface INCApplicationContext extends IApplicationContext {
+ public void setDistributedState(Serializable state);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java
new file mode 100644
index 0000000..4aa1ac3
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.hyracks.api.application;
+
+public interface INCBootstrap extends IBootstrap {
+ public void setApplicationContext(INCApplicationContext appCtx);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
index 753079b..779e9ac 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
@@ -14,10 +14,7 @@
*/
package edu.uci.ics.hyracks.api.client;
-import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
import java.util.EnumSet;
import java.util.UUID;
@@ -31,6 +28,7 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
abstract class AbstractHyracksConnection implements IHyracksClientConnection {
private final String ccHost;
@@ -73,14 +71,7 @@
@Override
public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- return hci.createJob(appName, serialize(jobSpec), jobFlags);
- }
-
- private byte[] serialize(JobSpecification jobSpec) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(jobSpec);
- return baos.toByteArray();
+ return hci.createJob(appName, JavaSerializationUtils.serialize(jobSpec), jobFlags);
}
@Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
index 535c925..d90ac60 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.api.context;
+import java.util.UUID;
+
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IResourceManager;
@@ -23,4 +25,6 @@
public int getFrameSize();
public ICounterContext getCounterContext();
+
+ public UUID getJobId();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
index cd8c067..94e5bd3 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
@@ -49,7 +49,7 @@
public void notifyRegistration(IClusterController ccs) throws Exception;
- public void createApplication(String appName, boolean deployHar) throws Exception;
+ public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState) throws Exception;
public void destroyApplication(String appName) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
new file mode 100644
index 0000000..c2f207e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.util.UUID;
+
+public interface IJobLifecycleListener {
+ public void notifyJobCreation(UUID jobId, JobSpecification jobSpec);
+
+ public void notifyJobStart(UUID jobId);
+
+ public void notifyJobFinish(UUID jobId);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSpecificationFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSpecificationFactory.java
new file mode 100644
index 0000000..722aaae
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSpecificationFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IJobSpecificationFactory {
+ public JobSpecification createJobSpecification(byte[] bytes, ICCBootstrap bootstrap, ICCApplicationContext appCtx)
+ throws HyracksException;
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index e724deb..fce9c22 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -46,6 +46,8 @@
private final Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap;
+ private final Map<String, Serializable> properties;
+
public JobSpecification() {
roots = new ArrayList<OperatorDescriptorId>();
opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
@@ -53,6 +55,7 @@
opInputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
opOutputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
connectorOpMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
+ properties = new HashMap<String, Serializable>();
}
public void addRoot(IOperatorDescriptor op) {
@@ -68,6 +71,14 @@
new Pair<IOperatorDescriptor, Integer>(producerOp, producerPort),
new Pair<IOperatorDescriptor, Integer>(consumerOp, consumerPort)));
}
+
+ public void setProperty(String name, Serializable value) {
+ properties.put(name, value);
+ }
+
+ public Serializable getProperty(String name) {
+ return properties.get(name);
+ }
private <T> void extend(List<T> list, int index) {
int n = list.size();
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
new file mode 100644
index 0000000..05a75f9
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
+
+public class JavaSerializationUtils {
+ public static byte[] serialize(Serializable jobSpec) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(jobSpec);
+ return baos.toByteArray();
+ }
+
+ public static Object deserialize(byte[] bytes, ClassLoader classLoader) throws IOException, ClassNotFoundException {
+ ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
+ return ois.readObject();
+ }
+
+ private static class ClassLoaderObjectInputStream extends ObjectInputStream {
+ private ClassLoader classLoader;
+
+ protected ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException,
+ SecurityException {
+ super(in);
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
+ return Class.forName(desc.getName(), false, classLoader);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index e411517..a9fd347 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -24,7 +24,6 @@
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -56,7 +55,6 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
-import org.json.JSONArray;
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
@@ -74,6 +72,8 @@
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.web.handlers.util.IJSONOutputFunction;
import edu.uci.ics.hyracks.control.cc.web.handlers.util.JSONOutputRequestHandler;
@@ -92,7 +92,7 @@
private final Map<String, NodeControllerState> nodeRegistry;
- private final Map<String, ApplicationContext> applications;
+ private final Map<String, CCApplicationContext> applications;
private final ServerContext serverCtx;
@@ -111,7 +111,7 @@
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
- applications = new Hashtable<String, ApplicationContext>();
+ applications = new Hashtable<String, CCApplicationContext>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
ClusterControllerService.class.getName()));
Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
@@ -148,11 +148,19 @@
@Override
public UUID createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- ApplicationContext appCtx = applications.get(appName);
+ CCApplicationContext appCtx = getApplicationContext(appName);
if (appCtx == null) {
throw new HyracksException("No application with id " + appName + " found");
}
- return jobManager.createJob(appName, (JobSpecification) appCtx.deserialize(jobSpec), jobFlags);
+ UUID jobId = UUID.randomUUID();
+ JobSpecification specification = appCtx.createJobSpecification(jobId, jobSpec);
+ jobManager.createJob(jobId, appName, specification, jobFlags);
+ appCtx.notifyJobCreation(jobId, specification);
+ return jobId;
+ }
+
+ public synchronized CCApplicationContext getApplicationContext(String appName) {
+ return applications.get(appName);
}
@Override
@@ -328,7 +336,7 @@
}
String appName = parts[0];
ApplicationContext appCtx;
- appCtx = applications.get(appName);
+ appCtx = getApplicationContext(appName);
if (appCtx != null) {
if (HttpMethods.PUT.equals(request.getMethod())) {
OutputStream os = appCtx.getHarOutputStream();
@@ -384,7 +392,7 @@
if (applications.containsKey(appName)) {
throw new HyracksException("Duplicate application with name: " + appName + " being created.");
}
- ApplicationContext appCtx = new ApplicationContext(serverCtx, appName);
+ CCApplicationContext appCtx = new CCApplicationContext(serverCtx, appName);
applications.put(appName, appCtx);
}
}
@@ -404,8 +412,10 @@
@Override
public void startApplication(final String appName) throws Exception {
- ApplicationContext appCtx = applications.get(appName);
+ ApplicationContext appCtx = getApplicationContext(appName);
+ appCtx.initializeClassPath();
appCtx.initialize();
+ final byte[] serializedDistributedState = JavaSerializationUtils.serialize(appCtx.getDestributedState());
final boolean deployHar = appCtx.containsHar();
RemoteOp<Void>[] ops;
synchronized (this) {
@@ -419,7 +429,7 @@
@Override
public Void execute(INodeController node) throws Exception {
- node.createApplication(appName, deployHar);
+ node.createApplication(appName, deployHar, serializedDistributedState);
return null;
}
});
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
index 1f14b3b..88076c9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
@@ -23,7 +23,8 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
public interface IJobManager {
- public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+ public void createJob(UUID jobId, String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+ throws Exception;
public void start(UUID jobId) throws Exception;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
index 9ed4f00..2eabaf1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
@@ -60,6 +60,7 @@
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
public class JOLJobManagerImpl implements IJobManager {
private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
@@ -302,7 +303,8 @@
try {
Object[] data = t.toArray();
UUID jobId = (UUID) data[0];
- Set<String> ts = (Set<String>) data[1];
+ String appName = (String) data[1];
+ Set<String> ts = (Set<String>) data[2];
ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
.size()];
int i = 0;
@@ -317,6 +319,10 @@
jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
jolRuntime.evaluate();
}
+ CCApplicationContext appCtx = ccs.getApplicationContext(appName);
+ if (appCtx != null) {
+ appCtx.notifyJobFinish(jobId);
+ }
} catch (Exception e) {
}
}
@@ -378,9 +384,8 @@
}
@Override
- public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- final UUID jobId = UUID.randomUUID();
-
+ public void createJob(final UUID jobId, String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+ throws Exception {
final JobPlanBuilder builder = new JobPlanBuilder();
builder.init(jobSpec, jobFlags);
@@ -443,8 +448,6 @@
jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null);
jolRuntime.evaluate();
-
- return jobId;
}
private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
@@ -868,7 +871,7 @@
private static Key PRIMARY_KEY = new Key(0);
@SuppressWarnings("unchecked")
- private static final Class[] SCHEMA = new Class[] { UUID.class, Set.class };
+ private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, Set.class };
public JobCleanUpTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
new file mode 100644
index 0000000..8ced75d
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -0,0 +1,79 @@
+package edu.uci.ics.hyracks.control.cc.application;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.job.DeserializingJobSpecificationFactory;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+ private IJobSpecificationFactory jobSpecFactory;
+
+ private List<IJobLifecycleListener> jobLifecycleListeners;
+
+ public CCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+ super(serverCtx, appName);
+ jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
+ jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
+ }
+
+ @Override
+ protected void start() throws Exception {
+ ((ICCBootstrap) bootstrap).setApplicationContext(this);
+ bootstrap.start();
+ }
+
+ @Override
+ public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory) {
+ this.jobSpecFactory = jobSpecFactory;
+ }
+
+ public JobSpecification createJobSpecification(UUID jobId, byte[] bytes) throws HyracksException {
+ return jobSpecFactory.createJobSpecification(bytes, (ICCBootstrap) bootstrap, this);
+ }
+
+ @Override
+ protected void stop() throws Exception {
+ if (bootstrap != null) {
+ bootstrap.stop();
+ }
+ }
+
+ @Override
+ public void setDistributedState(Serializable state) {
+ this.distributedState = state;
+ }
+
+ @Override
+ public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener) {
+ jobLifecycleListeners.add(jobLifecycleListener);
+ }
+
+ public synchronized void notifyJobStart(UUID jobId) {
+ for (IJobLifecycleListener l : jobLifecycleListeners) {
+ l.notifyJobStart(jobId);
+ }
+ }
+
+ public synchronized void notifyJobFinish(UUID jobId) {
+ for (IJobLifecycleListener l : jobLifecycleListeners) {
+ l.notifyJobFinish(jobId);
+ }
+ }
+
+ public synchronized void notifyJobCreation(UUID jobId, JobSpecification specification) {
+ for (IJobLifecycleListener l : jobLifecycleListeners) {
+ l.notifyJobCreation(jobId, specification);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
new file mode 100644
index 0000000..4df524c
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+
+public class DeserializingJobSpecificationFactory implements IJobSpecificationFactory {
+ public static final IJobSpecificationFactory INSTANCE = new DeserializingJobSpecificationFactory();
+
+ private DeserializingJobSpecificationFactory() {
+ }
+
+ @Override
+ public JobSpecification createJobSpecification(byte[] bytes, ICCBootstrap bootstrap, ICCApplicationContext appCtx)
+ throws HyracksException {
+ try {
+ return (JobSpecification) JavaSerializationUtils.deserialize(bytes, appCtx.getClassLoader());
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ } catch (ClassNotFoundException e) {
+ throw new HyracksException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
index a717128..8174098 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
@@ -324,13 +324,14 @@
NewStats.putAll(SMap);
};
-define(jobcleanup_agg, {UUID, Set});
+define(jobcleanup_agg, {UUID, String, Set});
-jobcleanup_agg(JobId, set<NodeId>) :-
+jobcleanup_agg(JobId, AppName, set<NodeId>) :-
stagestart#insert(JobId, StageNumber, Attempt),
stagefinish(JobId, _, Attempt, _),
attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
+ job(JobId, AppName, _, _, _, _),
notin jobstage(JobId, StageNumber);
-jobcleanup(JobId, NodeIdSet) :-
- jobcleanup_agg(JobId, NodeIdSet);
+jobcleanup(JobId, AppName, NodeIdSet) :-
+ jobcleanup_agg(JobId, AppName, NodeIdSet);
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index c499ac9..6a494aa 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -14,14 +14,13 @@
*/
package edu.uci.ics.hyracks.control.common.application;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
import java.io.OutputStream;
+import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -39,20 +38,22 @@
import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
-public class ApplicationContext implements IApplicationContext {
+public abstract class ApplicationContext implements IApplicationContext {
private static final String APPLICATION_ROOT = "applications";
private static final String CLUSTER_CONTROLLER_BOOTSTRAP_CLASS_KEY = "cc.bootstrap.class";
private static final String NODE_CONTROLLER_BOOTSTRAP_CLASS_KEY = "nc.bootstrap.class";
- private ServerContext serverCtx;
- private final String appName;
- private final File applicationRootDir;
- private ClassLoader classLoader;
- private ApplicationStatus status;
- private Properties deploymentDescriptor;
- private IBootstrap bootstrap;
+ protected ServerContext serverCtx;
+ protected final String appName;
+ protected final File applicationRootDir;
+ protected ClassLoader classLoader;
+ protected ApplicationStatus status;
+ protected Properties deploymentDescriptor;
+ protected IBootstrap bootstrap;
+ protected Serializable distributedState;
public ApplicationContext(ServerContext serverCtx, String appName) throws IOException {
this.serverCtx = serverCtx;
@@ -67,10 +68,7 @@
return appName;
}
- public void initialize() throws Exception {
- if (status != ApplicationStatus.CREATED) {
- throw new IllegalStateException();
- }
+ public void initializeClassPath() throws Exception {
if (expandArchive()) {
File expandedFolder = getExpandedFolder();
List<URL> urls = new ArrayList<URL>();
@@ -83,7 +81,16 @@
});
classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]));
deploymentDescriptor = parseDeploymentDescriptor();
+ } else {
+ classLoader = getClass().getClassLoader();
+ }
+ }
+ public void initialize() throws Exception {
+ if (status != ApplicationStatus.CREATED) {
+ throw new IllegalStateException();
+ }
+ if (expandArchive()) {
String bootstrapClass = null;
switch (serverCtx.getServerType()) {
case CLUSTER_CONTROLLER: {
@@ -97,15 +104,16 @@
}
if (bootstrapClass != null) {
bootstrap = (IBootstrap) classLoader.loadClass(bootstrapClass).newInstance();
- bootstrap.setApplicationContext(this);
- bootstrap.start();
+ start();
}
- } else {
- classLoader = getClass().getClassLoader();
}
status = ApplicationStatus.INITIALIZED;
}
+ protected abstract void start() throws Exception;
+
+ protected abstract void stop() throws Exception;
+
private void findJarFiles(File dir, List<URL> urls) throws MalformedURLException {
for (File f : dir.listFiles()) {
if (f.isDirectory()) {
@@ -161,16 +169,13 @@
public void deinitialize() throws Exception {
status = ApplicationStatus.DEINITIALIZED;
- if (bootstrap != null) {
- bootstrap.stop();
- }
+ stop();
File expandedFolder = getExpandedFolder();
FileUtils.deleteDirectory(expandedFolder);
}
public Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
- ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
- return ois.readObject();
+ return JavaSerializationUtils.deserialize(bytes, classLoader);
}
public OutputStream getHarOutputStream() throws IOException {
@@ -188,4 +193,14 @@
public boolean containsHar() {
return getArchiveFile().exists();
}
+
+ @Override
+ public Serializable getDestributedState() {
+ return distributedState;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ClassLoaderObjectInputStream.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ClassLoaderObjectInputStream.java
deleted file mode 100644
index 1ae29ba..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ClassLoaderObjectInputStream.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package edu.uci.ics.hyracks.control.common.application;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
-
-public class ClassLoaderObjectInputStream extends ObjectInputStream {
- private ClassLoader classLoader;
-
- protected ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException,
- SecurityException {
- super(in);
- this.classLoader = classLoader;
- }
-
- protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
- return Class.forName(desc.getName(), false, classLoader);
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3690ed7..9733d94 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.Serializable;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.rmi.registry.LocateRegistry;
@@ -76,6 +77,7 @@
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
import edu.uci.ics.hyracks.control.nc.comm.ConnectionManager;
import edu.uci.ics.hyracks.control.nc.comm.DemuxDataReceiveListenerFactory;
import edu.uci.ics.hyracks.control.nc.runtime.DelegateHyracksContext;
@@ -107,7 +109,7 @@
private final ServerContext serverCtx;
- private final Map<String, ApplicationContext> applications;
+ private final Map<String, NCApplicationContext> applications;
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
@@ -123,7 +125,7 @@
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
NodeControllerService.class.getName()), id));
- applications = new Hashtable<String, ApplicationContext>();
+ applications = new Hashtable<String, NCApplicationContext>();
}
private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -218,7 +220,8 @@
Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
joblet.setStagelet(stageId, stagelet);
- IHyracksContext stageletContext = new DelegateHyracksContext(ctx, stagelet.getStageletCounterContext());
+ IHyracksContext stageletContext = new DelegateHyracksContext(ctx, jobId,
+ stagelet.getStageletCounterContext());
final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
@@ -328,7 +331,7 @@
final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
- final IHyracksContext stageletContext = new DelegateHyracksContext(ctx,
+ final IHyracksContext stageletContext = new DelegateHyracksContext(ctx, jobId,
stagelet.getStageletCounterContext());
final JobSpecification spec = plan.getJobSpecification();
@@ -554,13 +557,14 @@
}
@Override
- public void createApplication(String appName, boolean deployHar) throws Exception {
- ApplicationContext appCtx;
+ public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+ throws Exception {
+ NCApplicationContext appCtx;
synchronized (applications) {
if (applications.containsKey(appName)) {
throw new HyracksException("Duplicate application with name: " + appName + " being created.");
}
- appCtx = new ApplicationContext(serverCtx, appName);
+ appCtx = new NCApplicationContext(serverCtx, appName);
applications.put(appName, appCtx);
}
if (deployHar) {
@@ -577,6 +581,8 @@
is.close();
}
}
+ appCtx.initializeClassPath();
+ appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
appCtx.initialize();
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
new file mode 100644
index 0000000..c894b4a
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.nc.application;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+ public NCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+ super(serverCtx, appName);
+ }
+
+ @Override
+ public void setDistributedState(Serializable state) {
+ distributedState = state;
+ }
+
+ @Override
+ protected void start() throws Exception {
+ ((INCBootstrap) bootstrap).setApplicationContext(this);
+ bootstrap.start();
+ }
+
+ @Override
+ protected void stop() throws Exception {
+ if (bootstrap != null) {
+ bootstrap.stop();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
index 12c1a76..963c2e7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.nc.runtime;
+import java.util.UUID;
+
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IResourceManager;
@@ -23,9 +25,12 @@
private final ICounterContext counterContext;
- public DelegateHyracksContext(IHyracksContext delegate, ICounterContext counterContext) {
+ private final UUID jobId;
+
+ public DelegateHyracksContext(IHyracksContext delegate, UUID jobId, ICounterContext counterContext) {
this.delegate = delegate;
this.counterContext = counterContext;
+ this.jobId = jobId;
}
@Override
@@ -39,6 +44,11 @@
}
@Override
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ @Override
public ICounterContext getCounterContext() {
return counterContext;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
index 7eddfc3..fb7a01d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.control.nc.runtime;
+import java.util.UUID;
+
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IResourceManager;
@@ -41,4 +43,9 @@
public ICounterContext getCounterContext() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public UUID getJobId() {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
index ed34fcb..0d9e2b4 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -38,4 +39,9 @@
buffer.position(0);
buffer.limit(buffer.capacity());
}
+
+ public static int getAbsoluteFieldStartOffset(IFrameTupleAccessor accessor, int tuple, int field) {
+ return accessor.getTupleStartOffset(tuple) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tuple, field);
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java
new file mode 100644
index 0000000..71a3110
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class HadoopNewPartitionerTuplePartitionComputerFactory<K extends Writable, V extends Writable> extends
+ AbstractClassBasedDelegate<Partitioner<K, V>> implements ITuplePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private final ISerializerDeserializer<K> keyIO;
+ private final ISerializerDeserializer<V> valueIO;
+
+ public HadoopNewPartitionerTuplePartitionComputerFactory(Class<? extends Partitioner<K, V>> klass,
+ ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
+ super(klass);
+ this.keyIO = keyIO;
+ this.valueIO = valueIO;
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, 0);
+ bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+ K key = keyIO.deserialize(dis);
+ int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, 1);
+ bbis.setByteBuffer(accessor.getBuffer(), valueStart);
+ V value = valueIO.deserialize(dis);
+ return instance.getPartition(key, value, nParts);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
index d06d6ff..4efa614 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
@@ -14,7 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.hadoop.data;
-import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.RawComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -23,15 +23,15 @@
public class WritableComparingBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
private static final long serialVersionUID = 1L;
- private Class<? extends WritableComparator> cmpClass;
+ private Class<? extends RawComparator<T>> cmpClass;
- public WritableComparingBinaryComparatorFactory(Class<? extends WritableComparator> cmpClass) {
+ public WritableComparingBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
this.cmpClass = cmpClass;
}
@Override
public IBinaryComparator createBinaryComparator() {
- final WritableComparator instance = ReflectionUtils.createInstance(cmpClass);
+ final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
return new IBinaryComparator() {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
diff --git a/hyracks-dataflow-std/.classpath b/hyracks-dataflow-std/.classpath
index 86f50f4..31cf404 100644
--- a/hyracks-dataflow-std/.classpath
+++ b/hyracks-dataflow-std/.classpath
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry kind="src" path="src/test/java"/>
+ <classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 228d22b..36a49ee 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -15,24 +15,13 @@
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -40,15 +29,10 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String FRAMESORTER = "framesorter";
@@ -104,61 +88,29 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- final FrameSorter frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0]);
- final int maxSortFrames = framesLimit - 1;
+ final ExternalSortRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields,
+ firstKeyNormalizerFactory, comparatorFactories, recordDescriptors[0], framesLimit);
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private LinkedList<File> runs;
-
@Override
public void open() throws HyracksDataException {
- runs = new LinkedList<File>();
- frameSorter.reset();
+ runGen.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (frameSorter.getFrameCount() >= maxSortFrames) {
- flushFramesToRun();
- }
- frameSorter.insertFrame(buffer);
+ runGen.nextFrame(buffer);
}
@Override
public void close() throws HyracksDataException {
- if (frameSorter.getFrameCount() > 0) {
- if (runs.size() <= 0) {
- frameSorter.sortFrames();
- env.set(FRAMESORTER, frameSorter);
- } else {
- flushFramesToRun();
- }
- }
- env.set(RUNS, runs);
- }
-
- private void flushFramesToRun() throws HyracksDataException {
- frameSorter.sortFrames();
- File runFile;
- try {
- runFile = ctx.getResourceManager().createFile(
- ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- RunFileWriter writer = new RunFileWriter(runFile);
- writer.open();
- try {
- frameSorter.flushFrames(writer);
- } finally {
- writer.close();
- }
- frameSorter.reset();
- runs.add(runFile);
+ runGen.close();
+ env.set(FRAMESORTER, runGen.getFrameSorter());
+ env.set(RUNS, runGen.getRuns());
}
@Override
public void flush() throws HyracksDataException {
+ runGen.flush();
}
};
return op;
@@ -176,283 +128,19 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- private List<ByteBuffer> inFrames;
- private ByteBuffer outFrame;
- LinkedList<File> runs;
- private FrameTupleAppender outFrameAppender;
-
@Override
public void initialize() throws HyracksDataException {
- runs = (LinkedList<File>) env.get(RUNS);
- writer.open();
- try {
- if (runs.size() <= 0) {
- FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
- if (frameSorter != null) {
- frameSorter.flushFrames(writer);
- }
- env.set(FRAMESORTER, null);
- } else {
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.getResourceManager().allocateFrame();
- outFrameAppender = new FrameTupleAppender(ctx);
- outFrameAppender.reset(outFrame, true);
- for (int i = 0; i < framesLimit - 1; ++i) {
- inFrames.add(ctx.getResourceManager().allocateFrame());
- }
- int passCount = 0;
- while (runs.size() > 0) {
- passCount++;
- try {
- doPass(runs, passCount);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- }
- } finally {
- writer.close();
- }
+ List<File> runs = (List<File>) env.get(RUNS);
+ FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
+ comparatorFactories, recordDescriptors[0], framesLimit, writer);
+ merger.process();
+ env.set(FRAMESORTER, null);
env.set(RUNS, null);
}
-
- // creates a new run from runs that can fit in memory.
- private void doPass(LinkedList<File> runs, int passCount) throws HyracksDataException, IOException {
- File newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
- if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
- finalPass = true;
- for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
- inFrames.remove(i);
- }
- } else {
- newRun = ctx.getResourceManager().createFile(
- ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
- writer = new RunFileWriter(newRun);
- writer.open();
- }
- try {
- RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
- Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDescriptors[0],
- inFrames.size(), comparator);
- int[] tupleIndexes = new int[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
- tupleIndexes[i] = 0;
- int runIndex = topTuples.peek().getRunid();
- runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
- runCursors[runIndex].open();
- if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
- tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDescriptors[0]);
- tupleAccessors[runIndex].reset(inFrames.get(runIndex));
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- } else {
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- while (!topTuples.areRunsExhausted()) {
- ReferenceEntry top = topTuples.peek();
- int runIndex = top.getRunid();
- FrameTupleAccessor fta = top.getAccessor();
- int tupleIndex = top.getTupleIndex();
-
- if (!outFrameAppender.append(fta, tupleIndex)) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!outFrameAppender.append(fta, tupleIndex)) {
- throw new IllegalStateException();
- }
- }
-
- ++tupleIndexes[runIndex];
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- }
- if (outFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- }
- runs.subList(0, inFrames.size()).clear();
- if (!finalPass) {
- runs.add(0, newRun);
- }
- } finally {
- if (!finalPass) {
- writer.close();
- }
- }
- }
-
- private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
- boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- if (exists) {
- topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors) throws IOException {
- if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
- return false;
- } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
- ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
- if (runCursors[runIndex].nextFrame(buf)) {
- tupleIndexes[runIndex] = 0;
- return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- } else {
- return false;
- }
- } else {
- return true;
- }
- }
-
- private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
- throws HyracksDataException {
- runCursors[index].close();
- runCursors[index] = null;
- tupleAccessor[index] = null;
- }
};
return op;
}
}
-
- private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceEntry>() {
- public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
- int j1 = (Integer) tp1.getTupleIndex();
- int j2 = (Integer) tp2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < sortFields.length; ++f) {
- int fIdx = sortFields[f];
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- }
-
- private class RunFileWriter implements IFrameWriter {
- private final File file;
- private FileChannel channel;
-
- public RunFileWriter(File file) {
- this.file = file;
- }
-
- @Override
- public void open() throws HyracksDataException {
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file, "rw");
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e);
- }
- channel = raf.getChannel();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- int remain = buffer.capacity();
- while (remain > 0) {
- int len;
- try {
- len = channel.write(buffer);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- if (len < 0) {
- throw new HyracksDataException("Error writing data");
- }
- remain -= len;
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (channel != null) {
- try {
- channel.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- @Override
- public void flush() throws HyracksDataException {
- }
- }
-
- public static class RunFileReader implements IFrameReader {
- private final File file;
- private FileChannel channel;
-
- public RunFileReader(File file) throws FileNotFoundException {
- this.file = file;
- }
-
- @Override
- public void open() throws HyracksDataException {
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file, "r");
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e);
- }
- channel = raf.getChannel();
- }
-
- @Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- buffer.clear();
- int remain = buffer.capacity();
- while (remain > 0) {
- int len;
- try {
- len = channel.read(buffer);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- if (len < 0) {
- return false;
- }
- remain -= len;
- }
- return true;
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- channel.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
new file mode 100644
index 0000000..e2a08c2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ExternalSortRunGenerator implements IFrameWriter {
+ private final IHyracksContext ctx;
+ private final FrameSorter frameSorter;
+ private final List<File> runs;
+ private final int maxSortFrames;
+
+ public ExternalSortRunGenerator(IHyracksContext ctx, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, int framesLimit) {
+ this.ctx = ctx;
+ frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc);
+ runs = new LinkedList<File>();
+ maxSortFrames = framesLimit - 1;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ runs.clear();
+ frameSorter.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (frameSorter.getFrameCount() >= maxSortFrames) {
+ flushFramesToRun();
+ }
+ frameSorter.insertFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (frameSorter.getFrameCount() > 0) {
+ if (runs.size() <= 0) {
+ frameSorter.sortFrames();
+ } else {
+ flushFramesToRun();
+ }
+ }
+ }
+
+ private void flushFramesToRun() throws HyracksDataException {
+ frameSorter.sortFrames();
+ File runFile;
+ try {
+ runFile = ctx.getResourceManager().createFile(ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ RunFileWriter writer = new RunFileWriter(runFile);
+ writer.open();
+ try {
+ frameSorter.flushFrames(writer);
+ } finally {
+ writer.close();
+ }
+ frameSorter.reset();
+ runs.add(runFile);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+
+ public FrameSorter getFrameSorter() {
+ return frameSorter;
+ }
+
+ public List<File> getRuns() {
+ return runs;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
new file mode 100644
index 0000000..91114b2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class ExternalSortRunMerger {
+ private final IHyracksContext ctx;
+ private final FrameSorter frameSorter;
+ private final List<File> runs;
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDesc;
+ private final int framesLimit;
+ private final IFrameWriter writer;
+ private List<ByteBuffer> inFrames;
+ private ByteBuffer outFrame;
+ private FrameTupleAppender outFrameAppender;
+
+ public ExternalSortRunMerger(IHyracksContext ctx, FrameSorter frameSorter, List<File> runs, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, int framesLimit,
+ IFrameWriter writer) {
+ this.ctx = ctx;
+ this.frameSorter = frameSorter;
+ this.runs = runs;
+ this.sortFields = sortFields;
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.recordDesc = recordDesc;
+ this.framesLimit = framesLimit;
+ this.writer = writer;
+ }
+
+ public void process(boolean doFinalPass) throws HyracksDataException {
+ if (doFinalPass) {
+ writer.open();
+ }
+ try {
+ if (runs.size() <= 0) {
+ if (frameSorter != null) {
+ frameSorter.flushFrames(writer);
+ }
+ } else {
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.getResourceManager().allocateFrame();
+ outFrameAppender = new FrameTupleAppender(ctx);
+ outFrameAppender.reset(outFrame, true);
+ for (int i = 0; i < framesLimit - 1; ++i) {
+ inFrames.add(ctx.getResourceManager().allocateFrame());
+ }
+ int passCount = 0;
+ while (runs.size() > 0) {
+ passCount++;
+ try {
+ doPass(runs, passCount, doFinalPass);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ } finally {
+ if (doFinalPass) {
+ writer.close();
+ }
+ }
+ }
+
+ public void process() throws HyracksDataException {
+ process(true);
+ }
+
+ // creates a new run from runs that can fit in memory.
+ private void doPass(List<File> runs, int passCount, boolean doFinalPass) throws HyracksDataException, IOException {
+ File newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
+ if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+ if (!doFinalPass) {
+ return;
+ }
+ finalPass = true;
+ for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+ inFrames.remove(i);
+ }
+ } else {
+ newRun = ctx.getResourceManager().createFile(ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
+ writer = new RunFileWriter(newRun);
+ writer.open();
+ }
+ try {
+ RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDesc, inFrames.size(),
+ comparator);
+ int[] tupleIndexes = new int[inFrames.size()];
+ for (int i = 0; i < inFrames.size(); i++) {
+ tupleIndexes[i] = 0;
+ int runIndex = topTuples.peek().getRunid();
+ runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
+ runCursors[runIndex].open();
+ if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
+ tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDesc);
+ tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ } else {
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ while (!topTuples.areRunsExhausted()) {
+ ReferenceEntry top = topTuples.peek();
+ int runIndex = top.getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+ int tupleIndex = top.getTupleIndex();
+
+ if (!outFrameAppender.append(fta, tupleIndex)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ if (!outFrameAppender.append(fta, tupleIndex)) {
+ throw new IllegalStateException();
+ }
+ }
+
+ ++tupleIndexes[runIndex];
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ }
+ if (outFrameAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ }
+ runs.subList(0, inFrames.size()).clear();
+ if (!finalPass) {
+ runs.add(0, newRun);
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
+ boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ if (exists) {
+ topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors) throws IOException {
+ if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+ return false;
+ } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+ ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
+ if (runCursors[runIndex].nextFrame(buf)) {
+ tupleIndexes[runIndex] = 0;
+ return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ runCursors[index].close();
+ runCursors[index] = null;
+ tupleAccessor[index] = null;
+ }
+
+ private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntry>() {
+ public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+ int j1 = tp1.getTupleIndex();
+ int j2 = tp2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < sortFields.length; ++f) {
+ int fIdx = sortFields[f];
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java
new file mode 100644
index 0000000..f31100e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java
@@ -0,0 +1,59 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RunFileReader implements IFrameReader {
+ private final File file;
+ private FileChannel channel;
+
+ public RunFileReader(File file) throws FileNotFoundException {
+ this.file = file;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file, "r");
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ channel = raf.getChannel();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ buffer.clear();
+ int remain = buffer.capacity();
+ while (remain > 0) {
+ int len;
+ try {
+ len = channel.read(buffer);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ if (len < 0) {
+ return false;
+ }
+ remain -= len;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java
new file mode 100644
index 0000000..fd0a03e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RunFileWriter implements IFrameWriter {
+ private final File file;
+ private FileChannel channel;
+
+ public RunFileWriter(File file) {
+ this.file = file;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file, "rw");
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ channel = raf.getChannel();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int remain = buffer.capacity();
+ while (remain > 0) {
+ int len;
+ try {
+ len = channel.write(buffer);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ if (len < 0) {
+ throw new HyracksDataException("Error writing data");
+ }
+ remain -= len;
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (channel != null) {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java
index 4be1349..e87d8bd 100644
--- a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java
+++ b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java
@@ -2,14 +2,14 @@
import java.util.logging.Logger;
-import edu.uci.ics.hyracks.api.application.IApplicationContext;
-import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
-public class NCBootstrap implements IBootstrap {
+public class NCBootstrap implements INCBootstrap {
private static final Logger LOGGER = Logger.getLogger(NCBootstrap.class.getName());
- private IApplicationContext appCtx;
-
+ private INCApplicationContext appCtx;
+
@Override
public void start() throws Exception {
LOGGER.info("Starting NC Bootstrap");
@@ -24,7 +24,7 @@
}
@Override
- public void setApplicationContext(IApplicationContext appCtx) {
+ public void setApplicationContext(INCApplicationContext appCtx) {
this.appCtx = appCtx;
}
-}
+}
\ No newline at end of file