Several major changes in hyracks:
-- reduced CC/NC communications for reporting partition request and availability; partition request/availability are only reported for the case of send-side materialized (without pipelining) policies in case of task re-attempt.
-- changed buffer cache to dynamically allocate memory based on needs instead of pre-allocating
-- changed each network channel to lazily allocate memory based on needs, and changed materialized connectors to lazily allocate files based on needs
-- changed several major CCNCCFunctions to use non-java serde
-- added a sort-based group-by operator which pushes group-by aggregations into an external sort
-- make external sort a stable sort
1,3,and 4 is to reduce the job overhead.
2 is to reduce the unecessary NC resource consumptions such as memory and files.
5 and 6 are improvements to runtime operators.
One change in algebricks:
-- implemented a rule to push group-by aggregation into sort, i.e., using the sort-based gby operator
Several important changes in pregelix:
-- remove static states in vertex
-- direct check halt bit without deserialization
-- optimize the sort algorithm by packing yet-another 2-byte normalized key into the tPointers array
Change-Id: Id696f9a9f1647b4a025b8b33d20b3a89127c60d6
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/35
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/hyracks/hyracks-api/pom.xml b/hyracks/hyracks-api/pom.xml
index 734d671..99f728f 100644
--- a/hyracks/hyracks-api/pom.xml
+++ b/hyracks/hyracks-api/pom.xml
@@ -47,7 +47,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
- <version>4.1-alpha2</version>
+ <version>4.3</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 48d7275..e71f3c9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -74,6 +74,7 @@
acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
+ acg.setReportTaskDetails(spec.isReportTaskDetails());
final Set<Constraint> constraints = new HashSet<Constraint>();
final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@Override
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index e93ebeb..3bb7d22 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -14,15 +14,30 @@
*/
package edu.uci.ics.hyracks.api.comm;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
-public final class NetworkAddress implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class NetworkAddress implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final byte[] ipAddress;
+ private byte[] ipAddress;
- private final int port;
+ private int port;
+
+ public static NetworkAddress create(DataInput dis) throws IOException {
+ NetworkAddress networkAddress = new NetworkAddress();
+ networkAddress.readFields(dis);
+ return networkAddress;
+ }
+
+ private NetworkAddress() {
+
+ }
public NetworkAddress(byte[] ipAddress, int port) {
this.ipAddress = ipAddress;
@@ -55,4 +70,19 @@
NetworkAddress on = (NetworkAddress) o;
return on.port == port && Arrays.equals(on.ipAddress, ipAddress);
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeInt(ipAddress.length);
+ output.write(ipAddress);
+ output.writeInt(port);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int size = input.readInt();
+ ipAddress = new byte[size];
+ input.readFully(ipAddress);
+ port = input.readInt();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index af63632..68560e1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -14,12 +14,27 @@
*/
package edu.uci.ics.hyracks.api.dataflow;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
-public final class ActivityId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class ActivityId implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final OperatorDescriptorId odId;
- private final int id;
+ private OperatorDescriptorId odId;
+ private int id;
+
+ public static ActivityId create(DataInput dis) throws IOException {
+ ActivityId activityId = new ActivityId();
+ activityId.readFields(dis);
+ return activityId;
+ }
+
+ private ActivityId() {
+
+ }
public ActivityId(OperatorDescriptorId odId, int id) {
this.odId = odId;
@@ -64,4 +79,16 @@
}
throw new IllegalArgumentException("Unable to parse: " + str);
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ odId.writeFields(output);
+ output.writeInt(id);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ odId = OperatorDescriptorId.create(input);
+ id = input.readInt();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index b363556..5190cae 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -14,13 +14,28 @@
*/
package edu.uci.ics.hyracks.api.dataflow;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
-public final class ConnectorDescriptorId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class ConnectorDescriptorId implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
private int id;
+ public static ConnectorDescriptorId create(DataInput dis) throws IOException {
+ ConnectorDescriptorId connectorDescriptorId = new ConnectorDescriptorId();
+ connectorDescriptorId.readFields(dis);
+ return connectorDescriptorId;
+ }
+
+ private ConnectorDescriptorId() {
+
+ }
+
public ConnectorDescriptorId(int id) {
this.id = id;
}
@@ -50,4 +65,14 @@
public String toString() {
return "CDID:" + id;
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeInt(id);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ id = input.readInt();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index 0c23465..5351c78 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -14,12 +14,27 @@
*/
package edu.uci.ics.hyracks.api.dataflow;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
-public final class OperatorDescriptorId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class OperatorDescriptorId implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final int id;
+ private int id;
+
+ public static OperatorDescriptorId create(DataInput dis) throws IOException {
+ OperatorDescriptorId operatorDescriptorId = new OperatorDescriptorId();
+ operatorDescriptorId.readFields(dis);
+ return operatorDescriptorId;
+ }
+
+ private OperatorDescriptorId() {
+
+ }
public OperatorDescriptorId(int id) {
this.id = id;
@@ -57,4 +72,14 @@
}
throw new IllegalArgumentException("Unable to parse: " + str);
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeInt(id);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ id = input.readInt();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index 65fa2e5..2355e98 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -14,14 +14,29 @@
*/
package edu.uci.ics.hyracks.api.dataflow;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
-public final class TaskAttemptId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class TaskAttemptId implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final TaskId taskId;
+ private TaskId taskId;
- private final int attempt;
+ private int attempt;
+
+ public static TaskAttemptId create(DataInput dis) throws IOException {
+ TaskAttemptId taskAttemptId = new TaskAttemptId();
+ taskAttemptId.readFields(dis);
+ return taskAttemptId;
+ }
+
+ private TaskAttemptId() {
+
+ }
public TaskAttemptId(TaskId taskId, int attempt) {
this.taskId = taskId;
@@ -63,4 +78,16 @@
}
throw new IllegalArgumentException("Unable to parse: " + str);
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ taskId.writeFields(output);
+ output.writeInt(attempt);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ taskId = TaskId.create(input);
+ attempt = input.readInt();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 6d58bd9..6b9eecc 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -14,14 +14,29 @@
*/
package edu.uci.ics.hyracks.api.dataflow;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
-public final class TaskId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class TaskId implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final ActivityId activityId;
+ private ActivityId activityId;
- private final int partition;
+ private int partition;
+
+ public static TaskId create(DataInput dis) throws IOException {
+ TaskId taskId = new TaskId();
+ taskId.readFields(dis);
+ return taskId;
+ }
+
+ private TaskId() {
+
+ }
public TaskId(ActivityId activityId, int partition) {
this.activityId = activityId;
@@ -63,4 +78,16 @@
}
throw new IllegalArgumentException("Unable to parse: " + str);
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ activityId.writeFields(output);
+ output.writeInt(partition);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ activityId = ActivityId.create(input);
+ partition = input.readInt();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java
new file mode 100644
index 0000000..8b416da
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author yingyib
+ */
+public class ConnectorPolicyFactory {
+ public static ConnectorPolicyFactory INSTANCE = new ConnectorPolicyFactory();
+
+ private ConnectorPolicyFactory() {
+
+ }
+
+ public IConnectorPolicy getConnectorPolicy(DataInput input) throws IOException {
+ int kind = input.readInt();
+ switch (kind) {
+ case 0:
+ return new PipeliningConnectorPolicy();
+ case 1:
+ return new SendSideMaterializedBlockingConnectorPolicy();
+ case 2:
+ return new SendSideMaterializedPipeliningConnectorPolicy();
+ case 3:
+ return new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
+ case 4:
+ return new SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy();
+ case 5:
+ return new SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy();
+ }
+ return null;
+ }
+
+ public void writeConnectorPolicy(IConnectorPolicy policy, DataOutput output) throws IOException {
+ if (policy instanceof PipeliningConnectorPolicy) {
+ output.writeInt(0);
+ } else if (policy instanceof SendSideMaterializedBlockingConnectorPolicy) {
+ output.writeInt(1);
+ } else if (policy instanceof SendSideMaterializedPipeliningConnectorPolicy) {
+ output.writeInt(2);
+ } else if (policy instanceof SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy) {
+ output.writeInt(3);
+ } else if (policy instanceof SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy) {
+ output.writeInt(4);
+ } else if (policy instanceof SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy) {
+ output.writeInt(5);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java
new file mode 100644
index 0000000..8beb2f6
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public class SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean requiresProducerConsumerCoscheduling() {
+ return true;
+ }
+
+ @Override
+ public boolean consumerWaitsForProducerToFinish() {
+ return false;
+ }
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return false;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return true;
+ }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
index 6eab7a7..f461e52 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
@@ -15,17 +15,32 @@
package edu.uci.ics.hyracks.api.deployment;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.io.IWritable;
+
/**
* The representation of a deployment id
*
* @author yingyib
*/
-public class DeploymentId implements Serializable {
+public class DeploymentId implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final String deploymentKey;
+ private String deploymentKey;
+
+ public static DeploymentId create(DataInput dis) throws IOException {
+ DeploymentId deploymentId = new DeploymentId();
+ deploymentId.readFields(dis);
+ return deploymentId;
+ }
+
+ private DeploymentId() {
+
+ }
public DeploymentId(String deploymentKey) {
this.deploymentKey = deploymentKey;
@@ -50,4 +65,14 @@
public String toString() {
return deploymentKey;
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeUTF(deploymentKey);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ deploymentKey = input.readUTF();
+ }
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java
new file mode 100644
index 0000000..9e7e8c8
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author yingyib
+ */
+public interface IWritable {
+
+ public void writeFields(DataOutput output) throws IOException;
+
+ public void readFields(DataInput input) throws IOException;
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index e80168b..12c4e6e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -17,6 +17,7 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.json.JSONArray;
@@ -25,6 +26,7 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
public class ActivityClusterGraph implements Serializable {
@@ -50,12 +52,15 @@
private boolean useConnectorPolicyForScheduling;
+ private boolean reportTaskDetails;
+
public ActivityClusterGraph() {
version = 0;
activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
activityMap = new HashMap<ActivityId, ActivityCluster>();
connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
frameSize = 32768;
+ reportTaskDetails = true;
}
public Map<ActivityId, ActivityCluster> getActivityMap() {
@@ -135,6 +140,24 @@
this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
}
+ public boolean isReportTaskDetails() {
+ return reportTaskDetails;
+ }
+
+ public void setReportTaskDetails(boolean reportTaskDetails) {
+ this.reportTaskDetails = reportTaskDetails;
+ }
+
+ public List<IConnectorDescriptor> getActivityInputs(ActivityId activityId) {
+ ActivityCluster ac = activityMap.get(activityId);
+ return ac.getActivityInputMap().get(activityId);
+ }
+
+ public ActivityId getProducerActivity(ConnectorDescriptorId cid) {
+ ActivityCluster ac = connectorMap.get(cid);
+ return ac.getProducerActivity(cid);
+ }
+
public JSONObject toJSON() throws JSONException {
JSONObject acgj = new JSONObject();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
index b8eb61b..c9027ba 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
@@ -14,12 +14,26 @@
*/
package edu.uci.ics.hyracks.api.job;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
-public final class JobId implements Serializable {
- private static final long serialVersionUID = 1L;
+import edu.uci.ics.hyracks.api.io.IWritable;
- private final long id;
+public final class JobId implements IWritable, Serializable {
+ private static final long serialVersionUID = 1L;
+ private long id;
+
+ public static JobId create(DataInput dis) throws IOException {
+ JobId jobId = new JobId();
+ jobId.readFields(dis);
+ return jobId;
+ }
+
+ private JobId() {
+
+ }
public JobId(long id) {
this.id = id;
@@ -57,4 +71,14 @@
}
throw new IllegalArgumentException();
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeLong(id);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ id = input.readLong();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 128978b..19904dd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -75,6 +75,8 @@
private boolean useConnectorPolicyForScheduling;
+ private boolean reportTaskDetails;
+
private transient int operatorIdCounter;
private transient int connectorIdCounter;
@@ -98,7 +100,8 @@
operatorIdCounter = 0;
connectorIdCounter = 0;
maxReattempts = 2;
- useConnectorPolicyForScheduling = true;
+ useConnectorPolicyForScheduling = false;
+ reportTaskDetails = true;
setFrameSize(frameSize);
}
@@ -288,6 +291,14 @@
this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
}
+ public boolean isReportTaskDetails() {
+ return reportTaskDetails;
+ }
+
+ public void setReportTaskDetails(boolean reportTaskDetails) {
+ this.reportTaskDetails = reportTaskDetails;
+ }
+
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
index 2ff71d5..c7e01e6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
@@ -14,21 +14,35 @@
*/
package edu.uci.ics.hyracks.api.partitions;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.io.IWritable;
import edu.uci.ics.hyracks.api.job.JobId;
-public final class PartitionId implements Serializable {
+public final class PartitionId implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
+ private JobId jobId;
- private final ConnectorDescriptorId cdId;
+ private ConnectorDescriptorId cdId;
- private final int senderIndex;
+ private int senderIndex;
- private final int receiverIndex;
+ private int receiverIndex;
+
+ public static PartitionId create(DataInput dis) throws IOException {
+ PartitionId partitionId = new PartitionId();
+ partitionId.readFields(dis);
+ return partitionId;
+ }
+
+ private PartitionId() {
+
+ }
public PartitionId(JobId jobId, ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
this.jobId = jobId;
@@ -94,4 +108,20 @@
public String toString() {
return jobId.toString() + ":" + cdId + ":" + senderIndex + ":" + receiverIndex;
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ cdId.writeFields(output);
+ jobId.writeFields(output);
+ output.writeInt(receiverIndex);
+ output.writeInt(senderIndex);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ cdId = ConnectorDescriptorId.create(input);
+ jobId = JobId.create(input);
+ receiverIndex = input.readInt();
+ senderIndex = input.readInt();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
index da30e20..08284cc 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
@@ -22,6 +22,8 @@
public static final String MEMORY_USAGE = "heap-used-sizes";
+ public static final String MEMORY_MAX = "heap-max-sizes";
+
public static final String NETWORK_IO_READ = "net-payload-bytes-read";
public static final String NETWORK_IO_WRITE = "net-payload-bytes-written";
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
index c39cba7..62bb943 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.client.stats.impl;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
@@ -39,7 +40,7 @@
*/
public class ClientCounterContext implements IClusterCounterContext {
private static String[] RESET_COUNTERS = { Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
- Counters.MEMORY_USAGE, Counters.DISK_READ, Counters.DISK_WRITE, Counters.NUM_PROCESSOR };
+ Counters.MEMORY_USAGE, Counters.MEMORY_MAX, Counters.DISK_READ, Counters.DISK_WRITE, Counters.NUM_PROCESSOR };
private static String[] AGG_COUNTERS = { Counters.SYSTEM_LOAD };
private static int UPDATE_INTERVAL = 10000;
@@ -135,7 +136,7 @@
}
}
} catch (Exception e) {
- throw new IllegalStateException(e);
+ //ignore
}
}
@@ -173,16 +174,24 @@
} else if (counterObject instanceof JSONArray) {
JSONArray jArray = (JSONArray) counterObject;
Object[] values = jArray.toArray();
+ /**
+ * use the last non-zero value as the counter value
+ */
for (Object value : values) {
if (value instanceof Double) {
Double dValue = (Double) value;
- counterValue += dValue.doubleValue();
+ double currentVal = dValue.doubleValue();
+ if (currentVal != 0) {
+ counterValue = (long) currentVal;
+ }
} else if (value instanceof Long) {
Long lValue = (Long) value;
- counterValue += lValue.longValue();
+ long currentVal = lValue.longValue();
+ if (currentVal != 0) {
+ counterValue = lValue.longValue();
+ }
}
}
- counterValue /= values.length;
} else {
Long val = (Long) counterObject;
counterValue = val.longValue();
@@ -215,7 +224,11 @@
in.close();
return response.toString();
} catch (Exception e) {
- throw new IllegalStateException(e);
+ if (!(e instanceof java.net.ConnectException || e instanceof IOException)) {
+ throw new IllegalStateException(e);
+ } else {
+ return "";
+ }
}
}
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
index bbf212f..2ba2631 100644
--- a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
@@ -33,8 +33,9 @@
synchronized (this) {
wait(20000);
}
- String[] counters = { Counters.MEMORY_USAGE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
- Counters.SYSTEM_LOAD, Counters.NUM_PROCESSOR, Counters.DISK_READ, Counters.DISK_WRITE };
+ String[] counters = { Counters.MEMORY_USAGE, Counters.MEMORY_MAX, Counters.NETWORK_IO_READ,
+ Counters.NETWORK_IO_WRITE, Counters.SYSTEM_LOAD, Counters.NUM_PROCESSOR, Counters.DISK_READ,
+ Counters.DISK_WRITE };
for (String counterName : counters) {
ICounter counter = ccContext.getCounter(counterName, false);
System.out.println(counterName + ": " + counter.get());
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
index ffb1ace..c5cb7d0 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
@@ -95,9 +95,7 @@
}
ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
- for (int i = 0; i < nBuffers; ++i) {
- ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
- }
+ ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers, ctx.getFrameSize());
ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
writeBuffer.putLong(partitionId.getJobId().getId());
writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
index af1f8f6..b573b73 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
@@ -32,6 +32,10 @@
private boolean aborted;
+ private int frameSize = 32768;
+
+ private int allocateCounter = 0;
+
public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
this.ccb = ccb;
this.nBuffers = nBuffers;
@@ -40,9 +44,7 @@
}
public void setFrameSize(int frameSize) {
- for (int i = 0; i < nBuffers; ++i) {
- emptyStack.push(ByteBuffer.allocateDirect(frameSize));
- }
+ this.frameSize = frameSize;
}
@Override
@@ -58,6 +60,10 @@
throw new HyracksDataException("Connection has been aborted");
}
destBuffer = emptyStack.poll();
+ if (destBuffer == null && allocateCounter < nBuffers) {
+ destBuffer = ByteBuffer.allocateDirect(frameSize);
+ allocateCounter++;
+ }
if (destBuffer != null) {
break;
}
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java
new file mode 100644
index 0000000..c59398c
--- /dev/null
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.channels;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IBufferFactory;
+
+/**
+ * @author yingyib
+ */
+public class ReadBufferFactory implements IBufferFactory {
+
+ private final int limit;
+ private final int frameSize;
+ private int counter = 0;
+
+ public ReadBufferFactory(int limit, IHyracksCommonContext ctx) {
+ this.limit = limit;
+ this.frameSize = ctx.getFrameSize();
+ }
+
+ @Override
+ public ByteBuffer createBuffer() {
+ try {
+ if (counter >= limit) {
+ return null;
+ } else {
+ ByteBuffer frame = ByteBuffer.allocate(frameSize);
+ counter++;
+ return frame;
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index c994dfb..e0bc9e2 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -84,8 +84,8 @@
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
-import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
import edu.uci.ics.hyracks.control.common.logs.LogFile;
import edu.uci.ics.hyracks.control.common.work.IPCResponder;
import edu.uci.ics.hyracks.control.common.work.IResultCallback;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index ad4744b..dbd64a7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -30,11 +30,13 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
@@ -338,29 +340,45 @@
tcAttempt.initializePendingTaskCounter();
tcAttempts.add(tcAttempt);
- /* TODO - Further improvement for reducing messages -- not yet complete.
+ /**
+ * Improvement for reducing master/slave message communications, for each TaskAttemptDescriptor,
+ * we set the NetworkAddress[][] partitionLocations, in which each row is for an incoming connector descriptor
+ * and each column is for an input channel of the connector.
+ */
for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
List<TaskAttemptDescriptor> tads = e.getValue();
for (TaskAttemptDescriptor tad : tads) {
- TaskId tid = tad.getTaskAttemptId().getTaskId();
+ TaskAttemptId taid = tad.getTaskAttemptId();
+ int attempt = taid.getAttempt();
+ TaskId tid = taid.getTaskId();
ActivityId aid = tid.getActivityId();
- List<IConnectorDescriptor> inConnectors = jag.getActivityInputConnectorDescriptors(aid);
+ List<IConnectorDescriptor> inConnectors = acg.getActivityInputs(aid);
int[] inPartitionCounts = tad.getInputPartitionCounts();
- NetworkAddress[][] partitionLocations = new NetworkAddress[inPartitionCounts.length][];
- for (int i = 0; i < inPartitionCounts.length; ++i) {
- ConnectorDescriptorId cdId = inConnectors.get(i).getConnectorId();
- ActivityId producerAid = jag.getProducerActivity(cdId);
- partitionLocations[i] = new NetworkAddress[inPartitionCounts[i]];
- for (int j = 0; j < inPartitionCounts[i]; ++j) {
- TaskId producerTaskId = new TaskId(producerAid, j);
- String nodeId = findTaskLocation(producerTaskId);
- partitionLocations[i][j] = ccs.getNodeMap().get(nodeId).getDataPort();
+ if (inPartitionCounts != null) {
+ NetworkAddress[][] partitionLocations = new NetworkAddress[inPartitionCounts.length][];
+ for (int i = 0; i < inPartitionCounts.length; ++i) {
+ ConnectorDescriptorId cdId = inConnectors.get(i).getConnectorId();
+ IConnectorPolicy policy = jobRun.getConnectorPolicyMap().get(cdId);
+ /**
+ * carry sender location information into a task
+ * when it is not the case that it is an re-attempt and the send-side
+ * is materialized blocking.
+ */
+ if (!(attempt > 0 && policy.materializeOnSendSide() && policy
+ .consumerWaitsForProducerToFinish())) {
+ ActivityId producerAid = acg.getProducerActivity(cdId);
+ partitionLocations[i] = new NetworkAddress[inPartitionCounts[i]];
+ for (int j = 0; j < inPartitionCounts[i]; ++j) {
+ TaskId producerTaskId = new TaskId(producerAid, j);
+ String nodeId = findTaskLocation(producerTaskId);
+ partitionLocations[i][j] = ccs.getNodeMap().get(nodeId).getDataPort();
+ }
+ }
}
+ tad.setInputPartitionLocations(partitionLocations);
}
- tad.setInputPartitionLocations(partitionLocations);
}
}
- */
tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
tcAttempt.setStartTime(System.currentTimeMillis());
@@ -442,24 +460,25 @@
final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
jobRun.getConnectorPolicyMap());
- for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
- String nodeId = entry.getKey();
- final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
- final NodeControllerState node = ccs.getNodeMap().get(nodeId);
- if (node != null) {
- node.getActiveJobIds().add(jobRun.getJobId());
- boolean changed = jobRun.getParticipatingNodeIds().add(nodeId);
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
- }
- try {
- byte[] jagBytes = changed ? JavaSerializationUtils.serialize(acg) : null;
+ try {
+ byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+ for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
+ String nodeId = entry.getKey();
+ final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
+ final NodeControllerState node = ccs.getNodeMap().get(nodeId);
+ if (node != null) {
+ node.getActiveJobIds().add(jobRun.getJobId());
+ boolean changed = jobRun.getParticipatingNodeIds().add(nodeId);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
+ }
+ byte[] jagBytes = changed ? acgBytes : null;
node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
connectorPolicies, jobRun.getFlags());
- } catch (Exception e) {
- e.printStackTrace();
}
}
+ } catch (Exception e) {
+ throw new HyracksException(e);
}
}
@@ -702,10 +721,16 @@
ccs.getActiveRunMap().remove(jobId);
ccs.getRunMapArchive().put(jobId, run);
ccs.getRunHistory().put(jobId, run.getExceptions());
- try {
- ccs.getJobLogFile().log(createJobLogObject(run));
- } catch (Exception e) {
- throw new RuntimeException(e);
+
+ if (run.getActivityClusterGraph().isReportTaskDetails()) {
+ /**
+ * log job details when task-profiling is enabled
+ */
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 340134e..5c9aad5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -105,10 +105,16 @@
ccs.getActiveRunMap().remove(jobId);
ccs.getRunMapArchive().put(jobId, run);
ccs.getRunHistory().put(jobId, run.getExceptions());
- try {
- ccs.getJobLogFile().log(createJobLogObject(run));
- } catch (Exception e) {
- throw new RuntimeException(e);
+
+ if (run.getActivityClusterGraph().isReportTaskDetails()) {
+ /**
+ * log job details when profiling is enabled
+ */
+ try {
+ ccs.getJobLogFile().log(createJobLogObject(run));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 2d6bdea..9f1c4b2 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -18,11 +18,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.json.JSONException;
-import org.json.JSONObject;
-
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -71,24 +67,6 @@
ccs.getActiveRunMap().remove(jobId);
ccs.getRunMapArchive().put(jobId, run);
ccs.getRunHistory().put(jobId, run.getExceptions());
- try {
- ccs.getJobLogFile().log(createJobLogObject(run));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
}
}
-
- private JSONObject createJobLogObject(final JobRun run) {
- JSONObject jobLogObject = new JSONObject();
- try {
- ActivityClusterGraph acg = run.getActivityClusterGraph();
- jobLogObject.put("activity-cluster-graph", acg.toJSON());
- jobLogObject.put("job-run", run.toJSON());
- } catch (JSONException e) {
- throw new RuntimeException(e);
- }
- return jobLogObject;
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 01525e4..74e9710 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -49,6 +49,9 @@
@Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
public int nNetThreads = 1;
+ @Option(name = "-net-buffer-count", usage = "Number of network buffers per input/output channel (default:1)", required = false)
+ public int nNetBuffers = 1;
+
@Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
public int maxMemory = -1;
@@ -84,6 +87,8 @@
cList.add(ioDevices);
cList.add("-net-thread-count");
cList.add(String.valueOf(nNetThreads));
+ cList.add("-net-buffer-count");
+ cList.add(String.valueOf(nNetBuffers));
cList.add("-max-memory");
cList.add(String.valueOf(maxMemory));
cList.add("-result-time-to-live");
@@ -113,6 +118,7 @@
configuration.put("data-ip-address", dataIPAddress);
configuration.put("iodevices", ioDevices);
configuration.put("net-thread-count", String.valueOf(nNetThreads));
+ configuration.put("net-buffer-count", String.valueOf(nNetBuffers));
configuration.put("max-memory", String.valueOf(maxMemory));
configuration.put("result-time-to-live", String.valueOf(resultTTL));
configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold));
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 2407c10..a39a159 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -28,7 +28,7 @@
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -200,7 +200,7 @@
String filePath = deploymentDir + File.separator + fileName;
File targetFile = new File(filePath);
if (isNC) {
- HttpClient hc = new DefaultHttpClient();
+ HttpClient hc = HttpClientBuilder.create().build();
HttpGet get = new HttpGet(url.toString());
HttpResponse response = hc.execute(get);
InputStream is = response.getEntity().getContent();
@@ -216,6 +216,7 @@
}
return downloadedFileURLs;
} catch (Exception e) {
+ e.printStackTrace();
trace = e;
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index e999913..d574435 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -14,10 +14,11 @@
*/
package edu.uci.ics.hyracks.control.common.heartbeat;
-import java.io.Serializable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
-public class HeartbeatData implements Serializable {
- private static final long serialVersionUID = 1L;
+public class HeartbeatData {
public long heapInitSize;
public long heapUsedSize;
@@ -47,4 +48,83 @@
public long ipcMessageBytesReceived;
public long diskReads;
public long diskWrites;
+
+ public void readFields(DataInput dis) throws IOException {
+ heapInitSize = dis.readLong();
+ heapUsedSize = dis.readLong();
+ heapCommittedSize = dis.readLong();
+ heapMaxSize = dis.readLong();
+ nonheapInitSize = dis.readLong();
+ nonheapUsedSize = dis.readLong();
+ nonheapCommittedSize = dis.readLong();
+ nonheapMaxSize = dis.readLong();
+ threadCount = dis.readInt();
+ peakThreadCount = dis.readInt();
+ totalStartedThreadCount = dis.readLong();
+ systemLoadAverage = dis.readDouble();
+ netPayloadBytesRead = dis.readLong();
+ netPayloadBytesWritten = dis.readLong();
+ netSignalingBytesRead = dis.readLong();
+ netSignalingBytesWritten = dis.readLong();
+ netSignalingBytesWritten = dis.readLong();
+ datasetNetPayloadBytesWritten = dis.readLong();
+ datasetNetSignalingBytesRead = dis.readLong();
+ datasetNetSignalingBytesWritten = dis.readLong();
+ ipcMessagesSent = dis.readLong();
+ ipcMessageBytesSent = dis.readLong();
+ ipcMessagesReceived = dis.readLong();
+ ipcMessageBytesReceived = dis.readLong();
+ diskReads = dis.readLong();
+ diskWrites = dis.readLong();
+
+ int gcCounts = dis.readInt();
+ gcCollectionCounts = new long[gcCounts];
+ for (int i = 0; i < gcCollectionCounts.length; i++) {
+ gcCollectionCounts[i] = dis.readLong();
+ }
+ int gcTimeCounts = dis.readInt();
+ gcCollectionTimes = new long[gcTimeCounts];
+ for (int i = 0; i < gcCollectionTimes.length; i++) {
+ gcCollectionTimes[i] = dis.readLong();
+ }
+ }
+
+ public void write(DataOutput dos) throws IOException {
+ dos.writeLong(heapInitSize);
+ dos.writeLong(heapUsedSize);
+ dos.writeLong(heapCommittedSize);
+ dos.writeLong(heapMaxSize);
+ dos.writeLong(nonheapInitSize);
+ dos.writeLong(nonheapUsedSize);
+ dos.writeLong(nonheapCommittedSize);
+ dos.writeLong(nonheapMaxSize);
+ dos.writeInt(threadCount);
+ dos.writeInt(peakThreadCount);
+ dos.writeLong(totalStartedThreadCount);
+ dos.writeDouble(systemLoadAverage);
+ dos.writeLong(netPayloadBytesRead);
+ dos.writeLong(netPayloadBytesWritten);
+ dos.writeLong(netSignalingBytesRead);
+ dos.writeLong(netSignalingBytesWritten);
+ dos.writeLong(datasetNetPayloadBytesRead);
+ dos.writeLong(datasetNetPayloadBytesWritten);
+ dos.writeLong(datasetNetSignalingBytesRead);
+ dos.writeLong(datasetNetSignalingBytesWritten);
+ dos.writeLong(ipcMessagesSent);
+ dos.writeLong(ipcMessageBytesSent);
+ dos.writeLong(ipcMessagesReceived);
+ dos.writeLong(ipcMessageBytesReceived);
+ dos.writeLong(diskReads);
+ dos.writeLong(diskWrites);
+
+ dos.writeInt(gcCollectionCounts.length);
+ for (int i = 0; i < gcCollectionCounts.length; i++) {
+ dos.writeLong(gcCollectionCounts[i]);
+ }
+ dos.writeInt(gcCollectionTimes.length);
+ for (int i = 0; i < gcCollectionTimes.length; i++) {
+ dos.writeLong(gcCollectionTimes[i]);
+ }
+ }
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 6be2294..1417df9 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -23,9 +23,12 @@
import java.io.Serializable;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,6 +39,7 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.ConnectorPolicyFactory;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -176,10 +180,10 @@
public static class NotifyTaskCompleteFunction extends Function {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
- private final TaskAttemptId taskId;
- private final String nodeId;
- private final TaskProfile statistics;
+ private JobId jobId;
+ private TaskAttemptId taskId;
+ private String nodeId;
+ private TaskProfile statistics;
public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
this.jobId = jobId;
@@ -208,6 +212,26 @@
public TaskProfile getStatistics() {
return statistics;
}
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ JobId jobId = JobId.create(dis);
+ String nodeId = dis.readUTF();
+ TaskAttemptId taskId = TaskAttemptId.create(dis);
+ TaskProfile statistics = TaskProfile.create(dis);
+ return new NotifyTaskCompleteFunction(jobId, taskId, nodeId, statistics);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ NotifyTaskCompleteFunction fn = (NotifyTaskCompleteFunction) object;
+ DataOutputStream dos = new DataOutputStream(out);
+ fn.jobId.writeFields(dos);
+ dos.writeUTF(fn.nodeId);
+ fn.taskId.writeFields(dos);
+ fn.statistics.writeFields(dos);
+ }
}
public static class NotifyTaskFailureFunction extends Function {
@@ -270,6 +294,23 @@
public String getNodeId() {
return nodeId;
}
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ JobId jobId = JobId.create(dis);
+ String nodeId = dis.readUTF();
+
+ return new NotifyJobletCleanupFunction(jobId, nodeId);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ NotifyJobletCleanupFunction fn = (NotifyJobletCleanupFunction) object;
+ DataOutputStream dos = new DataOutputStream(out);
+ fn.jobId.writeFields(dos);
+ dos.writeUTF(fn.nodeId);
+ }
}
public static class NodeHeartbeatFunction extends Function {
@@ -295,6 +336,23 @@
public HeartbeatData getHeartbeatData() {
return hbData;
}
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ String nodeId = dis.readUTF();
+ HeartbeatData hbData = new HeartbeatData();
+ hbData.readFields(dis);
+ return new NodeHeartbeatFunction(nodeId, hbData);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ NodeHeartbeatFunction fn = (NodeHeartbeatFunction) object;
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeUTF(fn.nodeId);
+ fn.hbData.write(dos);
+ }
}
public static class ReportProfileFunction extends Function {
@@ -650,6 +708,90 @@
public EnumSet<JobFlag> getFlags() {
return flags;
}
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ //read jobId and taskId
+ JobId jobId = JobId.create(dis);
+ DeploymentId deploymentId = null;
+ boolean hasDeployed = dis.readBoolean();
+ if (hasDeployed) {
+ deploymentId = DeploymentId.create(dis);
+ }
+
+ // read plan bytes
+ int planBytesSize = dis.readInt();
+ byte[] planBytes = null;
+ if (planBytesSize >= 0) {
+ planBytes = new byte[planBytesSize];
+ dis.read(planBytes, 0, planBytesSize);
+ }
+
+ // read task attempt descriptors
+ int tadSize = dis.readInt();
+ List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<TaskAttemptDescriptor>();
+ for (int i = 0; i < tadSize; i++) {
+ TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
+ taskDescriptors.add(tad);
+ }
+
+ //read connector policies
+ int cpSize = dis.readInt();
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+ for (int i = 0; i < cpSize; i++) {
+ ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
+ IConnectorPolicy policy = ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
+ connectorPolicies.put(cid, policy);
+ }
+
+ // read flags
+ int flagSize = dis.readInt();
+ EnumSet<JobFlag> flags = EnumSet.noneOf(JobFlag.class);
+ for (int i = 0; i < flagSize; i++) {
+ flags.add(JobFlag.values()[(dis.readInt())]);
+ }
+
+ return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ StartTasksFunction fn = (StartTasksFunction) object;
+ DataOutputStream dos = new DataOutputStream(out);
+
+ //write jobId and deploymentId
+ fn.jobId.writeFields(dos);
+ dos.writeBoolean(fn.deploymentId == null ? false : true);
+ if (fn.deploymentId != null) {
+ fn.deploymentId.writeFields(dos);
+ }
+
+ //write plan bytes
+ dos.writeInt(fn.planBytes == null ? -1 : fn.planBytes.length);
+ if (fn.planBytes != null) {
+ dos.write(fn.planBytes, 0, fn.planBytes.length);
+ }
+
+ //write task descriptors
+ dos.writeInt(fn.taskDescriptors.size());
+ for (int i = 0; i < fn.taskDescriptors.size(); i++) {
+ fn.taskDescriptors.get(i).writeFields(dos);
+ }
+
+ //write connector policies
+ dos.writeInt(fn.connectorPolicies.size());
+ for (Entry<ConnectorDescriptorId, IConnectorPolicy> entry : fn.connectorPolicies.entrySet()) {
+ entry.getKey().writeFields(dos);
+ ConnectorPolicyFactory.INSTANCE.writeConnectorPolicy(entry.getValue(), dos);
+ }
+
+ //write flags
+ dos.writeInt(fn.flags.size());
+ for (JobFlag flag : fn.flags) {
+ dos.writeInt(flag.ordinal());
+ }
+ }
}
public static class AbortTasksFunction extends Function {
@@ -700,6 +842,23 @@
public JobStatus getStatus() {
return status;
}
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ JobId jobId = JobId.create(dis);
+ JobStatus status = JobStatus.values()[dis.readInt()];
+
+ return new CleanupJobletFunction(jobId, status);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ CleanupJobletFunction fn = (CleanupJobletFunction) object;
+ DataOutputStream dos = new DataOutputStream(out);
+ fn.jobId.writeFields(dos);
+ dos.writeInt(fn.status.ordinal());
+ }
}
public static class GetNodeControllersInfoFunction extends Function {
@@ -978,6 +1137,25 @@
case REPORT_PARTITION_AVAILABILITY:
ReportPartitionAvailabilityFunction.serialize(out, object);
return;
+
+ case NODE_HEARTBEAT:
+ NodeHeartbeatFunction.serialize(out, object);
+ return;
+
+ case START_TASKS:
+ StartTasksFunction.serialize(out, object);
+ return;
+
+ case NOTIFY_TASK_COMPLETE:
+ NotifyTaskCompleteFunction.serialize(out, object);
+ return;
+
+ case NOTIFY_JOBLET_CLEANUP:
+ NotifyJobletCleanupFunction.serialize(out, object);
+ return;
+ case CLEANUP_JOBLET:
+ CleanupJobletFunction.serialize(out, object);
+ return;
}
JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
}
@@ -992,6 +1170,21 @@
case REPORT_PARTITION_AVAILABILITY:
return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
+
+ case NODE_HEARTBEAT:
+ return NodeHeartbeatFunction.deserialize(buffer, length);
+
+ case START_TASKS:
+ return StartTasksFunction.deserialize(buffer, length);
+
+ case NOTIFY_TASK_COMPLETE:
+ return NotifyTaskCompleteFunction.deserialize(buffer, length);
+
+ case NOTIFY_JOBLET_CLEANUP:
+ return NotifyJobletCleanupFunction.deserialize(buffer, length);
+
+ case CLEANUP_JOBLET:
+ return CleanupJobletFunction.deserialize(buffer, length);
}
return javaSerde.deserializeObject(buffer, length);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
index 6018132..43b39d3 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
@@ -14,25 +14,39 @@
*/
package edu.uci.ics.hyracks.control.common.job;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.io.IWritable;
-public class TaskAttemptDescriptor implements Serializable {
+public class TaskAttemptDescriptor implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final TaskAttemptId taId;
+ private TaskAttemptId taId;
- private final int nPartitions;
+ private int nPartitions;
- private final int[] nInputPartitions;
+ private int[] nInputPartitions;
- private final int[] nOutputPartitions;
+ private int[] nOutputPartitions;
private NetworkAddress[][] inputPartitionLocations;
+ public static TaskAttemptDescriptor create(DataInput dis) throws IOException {
+ TaskAttemptDescriptor taskAttemptDescriptor = new TaskAttemptDescriptor();
+ taskAttemptDescriptor.readFields(dis);
+ return taskAttemptDescriptor;
+ }
+
+ private TaskAttemptDescriptor() {
+
+ }
+
public TaskAttemptDescriptor(TaskAttemptId taId, int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
this.taId = taId;
this.nPartitions = nPartitions;
@@ -70,4 +84,74 @@
+ Arrays.toString(nInputPartitions) + ", nOutputPartitions = " + Arrays.toString(nOutputPartitions)
+ "]";
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ taId.writeFields(output);
+ output.writeInt(nPartitions);
+
+ output.writeInt(nInputPartitions == null ? -1 : nInputPartitions.length);
+ if (nInputPartitions != null) {
+ for (int i = 0; i < nInputPartitions.length; i++) {
+ output.writeInt(nInputPartitions[i]);
+ }
+ }
+
+ output.writeInt(nOutputPartitions == null ? -1 : nOutputPartitions.length);
+ if (nOutputPartitions != null) {
+ for (int i = 0; i < nOutputPartitions.length; i++) {
+ output.writeInt(nOutputPartitions[i]);
+ }
+ }
+
+ output.writeInt(inputPartitionLocations == null ? -1 : inputPartitionLocations.length);
+ if (inputPartitionLocations != null) {
+ for (int i = 0; i < inputPartitionLocations.length; i++) {
+ if (inputPartitionLocations[i] != null) {
+ output.writeInt(inputPartitionLocations[i].length);
+ for (int j = 0; j < inputPartitionLocations[i].length; j++) {
+ inputPartitionLocations[i][j].writeFields(output);
+ }
+ } else {
+ output.writeInt(-1);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ taId = TaskAttemptId.create(input);
+ nPartitions = input.readInt();
+
+ int inputCount = input.readInt();
+ if (inputCount >= 0) {
+ nInputPartitions = new int[inputCount];
+ for (int i = 0; i < nInputPartitions.length; i++) {
+ nInputPartitions[i] = input.readInt();
+ }
+ }
+
+ int outputCount = input.readInt();
+ if (outputCount >= 0) {
+ nOutputPartitions = new int[outputCount];
+ for (int i = 0; i < nOutputPartitions.length; i++) {
+ nOutputPartitions[i] = input.readInt();
+ }
+ }
+
+ int addrCount = input.readInt();
+ if (addrCount >= 0) {
+ inputPartitionLocations = new NetworkAddress[addrCount][];
+ for (int i = 0; i < inputPartitionLocations.length; i++) {
+ int columns = input.readInt();
+ if (columns >= 0) {
+ inputPartitionLocations[i] = new NetworkAddress[columns];
+ for (int j = 0; j < columns; j++) {
+ inputPartitionLocations[i][j] = NetworkAddress.create(input);
+ }
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
index 2080718..b8b90cb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
@@ -14,12 +14,17 @@
*/
package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
-public class MultiResolutionEventProfiler implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public class MultiResolutionEventProfiler implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final int[] times;
+ private int[] times;
private long offset;
@@ -29,6 +34,16 @@
private int eventCounter;
+ public static MultiResolutionEventProfiler create(DataInput dis) throws IOException {
+ MultiResolutionEventProfiler multiResolutionEventProfiler = new MultiResolutionEventProfiler();
+ multiResolutionEventProfiler.readFields(dis);
+ return multiResolutionEventProfiler;
+ }
+
+ private MultiResolutionEventProfiler() {
+
+ }
+
public MultiResolutionEventProfiler(int nSamples) {
times = new int[nSamples];
offset = -1;
@@ -78,4 +93,29 @@
public long getOffset() {
return offset;
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeInt(eventCounter);
+ output.writeLong(offset);
+ output.writeInt(ptr);
+ output.writeInt(resolution);
+ output.writeInt(times.length);
+ for (int i = 0; i < times.length; i++) {
+ output.writeInt(times[i]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ eventCounter = input.readInt();
+ offset = input.readLong();
+ ptr = input.readInt();
+ resolution = input.readInt();
+ int nSamples = input.readInt();
+ times = new int[nSamples];
+ for (int i = 0; i < times.length; i++) {
+ times[i] = input.readInt();
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
index 2cb4191..12cd4b1 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
@@ -14,18 +14,24 @@
*/
package edu.uci.ics.hyracks.control.common.job.profiling.om;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-public abstract class AbstractProfile implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public abstract class AbstractProfile implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- protected final Map<String, Long> counters;
+ protected Map<String, Long> counters;
public AbstractProfile() {
counters = new HashMap<String, Long>();
@@ -51,4 +57,24 @@
protected void merge(AbstractProfile profile) {
counters.putAll(profile.counters);
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeInt(counters.size());
+ for (Entry<String, Long> entry : counters.entrySet()) {
+ output.writeUTF(entry.getKey());
+ output.writeLong(entry.getValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int size = input.readInt();
+ counters = new HashMap<String, Long>();
+ for (int i = 0; i < size; i++) {
+ String key = input.readUTF();
+ long value = input.readLong();
+ counters.put(key, value);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
index a941187..a3f7e41 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -14,8 +14,12 @@
*/
package edu.uci.ics.hyracks.control.common.job.profiling.om;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import org.json.JSONArray;
import org.json.JSONException;
@@ -26,9 +30,19 @@
public class JobProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
+ private JobId jobId;
- private final Map<String, JobletProfile> jobletProfiles;
+ private Map<String, JobletProfile> jobletProfiles;
+
+ public static JobProfile create(DataInput dis) throws IOException {
+ JobProfile jobProfile = new JobProfile();
+ jobProfile.readFields(dis);
+ return jobProfile;
+ }
+
+ private JobProfile() {
+
+ }
public JobProfile(JobId jobId) {
this.jobId = jobId;
@@ -68,4 +82,26 @@
}
}
}
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ jobId = JobId.create(input);
+ int size = input.readInt();
+ jobletProfiles = new HashMap<String, JobletProfile>();
+ for (int i = 0; i < size; i++) {
+ String key = input.readUTF();
+ JobletProfile value = JobletProfile.create(input);
+ jobletProfiles.put(key, value);
+ }
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ jobId.writeFields(output);
+ output.writeInt(jobletProfiles.size());
+ for (Entry<String, JobletProfile> entry : jobletProfiles.entrySet()) {
+ output.writeUTF(entry.getKey());
+ entry.getValue().writeFields(output);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
index 16d08d7..a879873 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
@@ -14,8 +14,12 @@
*/
package edu.uci.ics.hyracks.control.common.job.profiling.om;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import org.json.JSONArray;
import org.json.JSONException;
@@ -26,9 +30,19 @@
public class JobletProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
- private final String nodeId;
+ private String nodeId;
- private final Map<TaskAttemptId, TaskProfile> taskProfiles;
+ private Map<TaskAttemptId, TaskProfile> taskProfiles;
+
+ public static JobletProfile create(DataInput dis) throws IOException {
+ JobletProfile jobletProfile = new JobletProfile();
+ jobletProfile.readFields(dis);
+ return jobletProfile;
+ }
+
+ private JobletProfile() {
+
+ }
public JobletProfile(String nodeId) {
this.nodeId = nodeId;
@@ -68,4 +82,28 @@
}
}
}
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ nodeId = input.readUTF();
+ int size = input.readInt();
+ taskProfiles = new HashMap<TaskAttemptId, TaskProfile>();
+ for (int i = 0; i < size; i++) {
+ TaskAttemptId key = TaskAttemptId.create(input);
+ TaskProfile value = TaskProfile.create(input);
+ taskProfiles.put(key, value);
+ }
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ super.writeFields(output);
+ output.writeUTF(nodeId);
+ output.writeInt(taskProfiles.size());
+ for (Entry<TaskAttemptId, TaskProfile> entry : taskProfiles.entrySet()) {
+ entry.getKey().writeFields(output);
+ entry.getValue().writeFields(output);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
index a9cc979..3fc456d 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -14,21 +14,35 @@
*/
package edu.uci.ics.hyracks.control.common.job.profiling.om;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.io.IWritable;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
-public class PartitionProfile implements Serializable {
+public class PartitionProfile implements IWritable, Serializable {
private static final long serialVersionUID = 1L;
- private final PartitionId pid;
+ private PartitionId pid;
- private final long openTime;
+ private long openTime;
- private final long closeTime;
+ private long closeTime;
- private final MultiResolutionEventProfiler mrep;
+ private MultiResolutionEventProfiler mrep;
+
+ public static PartitionProfile create(DataInput dis) throws IOException {
+ PartitionProfile partitionProfile = new PartitionProfile();
+ partitionProfile.readFields(dis);
+ return partitionProfile;
+ }
+
+ private PartitionProfile() {
+
+ }
public PartitionProfile(PartitionId pid, long openTime, long closeTime, MultiResolutionEventProfiler mrep) {
this.pid = pid;
@@ -52,4 +66,20 @@
public MultiResolutionEventProfiler getSamples() {
return mrep;
}
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeLong(closeTime);
+ output.writeLong(openTime);
+ mrep.writeFields(output);
+ pid.writeFields(output);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ closeTime = input.readLong();
+ openTime = input.readLong();
+ mrep = MultiResolutionEventProfiler.create(input);
+ pid = PartitionId.create(input);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
index 6918af4..8774a50 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -14,8 +14,12 @@
*/
package edu.uci.ics.hyracks.control.common.job.profiling.om;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import org.json.JSONArray;
import org.json.JSONException;
@@ -28,9 +32,19 @@
public class TaskProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
- private final TaskAttemptId taskAttemptId;
+ private TaskAttemptId taskAttemptId;
- private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+ private Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+ public static TaskProfile create(DataInput dis) throws IOException {
+ TaskProfile taskProfile = new TaskProfile();
+ taskProfile.readFields(dis);
+ return taskProfile;
+ }
+
+ private TaskProfile() {
+
+ }
public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile) {
this.taskAttemptId = taskAttemptId;
@@ -84,4 +98,28 @@
return json;
}
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ taskAttemptId = TaskAttemptId.create(input);
+ int size = input.readInt();
+ partitionSendProfile = new HashMap<PartitionId, PartitionProfile>();
+ for (int i = 0; i < size; i++) {
+ PartitionId key = PartitionId.create(input);
+ PartitionProfile value = PartitionProfile.create(input);
+ partitionSendProfile.put(key, value);
+ }
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ super.writeFields(output);
+ taskAttemptId.writeFields(output);
+ output.writeInt(partitionSendProfile.size());
+ for (Entry<PartitionId, PartitionProfile> entry : partitionSendProfile.entrySet()) {
+ entry.getKey().writeFields(output);
+ entry.getValue().writeFields(output);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 56b6654..89c5b75 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -267,6 +267,10 @@
return globalJobData;
}
+ public IJobletEventListener getJobletEventListener() {
+ return jobletEventListener;
+ }
+
public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
IPartitionCollector collector, PartitionState minState) throws Exception {
for (PartitionId pid : pids) {
@@ -283,10 +287,6 @@
}
}
- public IJobletEventListener getJobletEventListener() {
- return jobletEventListener;
- }
-
public void cleanup(JobStatus status) {
cleanupStatus = status;
cleanupPending = true;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f0b570e..5eec7bb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -168,7 +168,8 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
+ netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads,
+ ncConfig.nNetBuffers);
lccm = new LifeCycleComponentManager();
queue = new WorkQueue();
@@ -243,7 +244,7 @@
datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
ncConfig.resultTTL, ncConfig.resultSweepThreshold);
datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
- datasetPartitionManager, ncConfig.nNetThreads);
+ datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index fa9b6b3..3014024 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -89,7 +90,10 @@
private NodeControllerService ncs;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs) {
+ private List<List<PartitionChannel>> inputChannelsFromConnectors;
+
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs,
+ List<List<PartitionChannel>> inputChannelsFromConnectors) {
this.joblet = joblet;
this.taskAttemptId = taskId;
this.displayName = displayName;
@@ -102,6 +106,7 @@
pendingThreads = new LinkedHashSet<Thread>();
exceptions = new ArrayList<>();
this.ncs = ncs;
+ this.inputChannelsFromConnectors = inputChannelsFromConnectors;
}
public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -113,7 +118,7 @@
public ByteBuffer allocateFrame() throws HyracksDataException {
return joblet.allocateFrame();
}
-
+
@Override
public void deallocateFrames(int frameCount) {
joblet.deallocateFrames(frameCount);
@@ -242,7 +247,7 @@
final int cIdx = i;
executor.execute(new Runnable() {
@Override
- public void run() {
+ public void run() {
if (aborted) {
return;
}
@@ -252,7 +257,7 @@
thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
thread.setPriority(Thread.MIN_PRIORITY);
try {
- pushFrames(collector, writer);
+ pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
} catch (HyracksDataException e) {
synchronized (Task.this) {
exceptions.add(e);
@@ -266,7 +271,7 @@
});
}
try {
- pushFrames(collectors[0], operator.getInputFrameWriter(0));
+ pushFrames(collectors[0], inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
} finally {
sem.acquire(collectors.length - 1);
}
@@ -293,15 +298,20 @@
}
}
- private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {
+ private void pushFrames(IPartitionCollector collector, List<PartitionChannel> inputChannels, IFrameWriter writer)
+ throws HyracksDataException {
if (aborted) {
return;
}
try {
collector.open();
try {
- joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
- PartitionState.STARTED);
+ if (inputChannels.size() <= 0) {
+ joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
+ PartitionState.STARTED);
+ } else {
+ collector.addPartitions(inputChannels);
+ }
IFrameReader reader = collector.getReader();
reader.open();
try {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
index 1e8baa1..804f61d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
@@ -16,17 +16,18 @@
package edu.uci.ics.hyracks.control.nc.io.profiling;
import java.io.BufferedReader;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
public class IOCounterLinux implements IIOCounter {
public static final String COMMAND = "iostat";
- public static final String COMMAND2 = "cat /proc/self/io";
+ public static final String STATFILE = "/proc/self/io";
public static final int PAGE_SIZE = 4096;
- private final long baseReads;
- private final long baseWrites;
+ private long baseReads = 0;
+ private long baseWrites = 0;
public IOCounterLinux() {
baseReads = getReads();
@@ -36,12 +37,12 @@
@Override
public long getReads() {
try {
- long reads = extractColumn(4);
- return reads - baseReads;
+ long reads = extractRow(4);
+ return reads;
} catch (IOException e) {
try {
- long reads = extractRow(4);
- return reads / PAGE_SIZE;
+ long reads = extractColumn(4) * PAGE_SIZE;
+ return reads - baseReads;
} catch (IOException e2) {
return 0;
}
@@ -51,13 +52,13 @@
@Override
public long getWrites() {
try {
- long writes = extractColumn(5);
- return writes - baseWrites;
+ long writes = extractRow(5);
+ long cancelledWrites = extractRow(6);
+ return (writes - cancelledWrites);
} catch (IOException e) {
try {
- long writes = extractRow(5);
- long cancelledWrites = extractRow(6);
- return (writes - cancelledWrites) / PAGE_SIZE;
+ long writes = extractColumn(5) * PAGE_SIZE;
+ return writes - baseWrites;
} catch (IOException e2) {
return 0;
}
@@ -92,7 +93,7 @@
}
private long extractRow(int rowIndex) throws IOException {
- BufferedReader reader = exec(COMMAND2);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(STATFILE)));
String line = null;
long ios = 0;
int i = 0;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
index 130c967..348a37c5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -48,11 +48,14 @@
private final MuxDemux md;
+ private final int nBuffers;
+
private NetworkAddress networkAddress;
- public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads)
- throws IOException {
+ public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads,
+ int nBuffers) throws IOException {
this.partitionManager = partitionManager;
+ this.nBuffers = nBuffers;
md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
MAX_CONNECTION_ATTEMPTS);
}
@@ -102,7 +105,7 @@
LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
+ partition + " on channel: " + ccb);
}
- noc = new NetworkOutputChannel(ccb, 1);
+ noc = new NetworkOutputChannel(ccb, nBuffers);
try {
partitionManager.initializeDatasetPartitionReader(jobId, rsId, partition, noc);
} catch (HyracksException e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 4d9cd22..8791aa1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -47,12 +47,16 @@
private final PartitionManager partitionManager;
+ private final int nBuffers;
+
private final MuxDemux md;
private NetworkAddress networkAddress;
- public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads) throws IOException {
+ public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads, int nBuffers)
+ throws IOException {
this.partitionManager = partitionManager;
+ this.nBuffers = nBuffers;
md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
MAX_CONNECTION_ATTEMPTS);
}
@@ -99,10 +103,11 @@
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Received initial partition request: " + pid + " on channel: " + ccb);
}
- noc = new NetworkOutputChannel(ccb, 1);
+ noc = new NetworkOutputChannel(ccb, nBuffers);
try {
partitionManager.registerPartitionRequest(pid, noc);
} catch (HyracksException e) {
+ e.printStackTrace();
noc.abort();
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
index 433c45a..6c4570d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -50,7 +50,9 @@
@Override
public void deallocate() {
- partitionFile.delete();
+ if (partitionFile != null) {
+ partitionFile.delete();
+ }
}
@Override
@@ -59,6 +61,11 @@
@Override
public void run() {
try {
+ if (partitionFile == null) {
+ writer.open();
+ writer.close();
+ return;
+ }
IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
try {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 0e9005a..b94d714 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -65,15 +65,17 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + pid + " by " + taId);
}
- fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
- handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- size = 0;
failed = false;
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (handle == null) {
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ }
size += ctx.getIOManager().syncWrite(handle, size, buffer);
}
@@ -87,11 +89,14 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("close(" + pid + " by " + taId);
}
- ctx.getIOManager().close(handle);
+ if (handle != null) {
+ ctx.getIOManager().close(handle);
+ }
if (!failed) {
manager.registerPartition(pid, taId,
new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
- PartitionState.COMMITTED);
+ PartitionState.COMMITTED, taId.getAttempt() == 0 ? false : true);
+
}
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 0e63485..6393979 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -73,7 +73,9 @@
@Override
public void deallocate() {
- fRef.delete();
+ if (fRef != null) {
+ fRef.delete();
+ }
}
@Override
@@ -82,47 +84,56 @@
@Override
public void run() {
try {
- IFileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ synchronized (MaterializingPipelinedPartition.this) {
+ while (fRef == null && eos == false) {
+ MaterializingPipelinedPartition.this.wait();
+ }
+ }
+ IFileHandle fh = fRef == null ? null : ioManager.open(fRef,
+ IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
try {
writer.open();
try {
- long offset = 0;
- ByteBuffer buffer = ctx.allocateFrame();
- boolean fail = false;
- boolean done = false;
- while (!fail && !done) {
- synchronized (MaterializingPipelinedPartition.this) {
- while (offset >= size && !eos && !failed) {
- try {
- MaterializingPipelinedPartition.this.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ if (fh != null) {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ boolean fail = false;
+ boolean done = false;
+ while (!fail && !done) {
+ synchronized (MaterializingPipelinedPartition.this) {
+ while (offset >= size && !eos && !failed) {
+ try {
+ MaterializingPipelinedPartition.this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
}
+ fail = failed;
+ done = eos && offset >= size;
}
- fail = failed;
- done = eos && offset >= size;
- }
- if (fail) {
- writer.fail();
- } else if (!done) {
- buffer.clear();
- long readLen = ioManager.syncRead(fh, offset, buffer);
- if (readLen < buffer.capacity()) {
- throw new HyracksDataException("Premature end of file");
+ if (fail) {
+ writer.fail();
+ } else if (!done) {
+ buffer.clear();
+ long readLen = ioManager.syncRead(fh, offset, buffer);
+ if (readLen < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += readLen;
+ buffer.flip();
+ writer.nextFrame(buffer);
}
- offset += readLen;
- buffer.flip();
- writer.nextFrame(buffer);
}
}
} finally {
writer.close();
}
} finally {
- ioManager.close(fh);
+ if (fh != null) {
+ ioManager.close(fh);
+ }
}
- } catch (HyracksDataException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -139,17 +150,23 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + pid + " by " + taId);
}
- fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(":", "$"));
- handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
size = 0;
eos = false;
failed = false;
- manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+ manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
+ }
+
+ private void checkOrCreateFile() throws HyracksDataException {
+ if (fRef == null) {
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(":", "$"));
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
}
@Override
public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ checkOrCreateFile();
size += ctx.getIOManager().syncWrite(handle, size, buffer);
notifyAll();
}
@@ -165,16 +182,13 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("close(" + pid + " by " + taId);
}
- boolean commit = false;
synchronized (this) {
eos = true;
- ctx.getIOManager().close(handle);
+ if (handle != null) {
+ ctx.getIOManager().close(handle);
+ }
handle = null;
- commit = !failed;
notifyAll();
}
- if (commit) {
- manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
- }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index ea966c7..b209cc1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -37,51 +37,68 @@
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class PartitionManager {
+
private final NodeControllerService ncs;
- private final Map<PartitionId, List<IPartition>> partitionMap;
+ private final Map<PartitionId, List<IPartition>> availablePartitionMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
private final IWorkspaceFileFactory fileFactory;
+ private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<PartitionId, NetworkOutputChannel>();
+
public PartitionManager(NodeControllerService ncs) {
this.ncs = ncs;
- partitionMap = new HashMap<PartitionId, List<IPartition>>();
- deallocatableRegistry = new DefaultDeallocatableRegistry();
- fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+ this.availablePartitionMap = new HashMap<PartitionId, List<IPartition>>();
+ this.deallocatableRegistry = new DefaultDeallocatableRegistry();
+ this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext()
+ .getIOManager());
}
- public void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
- throws HyracksDataException {
- synchronized (this) {
- List<IPartition> pList = partitionMap.get(pid);
+ public synchronized void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition,
+ PartitionState state, boolean updateToCC) throws HyracksDataException {
+ try {
+ /**
+ * process pending requests
+ */
+ NetworkOutputChannel writer = partitionRequests.remove(pid);
+ if (writer != null) {
+ writer.setFrameSize(partition.getTaskContext().getFrameSize());
+ partition.writeTo(writer);
+ if (!partition.isReusable()) {
+ return;
+ }
+ }
+
+ /**
+ * put a coming available partition into the available partition map
+ */
+ List<IPartition> pList = availablePartitionMap.get(pid);
if (pList == null) {
pList = new ArrayList<IPartition>();
- partitionMap.put(pid, pList);
+ availablePartitionMap.put(pid, pList);
}
pList.add(partition);
- }
- updatePartitionState(pid, taId, partition, state);
- }
- public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
- throws HyracksDataException {
- PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
- desc.setState(state);
- try {
- ncs.getClusterController().registerPartitionProvider(desc);
+ /**
+ * update to CC only when necessary
+ */
+ if (updateToCC) {
+ updatePartitionState(pid, taId, partition, state);
+ }
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
public synchronized IPartition getPartition(PartitionId pid) {
- return partitionMap.get(pid).get(0);
+ return availablePartitionMap.get(pid).get(0);
}
public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
- for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
+ for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
+ .hasNext();) {
Map.Entry<PartitionId, List<IPartition>> e = i.next();
PartitionId pid = e.getKey();
if (jobId.equals(pid.getJobId())) {
@@ -95,16 +112,21 @@
public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
throws HyracksException {
- List<IPartition> pList = partitionMap.get(partitionId);
- if (pList != null && !pList.isEmpty()) {
- IPartition partition = pList.get(0);
- writer.setFrameSize(partition.getTaskContext().getFrameSize());
- partition.writeTo(writer);
- if (!partition.isReusable()) {
- partitionMap.remove(partitionId);
+ try {
+ List<IPartition> pList = availablePartitionMap.get(partitionId);
+ if (pList != null && !pList.isEmpty()) {
+ IPartition partition = pList.get(0);
+ writer.setFrameSize(partition.getTaskContext().getFrameSize());
+ partition.writeTo(writer);
+ if (!partition.isReusable()) {
+ availablePartitionMap.remove(partitionId);
+ }
+ } else {
+ //throw new HyracksException("Request for unknown partition " + partitionId);
+ partitionRequests.put(partitionId, writer);
}
- } else {
- throw new HyracksException("Request for unknown partition " + partitionId);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
}
}
@@ -115,4 +137,15 @@
public void close() {
deallocatableRegistry.close();
}
+
+ public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ throws HyracksDataException {
+ PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
+ desc.setState(state);
+ try {
+ ncs.getClusterController().registerPartitionProvider(desc);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 76345bc..c1abdc3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -69,7 +69,7 @@
@Override
public void open() throws HyracksDataException {
- manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+ manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
failed = false;
pendingConnection = true;
}
@@ -108,7 +108,6 @@
public void close() throws HyracksDataException {
if (!failed) {
ensureConnected();
- manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
delegate.close();
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index b9ee504..1be5fc6 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -14,6 +14,9 @@
*/
package edu.uci.ics.hyracks.control.nc.work;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -25,6 +28,8 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -43,6 +48,7 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.NetworkInputChannel;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -89,8 +95,7 @@
public void run() {
try {
NCApplicationContext appCtx = ncs.getApplicationContext();
- final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes == null ? null
- : (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx));
+ final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes);
final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@@ -119,12 +124,13 @@
LOGGER.info("Initializing " + taId + " -> " + han);
}
final int partition = tid.getPartition();
- Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs);
+ List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
+ Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
+ createInputChannels(td, inputs));
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
- List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
if (inputs != null) {
for (int i = 0; i < inputs.size(); ++i) {
IConnectorDescriptor conn = inputs.get(i);
@@ -169,13 +175,15 @@
}
private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext appCtx,
- ActivityClusterGraph acg) throws Exception {
+ byte[] acgBytes) throws Exception {
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
- if (acg == null) {
+ if (acgBytes == null) {
throw new NullPointerException("JobActivityGraph was null");
}
+ ActivityClusterGraph acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId,
+ appCtx);
ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
jobletMap.put(jobId, ji);
}
@@ -231,4 +239,38 @@
}
return factory;
}
+
+ /**
+ * Create a list of known channels for each input connector
+ *
+ * @param td
+ * the task attempt id
+ * @param inputs
+ * the input connector descriptors
+ * @return a list of known channels, one for each connector
+ * @throws UnknownHostException
+ */
+ private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td, List<IConnectorDescriptor> inputs)
+ throws UnknownHostException {
+ NetworkAddress[][] inputAddresses = td.getInputPartitionLocations();
+ List<List<PartitionChannel>> channelsForInputConnectors = new ArrayList<List<PartitionChannel>>();
+ if (inputAddresses != null) {
+ for (int i = 0; i < inputAddresses.length; i++) {
+ List<PartitionChannel> channels = new ArrayList<PartitionChannel>();
+ if (inputAddresses[i] != null) {
+ for (int j = 0; j < inputAddresses[i].length; j++) {
+ NetworkAddress networkAddress = inputAddresses[i][j];
+ PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td
+ .getTaskAttemptId().getTaskId().getPartition());
+ PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(
+ ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+ .getIpAddress()), networkAddress.getPort()), pid, 5));
+ channels.add(channel);
+ }
+ }
+ channelsForInputConnectors.add(channels);
+ }
+ }
+ return channelsForInputConnectors;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
index 1b26c00..f894e38 100644
--- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
@@ -15,11 +15,9 @@
package edu.uci.ics.hyracks.data.std.util;
import java.io.ByteArrayOutputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.Arrays;
public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream {
- private static final Logger LOGGER = Logger.getLogger(ByteArrayAccessibleOutputStream.class.getName());
public ByteArrayAccessibleOutputStream() {
super();
@@ -34,17 +32,45 @@
}
public void write(int b) {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("write(byte) value: " + b);
- }
- super.write(b);
+ ensureCapacity(count + 1);
+ buf[count] = (byte) b;
+ count += 1;
}
@Override
- public void write(byte[] bytes, int offset, int length) {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("write(byte[], int, int) offset: " + offset + " length" + length);
+ public void write(byte[] b, int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
+ throw new IndexOutOfBoundsException();
}
- super.write(bytes, offset, length);
+ ensureCapacity(count + len);
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ private void ensureCapacity(int minCapacity) {
+ // overflow-conscious code
+ if (minCapacity - buf.length > 0)
+ grow(minCapacity);
+ }
+
+ /**
+ * Increases the capacity to ensure that it can hold at least the
+ * number of elements specified by the minimum capacity argument.
+ *
+ * @param minCapacity
+ * the desired minimum capacity
+ */
+ private void grow(int minCapacity) {
+ // overflow-conscious code
+ int oldCapacity = buf.length;
+ int newCapacity = oldCapacity << 1;
+ if (newCapacity - minCapacity < 0)
+ newCapacity = minCapacity;
+ if (newCapacity < 0) {
+ if (minCapacity < 0) // overflow
+ throw new OutOfMemoryError();
+ newCapacity = Integer.MAX_VALUE;
+ }
+ buf = Arrays.copyOf(buf, newCapacity);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/ByteBufferInputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/ByteBufferInputStream.java
index 9cdd692..eee4758 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/ByteBufferInputStream.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/ByteBufferInputStream.java
@@ -16,11 +16,8 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
public class ByteBufferInputStream extends InputStream {
- private static final Logger LOGGER = Logger.getLogger(ByteBufferInputStream.class.getName());
private ByteBuffer buffer;
@@ -37,20 +34,13 @@
@Override
public int read() {
int remaining = buffer.capacity() - position;
- int value = remaining > 0 ? (buffer.get(position++) & 0xff) : -1;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
- }
+ int value = remaining > 0 ? (buffer.array()[position++] & 0xff) : -1;
return value;
}
@Override
public int read(byte[] bytes, int offset, int length) {
int remaining = buffer.capacity() - position;
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
- + length + " position: " + position);
- }
if (remaining == 0) {
return -1;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index a1d24e7..ea586fc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -32,6 +32,8 @@
private final FrameTupleAppender[] appenders;
private final FrameTupleAccessor tupleAccessor;
private final ITuplePartitionComputer tpc;
+ private final IHyracksTaskContext ctx;
+ private boolean allocated = false;
public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory,
RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
@@ -42,20 +44,22 @@
try {
pWriters[i] = pwFactory.createFrameWriter(i);
appenders[i] = new FrameTupleAppender(ctx.getFrameSize());
- appenders[i].reset(ctx.allocateFrame(), true);
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
this.tpc = tpc;
+ this.ctx = ctx;
}
@Override
public void close() throws HyracksDataException {
for (int i = 0; i < pWriters.length; ++i) {
- if (appenders[i].getTupleCount() > 0) {
- flushFrame(appenders[i].getBuffer(), pWriters[i]);
+ if (allocated) {
+ if (appenders[i].getTupleCount() > 0) {
+ flushFrame(appenders[i].getBuffer(), pWriters[i]);
+ }
}
pWriters[i].close();
}
@@ -71,12 +75,15 @@
public void open() throws HyracksDataException {
for (int i = 0; i < pWriters.length; ++i) {
pWriters[i].open();
- appenders[i].reset(appenders[i].getBuffer(), true);
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (!allocated) {
+ allocateFrames();
+ allocated = true;
+ }
tupleAccessor.reset(buffer);
int tupleCount = tupleAccessor.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
@@ -87,12 +94,23 @@
flushFrame(appenderBuffer, pWriters[h]);
appender.reset(appenderBuffer, true);
if (!appender.append(tupleAccessor, i)) {
- throw new HyracksDataException("Record size (" + (tupleAccessor.getTupleEndOffset(i) - tupleAccessor.getTupleStartOffset(i)) + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+ throw new HyracksDataException("Record size ("
+ + (tupleAccessor.getTupleEndOffset(i) - tupleAccessor.getTupleStartOffset(i))
+ + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
}
}
}
}
+ /**
+ * @throws HyracksDataException
+ */
+ private void allocateFrames() throws HyracksDataException {
+ for (int i = 0; i < appenders.length; ++i) {
+ appenders[i].reset(ctx.allocateFrame(), true);
+ }
+ }
+
@Override
public void fail() throws HyracksDataException {
for (int i = 0; i < appenders.length; ++i) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index ba9ff49..114463f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -41,6 +41,7 @@
private final ByteBuffer outFrame;
private final FrameTupleAppender appender;
private final ArrayTupleBuilder tupleBuilder;
+ private boolean outputPartial = false;
private boolean first;
@@ -48,6 +49,13 @@
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
+ IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
+ this(ctx, groupFields, comparators, aggregator, inRecordDesc, outRecordDesc, writer);
+ this.outputPartial = outputPartial;
+ }
+
+ public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+ IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
IFrameWriter writer) throws HyracksDataException {
this.groupFields = groupFields;
this.comparators = comparators;
@@ -121,10 +129,13 @@
for (int j = 0; j < groupFields.length; j++) {
tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
}
- boolean hasOutput = aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+ boolean hasOutput = outputPartial ? aggregator.outputPartialResult(tupleBuilder, lastTupleAccessor,
+ lastTupleIndex, aggregateState) : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
+ lastTupleIndex, aggregateState);
- if (hasOutput && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
+ if (hasOutput
+ && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
FrameUtils.flushFrame(outFrame, writer);
appender.reset(outFrame, true);
if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
new file mode 100644
index 0000000..2a28dea
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.sort;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterMergeSort;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterQuickSort;
+import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
+
+/**
+ * Group-by aggregation is pushed before run file generation.
+ *
+ * @author yingyib
+ */
+public class ExternalSortGroupByRunGenerator implements IFrameWriter {
+ private final IHyracksTaskContext ctx;
+ private final IFrameSorter frameSorter;
+ private final List<IFrameReader> runs;
+ private final int maxSortFrames;
+
+ private final int[] groupFields;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final RecordDescriptor inRecordDesc;
+ private final RecordDescriptor outRecordDesc;
+
+ public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor recordDesc,
+ int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException {
+ this.ctx = ctx;
+ if (alg == Algorithm.MERGE_SORT) {
+ frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+ recordDesc);
+ } else {
+ frameSorter = new FrameSorterQuickSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+ recordDesc);
+ }
+ this.runs = new LinkedList<IFrameReader>();
+ this.maxSortFrames = framesLimit - 1;
+ this.groupFields = groupFields;
+ this.comparatorFactories = comparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ this.inRecordDesc = recordDesc;
+ this.outRecordDesc = outRecordDesc;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ runs.clear();
+ frameSorter.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (frameSorter.getFrameCount() >= maxSortFrames) {
+ flushFramesToRun();
+ }
+ frameSorter.insertFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (frameSorter.getFrameCount() > 0) {
+ if (runs.size() <= 0) {
+ frameSorter.sortFrames();
+ } else {
+ flushFramesToRun();
+ }
+ }
+ }
+
+ private void flushFramesToRun() throws HyracksDataException {
+ frameSorter.sortFrames();
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalSortGroupByRunGenerator.class.getSimpleName());
+ RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
+
+ //create group-by comparators
+ IBinaryComparator[] comparators = new IBinaryComparator[Math
+ .min(groupFields.length, comparatorFactories.length)];
+ for (int i = 0; i < comparators.length; i++) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc,
+ groupFields, groupFields, writer);
+ PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator,
+ this.inRecordDesc, this.outRecordDesc, writer, true);
+ pgw.open();
+
+ try {
+ frameSorter.flushFrames(pgw);
+ } finally {
+ pgw.close();
+ }
+ frameSorter.reset();
+ runs.add(writer.createReader());
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ public IFrameSorter getFrameSorter() {
+ return frameSorter;
+ }
+
+ public List<IFrameReader> getRuns() {
+ return runs;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
new file mode 100644
index 0000000..1f9b358
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+/**
+ * Group-by aggregation is pushed into multi-pass merge of external sort.
+ *
+ * @author yingyib
+ */
+public class ExternalSortGroupByRunMerger {
+
+ private final IHyracksTaskContext ctx;
+ private final List<IFrameReader> runs;
+ private final RecordDescriptor inputRecordDesc;
+ private final RecordDescriptor partialAggRecordDesc;
+ private final RecordDescriptor outRecordDesc;
+ private final int framesLimit;
+ private final IFrameWriter writer;
+ private List<ByteBuffer> inFrames;
+ private ByteBuffer outFrame;
+ private FrameTupleAppender outFrameAppender;
+
+ private IFrameSorter frameSorter; // Used in External sort, no replacement
+ // selection
+
+ private final int[] groupFields;
+ private final INormalizedKeyComputer firstKeyNkc;
+ private final IBinaryComparator[] comparators;
+ private final IAggregatorDescriptorFactory mergeAggregatorFactory;
+ private final IAggregatorDescriptorFactory partialAggregatorFactory;
+ private final boolean localSide;
+
+ private final int[] mergeSortFields;
+ private final int[] mergeGroupFields;
+ private IBinaryComparator[] groupByComparators;
+
+ // Constructor for external sort, no replacement selection
+ public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
+ int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc,
+ RecordDescriptor outRecordDesc, int framesLimit, IFrameWriter writer, int[] groupFields,
+ INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
+ IAggregatorDescriptorFactory partialAggregatorFactory, IAggregatorDescriptorFactory aggregatorFactory,
+ boolean localStage) {
+ this.ctx = ctx;
+ this.frameSorter = frameSorter;
+ this.runs = new LinkedList<IFrameReader>(runs);
+ this.inputRecordDesc = inRecordDesc;
+ this.partialAggRecordDesc = partialAggRecordDesc;
+ this.outRecordDesc = outRecordDesc;
+ this.framesLimit = framesLimit;
+ this.writer = writer;
+
+ this.groupFields = groupFields;
+ this.firstKeyNkc = nmk;
+ this.comparators = comparators;
+ this.mergeAggregatorFactory = aggregatorFactory;
+ this.partialAggregatorFactory = partialAggregatorFactory;
+ this.localSide = localStage;
+
+ //create merge sort fields
+ int numSortFields = sortFields.length;
+ mergeSortFields = new int[numSortFields];
+ for (int i = 0; i < numSortFields; i++) {
+ mergeSortFields[i] = i;
+ }
+
+ //create merge group fields
+ int numGroupFields = groupFields.length;
+ mergeGroupFields = new int[numGroupFields];
+ for (int i = 0; i < numGroupFields; i++) {
+ mergeGroupFields[i] = i;
+ }
+
+ //setup comparators for grouping
+ groupByComparators = new IBinaryComparator[Math.min(mergeGroupFields.length, comparators.length)];
+ for (int i = 0; i < groupByComparators.length; i++) {
+ groupByComparators[i] = comparators[i];
+ }
+ }
+
+ public void process() throws HyracksDataException {
+ IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory;
+ IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, partialAggRecordDesc, outRecordDesc,
+ groupFields, groupFields, writer);
+ PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, groupByComparators, aggregator,
+ inputRecordDesc, outRecordDesc, writer, false);
+ try {
+ if (runs.size() <= 0) {
+ pgw.open();
+ if (frameSorter != null && frameSorter.getFrameCount() > 0) {
+ frameSorter.flushFrames(pgw);
+ }
+ /** recycle sort buffer */
+ frameSorter.close();
+ } else {
+ /** recycle sort buffer */
+ frameSorter.close();
+
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.allocateFrame();
+ outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrameAppender.reset(outFrame, true);
+ for (int i = 0; i < framesLimit - 1; ++i) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ int maxMergeWidth = framesLimit - 1;
+ while (runs.size() > maxMergeWidth) {
+ int generationSeparator = 0;
+ while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
+ int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
+ runs.size() - maxMergeWidth + 1);
+ FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortGroupByRunMerger.class
+ .getSimpleName());
+ IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
+
+ aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory;
+ aggregator = aggregatorFactory.createAggregator(ctx, partialAggRecordDesc,
+ partialAggRecordDesc, mergeGroupFields, mergeGroupFields, mergeResultWriter);
+ pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregator,
+ partialAggRecordDesc, partialAggRecordDesc, mergeResultWriter, true);
+ pgw.open();
+
+ IFrameReader[] runCursors = new RunFileReader[mergeWidth];
+ for (int i = 0; i < mergeWidth; i++) {
+ runCursors[i] = runs.get(generationSeparator + i);
+ }
+ merge(pgw, runCursors);
+ pgw.close();
+ runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
+ runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
+ }
+ }
+ if (!runs.isEmpty()) {
+ aggregator = mergeAggregatorFactory.createAggregator(ctx, partialAggRecordDesc, outRecordDesc,
+ mergeGroupFields, mergeGroupFields, writer);
+ pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregator,
+ partialAggRecordDesc, outRecordDesc, writer, false);
+ pgw.open();
+ IFrameReader[] runCursors = new RunFileReader[runs.size()];
+ for (int i = 0; i < runCursors.length; i++) {
+ runCursors[i] = runs.get(i);
+ }
+ merge(pgw, runCursors);
+ }
+ }
+ } catch (Exception e) {
+ pgw.fail();
+ } finally {
+ pgw.close();
+ }
+ }
+
+ private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
+ RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, mergeSortFields,
+ comparators, firstKeyNkc, partialAggRecordDesc);
+ merger.open();
+ try {
+ while (merger.nextFrame(outFrame)) {
+ FrameUtils.flushFrame(outFrame, mergeResultWriter);
+ }
+ } finally {
+ merger.close();
+ }
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
new file mode 100644
index 0000000..cee105b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
+
+/**
+ * This Operator pushes group-by aggregation into the external sort.
+ * After the in-memory sort, it aggregates the sorted data before writing it to a run file.
+ * During the merge phase, it does an aggregation over sorted results.
+ *
+ * @author yingyib
+ */
+public class SortGroupByOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private static final int SORT_ACTIVITY_ID = 0;
+ private static final int MERGE_ACTIVITY_ID = 1;
+
+ private final int framesLimit;
+ private final int[] sortFields;
+ private final int[] groupFields;
+ private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IAggregatorDescriptorFactory mergeAggregatorFactory;
+ private final IAggregatorDescriptorFactory partialAggregatorFactory;
+ private final RecordDescriptor partialAggRecordDesc;
+ private final RecordDescriptor outputRecordDesc;
+ private final boolean finalStage;
+ private Algorithm alg = Algorithm.MERGE_SORT;
+
+ /***
+ * @param spec
+ * , the Hyracks job specification
+ * @param framesLimit
+ * , the frame limit for this operator
+ * @param sortFields
+ * , the fields to sort
+ * @param groupFields
+ * , the fields to group, which can be a prefix subset of sortFields
+ * @param firstKeyNormalizerFactory
+ * , the normalized key computer factory of the first key
+ * @param comparatorFactories
+ * , the comparator factories of sort keys
+ * @param partialAggregatorFactory
+ * , for aggregating the input of this operator
+ * @param mergeAggregatorFactory
+ * , for aggregating the intermediate data of this operator
+ * @param partialAggRecordDesc
+ * , the record descriptor of intermediate data
+ * @param outRecordDesc
+ * , the record descriptor of output data
+ * @param finalStage
+ * , whether the operator is used for final stage aggregation
+ */
+ public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+ int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
+ IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
+ RecordDescriptor outRecordDesc, boolean finalStage) {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ this.sortFields = sortFields;
+ if (framesLimit <= 1) {
+ throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+ }
+ this.recordDescriptors[0] = outRecordDesc;
+
+ this.groupFields = groupFields;
+ this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.comparatorFactories = comparatorFactories;
+ this.mergeAggregatorFactory = mergeAggregatorFactory;
+ this.partialAggregatorFactory = partialAggregatorFactory;
+ this.partialAggRecordDesc = partialAggRecordDesc;
+ this.outputRecordDesc = outRecordDesc;
+ this.finalStage = finalStage;
+ }
+
+ /***
+ * @param spec
+ * , the Hyracks job specification
+ * @param framesLimit
+ * , the frame limit for this operator
+ * @param sortFields
+ * , the fields to sort
+ * @param groupFields
+ * , the fields to group, which can be a prefix subset of sortFields
+ * @param firstKeyNormalizerFactory
+ * , the normalized key computer factory of the first key
+ * @param comparatorFactories
+ * , the comparator factories of sort keys
+ * @param partialAggregatorFactory
+ * , for aggregating the input of this operator
+ * @param mergeAggregatorFactory
+ * , for aggregating the intermediate data of this operator
+ * @param partialAggRecordDesc
+ * , the record descriptor of intermediate data
+ * @param outRecordDesc
+ * , the record descriptor of output data
+ * @param finalStage
+ * , whether the operator is used for final stage aggregation
+ * @param alg
+ * , the in-memory sort algorithm
+ */
+ public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+ int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
+ IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
+ RecordDescriptor outRecordDesc, boolean finalStage, Algorithm alg) {
+ this(spec, framesLimit, sortFields, groupFields, firstKeyNormalizerFactory, comparatorFactories,
+ partialAggregatorFactory, mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, finalStage);
+ this.alg = alg;
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+ MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+
+ builder.addActivity(this, sa);
+ builder.addSourceEdge(0, sa, 0);
+
+ builder.addActivity(this, ma);
+ builder.addTargetEdge(0, ma, 0);
+
+ builder.addBlockingEdge(sa, ma);
+ }
+
+ public static class SortTaskState extends AbstractStateObject {
+ private List<IFrameReader> runs;
+ private IFrameSorter frameSorter;
+
+ public SortTaskState() {
+ }
+
+ private SortTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
+ private class SortActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public SortActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private ExternalSortGroupByRunGenerator runGen;
+
+ @Override
+ public void open() throws HyracksDataException {
+ runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
+ recordDescProvider.getInputRecordDescriptor(SortActivity.this.getActivityId(), 0),
+ framesLimit, groupFields, firstKeyNormalizerFactory, comparatorFactories,
+ partialAggregatorFactory, partialAggRecordDesc, alg);
+ runGen.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ runGen.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(
+ getActivityId(), partition));
+ runGen.close();
+ state.runs = runGen.getRuns();
+ state.frameSorter = runGen.getFrameSorter();
+ ctx.setStateObject(state);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ runGen.fail();
+ }
+ };
+ return op;
+ }
+ }
+
+ private class MergeActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public MergeActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
+ SORT_ACTIVITY_ID), partition));
+ List<IFrameReader> runs = state.runs;
+ IFrameSorter frameSorter = state.frameSorter;
+ int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
+
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparators.length; i++) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
+ .createNormalizedKeyComputer();
+
+ ExternalSortGroupByRunMerger merger = new ExternalSortGroupByRunMerger(ctx, frameSorter, runs,
+ sortFields, recordDescProvider.getInputRecordDescriptor(new ActivityId(odId,
+ SORT_ACTIVITY_ID), 0), partialAggRecordDesc, outputRecordDesc, necessaryFrames,
+ writer, groupFields, nkc, comparators, partialAggregatorFactory, mergeAggregatorFactory,
+ !finalStage);
+ merger.process();
+ }
+ };
+ return op;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index eaf4162..9178094 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -145,6 +145,7 @@
runCursors[i] = runs.get(generationSeparator + i);
}
merge(mergeResultWriter, runCursors);
+ mergeResultWriter.close();
runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index 00fbe9b..24c8cb9 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -149,14 +149,10 @@
return new Comparator<ReferenceEntry>() {
public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
int nmk1 = tp1.getNormalizedKey();
- int nmk2 = tp1.getNormalizedKey();
- if (nmk1 > nmk2) {
- return 1;
+ int nmk2 = tp2.getNormalizedKey();
+ if (nmk1 != nmk2) {
+ return ((((long) nmk1) & 0xffffffffL) < (((long) nmk2) & 0xffffffffL)) ? -1 : 1;
}
- if (nmk1 < nmk2) {
- return -1;
- }
-
FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
byte[] b1 = fta1.getBuffer().array();
@@ -171,7 +167,9 @@
return c;
}
}
- return 0;
+ int runid1 = tp1.getRunid();
+ int runid2 = tp2.getRunid();
+ return runid1 < runid2 ? -1 : (runid1 == runid2 ? 0 : 1);
}
};
}
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
index 9dd303d..e20211f 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
@@ -53,9 +53,9 @@
public RuntimeContext(INCApplicationContext appCtx) throws HyracksDataException {
fileMapManager = new TransientFileMapManager();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
- IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
- bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
- new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100, threadFactory);
+ IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50);
+ bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), prs, new DelayPageCleanerPolicy(1000),
+ fileMapManager, 100, threadFactory);
lcManager = new IndexLifecycleManager();
ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
localResourceRepository = localResourceRepositoryFactory.createRepository();
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index e32e409..0f76343 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -77,6 +77,7 @@
Exception exception = null;
if (message.getFlag() == Message.ERROR) {
exception = (Exception) message.getPayload();
+ exception.printStackTrace();
} else {
payload = message.getPayload();
}
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 2e95cde..6b9364d 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -108,12 +108,20 @@
private ByteBuffer currentReadBuffer;
+ private IBufferFactory bufferFactory;
+
ReadInterface() {
riEmptyStack = new ArrayDeque<ByteBuffer>();
credits = 0;
}
@Override
+ public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+ this.bufferFactory = bufferFactory;
+ cSet.addPendingCredits(channelId, limit * frameSize);
+ }
+
+ @Override
public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
fba = fullBufferAcceptor;
}
@@ -130,6 +138,11 @@
}
if (currentReadBuffer == null) {
currentReadBuffer = riEmptyStack.poll();
+ //if current buffer == null and limit not reached
+ // factory.createBuffer factory
+ if (currentReadBuffer == null) {
+ currentReadBuffer = bufferFactory.createBuffer();
+ }
assert currentReadBuffer != null;
}
int rSize = Math.min(size, currentReadBuffer.remaining());
@@ -171,6 +184,8 @@
private boolean channelWritabilityState;
+ private IBufferFactory bufferFactory;
+
private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
@@ -227,6 +242,22 @@
}
@Override
+ public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+ this.bufferFactory = bufferFactory;
+ if (!channelWritabilityState) {
+ cSet.markPendingWrite(channelId);
+ }
+ channelWritabilityState = true;
+ if (eos) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Received duplicate close() on channel: " + channelId);
+ }
+ return;
+ }
+ eos = true;
+ }
+
+ @Override
public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
eba = emptyBufferAcceptor;
}
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IBufferFactory.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IBufferFactory.java
new file mode 100644
index 0000000..5abba95
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IBufferFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author yingyib
+ */
+public interface IBufferFactory {
+
+ public ByteBuffer createBuffer();
+
+}
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
index 2e961fd..eb683eb 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
@@ -39,4 +39,17 @@
* @return the empty buffer acceptor.
*/
public IBufferAcceptor getEmptyBufferAcceptor();
+
+ /**
+ * Set the buffer factory which is in charge of creating buffers if the request does not
+ * make the number of allocated buffers goes beyond limit
+ *
+ * @param bufferFactory
+ * - the buffer factory
+ * @param limit
+ * - the limit of buffers
+ * @param frameSize
+ * - the size of each buffer
+ */
+ public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
index e2fb764..42516ea 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
@@ -39,4 +39,17 @@
* @return the full buffer acceptor.
*/
public ICloseableBufferAcceptor getFullBufferAcceptor();
+
+ /**
+ * Set the buffer factory which is in charge of creating buffers if the request does not
+ * make the number of allocated buffers goes beyond limit
+ *
+ * @param bufferFactory
+ * - the buffer factory
+ * @param limit
+ * - the limit of buffers
+ * @param frameSize
+ * - the size of each buffer
+ */
+ public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 03d57f5..740c447 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -16,18 +16,15 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -46,11 +43,9 @@
private static final int MIN_CLEANED_COUNT_DIFF = 3;
private static final int PIN_MAX_WAIT_TIME = 50;
- private final int maxOpenFiles;
-
- private final IIOManager ioManager;
private final int pageSize;
- private final int numPages;
+ private final int maxOpenFiles;
+ private final IIOManager ioManager;
private final CacheBucket[] pageMap;
private final IPageReplacementStrategy pageReplacementStrategy;
private final IPageCleanerPolicy pageCleanerPolicy;
@@ -58,27 +53,21 @@
private final CleanerThread cleanerThread;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
- private CachedPage[] cachedPages;
+ private List<ICachedPageInternal> cachedPages = new ArrayList<ICachedPageInternal>();
private boolean closed;
- public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
- IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy,
- IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles, ThreadFactory threadFactory) {
+ public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
+ IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
+ ThreadFactory threadFactory) {
this.ioManager = ioManager;
- this.pageSize = pageSize;
- this.numPages = numPages;
+ this.pageSize = pageReplacementStrategy.getPageSize();
this.maxOpenFiles = maxOpenFiles;
pageReplacementStrategy.setBufferCache(this);
- pageMap = new CacheBucket[numPages * MAP_FACTOR];
+ pageMap = new CacheBucket[pageReplacementStrategy.getMaxAllowedNumPages() * MAP_FACTOR];
for (int i = 0; i < pageMap.length; ++i) {
pageMap[i] = new CacheBucket();
}
- ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
- cachedPages = new CachedPage[buffers.length];
- for (int i = 0; i < buffers.length; ++i) {
- cachedPages[i] = new CachedPage(i, buffers[i], pageReplacementStrategy);
- }
this.pageReplacementStrategy = pageReplacementStrategy;
this.pageCleanerPolicy = pageCleanerPolicy;
this.fileMapManager = fileMapManager;
@@ -96,7 +85,7 @@
@Override
public int getNumPages() {
- return numPages;
+ return pageReplacementStrategy.getMaxAllowedNumPages();
}
private void pinSanityCheck(long dpid) throws HyracksDataException {
@@ -338,7 +327,8 @@
StringBuilder buffer = new StringBuilder();
buffer.append("Buffer cache state\n");
buffer.append("Page Size: ").append(pageSize).append('\n');
- buffer.append("Number of physical pages: ").append(numPages).append('\n');
+ buffer.append("Number of physical pages: ").append(pageReplacementStrategy.getMaxAllowedNumPages())
+ .append('\n');
buffer.append("Hash table size: ").append(pageMap.length).append('\n');
buffer.append("Page Map:\n");
int nCachedPages = 0;
@@ -416,88 +406,9 @@
}
}
- private class CachedPage implements ICachedPageInternal {
- private final int cpid;
- private final ByteBuffer buffer;
- private final AtomicInteger pinCount;
- private final AtomicBoolean dirty;
- private final ReadWriteLock latch;
- private final Object replacementStrategyObject;
- volatile long dpid;
- CachedPage next;
- volatile boolean valid;
-
- public CachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy pageReplacementStrategy) {
- this.cpid = cpid;
- this.buffer = buffer;
- pinCount = new AtomicInteger();
- dirty = new AtomicBoolean();
- latch = new ReentrantReadWriteLock(true);
- replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
- dpid = -1;
- valid = false;
- }
-
- public void reset(long dpid) {
- this.dpid = dpid;
- dirty.set(false);
- valid = false;
- pageReplacementStrategy.notifyCachePageReset(this);
- }
-
- public void invalidate() {
- reset(-1);
- }
-
- @Override
- public ByteBuffer getBuffer() {
- return buffer;
- }
-
- @Override
- public Object getReplacementStrategyObject() {
- return replacementStrategyObject;
- }
-
- @Override
- public boolean pinIfGoodVictim() {
- return pinCount.compareAndSet(0, 1);
- }
-
- @Override
- public int getCachedPageId() {
- return cpid;
- }
-
- @Override
- public void acquireReadLatch() {
- latch.readLock().lock();
- }
-
- @Override
- public void acquireWriteLatch() {
- latch.writeLock().lock();
- }
-
- @Override
- public void releaseReadLatch() {
- latch.readLock().unlock();
- }
-
- @Override
- public void releaseWriteLatch(boolean markDirty) {
- if (markDirty) {
- if (dirty.compareAndSet(false, true)) {
- pinCount.incrementAndGet();
- }
- }
- latch.writeLock().unlock();
- }
- }
-
@Override
public ICachedPageInternal getPage(int cpid) {
- return cachedPages[cpid];
+ return cachedPages.get(cpid);
}
private class CleanerThread extends Thread {
@@ -564,8 +475,9 @@
try {
while (true) {
pageCleanerPolicy.notifyCleanCycleStart(this);
+ int numPages = pageReplacementStrategy.getNumPages();
for (int i = 0; i < numPages; ++i) {
- CachedPage cPage = cachedPages[i];
+ CachedPage cPage = (CachedPage) cachedPages.get(i);
cleanPage(cPage, false);
}
if (shutdownStart) {
@@ -715,7 +627,7 @@
} else {
pinCount = cPage.pinCount.get();
}
- if (pinCount != 0) {
+ if (pinCount > 0) {
throw new IllegalStateException("Page is pinned and file is being closed. Pincount is: " + pinCount);
}
cPage.invalidate();
@@ -808,7 +720,11 @@
}
@Override
+ public void addPage(ICachedPageInternal page) {
+ cachedPages.add(page);
+ }
+
public void dumpState(OutputStream os) throws IOException {
os.write(dumpState().getBytes());
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
new file mode 100644
index 0000000..d57a356
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.common.buffercache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author yingyib
+ */
+class CachedPage implements ICachedPageInternal {
+ final int cpid;
+ final ByteBuffer buffer;
+ final AtomicInteger pinCount;
+ final AtomicBoolean dirty;
+ final ReadWriteLock latch;
+ private final Object replacementStrategyObject;
+ private final IPageReplacementStrategy pageReplacementStrategy;
+ volatile long dpid;
+ CachedPage next;
+ volatile boolean valid;
+
+ public CachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy pageReplacementStrategy) {
+ this.cpid = cpid;
+ this.buffer = buffer;
+ this.pageReplacementStrategy = pageReplacementStrategy;
+ pinCount = new AtomicInteger();
+ dirty = new AtomicBoolean();
+ latch = new ReentrantReadWriteLock(true);
+ replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
+ dpid = -1;
+ valid = false;
+ }
+
+ public void reset(long dpid) {
+ this.dpid = dpid;
+ dirty.set(false);
+ valid = false;
+ pageReplacementStrategy.notifyCachePageReset(this);
+ }
+
+ public void invalidate() {
+ reset(-1);
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public Object getReplacementStrategyObject() {
+ return replacementStrategyObject;
+ }
+
+ @Override
+ public boolean pinIfGoodVictim() {
+ return pinCount.compareAndSet(0, 1);
+ }
+
+ @Override
+ public int getCachedPageId() {
+ return cpid;
+ }
+
+ @Override
+ public void acquireReadLatch() {
+ latch.readLock().lock();
+ }
+
+ @Override
+ public void acquireWriteLatch() {
+ latch.writeLock().lock();
+ }
+
+ @Override
+ public void releaseReadLatch() {
+ latch.readLock().unlock();
+ }
+
+ @Override
+ public void releaseWriteLatch(boolean markDirty) {
+ if (markDirty) {
+ if (dirty.compareAndSet(false, true)) {
+ pinCount.incrementAndGet();
+ }
+ }
+ latch.writeLock().unlock();
+ }
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
index ec97344..611bf48 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
@@ -24,9 +24,16 @@
private final Lock lock;
private IBufferCacheInternal bufferCache;
private int clockPtr;
+ private ICacheMemoryAllocator allocator;
+ private int numPages = 0;
+ private final int pageSize;
+ private final int maxAllowedNumPages;
- public ClockPageReplacementStrategy() {
+ public ClockPageReplacementStrategy(ICacheMemoryAllocator allocator, int pageSize, int maxAllowedNumPages) {
this.lock = new ReentrantLock();
+ this.allocator = allocator;
+ this.pageSize = pageSize;
+ this.maxAllowedNumPages = maxAllowedNumPages;
clockPtr = 0;
}
@@ -53,38 +60,83 @@
@Override
public ICachedPageInternal findVictim() {
lock.lock();
+ ICachedPageInternal cachedPage = null;
try {
- int startClockPtr = clockPtr;
- int cycleCount = 0;
- do {
- ICachedPageInternal cPage = bufferCache.getPage(clockPtr);
-
- /*
- * We do two things here:
- * 1. If the page has been accessed, then we skip it -- The CAS would return
- * false if the current value is false which makes the page a possible candidate
- * for replacement.
- * 2. We check with the buffer manager if it feels its a good idea to use this
- * page as a victim.
- */
- AtomicBoolean accessedFlag = getPerPageObject(cPage);
- if (!accessedFlag.compareAndSet(true, false)) {
- if (cPage.pinIfGoodVictim()) {
- return cPage;
- }
- }
- clockPtr = (clockPtr + 1) % bufferCache.getNumPages();
- if (clockPtr == startClockPtr) {
- ++cycleCount;
- }
- } while (cycleCount < MAX_UNSUCCESSFUL_CYCLE_COUNT);
+ if (numPages >= maxAllowedNumPages) {
+ cachedPage = findVictimByEviction();
+ } else {
+ cachedPage = allocatePage();
+ }
} finally {
lock.unlock();
}
+ return cachedPage;
+ }
+
+ private ICachedPageInternal findVictimByEviction() {
+ int startClockPtr = clockPtr;
+ int cycleCount = 0;
+ do {
+ ICachedPageInternal cPage = bufferCache.getPage(clockPtr);
+
+ /*
+ * We do two things here:
+ * 1. If the page has been accessed, then we skip it -- The CAS would return
+ * false if the current value is false which makes the page a possible candidate
+ * for replacement.
+ * 2. We check with the buffer manager if it feels its a good idea to use this
+ * page as a victim.
+ */
+ AtomicBoolean accessedFlag = getPerPageObject(cPage);
+ if (!accessedFlag.compareAndSet(true, false)) {
+ if (cPage.pinIfGoodVictim()) {
+ return cPage;
+ }
+ }
+ clockPtr = (clockPtr + 1) % numPages;
+ if (clockPtr == startClockPtr) {
+ ++cycleCount;
+ }
+ } while (cycleCount < MAX_UNSUCCESSFUL_CYCLE_COUNT);
+ return null;
+ }
+
+ @Override
+ public int getNumPages() {
+ int retNumPages = 0;
+ lock.lock();
+ try {
+ retNumPages = numPages;
+ } finally {
+ lock.unlock();
+ }
+ return retNumPages;
+ }
+
+ private ICachedPageInternal allocatePage() {
+ CachedPage cPage = new CachedPage(numPages, allocator.allocate(pageSize, 1)[0], this);
+ bufferCache.addPage(cPage);
+ numPages++;
+ AtomicBoolean accessedFlag = getPerPageObject(cPage);
+ if (!accessedFlag.compareAndSet(true, false)) {
+ if (cPage.pinIfGoodVictim()) {
+ return cPage;
+ }
+ }
return null;
}
private AtomicBoolean getPerPageObject(ICachedPageInternal cPage) {
return (AtomicBoolean) cPage.getReplacementStrategyObject();
}
+
+ @Override
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public int getMaxAllowedNumPages() {
+ return maxAllowedNumPages;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCacheInternal.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCacheInternal.java
index 4c9e949..cd2e853 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCacheInternal.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCacheInternal.java
@@ -16,4 +16,6 @@
public interface IBufferCacheInternal extends IBufferCache {
public ICachedPageInternal getPage(int cpid);
+
+ public void addPage(ICachedPageInternal page);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageReplacementStrategy.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageReplacementStrategy.java
index b6bfdc3..0adcf68 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageReplacementStrategy.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageReplacementStrategy.java
@@ -24,4 +24,10 @@
public void notifyCachePageAccess(ICachedPageInternal cPage);
public ICachedPageInternal findVictim();
+
+ public int getNumPages();
+
+ public int getPageSize();
+
+ public int getMaxAllowedNumPages();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
index dd85945..2370b84 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -79,10 +79,10 @@
public synchronized static IBufferCache getBufferCache(IHyracksTaskContext ctx) {
if (bufferCache == null) {
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
- IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
+ IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages);
IFileMapProvider fileMapProvider = getFileMapProvider(ctx);
- bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, new DelayPageCleanerPolicy(1000),
- (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles, threadFactory);
+ bufferCache = new BufferCache(ctx.getIOManager(), prs, new DelayPageCleanerPolicy(1000),
+ (IFileMapManager) fileMapProvider, maxOpenFiles, threadFactory);
}
return bufferCache;
}