Merged hyracks-next
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@25 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/.classpath b/hyracks-core/.classpath
index 370f43c..2fb3f21 100644
--- a/hyracks-core/.classpath
+++ b/hyracks-core/.classpath
@@ -2,6 +2,7 @@
<classpath>
<classpathentry kind="src" path="src/main/java"/>
<classpathentry kind="src" path="src/test/java"/>
+ <classpathentry kind="src" path="src/main/resources"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-core/pom.xml b/hyracks-core/pom.xml
index 04f81dc..0222956 100644
--- a/hyracks-core/pom.xml
+++ b/hyracks-core/pom.xml
@@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-core</artifactId>
- <version>0.0.4</version>
+ <version>0.0.5-SNAPSHOT</version>
<build>
<plugins>
<plugin>
@@ -129,6 +129,13 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>jol</groupId>
+ <artifactId>jol</artifactId>
+ <version>1.0.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<reporting>
<plugins>
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
index 125c1aa..9e3cbbe 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import java.util.UUID;
public interface IConnectionEntry {
ByteBuffer getReadBuffer();
@@ -32,4 +33,16 @@
void close() throws IOException;
void write(ByteBuffer buffer);
+
+ UUID getJobId();
+
+ UUID getStageId();
+
+ void setJobId(UUID jobId);
+
+ void setStageId(UUID stageId);
+
+ boolean aborted();
+
+ void abort();
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
index 91f921c..7e74aee 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
@@ -18,4 +18,8 @@
public interface IDataReceiveListenerFactory {
public IDataReceiveListener getDataReceiveListener(UUID endpointUUID, IConnectionEntry entry, int senderIndex);
+
+ public UUID getJobId();
+
+ public UUID getStageId();
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
index 858bb65..bbd4b4f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
@@ -27,8 +27,8 @@
}
@Override
- public ConstraintType getConstraintType() {
- return ConstraintType.ABSOLUTE;
+ public LocationConstraintType getConstraintType() {
+ return LocationConstraintType.ABSOLUTE;
}
public String getLocationId() {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
new file mode 100644
index 0000000..edfb1c5
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public class ChoiceLocationConstraint extends LocationConstraint {
+ private static final long serialVersionUID = 1L;
+
+ private LocationConstraint[] choices;
+
+ public ChoiceLocationConstraint(LocationConstraint... choices) {
+ this.choices = choices;
+ }
+
+ public LocationConstraint[] getChoices() {
+ return choices;
+ }
+
+ @Override
+ public LocationConstraintType getConstraintType() {
+ return LocationConstraintType.CHOICE;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java
new file mode 100644
index 0000000..16b7b24
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+import java.util.Arrays;
+
+public class ExplicitPartitionConstraint extends PartitionConstraint {
+ private static final long serialVersionUID = 1L;
+
+ private final LocationConstraint[] locationConstraints;
+
+ public ExplicitPartitionConstraint(LocationConstraint[] locationConstraints) {
+ this.locationConstraints = locationConstraints;
+ }
+
+ public LocationConstraint[] getLocationConstraints() {
+ return locationConstraints;
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.deepToString(locationConstraints);
+ }
+
+ @Override
+ public PartitionConstraintType getPartitionConstraintType() {
+ return PartitionConstraintType.EXPLICIT;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
index e88a1b2..1228b90 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
@@ -19,9 +19,10 @@
public abstract class LocationConstraint implements Serializable {
private static final long serialVersionUID = 1L;
- public enum ConstraintType {
+ public enum LocationConstraintType {
ABSOLUTE,
+ CHOICE
}
- public abstract ConstraintType getConstraintType();
+ public abstract LocationConstraintType getConstraintType();
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
index c825539..82a6f7b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
@@ -15,22 +15,14 @@
package edu.uci.ics.hyracks.api.constraints;
import java.io.Serializable;
-import java.util.Arrays;
-public class PartitionConstraint implements Serializable {
+public abstract class PartitionConstraint implements Serializable {
private static final long serialVersionUID = 1L;
- private final LocationConstraint[] locationConstraints;
+ public abstract PartitionConstraintType getPartitionConstraintType();
- public PartitionConstraint(LocationConstraint[] locationConstraints) {
- this.locationConstraints = locationConstraints;
- }
-
- public LocationConstraint[] getLocationConstraints() {
- return locationConstraints;
- }
-
- public String toString() {
- return Arrays.deepToString(locationConstraints);
+ public enum PartitionConstraintType {
+ EXPLICIT,
+ COUNT
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java
new file mode 100644
index 0000000..b49d880
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public class PartitionCountConstraint extends PartitionConstraint {
+ private static final long serialVersionUID = 1L;
+
+ private final int count;
+
+ public PartitionCountConstraint(int count) {
+ this.count = count;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public PartitionConstraintType getPartitionConstraintType() {
+ return PartitionConstraintType.COUNT;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
index af86093..6461a05 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
@@ -17,7 +17,6 @@
import java.rmi.Remote;
import java.util.EnumSet;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -27,14 +26,25 @@
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
public interface IClusterController extends Remote {
- public void registerNode(INodeController nodeController) throws Exception;
+ public NodeParameters registerNode(INodeController nodeController) throws Exception;
public void unregisterNode(INodeController nodeController) throws Exception;
- public INodeController lookupNode(String id) throws Exception;
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception;
- public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
- throws Exception;
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
+ public void nodeHeartbeat(String id) throws Exception;
+
+ /*
+ * Client Application Control Methods.
+ */
+ public void createApplication(String appName) throws Exception;
+
+ public void startApplication(String appName) throws Exception;
+
+ public void destroyApplication(String appName) throws Exception;
/*
* Client Job Control methods.
@@ -48,6 +58,6 @@
public void start(UUID jobId) throws Exception;
public JobStatistics waitForCompletion(UUID jobId) throws Exception;
-
- public Map<String,INodeController> getRegistry() throws Exception;
+
+ public Map<String, INodeController> getRegistry() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
index c271356..264254e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
@@ -16,28 +16,33 @@
import java.rmi.Remote;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobStage;
import edu.uci.ics.hyracks.config.NCConfig;
public interface INodeController extends Remote {
public String getId() throws Exception;
-
+
public NCConfig getConfiguration() throws Exception;
public NodeCapability getNodeCapability() throws Exception;
- public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, JobStage stage)
- throws Exception;
+ public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception;
- public void initializeJobletPhase2(UUID jobId, JobPlan plan, JobStage stage,
- Map<PortInstanceId, Endpoint> globalPortMap) throws Exception;
+ public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Map<ActivityNodeId, Set<Integer>> tasks,
+ Map<OperatorDescriptorId, Set<Integer>> opPartitions, Map<PortInstanceId, Endpoint> globalPortMap)
+ throws Exception;
- public void commitJobletInitialization(UUID jobId, JobPlan plan, JobStage stage) throws Exception;
+ public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
+
+ public void abortJoblet(UUID jobId, UUID stageId) throws Exception;
public void cleanUpJob(UUID jobId) throws Exception;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java
new file mode 100644
index 0000000..3856fea
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.controller;
+
+import java.io.Serializable;
+
+public class NodeParameters implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private int heartbeatPeriod;
+
+ public int getHeartbeatPeriod() {
+ return heartbeatPeriod;
+ }
+
+ public void setHeartbeatPeriod(int heartbeatPeriod) {
+ this.heartbeatPeriod = heartbeatPeriod;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index c7f3175..86d8118 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -46,4 +46,9 @@
ConnectorDescriptorId other = (ConnectorDescriptorId) obj;
return id.equals(other.id);
}
+
+ @Override
+ public String toString() {
+ return "CDID:" + id;
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
new file mode 100644
index 0000000..c884998
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+public enum Direction {
+ INPUT,
+ OUTPUT,
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 600fd41..b3feef8 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -50,11 +50,15 @@
* Endpoint writer factory.
* @param index
* ordinal index of the data producer partition.
+ * @param nProducerPartitions
+ * Number of partitions of the producing operator.
+ * @param nConsumerPartitions
+ * Number of partitions of the consuming operator.
* @return data writer.
* @throws Exception
*/
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException;
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
/**
* Factory metod to create the receive side reader that reads data from this connector.
@@ -67,11 +71,15 @@
* Connection Demultiplexer
* @param index
* ordinal index of the data consumer partition
+ * @param nProducerPartitions
+ * Number of partitions of the producing operator.
+ * @param nConsumerPartitions
+ * Number of partitions of the consuming operator.
* @return data reader
* @throws HyracksDataException
*/
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException;
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
/**
* Translate this connector descriptor to JSON.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index 17a0fc5..ad51370 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -65,21 +65,6 @@
public void setPartitionConstraint(PartitionConstraint partitionConstraint);
/**
- * Returns the final partition locations selected for scheduling. These are decided by Hyracks such that they satisfy the partition constraints.
- *
- * @return array indicating number and node ids of the nodes to schedule the operator runtimes.
- */
- public String[] getPartitions();
-
- /**
- * Sets the partition locations.
- *
- * @param partitions
- * node ids to schedule the operators.
- */
- public void setPartitions(String[] partitions);
-
- /**
* Gets the output record descriptor
*
* @return Array of RecordDescriptor, one per output.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
index d8057d9..844266d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
@@ -17,11 +17,6 @@
import java.io.Serializable;
public final class PortInstanceId implements Serializable {
- public enum Direction {
- INPUT,
- OUTPUT,
- }
-
private static final long serialVersionUID = 1L;
private OperatorDescriptorId odId;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
index 921b239..8784b46 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
@@ -64,8 +64,8 @@
StringBuilder buffer = new StringBuilder();
buffer.append("{\n");
- indent(buffer, 1).append("startTime: '").append(df.format(startTime)).append("',\n");
- indent(buffer, 1).append("endTime: '").append(df.format(endTime)).append("',\n");
+ indent(buffer, 1).append("startTime: '").append(startTime == null ? null : df.format(startTime)).append("',\n");
+ indent(buffer, 1).append("endTime: '").append(endTime == null ? null : df.format(endTime)).append("',\n");
indent(buffer, 1).append("stages: [\n");
boolean first = true;
for (StageStatistics ss : stages) {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java
new file mode 100644
index 0000000..2d76c26
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+import java.io.Serializable;
+
+public interface IResource extends Serializable {
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java
new file mode 100644
index 0000000..3a6f930
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface ISpaceSharedResource {
+
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java
new file mode 100644
index 0000000..1fb0465
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface ITimeSharedResource {
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
index 4d9c309..ea8ede3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -18,10 +18,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.hyracks.api.comm.FrameConstants;
import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
import edu.uci.ics.hyracks.context.HyracksContext;
@@ -41,6 +41,12 @@
private final SelectionKey key;
+ private UUID jobId;
+
+ private UUID stageId;
+
+ private boolean aborted;
+
public ConnectionEntry(HyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
this.socketChannel = socketChannel;
readBuffer = ctx.getResourceManager().allocateFrame();
@@ -55,42 +61,46 @@
}
public boolean dispatch(SelectionKey key) throws IOException {
- if (key.isReadable()) {
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
- }
- int bytesRead = socketChannel.read(readBuffer);
- if (bytesRead < 0) {
- recvListener.eos(this);
- return true;
- }
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
- }
+ if (aborted) {
recvListener.dataReceived(this);
- } else if (key.isWritable()) {
- synchronized (this) {
- writeBuffer.flip();
+ } else {
+ if (key.isReadable()) {
if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
}
- int bytesWritten = socketChannel.write(writeBuffer);
- if (bytesWritten < 0) {
+ int bytesRead = socketChannel.read(readBuffer);
+ if (bytesRead < 0) {
+ recvListener.eos(this);
return true;
}
if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
}
- if (writeBuffer.remaining() <= 0) {
- int ops = key.interestOps();
- key.interestOps(ops & ~SelectionKey.OP_WRITE);
+ recvListener.dataReceived(this);
+ } else if (key.isWritable()) {
+ synchronized (this) {
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ }
+ int bytesWritten = socketChannel.write(writeBuffer);
+ if (bytesWritten < 0) {
+ return true;
+ }
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ }
+ if (writeBuffer.remaining() <= 0) {
+ int ops = key.interestOps();
+ key.interestOps(ops & ~SelectionKey.OP_WRITE);
+ }
+ writeBuffer.compact();
+ notifyAll();
}
- writeBuffer.compact();
- notifyAll();
+ } else {
+ LOGGER.warning("Spurious event triggered: " + key.readyOps());
+ return true;
}
- } else {
- LOGGER.warning("Spurious event triggered: " + key.readyOps());
- return true;
}
return false;
}
@@ -135,12 +145,46 @@
}
@Override
- public void close() throws IOException {
- socketChannel.close();
+ public void close() {
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
@Override
public SelectionKey getSelectionKey() {
return key;
}
+
+ @Override
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public void setJobId(UUID jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public UUID getStageId() {
+ return stageId;
+ }
+
+ @Override
+ public void setStageId(UUID stageId) {
+ this.stageId = stageId;
+ }
+
+ @Override
+ public void abort() {
+ aborted = true;
+ }
+
+ @Override
+ public boolean aborted() {
+ return aborted;
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
index bfca6ad..55e2962 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -26,9 +26,11 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -62,6 +64,8 @@
private final IDataReceiveListener initialDataReceiveListener;
+ private final Set<IConnectionEntry> connections;
+
private volatile boolean stopped;
private ByteBuffer emptyFrame;
@@ -76,7 +80,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Connection manager listening on " + serverSocket.getInetAddress() + ":"
- + serverSocket.getLocalPort());
+ + serverSocket.getLocalPort());
}
pendingConnectionReceivers = new HashMap<UUID, IDataReceiveListenerFactory>();
@@ -85,6 +89,7 @@
initialDataReceiveListener = new InitialDataReceiveListener();
emptyFrame = ctx.getResourceManager().allocateFrame();
emptyFrame.putInt(FrameHelper.getTupleCountOffset(ctx), 0);
+ connections = new HashSet<IConnectionEntry>();
}
public synchronized void dumpStats() {
@@ -116,7 +121,7 @@
public IFrameWriter connect(NetworkAddress address, UUID id, int senderId) throws HyracksDataException {
try {
SocketChannel channel = SocketChannel
- .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
+ .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
byte[] initialFrame = new byte[INITIAL_MESSAGE_LEN];
ByteBuffer buffer = ByteBuffer.wrap(initialFrame);
buffer.clear();
@@ -173,6 +178,18 @@
pendingConnectionReceivers.remove(id);
}
+ public synchronized void abortConnections(UUID jobId, UUID stageId) {
+ List<IConnectionEntry> abortConnections = new ArrayList<IConnectionEntry>();
+ synchronized (this) {
+ for (IConnectionEntry ce : connections) {
+ if (ce.getJobId().equals(jobId) && ce.getStageId().equals(stageId)) {
+ abortConnections.add(ce);
+ }
+ }
+ }
+ dataListenerThread.addPendingAbortConnections(abortConnections);
+ }
+
private final class NetworkFrameWriter implements IFrameWriter {
private SocketChannel channel;
@@ -237,7 +254,8 @@
private final class DataListenerThread extends Thread {
private Selector selector;
- private List<SocketChannel> pendingSockets;
+ private List<SocketChannel> pendingNewSockets;
+ private List<IConnectionEntry> pendingAbortConnections;
public DataListenerThread() {
super("Hyracks Data Listener Thread");
@@ -246,12 +264,17 @@
} catch (IOException e) {
throw new RuntimeException(e);
}
- pendingSockets = new ArrayList<SocketChannel>();
+ pendingNewSockets = new ArrayList<SocketChannel>();
+ pendingAbortConnections = new ArrayList<IConnectionEntry>();
}
synchronized void addSocketChannel(SocketChannel sc) throws IOException {
- LOGGER.info("Connection received");
- pendingSockets.add(sc);
+ pendingNewSockets.add(sc);
+ selector.wakeup();
+ }
+
+ synchronized void addPendingAbortConnections(List<IConnectionEntry> abortConnections) {
+ pendingAbortConnections.addAll(abortConnections);
selector.wakeup();
}
@@ -264,8 +287,8 @@
}
int n = selector.select();
synchronized (this) {
- if (!pendingSockets.isEmpty()) {
- for (SocketChannel sc : pendingSockets) {
+ if (!pendingNewSockets.isEmpty()) {
+ for (SocketChannel sc : pendingNewSockets) {
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
ConnectionEntry entry = new ConnectionEntry(ctx, sc, scKey);
@@ -275,7 +298,20 @@
LOGGER.fine("Woke up selector");
}
}
- pendingSockets.clear();
+ pendingNewSockets.clear();
+ }
+ if (!pendingAbortConnections.isEmpty()) {
+ for (IConnectionEntry ce : pendingAbortConnections) {
+ SelectionKey key = ce.getSelectionKey();
+ ce.abort();
+ ((ConnectionEntry) ce).dispatch(key);
+ key.cancel();
+ ce.close();
+ synchronized (ConnectionManager.this) {
+ connections.remove(ce);
+ }
+ }
+ pendingAbortConnections.clear();
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Selector: " + n);
@@ -295,11 +331,14 @@
if (close) {
key.cancel();
entry.close();
+ synchronized (ConnectionManager.this) {
+ connections.remove(entry);
+ }
}
}
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
@@ -331,6 +370,11 @@
newListener = connectionReceiver.getDataReceiveListener(endpointID, entry, senderId);
entry.setDataReceiveListener(newListener);
+ entry.setJobId(connectionReceiver.getJobId());
+ entry.setStageId(connectionReceiver.getStageId());
+ synchronized (ConnectionManager.this) {
+ connections.add(entry);
+ }
byte[] ack = new byte[4];
ByteBuffer ackBuffer = ByteBuffer.wrap(ack);
ackBuffer.clear();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
index 3eb2280..694e61a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -32,7 +32,7 @@
import edu.uci.ics.hyracks.context.HyracksContext;
public class DemuxDataReceiveListenerFactory implements IDataReceiveListenerFactory, IConnectionDemultiplexer,
- IDataReceiveListener {
+ IDataReceiveListener {
private static final Logger LOGGER = Logger.getLogger(DemuxDataReceiveListenerFactory.class.getName());
private final NonDeterministicFrameReader frameReader;
@@ -40,10 +40,14 @@
private final BitSet readyBits;
private IConnectionEntry senders[];
private int openSenderCount;
+ private UUID jobId;
+ private UUID stageId;
- public DemuxDataReceiveListenerFactory(HyracksContext ctx) {
+ public DemuxDataReceiveListenerFactory(HyracksContext ctx, UUID jobId, UUID stageId) {
frameReader = new NonDeterministicFrameReader(ctx, this);
this.ctx = ctx;
+ this.jobId = jobId;
+ this.stageId = stageId;
readyBits = new BitSet();
senders = null;
openSenderCount = 0;
@@ -66,13 +70,15 @@
ByteBuffer buffer = entry.getReadBuffer();
buffer.flip();
int dataLen = buffer.remaining();
- if (dataLen >= ctx.getFrameSize()) {
+ if (dataLen >= ctx.getFrameSize() || entry.aborted()) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("NonDeterministicDataReceiveListener: frame received: sender = " + senderIndex);
}
SelectionKey key = entry.getSelectionKey();
- int ops = key.interestOps();
- key.interestOps(ops & ~SelectionKey.OP_READ);
+ if (key.isValid()) {
+ int ops = key.interestOps();
+ key.interestOps(ops & ~SelectionKey.OP_READ);
+ }
readyBits.set(senderIndex);
notifyAll();
return;
@@ -139,4 +145,14 @@
public synchronized int getSenderCount() {
return senders.length;
}
+
+ @Override
+ public UUID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public UUID getStageId() {
+ return stageId;
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
index 3f2d449..91350735 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
@@ -55,6 +55,10 @@
}
while (true) {
IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+ if (entry.aborted()) {
+ eos = true;
+ return false;
+ }
lastReadSender = (Integer) entry.getAttachment();
ByteBuffer netBuffer = entry.getReadBuffer();
int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx));
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
index 2ae094e..70f0248 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
@@ -131,7 +131,7 @@
int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
- accessor.getFieldStartOffset(tIndex, fields[i]);
System.arraycopy(accessor.getBuffer().array(), fSrcStart, buffer.array(), tupleDataEndOffset
- + fSrcSlotsLength + fStartOffset, fLen);
+ + fTargetSlotsLength + fStartOffset, fLen);
fEndOffset += fLen;
buffer.putShort(tupleDataEndOffset + i * 2, (short) fEndOffset);
fStartOffset = fEndOffset;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
index 950569d..3d3652a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
@@ -22,4 +22,13 @@
@Option(name = "-http-port", usage = "Sets the http port for the admin console")
public int httpPort;
+
+ @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+ public int heartbeatPeriod = 10000;
+
+ @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+ public int maxHeartbeatLapsePeriods = 5;
+
+ @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
+ public boolean useJOL = false;
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java
deleted file mode 100644
index 0461499..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.controller;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.server.handler.ContextHandler;
-
-import edu.uci.ics.hyracks.api.controller.IClusterController;
-import edu.uci.ics.hyracks.api.controller.INodeController;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.config.CCConfig;
-import edu.uci.ics.hyracks.job.JobManager;
-import edu.uci.ics.hyracks.web.WebServer;
-
-public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
- private static final long serialVersionUID = 1L;
-
- private CCConfig ccConfig;
-
- private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
-
- private final Map<String, INodeController> nodeRegistry;
-
- private WebServer webServer;
-
- private final JobManager jobManager;
-
- private final Executor taskExecutor;
-
- public ClusterControllerService(CCConfig ccConfig) throws Exception {
- this.ccConfig = ccConfig;
- nodeRegistry = new LinkedHashMap<String, INodeController>();
- jobManager = new JobManager(this);
- taskExecutor = Executors.newCachedThreadPool();
- webServer = new WebServer(new Handler[] { getAdminConsoleHandler() });
- }
-
- @Override
- public void start() throws Exception {
- LOGGER.log(Level.INFO, "Starting ClusterControllerService");
- Registry registry = LocateRegistry.createRegistry(ccConfig.port);
- registry.rebind(IClusterController.class.getName(), this);
- webServer.setPort(ccConfig.httpPort);
- webServer.start();
- LOGGER.log(Level.INFO, "Started ClusterControllerService");
- }
-
- @Override
- public void stop() throws Exception {
- LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
- webServer.stop();
- LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
- }
-
- @Override
- public UUID createJob(JobSpecification jobSpec) throws Exception {
- return jobManager.createJob(jobSpec, EnumSet.noneOf(JobFlag.class));
- }
-
- @Override
- public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- return jobManager.createJob(jobSpec, jobFlags);
- }
-
- @Override
- public void registerNode(INodeController nodeController) throws Exception {
- String id = nodeController.getId();
- synchronized (this) {
- if (nodeRegistry.containsKey(id)) {
- throw new Exception("Node with this name already registered.");
- }
- nodeRegistry.put(id, nodeController);
- }
- nodeController.notifyRegistration(this);
- LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
- }
-
- @Override
- public void unregisterNode(INodeController nodeController) throws Exception {
- String id = nodeController.getId();
- synchronized (this) {
- nodeRegistry.remove(id);
- }
- LOGGER.log(Level.INFO, "Unregistered INodeController");
- }
-
- @Override
- public INodeController lookupNode(String id) throws Exception {
- return nodeRegistry.get(id);
- }
-
- public Executor getExecutor() {
- return taskExecutor;
- }
-
- public synchronized void notifyJobComplete(final UUID jobId) {
- for (final INodeController nc : nodeRegistry.values()) {
- taskExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- nc.cleanUpJob(jobId);
- } catch (Exception e) {
- }
- }
-
- });
- }
- }
-
- @Override
- public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
- throws Exception {
- jobManager.notifyStageletComplete(jobId, stageId, nodeId, statistics);
- }
-
- @Override
- public JobStatus getJobStatus(UUID jobId) throws Exception {
- return jobManager.getJobStatus(jobId);
- }
-
- @Override
- public void start(UUID jobId) throws Exception {
- jobManager.start(jobId);
- }
-
- @Override
- public JobStatistics waitForCompletion(UUID jobId) throws Exception {
- return jobManager.waitForCompletion(jobId);
- }
-
- private Handler getAdminConsoleHandler() {
- ContextHandler handler = new ContextHandler("/admin");
- handler.setHandler(new AbstractHandler() {
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
- if (!"/".equals(target)) {
- return;
- }
- response.setContentType("text/html;charset=utf-8");
- response.setStatus(HttpServletResponse.SC_OK);
- baseRequest.setHandled(true);
- PrintWriter writer = response.getWriter();
- writer.println("<html><head><title>Hyracks Admin Console</title></head><body>");
- writer.println("<h1>Hyracks Admin Console</h1>");
- writer.println("<h2>Node Controllers</h2>");
- writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
- synchronized (ClusterControllerService.this) {
- for (Map.Entry<String, INodeController> e : nodeRegistry.entrySet()) {
- try {
- INodeController n = e.getValue();
- writer.print("<tr><td>");
- writer.print(e.getKey());
- writer.print("</td><td>");
- writer.print("</td></tr>");
- } catch (Exception ex) {
- }
- }
- }
- writer.println("</table>");
- writer.println("</body></html>");
- writer.flush();
- }
- });
- return handler;
- }
-
- @Override
- public Map<String, INodeController> getRegistry() throws Exception {
- return nodeRegistry;
- }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java
deleted file mode 100644
index dbdf1c9..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.controller;
-
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.controller.IClusterController;
-import edu.uci.ics.hyracks.api.controller.INodeController;
-import edu.uci.ics.hyracks.api.controller.NodeCapability;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
-import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.comm.ConnectionManager;
-import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
-import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.context.HyracksContext;
-import edu.uci.ics.hyracks.job.Joblet;
-import edu.uci.ics.hyracks.job.Stagelet;
-import edu.uci.ics.hyracks.runtime.OperatorRunnable;
-
-public class NodeControllerService extends AbstractRemoteService implements INodeController {
- private static final long serialVersionUID = 1L;
-
- private NCConfig ncConfig;
-
- private final String id;
-
- private final HyracksContext ctx;
-
- private final NodeCapability nodeCapability;
-
- private final ConnectionManager connectionManager;
-
- private IClusterController ccs;
-
- private Map<UUID, Joblet> jobletMap;
-
- private Executor executor;
-
- public NodeControllerService(NCConfig ncConfig) throws Exception {
- this.ncConfig = ncConfig;
- id = ncConfig.nodeId;
- this.ctx = new HyracksContext(ncConfig.frameSize);
- if (id == null) {
- throw new Exception("id not set");
- }
- nodeCapability = computeNodeCapability();
- connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
- jobletMap = new HashMap<UUID, Joblet>();
- executor = Executors.newCachedThreadPool();
- }
-
- private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
-
- @Override
- public void start() throws Exception {
- LOGGER.log(Level.INFO, "Starting NodeControllerService");
- connectionManager.start();
- Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
- IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
- cc.registerNode(this);
- LOGGER.log(Level.INFO, "Started NodeControllerService");
- }
-
- @Override
- public void stop() throws Exception {
- LOGGER.log(Level.INFO, "Stopping NodeControllerService");
- connectionManager.stop();
- LOGGER.log(Level.INFO, "Stopped NodeControllerService");
- }
-
- @Override
- public String getId() throws Exception {
- return id;
- }
-
- @Override
- public NodeCapability getNodeCapability() throws Exception {
- return nodeCapability;
- }
-
- public ConnectionManager getConnectionManager() {
- return connectionManager;
- }
-
- private static NodeCapability computeNodeCapability() {
- NodeCapability nc = new NodeCapability();
- nc.setCPUCount(Runtime.getRuntime().availableProcessors());
- return nc;
- }
-
- private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
- String ipaddrStr = ncConfig.dataIPAddress;
- ipaddrStr = ipaddrStr.trim();
- Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
- Matcher m = pattern.matcher(ipaddrStr);
- if (!m.matches()) {
- throw new Exception(MessageFormat.format(
- "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
- }
- byte[] ipBytes = new byte[4];
- ipBytes[0] = (byte) Integer.parseInt(m.group(1));
- ipBytes[1] = (byte) Integer.parseInt(m.group(2));
- ipBytes[2] = (byte) Integer.parseInt(m.group(3));
- ipBytes[3] = (byte) Integer.parseInt(m.group(4));
- return InetAddress.getByAddress(ipBytes);
- }
-
- @Override
- public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, JobStage stage)
- throws Exception {
- LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stage.getId()
- + "]: Initializing Joblet Phase 1");
-
- final Joblet joblet = getLocalJoblet(jobId);
-
- Stagelet stagelet = new Stagelet(joblet, stage.getId(), id);
- joblet.setStagelet(stage.getId(), stagelet);
-
- final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
- Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
-
- List<Endpoint> endpointList = new ArrayList<Endpoint>();
-
- for (ActivityNodeId hanId : stage.getTasks()) {
- IActivityNode han = plan.getActivityNodeMap().get(hanId);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Initializing " + hanId + " -> " + han);
- }
- IOperatorDescriptor op = han.getOwner();
- List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
- String[] partitions = op.getPartitions();
- for (int i = 0; i < partitions.length; ++i) {
- String part = partitions[i];
- if (id.equals(part)) {
- IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
- OperatorRunnable or = new OperatorRunnable(ctx, hon);
- stagelet.setOperator(op.getOperatorId(), i, or);
- if (inputs != null) {
- for (int j = 0; j < inputs.size(); ++j) {
- if (j >= 1) {
- throw new IllegalStateException();
- }
- IConnectorDescriptor conn = inputs.get(j);
- Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
- endpointList.add(endpoint);
- DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx);
- connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
- PortInstanceId piId = new PortInstanceId(op.getOperatorId(),
- PortInstanceId.Direction.INPUT, plan.getTaskInputMap().get(hanId).get(j), i);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
- }
- portMap.put(piId, endpoint);
- IFrameReader reader = createReader(conn, drlf, i, plan, stagelet);
- or.setFrameReader(reader);
- }
- }
- honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
- }
- }
- }
-
- stagelet.setEndpointList(endpointList);
-
- return portMap;
- }
-
- private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
- final int receiverIndex, JobPlan plan, final Stagelet stagelet) throws HyracksDataException {
- final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex);
-
- return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
- private int frameCount;
-
- @Override
- public void open() throws HyracksDataException {
- frameCount = 0;
- reader.open();
- }
-
- @Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- boolean status = reader.nextFrame(buffer);
- if (status) {
- ++frameCount;
- }
- return status;
- }
-
- @Override
- public void close() throws HyracksDataException {
- reader.close();
- stagelet.getStatistics().getStatisticsMap().put(
- "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
- String.valueOf(frameCount));
- }
- } : reader;
- }
-
- @Override
- public void initializeJobletPhase2(UUID jobId, final JobPlan plan, JobStage stage,
- final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
- LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stage.getId()
- + "]: Initializing Joblet Phase 2");
- final Joblet ji = getLocalJoblet(jobId);
- Stagelet si = (Stagelet) ji.getStagelet(stage.getId());
- final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
-
- final Stagelet stagelet = (Stagelet) ji.getStagelet(stage.getId());
-
- final JobSpecification spec = plan.getJobSpecification();
-
- for (ActivityNodeId hanId : stage.getTasks()) {
- IActivityNode han = plan.getActivityNodeMap().get(hanId);
- IOperatorDescriptor op = han.getOwner();
- List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
- String[] partitions = op.getPartitions();
- for (int i = 0; i < partitions.length; ++i) {
- String part = partitions[i];
- if (id.equals(part)) {
- OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
- if (outputs != null) {
- for (int j = 0; j < outputs.size(); ++j) {
- final IConnectorDescriptor conn = outputs.get(j);
- final int senderIndex = i;
- IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
- @Override
- public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
- PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
- PortInstanceId.Direction.INPUT, spec.getConsumerInputIndex(conn), index);
- Endpoint ep = globalPortMap.get(piId);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
- }
- return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
- .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
- }
- };
- or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i));
- }
- }
- stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
- }
- }
- }
- }
-
- private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
- final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
- return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
- private int frameCount;
-
- @Override
- public void open() throws HyracksDataException {
- frameCount = 0;
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ++frameCount;
- writer.nextFrame(buffer);
- }
-
- @Override
- public void close() throws HyracksDataException {
- writer.close();
- stagelet.getStatistics().getStatisticsMap().put(
- "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
- String.valueOf(frameCount));
- }
- } : writer;
- }
-
- @Override
- public void commitJobletInitialization(UUID jobId, JobPlan plan, JobStage stage) throws Exception {
- final Joblet ji = getLocalJoblet(jobId);
- Stagelet si = (Stagelet) ji.getStagelet(stage.getId());
- for (Endpoint e : si.getEndpointList()) {
- connectionManager.unacceptConnection(e.getEndpointId());
- }
- si.setEndpointList(null);
- }
-
- private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
- Joblet ji = jobletMap.get(jobId);
- if (ji == null) {
- ji = new Joblet(this, jobId);
- jobletMap.put(jobId, ji);
- }
- return ji;
- }
-
- public Executor getExecutor() {
- return executor;
- }
-
- @Override
- public synchronized void cleanUpJob(UUID jobId) throws Exception {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Cleaning up after job: " + jobId);
- }
- jobletMap.remove(jobId);
- connectionManager.dumpStats();
- }
-
- @Override
- public void startStage(UUID jobId, UUID stageId) throws Exception {
- Joblet ji = jobletMap.get(jobId);
- if (ji != null) {
- Stagelet s = ji.getStagelet(stageId);
- if (s != null) {
- s.start();
- }
- }
- }
-
- public void notifyStageComplete(UUID jobId, UUID stageId, StageletStatistics stats) throws Exception {
- ccs.notifyStageletComplete(jobId, stageId, id, stats);
- }
-
- @Override
- public void notifyRegistration(IClusterController ccs) throws Exception {
- this.ccs = ccs;
- }
-
- @Override
- public NCConfig getConfiguration() throws Exception {
- return ncConfig;
- }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
new file mode 100644
index 0000000..af60240
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -0,0 +1,561 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import jol.core.Runtime;
+import jol.core.Runtime.DebugLevel;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.controller.IClusterController;
+import edu.uci.ics.hyracks.api.controller.INodeController;
+import edu.uci.ics.hyracks.api.controller.NodeParameters;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.config.CCConfig;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
+import edu.uci.ics.hyracks.web.WebServer;
+
+public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
+ private static final long serialVersionUID = 1L;
+
+ private CCConfig ccConfig;
+
+ private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
+
+ private final Map<String, NodeControllerState> nodeRegistry;
+
+ private WebServer webServer;
+
+ private final IJobManager jobManager;
+
+ private final Executor taskExecutor;
+
+ private final Timer timer;
+
+ private Runtime jolRuntime;
+
+ public ClusterControllerService(CCConfig ccConfig) throws Exception {
+ this.ccConfig = ccConfig;
+ nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+ Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
+ jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
+ jobManager = new JOLJobManagerImpl(this, jolRuntime);
+ taskExecutor = Executors.newCachedThreadPool();
+ webServer = new WebServer(new Handler[] {
+ getAdminConsoleHandler(), getApplicationDataHandler()
+ });
+ this.timer = new Timer(true);
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.log(Level.INFO, "Starting ClusterControllerService");
+ Registry registry = LocateRegistry.createRegistry(ccConfig.port);
+ registry.rebind(IClusterController.class.getName(), this);
+ webServer.setPort(ccConfig.httpPort);
+ webServer.start();
+ timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
+ LOGGER.log(Level.INFO, "Started ClusterControllerService");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
+ webServer.stop();
+ LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
+ }
+
+ @Override
+ public UUID createJob(JobSpecification jobSpec) throws Exception {
+ return jobManager.createJob(jobSpec, EnumSet.noneOf(JobFlag.class));
+ }
+
+ @Override
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ return jobManager.createJob(jobSpec, jobFlags);
+ }
+
+ @Override
+ public NodeParameters registerNode(INodeController nodeController) throws Exception {
+ String id = nodeController.getId();
+ NodeControllerState state = new NodeControllerState(nodeController);
+ synchronized (this) {
+ if (nodeRegistry.containsKey(id)) {
+ throw new Exception("Node with this name already registered.");
+ }
+ nodeRegistry.put(id, state);
+ }
+ nodeController.notifyRegistration(this);
+ jobManager.registerNode(id);
+ LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+ NodeParameters params = new NodeParameters();
+ params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
+ return params;
+ }
+
+ @Override
+ public void unregisterNode(INodeController nodeController) throws Exception {
+ String id = nodeController.getId();
+ synchronized (this) {
+ nodeRegistry.remove(id);
+ }
+ LOGGER.log(Level.INFO, "Unregistered INodeController");
+ }
+
+ public synchronized NodeControllerState lookupNode(String id) throws Exception {
+ return nodeRegistry.get(id);
+ }
+
+ public Executor getExecutor() {
+ return taskExecutor;
+ }
+
+ public synchronized void notifyJobComplete(final UUID jobId) {
+ for (final NodeControllerState ns : nodeRegistry.values()) {
+ taskExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ns.getNodeController().cleanUpJob(jobId);
+ } catch (Exception e) {
+ }
+ }
+
+ });
+ }
+ }
+
+ @Override
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception {
+ jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
+ }
+
+ @Override
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+ jobManager.notifyStageletFailure(jobId, stageId, attempt, nodeId);
+ }
+
+ @Override
+ public JobStatus getJobStatus(UUID jobId) throws Exception {
+ return jobManager.getJobStatus(jobId);
+ }
+
+ @Override
+ public void start(UUID jobId) throws Exception {
+ jobManager.start(jobId);
+ }
+
+ @Override
+ public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+ return jobManager.waitForCompletion(jobId);
+ }
+
+ private Handler getAdminConsoleHandler() {
+ ContextHandler handler = new ContextHandler("/admin");
+ handler.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ if (!"/".equals(target)) {
+ return;
+ }
+ response.setContentType("text/html;charset=utf-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ baseRequest.setHandled(true);
+ PrintWriter writer = response.getWriter();
+ writer.println("<html><head><title>Hyracks Admin Console</title></head><body>");
+ writer.println("<h1>Hyracks Admin Console</h1>");
+ writer.println("<h2>Node Controllers</h2>");
+ writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
+ synchronized (ClusterControllerService.this) {
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+ try {
+ writer.print("<tr><td>");
+ writer.print(e.getKey());
+ writer.print("</td><td>");
+ writer.print(e.getValue().getLastHeartbeatDuration());
+ writer.print("</td></tr>");
+ } catch (Exception ex) {
+ }
+ }
+ }
+ writer.println("</table>");
+ writer.println("</body></html>");
+ writer.flush();
+ }
+ });
+ return handler;
+ }
+
+ private Handler getApplicationDataHandler() {
+ ContextHandler handler = new ContextHandler("/applications");
+ handler.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ }
+ });
+ return handler;
+ }
+
+ @Override
+ public Map<String, INodeController> getRegistry() throws Exception {
+ Map<String, INodeController> map = new HashMap<String, INodeController>();
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+ map.put(e.getKey(), e.getValue().getNodeController());
+ }
+ return map;
+ }
+
+ @Override
+ public synchronized void nodeHeartbeat(String id) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Heartbeat from: " + id);
+ }
+ NodeControllerState ns = nodeRegistry.get(id);
+ if (ns != null) {
+ ns.notifyHeartbeat();
+ }
+ }
+
+ private void killNode(String nodeId) throws Exception {
+ nodeRegistry.remove(nodeId);
+ jobManager.notifyNodeFailure(nodeId);
+ }
+
+ private class DeadNodeSweeper extends TimerTask {
+ @Override
+ public void run() {
+ Set<String> deadNodes = new HashSet<String>();
+ synchronized (ClusterControllerService.this) {
+ for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+ NodeControllerState state = e.getValue();
+ if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+ deadNodes.add(e.getKey());
+ }
+ }
+ for (String deadNode : deadNodes) {
+ try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Killing node: " + deadNode);
+ }
+ killNode(deadNode);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ interface RemoteOp<T> {
+ public String getNodeId();
+
+ public T execute(INodeController node) throws Exception;
+ }
+
+ interface Accumulator<T, R> {
+ public void accumulate(T o);
+
+ public R getResult();
+ }
+
+ <T, R> R runRemote(final RemoteOp<T>[] remoteOps, final Accumulator<T, R> accumulator) throws Exception {
+ final Semaphore installComplete = new Semaphore(remoteOps.length);
+ final List<Exception> errors = new Vector<Exception>();
+ for (final RemoteOp<T> remoteOp : remoteOps) {
+ NodeControllerState nodeState = lookupNode(remoteOp.getNodeId());
+ final INodeController node = nodeState.getNodeController();
+
+ installComplete.acquire();
+ Runnable remoteRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ T t = remoteOp.execute(node);
+ if (accumulator != null) {
+ synchronized (accumulator) {
+ accumulator.accumulate(t);
+ }
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ installComplete.release();
+ }
+ }
+ };
+
+ getExecutor().execute(remoteRunner);
+ }
+ installComplete.acquire(remoteOps.length);
+ if (!errors.isEmpty()) {
+ throw errors.get(0);
+ }
+ return accumulator == null ? null : accumulator.getResult();
+ }
+
+ static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
+ private String nodeId;
+ private UUID jobId;
+ private JobPlan plan;
+ private UUID stageId;
+ private int attempt;
+ private Map<ActivityNodeId, Set<Integer>> tasks;
+ private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
+
+ public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.plan = plan;
+ this.stageId = stageId;
+ this.attempt = attempt;
+ this.tasks = tasks;
+ this.opPartitions = opPartitions;
+ }
+
+ @Override
+ public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
+ return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks, opPartitions);
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 1";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class Phase2Installer implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private JobPlan plan;
+ private UUID stageId;
+ private Map<ActivityNodeId, Set<Integer>> tasks;
+ private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
+ private Map<PortInstanceId, Endpoint> globalPortMap;
+
+ public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+ Map<PortInstanceId, Endpoint> globalPortMap) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.plan = plan;
+ this.stageId = stageId;
+ this.tasks = tasks;
+ this.opPartitions = opPartitions;
+ this.globalPortMap = globalPortMap;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.initializeJobletPhase2(jobId, plan, stageId, tasks, opPartitions, globalPortMap);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 2";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class Phase3Installer implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public Phase3Installer(String nodeId, UUID jobId, UUID stageId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.commitJobletInitialization(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 3";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class StageStarter implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public StageStarter(String nodeId, UUID jobId, UUID stageId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.startStage(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Started Stage: " + stageId;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class JobletAborter implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public JobletAborter(String nodeId, UUID jobId, UUID stageId, int attempt) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.abortJoblet(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Aborting";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class JobCompleteNotifier implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+
+ public JobCompleteNotifier(String nodeId, UUID jobId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.cleanUpJob(jobId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Cleaning Up";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class PortMapMergingAccumulator implements
+ Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
+ Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+
+ @Override
+ public void accumulate(Map<PortInstanceId, Endpoint> o) {
+ portMap.putAll(o);
+ }
+
+ @Override
+ public Map<PortInstanceId, Endpoint> getResult() {
+ return portMap;
+ }
+ }
+
+ @Override
+ public void createApplication(String appName) throws Exception {
+
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+
+ }
+
+ @Override
+ public void startApplication(String appName) throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
new file mode 100644
index 0000000..e364423
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public interface IJobManager {
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ public void start(UUID jobId) throws Exception;
+
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception;
+
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
+ public JobStatus getJobStatus(UUID jobId);
+
+ public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+
+ public void notifyNodeFailure(String nodeId) throws Exception;
+
+ public void registerNode(String nodeId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
new file mode 100644
index 0000000..9d7862a
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -0,0 +1,1089 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+import jol.core.Runtime;
+import jol.types.basic.BasicTupleSet;
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+import jol.types.exception.BadKeyException;
+import jol.types.exception.UpdateException;
+import jol.types.table.BasicTable;
+import jol.types.table.EventTable;
+import jol.types.table.Function;
+import jol.types.table.Key;
+import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public class JOLJobManagerImpl implements IJobManager {
+ private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
+
+ public static final String JOL_SCOPE = "hyrackscc";
+
+ private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
+
+ private final Runtime jolRuntime;
+
+ private final LinkedBlockingQueue<Runnable> jobQueue;
+
+ private final JobTable jobTable;
+
+ private final JobQueueThread jobQueueThread;
+
+ private final OperatorDescriptorTable odTable;
+
+ private final OperatorLocationTable olTable;
+
+ private final OperatorCloneCountTable ocTable;
+
+ private final ConnectorDescriptorTable cdTable;
+
+ private final ActivityNodeTable anTable;
+
+ private final ActivityConnectionTable acTable;
+
+ private final ActivityBlockedTable abTable;
+
+ private final JobStartTable jobStartTable;
+
+ private final JobCleanUpTable jobCleanUpTable;
+
+ private final JobCleanUpCompleteTable jobCleanUpCompleteTable;
+
+ private final StartMessageTable startMessageTable;
+
+ private final StageletCompleteTable stageletCompleteTable;
+
+ private final StageletFailureTable stageletFailureTable;
+
+ private final AvailableNodesTable availableNodesTable;
+
+ private final RankedAvailableNodesTable rankedAvailableNodesTable;
+
+ private final FailedNodesTable failedNodesTable;
+
+ private final AbortMessageTable abortMessageTable;
+
+ private final AbortNotifyTable abortNotifyTable;
+
+ private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction;
+
+ private final List<String> rankedAvailableNodes;
+
+ public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
+ this.jolRuntime = jolRuntime;
+ jobQueue = new LinkedBlockingQueue<Runnable>();
+ jobQueueThread = new JobQueueThread();
+ jobQueueThread.start();
+
+ this.jobTable = new JobTable(jolRuntime);
+ this.odTable = new OperatorDescriptorTable(jolRuntime);
+ this.olTable = new OperatorLocationTable(jolRuntime);
+ this.ocTable = new OperatorCloneCountTable(jolRuntime);
+ this.cdTable = new ConnectorDescriptorTable(jolRuntime);
+ this.anTable = new ActivityNodeTable(jolRuntime);
+ this.acTable = new ActivityConnectionTable(jolRuntime);
+ this.abTable = new ActivityBlockedTable(jolRuntime);
+ this.jobStartTable = new JobStartTable();
+ this.jobCleanUpTable = new JobCleanUpTable(jolRuntime);
+ this.jobCleanUpCompleteTable = new JobCleanUpCompleteTable();
+ this.startMessageTable = new StartMessageTable(jolRuntime);
+ this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
+ this.stageletFailureTable = new StageletFailureTable(jolRuntime);
+ this.availableNodesTable = new AvailableNodesTable(jolRuntime);
+ this.rankedAvailableNodesTable = new RankedAvailableNodesTable(jolRuntime);
+ this.failedNodesTable = new FailedNodesTable(jolRuntime);
+ this.abortMessageTable = new AbortMessageTable(jolRuntime);
+ this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
+ this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction();
+ this.rankedAvailableNodes = new ArrayList<String>();
+
+ jolRuntime.catalog().register(jobTable);
+ jolRuntime.catalog().register(odTable);
+ jolRuntime.catalog().register(olTable);
+ jolRuntime.catalog().register(ocTable);
+ jolRuntime.catalog().register(cdTable);
+ jolRuntime.catalog().register(anTable);
+ jolRuntime.catalog().register(acTable);
+ jolRuntime.catalog().register(abTable);
+ jolRuntime.catalog().register(jobStartTable);
+ jolRuntime.catalog().register(jobCleanUpTable);
+ jolRuntime.catalog().register(jobCleanUpCompleteTable);
+ jolRuntime.catalog().register(startMessageTable);
+ jolRuntime.catalog().register(stageletCompleteTable);
+ jolRuntime.catalog().register(stageletFailureTable);
+ jolRuntime.catalog().register(availableNodesTable);
+ jolRuntime.catalog().register(rankedAvailableNodesTable);
+ jolRuntime.catalog().register(failedNodesTable);
+ jolRuntime.catalog().register(abortMessageTable);
+ jolRuntime.catalog().register(abortNotifyTable);
+ jolRuntime.catalog().register(expandPartitionCountConstraintFunction);
+
+ jobTable.register(new JobTable.Callback() {
+ @Override
+ public void deletion(TupleSet arg0) {
+ jobTable.notifyAll();
+ }
+
+ @Override
+ public void insertion(TupleSet arg0) {
+ jobTable.notifyAll();
+ }
+ });
+
+ startMessageTable.register(new StartMessageTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void insertion(TupleSet tuples) {
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ JobPlan plan = (JobPlan) data[3];
+ Set<List> ts = (Set<List>) data[4];
+ Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+ if (opParts == null) {
+ opParts = new HashSet<Integer>();
+ opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+ }
+ opParts.add((Integer) lData[1]);
+ }
+ }
+ ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+ .size()];
+ int i = 0;
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
+ p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
+ plan, stageId, attempt, tasks, opPartitions);
+ }
+ LOGGER.info("Stage start - Phase 1");
+ Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+ new ClusterControllerService.PortMapMergingAccumulator());
+
+ ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+ .size()];
+ ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+ .size()];
+ ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+ .size()];
+ i = 0;
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
+ p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
+ plan, stageId, tasks, opPartitions, globalPortMap);
+ p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+ stageId);
+ ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
+ stageId);
+ ++i;
+ }
+ LOGGER.info("Stage start - Phase 2");
+ ccs.runRemote(p2is, null);
+ LOGGER.info("Stage start - Phase 3");
+ ccs.runRemote(p3is, null);
+ LOGGER.info("Stage start");
+ ccs.runRemote(ss, null);
+ LOGGER.info("Stage started");
+ } catch (Exception e) {
+ }
+ }
+ });
+ }
+ }
+ });
+
+ jobCleanUpTable.register(new JobCleanUpTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void insertion(TupleSet tuples) {
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ Set<String> ts = (Set<String>) data[1];
+ ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+ .size()];
+ int i = 0;
+ for (String n : ts) {
+ jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+ }
+ try {
+ ccs.runRemote(jcns, null);
+ } finally {
+ BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable
+ .createTuple(jobId));
+ jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+ jolRuntime.evaluate();
+ }
+ } catch (Exception e) {
+ }
+ }
+ });
+ }
+ }
+ });
+
+ abortMessageTable.register(new AbortMessageTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void insertion(TupleSet tuples) {
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ Set<List> ts = (Set<List>) data[4];
+ ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+ .size()];
+ int i = 0;
+ BasicTupleSet notificationTuples = new BasicTupleSet();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ String nodeId = (String) t2Data[0];
+ jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId,
+ attempt);
+ notificationTuples.add(AbortNotifyTable
+ .createTuple(jobId, stageId, nodeId, attempt));
+ }
+ try {
+ ccs.runRemote(jas, null);
+ } finally {
+ jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples,
+ null);
+ jolRuntime.evaluate();
+ }
+ } catch (Exception e) {
+ }
+ }
+ });
+ }
+ }
+ });
+
+ jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ final UUID jobId = UUID.randomUUID();
+
+ final JobPlanBuilder builder = new JobPlanBuilder();
+ builder.init(jobSpec, jobFlags);
+
+ final BasicTupleSet anTuples = new BasicTupleSet();
+ final BasicTupleSet acTuples = new BasicTupleSet();
+ final BasicTupleSet abTuples = new BasicTupleSet();
+ IActivityGraphBuilder gBuilder = new IActivityGraphBuilder() {
+ @Override
+ public void addTask(IActivityNode task) {
+ anTuples.add(ActivityNodeTable.createTuple(jobId, task));
+ builder.addTask(task);
+ }
+
+ @Override
+ public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+ acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
+ taskOutputIndex));
+ builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex);
+ }
+
+ @Override
+ public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+ acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
+ taskInputIndex));
+ builder.addSourceEdge(operatorInputIndex, task, taskInputIndex);
+ }
+
+ @Override
+ public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+ abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked));
+ builder.addBlockingEdge(blocker, blocked);
+ }
+ };
+
+ BasicTupleSet odTuples = new BasicTupleSet();
+ BasicTupleSet olTuples = new BasicTupleSet();
+ BasicTupleSet ocTuples = new BasicTupleSet();
+ for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
+ IOperatorDescriptor od = e.getValue();
+ int nPartitions = addPartitionConstraintTuples(jobId, od, olTuples, ocTuples);
+ odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od));
+ od.contributeTaskGraph(gBuilder);
+ }
+
+ BasicTupleSet cdTuples = new BasicTupleSet();
+ for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : jobSpec.getConnectorMap().entrySet()) {
+ cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
+ }
+
+ BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobSpec, builder.getPlan()));
+
+ jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorCloneCountTable.TABLE_NAME, ocTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null);
+
+ jolRuntime.evaluate();
+
+ return jobId;
+ }
+
+ private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
+ BasicTupleSet ocTuples) {
+ PartitionConstraint pc = od.getPartitionConstraint();
+
+ switch (pc.getPartitionConstraintType()) {
+ case COUNT:
+ int count = ((PartitionCountConstraint) pc).getCount();
+ ocTuples.add(OperatorCloneCountTable.createTuple(jobId, od.getOperatorId(), count));
+ return count;
+
+ case EXPLICIT:
+ LocationConstraint[] locationConstraints = ((ExplicitPartitionConstraint) pc).getLocationConstraints();
+ for (int i = 0; i < locationConstraints.length; ++i) {
+ addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i], 0);
+ }
+ return locationConstraints.length;
+ }
+ throw new IllegalArgumentException();
+ }
+
+ private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
+ LocationConstraint locationConstraint, int benefit) {
+ switch (locationConstraint.getConstraintType()) {
+ case ABSOLUTE:
+ String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
+ olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i, benefit));
+ break;
+
+ case CHOICE:
+ int index = 0;
+ for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) {
+ addLocationConstraintTuple(olTuples, jobId, opId, i, lc, benefit - index);
+ index++;
+ }
+ }
+ }
+
+ @Override
+ public JobStatus getJobStatus(UUID jobId) {
+ synchronized (jobTable) {
+ try {
+ Tuple jobTuple = jobTable.lookupJob(jobId);
+ if (jobTuple == null) {
+ return null;
+ }
+ return (JobStatus) jobTuple.value(1);
+ } catch (BadKeyException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void notifyNodeFailure(String nodeId) throws Exception {
+ int len = rankedAvailableNodes.size();
+ int delIndex = -1;
+ for (int i = 0; i < len; ++i) {
+ if (nodeId.equals(rankedAvailableNodes.get(i))) {
+ delIndex = i;
+ break;
+ }
+ }
+ if (delIndex < 0) {
+ return;
+ }
+ BasicTupleSet delRANTuples = new BasicTupleSet();
+ delRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, delIndex));
+
+ BasicTupleSet insRANTuples = new BasicTupleSet();
+ for (int i = delIndex + 1; i < len; ++i) {
+ insRANTuples.add(RankedAvailableNodesTable.createTuple(rankedAvailableNodes.get(i), i - 1));
+ }
+
+ rankedAvailableNodes.remove(delIndex);
+
+ jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, delRANTuples);
+
+ BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples);
+
+ jolRuntime.evaluate();
+
+ BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ StageletStatistics statistics) throws Exception {
+ BasicTupleSet scTuples = new BasicTupleSet();
+ scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
+
+ jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+ BasicTupleSet sfTuples = new BasicTupleSet();
+ sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
+
+ jolRuntime.schedule(JOL_SCOPE, StageletFailureTable.TABLE_NAME, sfTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public void start(UUID jobId) throws Exception {
+ BasicTupleSet jsTuples = new BasicTupleSet();
+ jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
+
+ jolRuntime.schedule(JOL_SCOPE, JobStartTable.TABLE_NAME, jsTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public synchronized void registerNode(String nodeId) throws Exception {
+ rankedAvailableNodes.add(nodeId);
+ BasicTupleSet insRANTuples = new BasicTupleSet();
+ insRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, rankedAvailableNodes.size() - 1));
+
+ jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, null);
+
+ BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
+
+ jolRuntime.evaluate();
+
+ BasicTupleSet unfailedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, null, unfailedTuples);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
+ public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+ synchronized (jobTable) {
+ Tuple jobTuple = null;
+ while ((jobTuple = jobTable.lookupJob(jobId)) != null && jobTuple.value(1) != JobStatus.TERMINATED) {
+ jobTable.wait();
+ }
+ return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
+ }
+ }
+
+ /*
+ * declare(job, keys(0), {JobId, Status, JobSpec, JobPlan})
+ */
+ private static class JobTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class, Set.class
+ };
+
+ public JobTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ @SuppressWarnings("unchecked")
+ static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
+ return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
+ }
+
+ @SuppressWarnings("unchecked")
+ JobStatistics buildJobStatistics(Tuple jobTuple) {
+ Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
+ JobStatistics stats = new JobStatistics();
+ if (statsSet != null) {
+ for (Set<StageletStatistics> stageStatsSet : statsSet) {
+ StageStatistics stageStats = new StageStatistics();
+ for (StageletStatistics stageletStats : stageStatsSet) {
+ stageStats.addStageletStatistics(stageletStats);
+ }
+ stats.addStageStatistics(stageStats);
+ }
+ }
+ return stats;
+ }
+
+ Tuple lookupJob(UUID jobId) throws BadKeyException {
+ TupleSet set = primary().lookupByKey(jobId);
+ if (set.isEmpty()) {
+ return null;
+ }
+ return (Tuple) set.toArray()[0];
+ }
+ }
+
+ /*
+ * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+ */
+ private static class OperatorDescriptorTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatordescriptor");
+
+ private static Key PRIMARY_KEY = new Key(0, 1);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, Integer.class, IOperatorDescriptor.class
+ };
+
+ public OperatorDescriptorTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, int nPartitions, IOperatorDescriptor od) {
+ return new Tuple(jobId, od.getOperatorId(), nPartitions, od);
+ }
+ }
+
+ /*
+ * declare(operatorlocation, keys(0, 1), {JobId, ODId, NodeId})
+ */
+ private static class OperatorLocationTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation");
+
+ private static Key PRIMARY_KEY = new Key();
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, String.class, Integer.class, Integer.class
+ };
+
+ public OperatorLocationTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition, int benefit) {
+ return new Tuple(jobId, opId, nodeId, partition, benefit);
+ }
+ }
+
+ /*
+ * declare(operatorclonecount, keys(0, 1), {JobId, ODId, Count})
+ */
+ private static class OperatorCloneCountTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorclonecount");
+
+ private static Key PRIMARY_KEY = new Key();
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, Integer.class
+ };
+
+ public OperatorCloneCountTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, int cloneCount) {
+ return new Tuple(jobId, opId, cloneCount);
+ }
+ }
+
+ /*
+ * declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor})
+ */
+ private static class ConnectorDescriptorTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "connectordescriptor");
+
+ private static Key PRIMARY_KEY = new Key(0, 1);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class,
+ ConnectorDescriptorId.class,
+ OperatorDescriptorId.class,
+ Integer.class,
+ OperatorDescriptorId.class,
+ Integer.class,
+ IConnectorDescriptor.class
+ };
+
+ public ConnectorDescriptorTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, JobSpecification jobSpec, IConnectorDescriptor conn) {
+ IOperatorDescriptor srcOD = jobSpec.getProducer(conn);
+ int srcPort = jobSpec.getProducerOutputIndex(conn);
+ IOperatorDescriptor destOD = jobSpec.getConsumer(conn);
+ int destPort = jobSpec.getConsumerInputIndex(conn);
+ Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort, destOD
+ .getOperatorId(), destPort, conn);
+ return cdTuple;
+ }
+ }
+
+ /*
+ * declare(activitynode, keys(0, 1, 2), {JobId, OperatorId, ActivityId, ActivityNode})
+ */
+ private static class ActivityNodeTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, IActivityNode.class
+ };
+
+ public ActivityNodeTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode aNode) {
+ return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(),
+ aNode);
+ }
+ }
+
+ /*
+ * declare(activityconnection, keys(0, 1, 2, 3), {JobId, OperatorId, Integer, Direction, ActivityNodeId, Integer})
+ */
+ private static class ActivityConnectionTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
+ };
+
+ public ActivityConnectionTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) {
+ return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction, aNode
+ .getActivityNodeId(), activityPort);
+ }
+ }
+
+ /*
+ * declare(activityblocked, keys(0, 1, 2, 3), {JobId, OperatorId, BlockerActivityId, BlockedActivityId})
+ */
+ private static class ActivityBlockedTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class,
+ OperatorDescriptorId.class,
+ ActivityNodeId.class,
+ OperatorDescriptorId.class,
+ ActivityNodeId.class
+ };
+
+ public ActivityBlockedTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) {
+ ActivityNodeId blockerANId = blocker.getActivityNodeId();
+ OperatorDescriptorId blockerODId = blockerANId.getOperatorDescriptorId();
+ ActivityNodeId blockedANId = blocked.getActivityNodeId();
+ OperatorDescriptorId blockedODId = blockedANId.getOperatorDescriptorId();
+ return new Tuple(jobId, blockerODId, blockerANId, blockedODId, blockedANId);
+ }
+ }
+
+ /*
+ * declare(jobstart, keys(0), {JobId, SubmitTime})
+ */
+ private static class JobStartTable extends EventTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, Long.class
+ };
+
+ public JobStartTable() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, long submitTime) {
+ return new Tuple(jobId, submitTime);
+ }
+ }
+
+ /*
+ * declare(startmessage, keys(0, 1), {JobId, StageId, JobPlan, TupleSet})
+ */
+ private static class StartMessageTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
+ };
+
+ public StartMessageTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+ }
+
+ /*
+ * declare(jobcleanup, keys(0), {JobId, Set<NodeId>})
+ */
+ private static class JobCleanUpTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanup");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, Set.class
+ };
+
+ public JobCleanUpTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+ }
+
+ /*
+ * declare(jobcleanupcomplete, keys(0), {JobId})
+ */
+ private static class JobCleanUpCompleteTable extends EventTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete");
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class
+ };
+
+ public JobCleanUpCompleteTable() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId) {
+ return new Tuple(jobId);
+ }
+ }
+
+ /*
+ * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+ */
+ private static class StageletCompleteTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, String.class, Integer.class, StageletStatistics.class
+ };
+
+ public StageletCompleteTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
+ StageletStatistics statistics) {
+ return new Tuple(jobId, stageId, nodeId, attempt, statistics);
+ }
+ }
+
+ /*
+ * declare(stageletfailure, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt})
+ */
+ private static class StageletFailureTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletfailure");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, String.class, Integer.class
+ };
+
+ public StageletFailureTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+ return new Tuple(jobId, stageId, nodeId, attempt);
+ }
+ }
+
+ /*
+ * declare(availablenodes, keys(0), {NodeId})
+ */
+ private static class AvailableNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "availablenodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ String.class
+ };
+
+ public AvailableNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId) {
+ return new Tuple(nodeId);
+ }
+ }
+
+ /*
+ * declare(rankedavailablenodes, keys(0), {NodeId, Integer})
+ */
+ private static class RankedAvailableNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "rankedavailablenodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ String.class, Integer.class
+ };
+
+ public RankedAvailableNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId, int rank) {
+ return new Tuple(nodeId, rank);
+ }
+ }
+
+ /*
+ * declare(failednodes, keys(0), {NodeId})
+ */
+ private static class FailedNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "failednodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ String.class
+ };
+
+ public FailedNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId) {
+ return new Tuple(nodeId);
+ }
+ }
+
+ /*
+ * declare(abortmessage, keys(0, 1), {JobId, StageId, Attempt, JobPlan, TupleSet})
+ */
+ private static class AbortMessageTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortmessage");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
+ };
+
+ public AbortMessageTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+ }
+
+ /*
+ * declare(abortnotify, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+ */
+ private static class AbortNotifyTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortnotify");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, String.class, Integer.class
+ };
+
+ public AbortNotifyTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+ return new Tuple(jobId, stageId, nodeId, attempt);
+ }
+ }
+
+ private static class ExpandPartitionCountConstraintTableFunction extends Function {
+ private static final String TABLE_NAME = "expandpartitioncountconstraint";
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, Integer.class, Integer.class
+ };
+
+ public ExpandPartitionCountConstraintTableFunction() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ @Override
+ public TupleSet insert(TupleSet tuples, TupleSet conflicts) throws UpdateException {
+ TupleSet result = new BasicTupleSet();
+ int counter = 0;
+ for (Tuple t : tuples) {
+ int nPartitions = (Integer) t.value(2);
+ for (int i = 0; i < nPartitions; ++i) {
+ result.add(new Tuple(t.value(0), t.value(1), i, counter++));
+ }
+ }
+ return result;
+ }
+ }
+
+ private class JobQueueThread extends Thread {
+ public JobQueueThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ Runnable r;
+ while (true) {
+ try {
+ r = jobQueue.take();
+ } catch (InterruptedException e) {
+ continue;
+ }
+ try {
+ r.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
new file mode 100644
index 0000000..d748a13
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobPlanBuilder implements IActivityGraphBuilder {
+ private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
+
+ private JobPlan plan;
+
+ @Override
+ public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+ addToValueSet(plan.getBlocker2BlockedMap(), blocker.getActivityNodeId(), blocked.getActivityNodeId());
+ addToValueSet(plan.getBlocked2BlockerMap(), blocked.getActivityNodeId(), blocker.getActivityNodeId());
+ }
+
+ @Override
+ public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
+ + task.getActivityNodeId() + ":" + taskInputIndex);
+ }
+ insertIntoIndexedMap(plan.getTaskInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
+ insertIntoIndexedMap(plan.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex, task
+ .getActivityNodeId());
+ }
+
+ @Override
+ public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
+ + task.getActivityNodeId() + ":" + taskOutputIndex);
+ }
+ insertIntoIndexedMap(plan.getTaskOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
+ insertIntoIndexedMap(plan.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex, task
+ .getActivityNodeId());
+ }
+
+ @Override
+ public void addTask(IActivityNode task) {
+ plan.getActivityNodeMap().put(task.getActivityNodeId(), task);
+ addToValueSet(plan.getOperatorTaskMap(), task.getOwner().getOperatorId(), task.getActivityNodeId());
+ }
+
+ private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
+ Set<V> targets = map.get(n1);
+ if (targets == null) {
+ targets = new HashSet<V>();
+ map.put(n1, targets);
+ }
+ targets.add(n2);
+ }
+
+ private <T> void extend(List<T> list, int index) {
+ int n = list.size();
+ for (int i = n; i <= index; ++i) {
+ list.add(null);
+ }
+ }
+
+ public void init(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+ plan = new JobPlan(jobSpec, jobFlags);
+ }
+
+ private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+ List<V> vList = map.get(key);
+ if (vList == null) {
+ vList = new ArrayList<V>();
+ map.put(key, vList);
+ }
+ extend(vList, index);
+ vList.set(index, value);
+ }
+
+ public JobPlan getPlan() {
+ return plan;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
similarity index 66%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
index 61cf70f..f992de1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
@@ -12,9 +12,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
-import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -25,7 +24,6 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -38,60 +36,8 @@
import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
-public class JobPlanBuilder implements IActivityGraphBuilder {
- private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
-
- private JobPlan plan;
-
- @Override
- public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
- addToValueSet(plan.getBlocker2BlockedMap(), blocker.getActivityNodeId(), blocked.getActivityNodeId());
- addToValueSet(plan.getBlocked2BlockerMap(), blocked.getActivityNodeId(), blocker.getActivityNodeId());
- }
-
- @Override
- public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
- + task.getActivityNodeId() + ":" + taskInputIndex);
- }
- insertIntoIndexedMap(plan.getTaskInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
- insertIntoIndexedMap(plan.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex, task
- .getActivityNodeId());
- }
-
- @Override
- public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
- + task.getActivityNodeId() + ":" + taskOutputIndex);
- }
- insertIntoIndexedMap(plan.getTaskOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
- insertIntoIndexedMap(plan.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex, task
- .getActivityNodeId());
- }
-
- @Override
- public void addTask(IActivityNode task) {
- plan.getActivityNodeMap().put(task.getActivityNodeId(), task);
- addToValueSet(plan.getOperatorTaskMap(), task.getOwner().getOperatorId(), task.getActivityNodeId());
- }
-
- private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
- Set<V> targets = map.get(n1);
- if (targets == null) {
- targets = new HashSet<V>();
- map.put(n1, targets);
- }
- targets.add(n2);
- }
-
- private <T> void extend(List<T> list, int index) {
- int n = list.size();
- for (int i = n; i <= index; ++i) {
- list.add(null);
- }
- }
+public class JobPlanner {
+ private static final Logger LOGGER = Logger.getLogger(JobPlanner.class.getName());
private Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobPlan plan, JobSpecification spec, Set<JobStage> eqSets) {
Map<ActivityNodeId, IActivityNode> activityNodeMap = plan.getActivityNodeMap();
@@ -127,12 +73,11 @@
return null;
}
- private JobStage inferStages() throws Exception {
+ private JobStage inferStages(JobPlan plan) throws Exception {
JobSpecification spec = plan.getJobSpecification();
/*
- * Build initial equivalence sets map. We create a map such that for
- * each IOperatorTask, t -> { t }
+ * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
*/
Map<ActivityNodeId, JobStage> stageMap = new HashMap<ActivityNodeId, JobStage>();
Set<JobStage> stages = new HashSet<JobStage>();
@@ -185,22 +130,8 @@
return endStage;
}
- public void init(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
- plan = new JobPlan(jobSpec, jobFlags);
- }
-
- private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
- List<V> vList = map.get(key);
- if (vList == null) {
- vList = new ArrayList<V>();
- map.put(key, vList);
- }
- extend(vList, index);
- vList.set(index, value);
- }
-
private void merge(Map<ActivityNodeId, JobStage> eqSetMap, Set<JobStage> eqSets, ActivityNodeId t1,
- ActivityNodeId t2) {
+ ActivityNodeId t2) {
JobStage stage1 = eqSetMap.get(t1);
Set<ActivityNodeId> s1 = stage1.getTasks();
JobStage stage2 = eqSetMap.get(t2);
@@ -220,14 +151,17 @@
}
}
- public JobPlan plan() throws Exception {
- PlanUtils.visit(plan.getJobSpecification(), new IOperatorDescriptorVisitor() {
+ public JobPlan plan(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ final JobPlanBuilder builder = new JobPlanBuilder();
+ builder.init(jobSpec, jobFlags);
+ PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
@Override
public void visit(IOperatorDescriptor op) throws Exception {
- op.contributeTaskGraph(JobPlanBuilder.this);
+ op.contributeTaskGraph(builder);
}
});
- JobStage endStage = inferStages();
+ JobPlan plan = builder.getPlan();
+ JobStage endStage = inferStages(plan);
plan.setEndStage(endStage);
return plan;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
new file mode 100644
index 0000000..e209515
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import edu.uci.ics.hyracks.api.controller.INodeController;
+
+public class NodeControllerState {
+ private final INodeController nodeController;
+
+ private int lastHeartbeatDuration;
+
+ public NodeControllerState(INodeController nodeController) {
+ this.nodeController = nodeController;
+ }
+
+ void notifyHeartbeat() {
+ lastHeartbeatDuration = 0;
+ }
+
+ int incrementLastHeartbeatDuration() {
+ return lastHeartbeatDuration++;
+ }
+
+ int getLastHeartbeatDuration() {
+ return lastHeartbeatDuration;
+ }
+
+ public INodeController getNodeController() {
+ return nodeController;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
similarity index 96%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
index 3357012..69f7380 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
import java.util.HashSet;
import java.util.Set;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
similarity index 86%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
index 5f38e56..7f0a44e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.nodecontroller;
import java.util.HashMap;
import java.util.Map;
@@ -23,7 +23,6 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
public class Joblet {
private static final long serialVersionUID = 1L;
@@ -88,8 +87,17 @@
return nodeController.getExecutor();
}
- public synchronized void notifyStageletComplete(UUID stageId, StageletStatistics stats) throws Exception {
+ public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletStatistics stats) throws Exception {
stageletMap.remove(stageId);
- nodeController.notifyStageComplete(jobId, stageId, stats);
+ nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
+ }
+
+ public void notifyStageletFailed(UUID stageId, int attempt) throws Exception {
+ stageletMap.remove(stageId);
+ nodeController.notifyStageFailed(jobId, stageId, attempt);
+ }
+
+ public NodeControllerService getNodeController() {
+ return nodeController;
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
new file mode 100644
index 0000000..195fe00
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -0,0 +1,427 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.nodecontroller;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.controller.IClusterController;
+import edu.uci.ics.hyracks.api.controller.INodeController;
+import edu.uci.ics.hyracks.api.controller.NodeCapability;
+import edu.uci.ics.hyracks.api.controller.NodeParameters;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.comm.ConnectionManager;
+import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.config.NCConfig;
+import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
+import edu.uci.ics.hyracks.runtime.OperatorRunnable;
+
+public class NodeControllerService extends AbstractRemoteService implements INodeController {
+ private static final long serialVersionUID = 1L;
+
+ private NCConfig ncConfig;
+
+ private final String id;
+
+ private final HyracksContext ctx;
+
+ private final NodeCapability nodeCapability;
+
+ private final ConnectionManager connectionManager;
+
+ private final Timer timer;
+
+ private IClusterController ccs;
+
+ private Map<UUID, Joblet> jobletMap;
+
+ private Executor executor;
+
+ private NodeParameters nodeParameters;
+
+ public NodeControllerService(NCConfig ncConfig) throws Exception {
+ this.ncConfig = ncConfig;
+ id = ncConfig.nodeId;
+ this.ctx = new HyracksContext(ncConfig.frameSize);
+ if (id == null) {
+ throw new Exception("id not set");
+ }
+ nodeCapability = computeNodeCapability();
+ connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
+ jobletMap = new HashMap<UUID, Joblet>();
+ executor = Executors.newCachedThreadPool();
+ timer = new Timer(true);
+ }
+
+ private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.log(Level.INFO, "Starting NodeControllerService");
+ connectionManager.start();
+ Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
+ IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
+ this.nodeParameters = cc.registerNode(this);
+
+ // Schedule heartbeat generator.
+ timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
+
+ LOGGER.log(Level.INFO, "Started NodeControllerService");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ connectionManager.stop();
+ LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public NodeCapability getNodeCapability() throws Exception {
+ return nodeCapability;
+ }
+
+ public ConnectionManager getConnectionManager() {
+ return connectionManager;
+ }
+
+ private static NodeCapability computeNodeCapability() {
+ NodeCapability nc = new NodeCapability();
+ nc.setCPUCount(Runtime.getRuntime().availableProcessors());
+ return nc;
+ }
+
+ private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+ String ipaddrStr = ncConfig.dataIPAddress;
+ ipaddrStr = ipaddrStr.trim();
+ Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
+ Matcher m = pattern.matcher(ipaddrStr);
+ if (!m.matches()) {
+ throw new Exception(MessageFormat.format(
+ "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+ }
+ byte[] ipBytes = new byte[4];
+ ipBytes[0] = (byte) Integer.parseInt(m.group(1));
+ ipBytes[1] = (byte) Integer.parseInt(m.group(2));
+ ipBytes[2] = (byte) Integer.parseInt(m.group(3));
+ ipBytes[3] = (byte) Integer.parseInt(m.group(4));
+ return InetAddress.getByAddress(ipBytes);
+ }
+
+ @Override
+ public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception {
+ LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
+
+ final Joblet joblet = getLocalJoblet(jobId);
+
+ Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
+ joblet.setStagelet(stageId, stagelet);
+
+ final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+ Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
+
+ List<Endpoint> endpointList = new ArrayList<Endpoint>();
+
+ for (ActivityNodeId hanId : tasks.keySet()) {
+ IActivityNode han = plan.getActivityNodeMap().get(hanId);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Initializing " + hanId + " -> " + han);
+ }
+ IOperatorDescriptor op = han.getOwner();
+ List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
+ for (int i : tasks.get(hanId)) {
+ IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
+ OperatorRunnable or = new OperatorRunnable(ctx, hon);
+ stagelet.setOperator(op.getOperatorId(), i, or);
+ if (inputs != null) {
+ for (int j = 0; j < inputs.size(); ++j) {
+ if (j >= 1) {
+ throw new IllegalStateException();
+ }
+ IConnectorDescriptor conn = inputs.get(j);
+ OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+ .getOperatorId();
+ OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+ .getOperatorId();
+ Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
+ endpointList.add(endpoint);
+ DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId, stageId);
+ connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
+ PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+ .getTaskInputMap().get(hanId).get(j), i);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
+ }
+ portMap.put(piId, endpoint);
+ IFrameReader reader = createReader(conn, drlf, i, plan, stagelet, opPartitions
+ .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+ or.setFrameReader(reader);
+ }
+ }
+ honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
+ }
+ }
+
+ stagelet.setEndpointList(endpointList);
+
+ return portMap;
+ }
+
+ private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
+ final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
+ throws HyracksDataException {
+ final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex, nProducerCount,
+ nConsumerCount);
+
+ return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
+ private int frameCount;
+
+ @Override
+ public void open() throws HyracksDataException {
+ frameCount = 0;
+ reader.open();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ boolean status = reader.nextFrame(buffer);
+ if (status) {
+ ++frameCount;
+ }
+ return status;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ reader.close();
+ stagelet.getStatistics().getStatisticsMap().put(
+ "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
+ String.valueOf(frameCount));
+ }
+ } : reader;
+ }
+
+ @Override
+ public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+ final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
+ LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
+ final Joblet ji = getLocalJoblet(jobId);
+ Stagelet si = (Stagelet) ji.getStagelet(stageId);
+ final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
+
+ final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
+
+ final JobSpecification spec = plan.getJobSpecification();
+
+ for (ActivityNodeId hanId : tasks.keySet()) {
+ IActivityNode han = plan.getActivityNodeMap().get(hanId);
+ IOperatorDescriptor op = han.getOwner();
+ List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
+ for (int i : tasks.get(hanId)) {
+ OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
+ if (outputs != null) {
+ for (int j = 0; j < outputs.size(); ++j) {
+ final IConnectorDescriptor conn = outputs.get(j);
+ OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+ .getOperatorId();
+ OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+ .getOperatorId();
+ final int senderIndex = i;
+ IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
+ PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
+ Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+ Endpoint ep = globalPortMap.get(piId);
+ if (ep == null) {
+ LOGGER.info("Got null Endpoint for " + piId);
+ throw new NullPointerException();
+ }
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
+ }
+ return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
+ .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+ }
+ };
+ or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i, opPartitions.get(
+ producerOpId).size(), opPartitions.get(consumerOpId).size()));
+ }
+ }
+ stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
+ }
+ }
+ }
+
+ private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
+ final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
+ return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
+ private int frameCount;
+
+ @Override
+ public void open() throws HyracksDataException {
+ frameCount = 0;
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ++frameCount;
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ stagelet.getStatistics().getStatisticsMap().put(
+ "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
+ String.valueOf(frameCount));
+ }
+ } : writer;
+ }
+
+ @Override
+ public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception {
+ final Joblet ji = getLocalJoblet(jobId);
+ Stagelet si = (Stagelet) ji.getStagelet(stageId);
+ for (Endpoint e : si.getEndpointList()) {
+ connectionManager.unacceptConnection(e.getEndpointId());
+ }
+ si.setEndpointList(null);
+ }
+
+ private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji == null) {
+ ji = new Joblet(this, jobId);
+ jobletMap.put(jobId, ji);
+ }
+ return ji;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ @Override
+ public synchronized void cleanUpJob(UUID jobId) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Cleaning up after job: " + jobId);
+ }
+ jobletMap.remove(jobId);
+ connectionManager.dumpStats();
+ }
+
+ @Override
+ public void startStage(UUID jobId, UUID stageId) throws Exception {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ Stagelet s = ji.getStagelet(stageId);
+ if (s != null) {
+ s.start();
+ }
+ }
+ }
+
+ public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+ ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
+ }
+
+ public void notifyStageFailed(UUID jobId, UUID stageId, int attempt) throws Exception {
+ ccs.notifyStageletFailure(jobId, stageId, attempt, id);
+ }
+
+ @Override
+ public void notifyRegistration(IClusterController ccs) throws Exception {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public NCConfig getConfiguration() throws Exception {
+ return ncConfig;
+ }
+
+ private class HeartbeatTask extends TimerTask {
+ private IClusterController cc;
+
+ public HeartbeatTask(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void run() {
+ try {
+ cc.nodeHeartbeat(id);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void abortJoblet(UUID jobId, UUID stageId) throws Exception {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ Stagelet stagelet = ji.getStagelet(stageId);
+ if (stagelet != null) {
+ stagelet.abort();
+ connectionManager.abortConnections(jobId, stageId);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
similarity index 78%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index e82fa5b..e16cf30 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.nodecontroller;
import java.rmi.RemoteException;
import java.util.Date;
@@ -40,19 +40,24 @@
private final UUID stageId;
+ private final int attempt;
+
private final Map<OperatorInstanceId, OperatorRunnable> honMap;
private List<Endpoint> endpointList;
private boolean started;
+ private volatile boolean abort;
+
private final Set<OperatorInstanceId> pendingOperators;
private final StageletStatistics stats;
- public Stagelet(Joblet joblet, UUID stageId, String nodeId) throws RemoteException {
+ public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
this.joblet = joblet;
this.stageId = stageId;
+ this.attempt = attempt;
pendingOperators = new HashSet<OperatorInstanceId>();
started = false;
honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
@@ -85,6 +90,13 @@
notifyAll();
}
+ public synchronized void abort() {
+ this.abort = true;
+ for (OperatorRunnable r : honMap.values()) {
+ r.abort();
+ }
+ }
+
public void installRunnable(final OperatorInstanceId opIId) {
pendingOperators.add(opIId);
final OperatorRunnable hon = honMap.get(opIId);
@@ -97,16 +109,22 @@
e.printStackTrace();
return;
}
+ if (abort) {
+ return;
+ }
try {
LOGGER.log(Level.INFO, "Starting runnable for operator: " + joblet.getJobId() + ":" + stageId + ":"
- + opIId.getOperatorId() + ":" + opIId.getPartition());
+ + opIId.getOperatorId() + ":" + opIId.getPartition());
} catch (Exception e) {
e.printStackTrace();
+ // notifyOperatorFailure(opIId);
}
try {
hon.run();
- } finally {
notifyOperatorCompletion(opIId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ // notifyOperatorFailure(opIId);
}
}
});
@@ -117,15 +135,24 @@
if (pendingOperators.isEmpty()) {
stats.setEndTime(new Date());
try {
- joblet.notifyStageletComplete(stageId, stats);
+ joblet.notifyStageletComplete(stageId, attempt, stats);
} catch (Exception e) {
e.printStackTrace();
}
}
}
+ protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
+ abort();
+ try {
+ joblet.notifyStageletFailed(stageId, attempt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
private synchronized void waitUntilStarted() throws InterruptedException {
- while (!started) {
+ while (!started && !abort) {
wait();
}
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
index 937e0a4..5013812 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
@@ -37,17 +37,16 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
JobSpecification spec = plan.getJobSpecification();
- final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
- final HashDataWriter hashWriter = new HashDataWriter(ctx, consumerPartitionCount, edwFactory, spec
- .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+ final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
+ .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
return hashWriter;
}
@Override
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
index 9f36730..28cd1fa 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
@@ -36,7 +36,7 @@
private final IBinaryComparatorFactory[] comparatorFactories;
public MToNHashPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
- int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
+ int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
super(spec);
this.tpcf = tpcf;
this.sortFields = sortFields;
@@ -45,19 +45,18 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
JobSpecification spec = plan.getJobSpecification();
- final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
- final HashDataWriter hashWriter = new HashDataWriter(ctx, consumerPartitionCount, edwFactory, spec
- .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+ final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
+ .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
return hashWriter;
}
@Override
- public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux, int index)
- throws HyracksDataException {
+ public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for(int i = 0; i < comparatorFactories.length; ++i) {
+ for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
JobSpecification spec = plan.getJobSpecification();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
index f455f01..d3a8af9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
@@ -41,7 +41,7 @@
private final FrameTupleAccessor tupleAccessor;
public RangeDataWriter(HyracksContext ctx, int consumerPartitionCount, IFrameWriter[] epWriters,
- FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
+ FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
this.consumerPartitionCount = consumerPartitionCount;
this.epWriters = epWriters;
this.appenders = appenders;
@@ -106,12 +106,11 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
JobSpecification spec = plan.getJobSpecification();
- final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
- final IFrameWriter[] epWriters = new IFrameWriter[consumerPartitionCount];
- final FrameTupleAppender[] appenders = new FrameTupleAppender[consumerPartitionCount];
- for (int i = 0; i < consumerPartitionCount; ++i) {
+ final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+ final FrameTupleAppender[] appenders = new FrameTupleAppender[nConsumerPartitions];
+ for (int i = 0; i < nConsumerPartitions; ++i) {
try {
epWriters[i] = edwFactory.createFrameWriter(i);
appenders[i] = new FrameTupleAppender(ctx);
@@ -120,14 +119,14 @@
throw new HyracksDataException(e);
}
}
- final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, consumerPartitionCount, epWriters, appenders, spec
- .getConnectorRecordDescriptor(this));
+ final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, nConsumerPartitions, epWriters, appenders, spec
+ .getConnectorRecordDescriptor(this));
return rangeWriter;
}
@Override
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
index 50d804a..324c39f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
@@ -36,11 +36,9 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
- JobSpecification spec = plan.getJobSpecification();
- final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
- final IFrameWriter[] epWriters = new IFrameWriter[consumerPartitionCount];
- for (int i = 0; i < consumerPartitionCount; ++i) {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+ for (int i = 0; i < nConsumerPartitions; ++i) {
epWriters[i] = edwFactory.createFrameWriter(i);
}
return new IFrameWriter() {
@@ -73,7 +71,7 @@
@Override
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
index c46026a..fff3aa6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
@@ -34,13 +34,13 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return edwFactory.createFrameWriter(index);
}
@Override
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
index b411020..2e610a9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
@@ -74,16 +74,6 @@
}
@Override
- public String[] getPartitions() {
- return partitions;
- }
-
- @Override
- public void setPartitions(String[] partitions) {
- this.partitions = partitions;
- }
-
- @Override
public RecordDescriptor[] getOutputRecordDescriptors() {
return recordDescriptors;
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
index 86c247d..b2dc234 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
@@ -34,8 +34,7 @@
class GroupingHashTable {
/**
- * The pointers in the link store 3 int values for each entry in the
- * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+ * The pointers in the link store 3 int values for each entry in the hashtable: (bufferIdx, tIndex, accumulatorIdx).
*
* @author vinayakb
*/
@@ -81,8 +80,8 @@
private final FrameTupleAccessor storedKeysAccessor;
GroupingHashTable(HyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
- ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
+ ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
this.ctx = ctx;
appender = new FrameTupleAppender(ctx);
buffers = new ArrayList<ByteBuffer>();
@@ -161,7 +160,7 @@
}
int saIndex = accumulatorSize++;
aggregator = accumulators[saIndex] = aggregatorFactory.createAggregator(inRecordDescriptor,
- outRecordDescriptor);
+ outRecordDescriptor);
aggregator.init(accessor, tIndex);
link.add(sbIndex, stIndex, saIndex);
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
index c1b917d..7ec0eab 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.controller.IClusterController;
@@ -61,7 +62,7 @@
private Object value;
public HDFSCustomReader(Map<String, String> jobConfMap, HadoopFileSplit inputSplit,
- String inputFormatClassName, Reporter reporter) {
+ String inputFormatClassName, Reporter reporter) {
try {
JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap) jobConfMap);
FileSystem fileSystem = null;
@@ -74,7 +75,7 @@
Class inputFormatClass = Class.forName(inputFormatClassName);
InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(getFileSplit(inputSplit), conf,
- reporter);
+ reporter);
if (hadoopRecordReader instanceof SequenceFileRecordReader) {
inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
@@ -130,27 +131,27 @@
private FileSplit getFileSplit(HadoopFileSplit hadoopFileSplit) {
FileSplit fileSplit = new FileSplit(new Path(hadoopFileSplit.getFile()), hadoopFileSplit.getStart(),
- hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
+ hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
return fileSplit;
}
}
public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, JobSpecification spec,
- HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
super(spec, splits, recordDescriptor);
this.inputFormatClassName = inputFormatClassName;
this.jobConfMap = jobConfMap;
}
public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, InetSocketAddress nameNode,
- JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
super(spec, null, recordDescriptor);
this.inputFormatClassName = inputFormatClassName;
this.jobConfMap = jobConfMap;
}
public HadoopReadOperatorDescriptor(IClusterController clusterController, Map<String, String> jobConfMap,
- JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
super(spec, null, recordDescriptor);
HadoopAdapter hadoopAdapter = HadoopAdapter.getInstance(fileSystemURL);
String inputPathString = jobConfMap.get("mapred.input.dir");
@@ -170,7 +171,7 @@
}
private void configurePartitionConstraints(IClusterController clusterController,
- Map<String, List<HadoopFileSplit>> blocksToRead) {
+ Map<String, List<HadoopFileSplit>> blocksToRead) {
List<LocationConstraint> locationConstraints = new ArrayList<LocationConstraint>();
Map<String, INodeController> registry = null;
try {
@@ -223,8 +224,8 @@
}
}
- PartitionConstraint partitionConstraint = new PartitionConstraint(locationConstraints
- .toArray(new LocationConstraint[] {}));
+ PartitionConstraint partitionConstraint = new ExplicitPartitionConstraint(locationConstraints
+ .toArray(new LocationConstraint[] {}));
this.setPartitionConstraint(partitionConstraint);
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
index 425f967..bb1ba84 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
@@ -17,7 +17,7 @@
import org.kohsuke.args4j.CmdLineParser;
import edu.uci.ics.hyracks.config.CCConfig;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
public class CCDriver {
public static void main(String args[]) throws Exception {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
index 657b304..3c856ab 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
@@ -19,7 +19,7 @@
import edu.uci.ics.dcache.client.DCacheClient;
import edu.uci.ics.dcache.client.DCacheClientConfig;
import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
public class NCDriver {
public static void main(String args[]) throws Exception {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java
deleted file mode 100644
index 3259649..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.job;
-
-import java.rmi.RemoteException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-
-public class JobControl {
- private static final long serialVersionUID = 1L;
-
- private final JobManager jobManager;
-
- private final JobPlan jobPlan;
-
- private final UUID jobId;
-
- private final Map<UUID, StageProgress> stageProgressMap;
-
- private final Set<UUID> completeStages;
-
- private JobStatus jobStatus;
-
- private JobStatistics jobStatistics;
-
- public JobControl(JobManager jobManager, JobPlan jobPlan) throws RemoteException {
- this.jobManager = jobManager;
- this.jobPlan = jobPlan;
- jobId = UUID.randomUUID();
- stageProgressMap = new HashMap<UUID, StageProgress>();
- completeStages = new HashSet<UUID>();
- jobStatus = JobStatus.INITIALIZED;
- jobStatistics = new JobStatistics();
- }
-
- public JobPlan getJobPlan() {
- return jobPlan;
- }
-
- public UUID getJobId() {
- return jobId;
- }
-
- public synchronized JobStatus getJobStatus() {
- return jobStatus;
- }
-
- public Set<UUID> getCompletedStages() {
- return completeStages;
- }
-
- public void setStatus(JobStatus status) {
- jobStatus = status;
- }
-
- public StageProgress getStageProgress(int stageId) {
- return stageProgressMap.get(stageId);
- }
-
- public void setStageProgress(UUID stageId, StageProgress stageProgress) {
- stageProgressMap.put(stageId, stageProgress);
- }
-
- public synchronized void notifyStageletComplete(UUID stageId, String nodeId, StageletStatistics ss)
- throws Exception {
- StageProgress stageProgress = stageProgressMap.get(stageId);
- stageProgress.markNodeComplete(nodeId);
- StageStatistics stageStatistics = stageProgress.getStageStatistics();
- stageStatistics.addStageletStatistics(ss);
- if (stageProgress.stageComplete()) {
- jobStatistics.addStageStatistics(stageStatistics);
- stageProgressMap.remove(stageId);
- completeStages.add(stageId);
- jobManager.advanceJob(this);
- }
- }
-
- public synchronized JobStatistics waitForCompletion() throws Exception {
- while (jobStatus != JobStatus.TERMINATED) {
- wait();
- }
- return jobStatistics;
- }
-
- public synchronized void notifyJobComplete() {
- jobStatus = JobStatus.TERMINATED;
- notifyAll();
- }
-
- public JobStatistics getJobStatistics() {
- return jobStatistics;
- }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java
deleted file mode 100644
index f976bb9..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.job;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.Vector;
-import java.util.concurrent.Semaphore;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
-import edu.uci.ics.hyracks.api.controller.INodeController;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
-import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
-
-public class JobManager {
- private static final Logger LOGGER = Logger.getLogger(JobManager.class.getName());
- private ClusterControllerService ccs;
-
- private final Map<UUID, JobControl> jobMap;
-
- public JobManager(ClusterControllerService ccs) {
- this.ccs = ccs;
- jobMap = new HashMap<UUID, JobControl>();
- }
-
- public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- JobPlanBuilder builder = new JobPlanBuilder();
- builder.init(jobSpec, jobFlags);
- JobControl jc = new JobControl(this, builder.plan());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(jc.getJobPlan().toString());
- }
- jobMap.put(jc.getJobId(), jc);
-
- return jc.getJobId();
- }
-
- public synchronized void start(UUID jobId) throws Exception {
- JobControl jobControlImpl = jobMap.get(jobId);
- LOGGER
- .info("Starting job: " + jobControlImpl.getJobId() + ", Current status: "
- + jobControlImpl.getJobStatus());
- if (jobControlImpl.getJobStatus() != JobStatus.INITIALIZED) {
- return;
- }
- jobControlImpl.getJobStatistics().setStartTime(new Date());
- jobControlImpl.setStatus(JobStatus.RUNNING);
- schedule(jobControlImpl);
- }
-
- public synchronized void advanceJob(JobControl jobControlImpl) throws Exception {
- schedule(jobControlImpl);
- }
-
- private void schedule(JobControl jobControlImpl) throws Exception {
- JobPlan plan = jobControlImpl.getJobPlan();
- JobStage endStage = plan.getEndStage();
-
- Set<UUID> completedStages = jobControlImpl.getCompletedStages();
- List<JobStage> runnableStages = new ArrayList<JobStage>();
- findRunnableStages(endStage, runnableStages, completedStages, new HashSet<UUID>());
- if (runnableStages.size() == 1 && runnableStages.get(0).getTasks().isEmpty()) {
- LOGGER.info("Job " + jobControlImpl.getJobId() + " complete");
- jobControlImpl.getJobStatistics().setEndTime(new Date());
- cleanUp(jobControlImpl);
- jobControlImpl.notifyJobComplete();
- } else {
- for (JobStage s : runnableStages) {
- if (s.isStarted()) {
- continue;
- }
- startStage(jobControlImpl, s);
- }
- }
- }
-
- private void cleanUp(JobControl jc) {
- jobMap.remove(jc.getJobId());
- ccs.notifyJobComplete(jc.getJobId());
- }
-
- private void startStage(JobControl jc, JobStage stage) throws Exception {
- stage.setStarted();
- Set<String> candidateNodes = deploy(jc, stage);
- for (String nodeId : candidateNodes) {
- ccs.lookupNode(nodeId).startStage(jc.getJobId(), stage.getId());
- }
- }
-
- private void findRunnableStages(JobStage s, List<JobStage> runnableStages, Set<UUID> completedStages, Set<UUID> seen) {
- boolean runnable = true;
- if (seen.contains(s.getId())) {
- return;
- }
- seen.add(s.getId());
- for (JobStage dep : s.getDependencies()) {
- boolean depComplete = completedStages.contains(dep.getId());
- runnable = runnable && depComplete;
- if (!depComplete) {
- findRunnableStages(dep, runnableStages, completedStages, seen);
- }
- }
- if (runnable) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Runnable stage: " + s);
- }
- runnableStages.add(s);
- }
- }
-
- private Set<String> deploy(JobControl jc, JobStage stage) throws Exception {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Deploying: " + stage);
- }
- Set<String> candidateNodes = plan(jc, stage);
- StageProgress stageProgress = new StageProgress(stage.getId());
- stageProgress.addPendingNodes(candidateNodes);
- Map<PortInstanceId, Endpoint> globalPortMap = runRemote(new Phase1Installer(jc, stage),
- new PortMapMergingAccumulator(), candidateNodes);
- runRemote(new Phase2Installer(jc, stage, globalPortMap), null, candidateNodes);
- runRemote(new Phase3Installer(jc, stage), null, candidateNodes);
- jc.setStageProgress(stage.getId(), stageProgress);
- return candidateNodes;
- }
-
- private interface RemoteOp<T> {
- public T execute(INodeController node) throws Exception;
- }
-
- private interface Accumulator<T, R> {
- public void accumulate(T o);
-
- public R getResult();
- }
-
- private static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
- private JobControl jc;
- private JobStage stage;
-
- public Phase1Installer(JobControl jc, JobStage stage) {
- this.jc = jc;
- this.stage = stage;
- }
-
- @Override
- public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
- return node.initializeJobletPhase1(jc.getJobId(), jc.getJobPlan(), stage);
- }
-
- @Override
- public String toString() {
- return jc.getJobId() + " Distribution Phase 1";
- }
- }
-
- private static class Phase2Installer implements RemoteOp<Void> {
- private JobControl jc;
- private JobStage stage;
- private Map<PortInstanceId, Endpoint> globalPortMap;
-
- public Phase2Installer(JobControl jc, JobStage stage, Map<PortInstanceId, Endpoint> globalPortMap) {
- this.jc = jc;
- this.stage = stage;
- this.globalPortMap = globalPortMap;
- }
-
- @Override
- public Void execute(INodeController node) throws Exception {
- node.initializeJobletPhase2(jc.getJobId(), jc.getJobPlan(), stage, globalPortMap);
- return null;
- }
-
- @Override
- public String toString() {
- return jc.getJobId() + " Distribution Phase 2";
- }
- }
-
- private static class Phase3Installer implements RemoteOp<Void> {
- private JobControl jc;
- private JobStage stage;
-
- public Phase3Installer(JobControl jc, JobStage stage) {
- this.jc = jc;
- this.stage = stage;
- }
-
- @Override
- public Void execute(INodeController node) throws Exception {
- node.commitJobletInitialization(jc.getJobId(), jc.getJobPlan(), stage);
- return null;
- }
-
- @Override
- public String toString() {
- return jc.getJobId() + " Distribution Phase 3";
- }
- }
-
- private static class PortMapMergingAccumulator implements
- Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
- Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
-
- @Override
- public void accumulate(Map<PortInstanceId, Endpoint> o) {
- portMap.putAll(o);
- }
-
- @Override
- public Map<PortInstanceId, Endpoint> getResult() {
- return portMap;
- }
- }
-
- private <T, R> R runRemote(final RemoteOp<T> remoteOp, final Accumulator<T, R> accumulator,
- Set<String> candidateNodes) throws Exception {
- LOGGER.log(Level.INFO, remoteOp + " : " + candidateNodes);
-
- final Semaphore installComplete = new Semaphore(candidateNodes.size());
- final List<Exception> errors = new Vector<Exception>();
- for (final String nodeId : candidateNodes) {
- final INodeController node = ccs.lookupNode(nodeId);
-
- installComplete.acquire();
- Runnable remoteRunner = new Runnable() {
- @Override
- public void run() {
- try {
- T t = remoteOp.execute(node);
- if (accumulator != null) {
- synchronized (accumulator) {
- accumulator.accumulate(t);
- }
- }
- } catch (Exception e) {
- errors.add(e);
- } finally {
- installComplete.release();
- }
- }
- };
-
- ccs.getExecutor().execute(remoteRunner);
- }
- installComplete.acquire(candidateNodes.size());
- if (!errors.isEmpty()) {
- throw errors.get(0);
- }
- return accumulator == null ? null : accumulator.getResult();
- }
-
- private Set<String> plan(JobControl jc, JobStage stage) throws Exception {
- LOGGER.log(Level.INFO, String.valueOf(jc.getJobId()) + ": Planning");
-
- final Set<OperatorDescriptorId> opSet = new HashSet<OperatorDescriptorId>();
- for (ActivityNodeId t : stage.getTasks()) {
- opSet.add(jc.getJobPlan().getActivityNodeMap().get(t).getOwner().getOperatorId());
- }
-
- final Set<String> candidateNodes = new HashSet<String>();
-
- IOperatorDescriptorVisitor visitor = new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) throws Exception {
- if (!opSet.contains(op.getOperatorId())) {
- return;
- }
- String[] partitions = op.getPartitions();
- if (partitions == null) {
- PartitionConstraint pc = op.getPartitionConstraint();
- LocationConstraint[] lcs = pc.getLocationConstraints();
- String[] assignment = new String[lcs.length];
- for (int i = 0; i < lcs.length; ++i) {
- String nodeId = ((AbsoluteLocationConstraint) lcs[i]).getLocationId();
- assignment[i] = nodeId;
- }
- op.setPartitions(assignment);
- partitions = assignment;
- }
- for (String p : partitions) {
- candidateNodes.add(p);
- }
- }
- };
-
- PlanUtils.visit(jc.getJobPlan().getJobSpecification(), visitor);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(stage + " Candidate nodes: " + candidateNodes);
- }
- return candidateNodes;
- }
-
- public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
- StageletStatistics statistics) throws Exception {
- JobControl jc = jobMap.get(jobId);
- if (jc != null) {
- jc.notifyStageletComplete(stageId, nodeId, statistics);
- }
- }
-
- public synchronized JobStatus getJobStatus(UUID jobId) {
- JobControl jc = jobMap.get(jobId);
- return jc.getJobStatus();
- }
-
- public JobStatistics waitForCompletion(UUID jobId) throws Exception {
- JobControl jc;
- synchronized (this) {
- jc = jobMap.get(jobId);
- }
- if (jc != null) {
- return jc.waitForCompletion();
- }
- return null;
- }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
index 5964d07..149f867 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
@@ -25,6 +25,7 @@
private IOperatorNodePushable opNode;
private IFrameReader reader;
private ByteBuffer buffer;
+ private volatile boolean abort;
public OperatorRunnable(HyracksContext ctx, IOperatorNodePushable opNode) {
this.opNode = opNode;
@@ -39,6 +40,10 @@
this.reader = reader;
}
+ public void abort() {
+ abort = true;
+ }
+
@Override
public void run() {
try {
@@ -46,6 +51,9 @@
if (reader != null) {
reader.open();
while (reader.nextFrame(buffer)) {
+ if (abort) {
+ break;
+ }
buffer.flip();
opNode.nextFrame(buffer);
buffer.compact();
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
new file mode 100644
index 0000000..3f03526
--- /dev/null
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -0,0 +1,310 @@
+program hyrackscc;
+
+import java.util.UUID;
+import java.util.Set;
+
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+
+define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
+ activitynode(JobId, OperatorId, ActivityId, _);
+
+activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+ StageNumber2 <= StageNumber1
+ {
+ StageNumber := StageNumber1 + 1;
+ };
+
+activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+ connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+ StageNumber1 != StageNumber2
+ {
+ StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+ };
+
+activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
+ activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+ connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+ StageNumber1 != StageNumber2
+ {
+ StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+ };
+
+watch(activitystage_temp, a);
+
+watch(activityconnection, a);
+watch(activityblocked, a);
+watch(operatordescriptor, a);
+watch(connectordescriptor, a);
+
+watch(activitystage, a);
+watch(activitystage, i);
+watch(activitystage, d);
+
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
+ activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
+
+define(jobstage, keys(0, 1), {UUID, Integer, UUID});
+
+jobstage(JobId, StageNumber, StageId) :-
+ activitystage(JobId, _, _, StageNumber)
+ {
+ StageId := java.util.UUID.randomUUID();
+ };
+
+watch(jobstage, a);
+
+define(jobattempt, keys(), {UUID, Integer});
+
+jobattempt(JobId, 0) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
+ jobstart(JobId, _);
+
+jobattempt(JobId, NextAttempt) :-
+ jobattempt(JobId, Attempt),
+ stagestart(JobId, _, Attempt),
+ abortcomplete(JobId, _, Attempt)
+ {
+ NextAttempt := Attempt + 1;
+ };
+
+define(stagestart, keys(), {UUID, Integer, Integer});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
+
+watch(jobstart, i);
+
+stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
+ jobattempt#insert(JobId, Attempt);
+
+update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
+ jobstart(JobId, _);
+
+stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
+ stagestart(JobId, StageNumber, Attempt),
+ stagefinish#insert(StageId, StageNumber, Attempt, _)
+ {
+ NextStageNumber := StageNumber + 1;
+ };
+
+watch(stagestart, a);
+watch(stagestart, d);
+
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
+ operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
+ availablenodes(NodeId);
+
+watch(availablenodes, a);
+watch(availablenodes, i);
+watch(availablenodes, d);
+
+define(availablenodecount, keys(0), {Integer, Integer});
+
+watch(availablenodecount, a);
+watch(availablenodecount, i);
+watch(availablenodecount, d);
+
+availablenodecount(0, count<NodeId>) :-
+ availablenodes(NodeId);
+
+watch(rankedavailablenodes, a);
+watch(rankedavailablenodes, i);
+watch(rankedavailablenodes, d);
+
+watch(operatorlocationcandidates, a);
+watch(operatorlocationcandidates, i);
+watch(operatorlocationcandidates, d);
+
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
+ operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
+
+watch(maxoperatorlocationbenefit, a);
+watch(maxoperatorlocationbenefit, i);
+watch(maxoperatorlocationbenefit, d);
+
+define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+watch(attemptoperatorlocationdecision, a);
+watch(attemptoperatorlocationdecision, i);
+watch(attemptoperatorlocationdecision, d);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+ jobattempt#insert(JobId, Attempt),
+ operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
+ maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+ jobattempt#insert(JobId, Attempt),
+ operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
+ rankedavailablenodes(NodeId, NodeRank),
+ availablenodecount(_, NodeCount),
+ NodeRank == CloneRank % NodeCount;
+
+define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
+ operatorclonecount(JobId, OperatorId, NPartitions);
+
+define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
+ expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
+
+watch(operatorclonecountexpansiontotalorder, a);
+watch(operatorclonecountexpansiontotalorder, i);
+watch(operatorclonecountexpansiontotalorder, d);
+
+watch(operatorclonecount, a);
+watch(operatorclonecount, i);
+watch(operatorclonecount, d);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
+ stagestart#insert(JobId, StageNumber, Attempt),
+ operatordescriptor(JobId, OperatorId, _, _),
+ activitystage(JobId, OperatorId, ActivityId, StageNumber),
+ jobstage(JobId, StageNumber, StageId),
+ attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
+
+watch(activitystart, a);
+
+define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
+ activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
+ {
+ ActivityInfo := [ActivityId, Partition];
+ };
+
+watch(stageletstart, a);
+watch(stageletstart, i);
+
+define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+ stageletstart#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
+ availablenodes(NodeId),
+ ActivityInfoSet.size() != 0
+ {
+ Tuple := [NodeId, ActivityInfoSet];
+ };
+
+startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+ startmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
+
+watch(startmessage, a);
+watch(startmessage, i);
+
+define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+ stageletfailure(JobId, StageId, NodeId, Attempt),
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet);
+
+stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
+ stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
+ failednodes#insert(NodeId),
+ notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+watch(stageletabort, a);
+watch(stageletabort, i);
+watch(stageletabort, d);
+
+define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageabort(JobId, StageId, Attempt, set<NodeId>) :-
+ stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
+
+define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+ stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+ availablenodes(NodeId)
+ {
+ Tuple := [NodeId, ActivityIdSet];
+ };
+
+abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+ abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
+ TSet.size() != 0;
+
+watch(abortmessage, a);
+watch(abortmessage, i);
+
+define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+ abortnotify(JobId, StageId, NodeId, Attempt);
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+ stageletabort(JobId, StageId, _, NodeId, Attempt, _),
+ notin availablenodes(NodeId);
+
+define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
+ stageletabortcomplete(JobId, StageId, NodeId, Attempt);
+
+define(abortcomplete, keys(), {UUID, UUID, Integer});
+
+abortcomplete(JobId, StageId, Attempt) :-
+ stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
+ stageabort(JobId, StageId, Attempt, NodeIdSet2),
+ NodeIdSet1.size() == NodeIdSet2.size();
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+ stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
+
+stagefinish(JobId, StageNumber, Attempt, SSet) :-
+ startmessage_agg(JobId, StageId, Attempt, _, TSet),
+ stageletcomplete_agg(JobId, StageId, Attempt, SSet),
+ jobstage(JobId, StageNumber, StageId),
+ TSet.size() == SSet.size();
+
+update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+ stagestart#insert(JobId, StageNumber, Attempt),
+ stagefinish(JobId, _, Attempt, SSet),
+ notin jobstage(JobId, StageNumber);
+
+define(jobcleanup_agg, {UUID, Set});
+
+jobcleanup_agg(JobId, set<NodeId>) :-
+ stagestart#insert(JobId, StageNumber, Attempt),
+ stagefinish(JobId, _, Attempt, _),
+ attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
+ notin jobstage(JobId, StageNumber);
+
+jobcleanup(JobId, NodeIdSet) :-
+ jobcleanup_agg(JobId, NodeIdSet);
\ No newline at end of file
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 94bc57a..8505531 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -24,8 +24,8 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.config.CCConfig;
import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
public abstract class AbstractIntegrationTest {
public static final String NC1_ID = "nc1";
@@ -39,6 +39,7 @@
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
ccConfig.port = 39001;
+ ccConfig.useJOL = true;
cc = new ClusterControllerService(ccConfig);
cc.start();
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index a3beb11..b7c2e57 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -49,57 +50,85 @@
public void countOfCountsSingleNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+ FileSplit[] splits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/words.txt"))
+ };
+ RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
- PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc);
+ PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
- PartitionConstraint groupPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+ });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(0), desc2);
+ PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
group.setPartitionConstraint(groupPartitionConstraint);
- InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
- PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc2);
+ PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
- PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(1), desc2);
+ PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -116,59 +145,91 @@
public void countOfCountsMultiNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+ FileSplit[] splits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/words.txt"))
+ };
+ RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
- PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc);
+ PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
- PartitionConstraint groupPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+ });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(0), desc2);
+ PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID)
+ });
group.setPartitionConstraint(groupPartitionConstraint);
- InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
- PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc2);
+ PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
- PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(1), desc2);
+ PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -185,59 +246,91 @@
public void countOfCountsExternalSortMultiNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+ FileSplit[] splits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/words.txt"))
+ };
+ RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 0 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
- PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
+ 0
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc);
+ PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
- PartitionConstraint groupPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+ });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(0), desc2);
+ PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID)
+ });
group.setPartitionConstraint(groupPartitionConstraint);
- ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 1 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
- PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc2);
+ PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
- PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(1), desc2);
+ PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index d619bba..24d45fa 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,19 +37,23 @@
public void scanPrint01() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/words.txt")),
- new FileSplit(NC1_ID, new File("data/words.txt")) };
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+ FileSplit[] splits = new FileSplit[] {
+ new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt"))
+ };
+ RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
+ });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index cd254e2..0b19cea 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,39 +44,57 @@
JobSpecification spec = new JobSpecification();
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+ };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
- PartitionConstraint sortersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, ordersDesc);
+ PartitionConstraint sortersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter.setPartitionConstraint(sortersPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
spec.connect(new MToNHashPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- new int[] { 1 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }),
- new int[] { 1 }, new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }), sorter, 0,
- printer, 0);
-
+ new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }), new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }), sorter, 0, printer, 0);
+
runTest(spec);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 593eac1..6770fd8 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -19,8 +19,10 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -42,83 +44,94 @@
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
/*
- * TPCH Customer table:
- * CREATE TABLE CUSTOMER (
- * C_CUSTKEY INTEGER NOT NULL,
- * C_NAME VARCHAR(25) NOT NULL,
- * C_ADDRESS VARCHAR(40) NOT NULL,
- * C_NATIONKEY INTEGER NOT NULL,
- * C_PHONE CHAR(15) NOT NULL,
- * C_ACCTBAL DECIMAL(15,2) NOT NULL,
- * C_MKTSEGMENT CHAR(10) NOT NULL,
- * C_COMMENT VARCHAR(117) NOT NULL
- * );
- * TPCH Orders table:
- * CREATE TABLE ORDERS (
- * O_ORDERKEY INTEGER NOT NULL,
- * O_CUSTKEY INTEGER NOT NULL,
- * O_ORDERSTATUS CHAR(1) NOT NULL,
- * O_TOTALPRICE DECIMAL(15,2) NOT NULL,
- * O_ORDERDATE DATE NOT NULL,
- * O_ORDERPRIORITY CHAR(15) NOT NULL,
- * O_CLERK CHAR(15) NOT NULL,
- * O_SHIPPRIORITY INTEGER NOT NULL,
- * O_COMMENT VARCHAR(79) NOT NULL
- * );
+ * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
*/
@Test
public void customerOrderCIDJoin() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl"))
+ };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl"))
+ };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
- PartitionConstraint custPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
custScanner.setPartitionConstraint(custPartitionConstraint);
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
- PartitionConstraint joinPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+ 1
+ }, new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, custOrderJoinDesc, 128);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -139,67 +152,213 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+ };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+ };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
- PartitionConstraint custPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
custScanner.setPartitionConstraint(custPartitionConstraint);
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
- PartitionConstraint joinPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+ 1
+ }, new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, custOrderJoinDesc, 128);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDJoinAutoExpand() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+ };
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+ };
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
+
+ CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+ CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
+ custScanner.setPartitionConstraint(custPartitionConstraint);
+
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+ 1
+ }, new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, custOrderJoinDesc, 128);
+ join.setPartitionConstraint(new PartitionCountConstraint(2));
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
+ printer.setPartitionConstraint(printerPartitionConstraint);
+
+ IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -214,75 +373,114 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+ };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+ };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
- PartitionConstraint custPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
custScanner.setPartitionConstraint(custPartitionConstraint);
MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc);
- ordMat.setPartitionConstraint(new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
+ ordMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ }));
MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
- custMat.setPartitionConstraint(new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
+ custMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ }));
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
- PartitionConstraint joinPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+ 1
+ }, new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, custOrderJoinDesc, 128);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
IConnectorDescriptor custPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(custPartConn, custScanner, 0, custMat, 0);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);