Added counters
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@167 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
index 8173785..535c925 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
@@ -14,10 +14,13 @@
*/
package edu.uci.ics.hyracks.api.context;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IResourceManager;
public interface IHyracksContext {
public IResourceManager getResourceManager();
public int getFrameSize();
+
+ public ICounterContext getCounterContext();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
index 9219ec3..2b9787d 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
@@ -15,5 +15,5 @@
package edu.uci.ics.hyracks.api.job;
public enum JobFlag {
- COLLECT_FRAME_COUNTS
+ PROFILE_RUNTIME
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
new file mode 100644
index 0000000..78d8875
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job.profiling.counters;
+
+import java.io.Serializable;
+
+public interface ICounter extends Serializable {
+ /**
+ * Get the name of the counter.
+ *
+ * @return Name of the counter.
+ */
+ public String getName();
+
+ /**
+ * Update the value of the counter to be current + delta.
+ *
+ * @param delta
+ * - Amount to change the counter value by.
+ * @return the new value after update.
+ */
+ public long update(long delta);
+
+ /**
+ * Set the value of the counter.
+ *
+ * @param value
+ * - New value of the counter.
+ * @return Old value of the counter.
+ */
+ public long set(long value);
+
+ /**
+ * Get the value of the counter.
+ *
+ * @return the value of the counter.
+ */
+ public long get();
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java
new file mode 100644
index 0000000..2992757
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job.profiling.counters;
+
+/**
+ * A namespace that holds named counters.
+ *
+ * @author vinayakb
+ */
+public interface ICounterContext {
+ /**
+ * Returns the fully-qualified context name
+ *
+ * @return fully-qualified context name.
+ */
+ public String getContextName();
+
+ /**
+ * Get a counter with the specified name.
+ *
+ * @param name
+ * - Name of the counter to get.
+ * @param create
+ * - Create if the counter does not exist.
+ * @return An existing counter with the given name (if one exists). If a counter with the
+ * said name does not exist, a new one is created if create is set to <code>true</code>, or
+ * else returns <code>null</code>.
+ */
+ public ICounter getCounter(String name, boolean create);
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
new file mode 100644
index 0000000..440cb0d
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class Counter implements ICounter {
+ private static final long serialVersionUID = 1L;
+
+ private final String name;
+ private final AtomicLong counter;
+
+ public Counter(String name) {
+ this.name = name;
+ counter = new AtomicLong();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public long update(long delta) {
+ return counter.addAndGet(delta);
+ }
+
+ @Override
+ public long set(long value) {
+ long oldValue = counter.get();
+ counter.set(value);
+ return oldValue;
+ }
+
+ @Override
+ public long get() {
+ return counter.get();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 5677827..77c0271 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -71,14 +71,16 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.nc.comm.ConnectionManager;
import edu.uci.ics.hyracks.control.nc.comm.DemuxDataReceiveListenerFactory;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.DelegateHyracksContext;
import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
public class NodeControllerService extends AbstractRemoteService implements INodeController {
private static final long serialVersionUID = 1L;
@@ -110,7 +112,7 @@
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
- this.ctx = new HyracksContext(ncConfig.frameSize);
+ this.ctx = new RootHyracksContext(ncConfig.frameSize);
if (id == null) {
throw new Exception("id not set");
}
@@ -211,6 +213,8 @@
Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
joblet.setStagelet(stageId, stagelet);
+ IHyracksContext stageletContext = new DelegateHyracksContext(ctx, stagelet.getStageletCounterContext());
+
final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
@@ -224,9 +228,9 @@
IOperatorDescriptor op = han.getOwner();
List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
for (int i : tasks.get(hanId)) {
- IOperatorNodePushable hon = han.createPushRuntime(ctx, joblet.getEnvironment(op, i), rdp, i,
- opPartitions.get(op.getOperatorId()).size());
- OperatorRunnable or = new OperatorRunnable(ctx, hon);
+ IOperatorNodePushable hon = han.createPushRuntime(stageletContext, joblet.getEnvironment(op, i),
+ rdp, i, opPartitions.get(op.getOperatorId()).size());
+ OperatorRunnable or = new OperatorRunnable(stageletContext, hon);
stagelet.setOperator(op.getOperatorId(), i, or);
if (inputs != null) {
for (int j = 0; j < inputs.size(); ++j) {
@@ -240,8 +244,8 @@
.getOperatorId();
Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
endpointList.add(endpoint);
- DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
- stageId);
+ DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(stageletContext,
+ jobId, stageId);
connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
.getTaskInputMap().get(hanId).get(j), i);
@@ -249,7 +253,7 @@
LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
}
portMap.put(piId, endpoint);
- IFrameReader reader = createReader(conn, drlf, i, plan, stagelet,
+ IFrameReader reader = createReader(stageletContext, conn, drlf, i, plan, stagelet,
opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size());
or.setFrameReader(reader);
}
@@ -267,26 +271,31 @@
}
}
- private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
- final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
- throws HyracksDataException {
- final IFrameReader reader = conn.createReceiveSideReader(ctx, plan.getJobSpecification()
+ private IFrameReader createReader(final IHyracksContext stageletContext, final IConnectorDescriptor conn,
+ IConnectionDemultiplexer demux, final int receiverIndex, JobPlan plan, final Stagelet stagelet,
+ int nProducerCount, int nConsumerCount) throws HyracksDataException {
+ final IFrameReader reader = conn.createReceiveSideReader(stageletContext, plan.getJobSpecification()
.getConnectorRecordDescriptor(conn), demux, receiverIndex, nProducerCount, nConsumerCount);
- return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
- private int frameCount;
+ return plan.getJobFlags().contains(JobFlag.PROFILE_RUNTIME) ? new IFrameReader() {
+ private ICounter openCounter = stageletContext.getCounterContext().getCounter(
+ conn.getConnectorId().getId() + ".receiver." + receiverIndex + ".open", true);
+ private ICounter closeCounter = stageletContext.getCounterContext().getCounter(
+ conn.getConnectorId().getId() + ".receiver." + receiverIndex + ".close", true);
+ private ICounter frameCounter = stageletContext.getCounterContext().getCounter(
+ conn.getConnectorId().getId() + ".receiver." + receiverIndex + ".nextFrame", true);
@Override
public void open() throws HyracksDataException {
- frameCount = 0;
reader.open();
+ openCounter.update(1);
}
@Override
public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
boolean status = reader.nextFrame(buffer);
if (status) {
- ++frameCount;
+ frameCounter.update(1);
}
return status;
}
@@ -294,10 +303,7 @@
@Override
public void close() throws HyracksDataException {
reader.close();
- stagelet.getStatistics()
- .getStatisticsMap()
- .put("framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
- String.valueOf(frameCount));
+ closeCounter.update(1);
}
} : reader;
}
@@ -317,6 +323,9 @@
final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
+ final IHyracksContext stageletContext = new DelegateHyracksContext(ctx,
+ stagelet.getStageletCounterContext());
+
final JobSpecification spec = plan.getJobSpecification();
for (ActivityNodeId hanId : tasks.keySet()) {
@@ -346,11 +355,12 @@
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
}
- return createWriter(connectionManager.connect(ep.getNetworkAddress(),
- ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+ return createWriter(stageletContext, connectionManager.connect(
+ ep.getNetworkAddress(), ep.getEndpointId(), senderIndex), plan, conn,
+ senderIndex, index, stagelet);
}
};
- or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan.getJobSpecification()
+ or.setFrameWriter(j, conn.createSendSideWriter(stageletContext, plan.getJobSpecification()
.getConnectorRecordDescriptor(conn), edwFactory, i, opPartitions.get(producerOpId)
.size(), opPartitions.get(consumerOpId).size()), spec
.getConnectorRecordDescriptor(conn));
@@ -365,30 +375,35 @@
}
}
- private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
- final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
- return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
- private int frameCount;
+ private IFrameWriter createWriter(final IHyracksContext stageletContext, final IFrameWriter writer, JobPlan plan,
+ final IConnectorDescriptor conn, final int senderIndex, final int receiverIndex, final Stagelet stagelet)
+ throws HyracksDataException {
+ return plan.getJobFlags().contains(JobFlag.PROFILE_RUNTIME) ? new IFrameWriter() {
+ private ICounter openCounter = stageletContext.getCounterContext().getCounter(
+ conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex + ".open", true);
+ private ICounter closeCounter = stageletContext.getCounterContext().getCounter(
+ conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex + ".close", true);
+ private ICounter frameCounter = stageletContext.getCounterContext()
+ .getCounter(
+ conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex
+ + ".nextFrame", true);
@Override
public void open() throws HyracksDataException {
- frameCount = 0;
writer.open();
+ openCounter.update(1);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ++frameCount;
+ frameCounter.update(1);
writer.nextFrame(buffer);
}
@Override
public void close() throws HyracksDataException {
+ closeCounter.update(1);
writer.close();
- stagelet.getStatistics()
- .getStatisticsMap()
- .put("framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "."
- + receiverIndex, String.valueOf(frameCount));
}
@Override
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index 5631d58..9b37aff 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -29,6 +29,7 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.control.nc.job.profiling.CounterContext;
import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
public class Stagelet {
@@ -44,6 +45,8 @@
private final Map<OperatorInstanceId, OperatorRunnable> honMap;
+ private final CounterContext stageletCounterContext;
+
private List<Endpoint> endpointList;
private boolean started;
@@ -61,6 +64,7 @@
pendingOperators = new HashSet<OperatorInstanceId>();
started = false;
honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
+ stageletCounterContext = new CounterContext(joblet.getJobId() + "." + stageId + "." + nodeId);
stats = new StageletStatistics();
stats.setNodeId(nodeId);
}
@@ -73,6 +77,10 @@
return honMap;
}
+ public CounterContext getStageletCounterContext() {
+ return stageletCounterContext;
+ }
+
public void setEndpointList(List<Endpoint> endpointList) {
this.endpointList = endpointList;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
new file mode 100644
index 0000000..b883683
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.job.profiling;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+
+public class CounterContext implements ICounterContext {
+ private final String name;
+ private final Map<String, Counter> counterMap;
+
+ public CounterContext(String name) {
+ this.name = name;
+ counterMap = new HashMap<String, Counter>();
+ }
+
+ @Override
+ public synchronized ICounter getCounter(String name, boolean create) {
+ Counter counter = counterMap.get(name);
+ if (counter == null && create) {
+ counter = new Counter(name);
+ counterMap.put(name, counter);
+ }
+ return counter;
+ }
+
+ @Override
+ public String getContextName() {
+ return name;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
similarity index 61%
copy from hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
copy to hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
index f61ff8b..12c1a76 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
@@ -15,24 +15,31 @@
package edu.uci.ics.hyracks.control.nc.runtime;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IResourceManager;
-public class HyracksContext implements IHyracksContext {
- private final IResourceManager resourceManager;
- private final int frameSize;
+public class DelegateHyracksContext implements IHyracksContext {
+ private final IHyracksContext delegate;
- public HyracksContext(int frameSize) {
- resourceManager = new ResourceManager(this);
- this.frameSize = frameSize;
+ private final ICounterContext counterContext;
+
+ public DelegateHyracksContext(IHyracksContext delegate, ICounterContext counterContext) {
+ this.delegate = delegate;
+ this.counterContext = counterContext;
}
@Override
public IResourceManager getResourceManager() {
- return resourceManager;
+ return delegate.getResourceManager();
}
@Override
public int getFrameSize() {
- return frameSize;
+ return delegate.getFrameSize();
+ }
+
+ @Override
+ public ICounterContext getCounterContext() {
+ return counterContext;
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index e1fcf12..6c48ec3 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -24,7 +24,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class OperatorRunnable implements Runnable {
- private IOperatorNodePushable opNode;
+ private final IOperatorNodePushable opNode;
private IFrameReader reader;
private ByteBuffer buffer;
private volatile boolean abort;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
similarity index 79%
rename from hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
rename to hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
index f61ff8b..7eddfc3 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -15,13 +15,14 @@
package edu.uci.ics.hyracks.control.nc.runtime;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IResourceManager;
-public class HyracksContext implements IHyracksContext {
+public class RootHyracksContext implements IHyracksContext {
private final IResourceManager resourceManager;
private final int frameSize;
- public HyracksContext(int frameSize) {
+ public RootHyracksContext(int frameSize) {
resourceManager = new ResourceManager(this);
this.frameSize = frameSize;
}
@@ -35,4 +36,9 @@
public int getFrameSize() {
return frameSize;
}
+
+ @Override
+ public ICounterContext getCounterContext() {
+ throw new UnsupportedOperationException();
+ }
}
\ No newline at end of file
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index cbce6f6..a74d7a0 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -33,7 +33,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -119,7 +119,7 @@
// build search keys (which must be of type ITupleReference)
// put search keys into frame and create tuplereference factories
- IHyracksContext ctx = new HyracksContext(32768); // WARNING: make sure frame size is same as on NCs
+ IHyracksContext ctx = new RootHyracksContext(32768); // WARNING: make sure frame size is same as on NCs
ByteBuffer keyFrame = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
appender.reset(keyFrame, true);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
index 46adf24..7fca1d3 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
@@ -33,7 +33,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -194,7 +194,7 @@
// put search keys into frame and create tuplereference factories
- IHyracksContext ctx = new HyracksContext(32768); // WARNING: make sure frame size is same as on NCs
+ IHyracksContext ctx = new RootHyracksContext(32768); // WARNING: make sure frame size is same as on NCs
ByteBuffer keyFrame = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
appender.reset(keyFrame, true);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 56569d9..b75fae3 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -31,7 +31,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializingDataReader;
import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -47,7 +47,7 @@
private List<ByteBuffer> buffers;
public SerDeserRunner(RecordDescriptor rDes) {
- ctx = new HyracksContext(FRAME_SIZE);
+ ctx = new RootHyracksContext(FRAME_SIZE);
this.rDes = rDes;
buffers = new ArrayList<ByteBuffer>();
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index d85fb57..077c917 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -74,7 +74,7 @@
}
protected void runTest(JobSpecification spec) throws Exception {
- UUID jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.COLLECT_FRAME_COUNTS));
+ UUID jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
System.err.println(spec.toJSON());
hcc.start(jobId);
System.err.print(jobId);
diff --git a/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeFieldPrefixNSMTest.java b/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeFieldPrefixNSMTest.java
index 7aa776a..77ef504 100644
--- a/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeFieldPrefixNSMTest.java
+++ b/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeFieldPrefixNSMTest.java
@@ -30,7 +30,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -85,7 +85,7 @@
private ITupleReference createTuple(int f0, int f1, int f2, boolean print) throws HyracksDataException {
if(print) System.out.println("CREATING: " + f0 + " " + f1 + " " + f2);
- IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);
+ IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);
ByteBuffer buf = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
diff --git a/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeTest.java b/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeTest.java
index 5f2508b..9d4f390 100644
--- a/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeTest.java
+++ b/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeTest.java
@@ -28,7 +28,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -137,7 +137,7 @@
print("INSERTING INTO TREE\n");
- IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);
+ IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);
ByteBuffer frame = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -343,7 +343,7 @@
print("INSERTING INTO TREE\n");
- IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);
+ IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);
ByteBuffer frame = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -514,7 +514,7 @@
Random rnd = new Random();
rnd.setSeed(50);
- IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);
+ IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);
ByteBuffer frame = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -682,7 +682,7 @@
Random rnd = new Random();
rnd.setSeed(50);
- IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);
+ IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);
ByteBuffer frame = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -837,7 +837,7 @@
Random rnd = new Random();
rnd.setSeed(50);
- IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);
+ IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);
ByteBuffer frame = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -985,7 +985,7 @@
Random rnd = new Random();
rnd.setSeed(50);
- IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);
+ IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);
ByteBuffer frame = ctx.getResourceManager().allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx);
ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());