Added counters

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@167 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
index 8173785..535c925 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
@@ -14,10 +14,13 @@
  */
 package edu.uci.ics.hyracks.api.context;
 
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IResourceManager;
 
 public interface IHyracksContext {
     public IResourceManager getResourceManager();
 
     public int getFrameSize();
+
+    public ICounterContext getCounterContext();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
index 9219ec3..2b9787d 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
@@ -15,5 +15,5 @@
 package edu.uci.ics.hyracks.api.job;
 
 public enum JobFlag {
-    COLLECT_FRAME_COUNTS
+    PROFILE_RUNTIME
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
new file mode 100644
index 0000000..78d8875
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job.profiling.counters;
+
+import java.io.Serializable;
+
+public interface ICounter extends Serializable {
+    /**
+     * Get the name of the counter.
+     * 
+     * @return Name of the counter.
+     */
+    public String getName();
+
+    /**
+     * Update the value of the counter to be current + delta.
+     * 
+     * @param delta
+     *            - Amount to change the counter value by.
+     * @return the new value after update.
+     */
+    public long update(long delta);
+
+    /**
+     * Set the value of the counter.
+     * 
+     * @param value
+     *            - New value of the counter.
+     * @return Old value of the counter.
+     */
+    public long set(long value);
+
+    /**
+     * Get the value of the counter.
+     * 
+     * @return the value of the counter.
+     */
+    public long get();
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java
new file mode 100644
index 0000000..2992757
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job.profiling.counters;
+
+/**
+ * A namespace that holds named counters.
+ * 
+ * @author vinayakb
+ */
+public interface ICounterContext {
+    /**
+     * Returns the fully-qualified context name
+     * 
+     * @return fully-qualified context name.
+     */
+    public String getContextName();
+
+    /**
+     * Get a counter with the specified name.
+     * 
+     * @param name
+     *            - Name of the counter to get.
+     * @param create
+     *            - Create if the counter does not exist.
+     * @return An existing counter with the given name (if one exists). If a counter with the
+     *         said name does not exist, a new one is created if create is set to <code>true</code>, or
+     *         else returns <code>null</code>.
+     */
+    public ICounter getCounter(String name, boolean create);
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
new file mode 100644
index 0000000..440cb0d
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class Counter implements ICounter {
+    private static final long serialVersionUID = 1L;
+
+    private final String name;
+    private final AtomicLong counter;
+
+    public Counter(String name) {
+        this.name = name;
+        counter = new AtomicLong();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public long update(long delta) {
+        return counter.addAndGet(delta);
+    }
+
+    @Override
+    public long set(long value) {
+        long oldValue = counter.get();
+        counter.set(value);
+        return oldValue;
+    }
+
+    @Override
+    public long get() {
+        return counter.get();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 5677827..77c0271 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -71,14 +71,16 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.nc.comm.ConnectionManager;
 import edu.uci.ics.hyracks.control.nc.comm.DemuxDataReceiveListenerFactory;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.DelegateHyracksContext;
 import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 
 public class NodeControllerService extends AbstractRemoteService implements INodeController {
     private static final long serialVersionUID = 1L;
@@ -110,7 +112,7 @@
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
-        this.ctx = new HyracksContext(ncConfig.frameSize);
+        this.ctx = new RootHyracksContext(ncConfig.frameSize);
         if (id == null) {
             throw new Exception("id not set");
         }
@@ -211,6 +213,8 @@
             Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
             joblet.setStagelet(stageId, stagelet);
 
+            IHyracksContext stageletContext = new DelegateHyracksContext(ctx, stagelet.getStageletCounterContext());
+
             final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
             Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
 
@@ -224,9 +228,9 @@
                 IOperatorDescriptor op = han.getOwner();
                 List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
                 for (int i : tasks.get(hanId)) {
-                    IOperatorNodePushable hon = han.createPushRuntime(ctx, joblet.getEnvironment(op, i), rdp, i,
-                            opPartitions.get(op.getOperatorId()).size());
-                    OperatorRunnable or = new OperatorRunnable(ctx, hon);
+                    IOperatorNodePushable hon = han.createPushRuntime(stageletContext, joblet.getEnvironment(op, i),
+                            rdp, i, opPartitions.get(op.getOperatorId()).size());
+                    OperatorRunnable or = new OperatorRunnable(stageletContext, hon);
                     stagelet.setOperator(op.getOperatorId(), i, or);
                     if (inputs != null) {
                         for (int j = 0; j < inputs.size(); ++j) {
@@ -240,8 +244,8 @@
                                     .getOperatorId();
                             Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
                             endpointList.add(endpoint);
-                            DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
-                                    stageId);
+                            DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(stageletContext,
+                                    jobId, stageId);
                             connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
                             PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
                                     .getTaskInputMap().get(hanId).get(j), i);
@@ -249,7 +253,7 @@
                                 LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
                             }
                             portMap.put(piId, endpoint);
-                            IFrameReader reader = createReader(conn, drlf, i, plan, stagelet,
+                            IFrameReader reader = createReader(stageletContext, conn, drlf, i, plan, stagelet,
                                     opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size());
                             or.setFrameReader(reader);
                         }
@@ -267,26 +271,31 @@
         }
     }
 
-    private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
-            final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
-            throws HyracksDataException {
-        final IFrameReader reader = conn.createReceiveSideReader(ctx, plan.getJobSpecification()
+    private IFrameReader createReader(final IHyracksContext stageletContext, final IConnectorDescriptor conn,
+            IConnectionDemultiplexer demux, final int receiverIndex, JobPlan plan, final Stagelet stagelet,
+            int nProducerCount, int nConsumerCount) throws HyracksDataException {
+        final IFrameReader reader = conn.createReceiveSideReader(stageletContext, plan.getJobSpecification()
                 .getConnectorRecordDescriptor(conn), demux, receiverIndex, nProducerCount, nConsumerCount);
 
-        return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
-            private int frameCount;
+        return plan.getJobFlags().contains(JobFlag.PROFILE_RUNTIME) ? new IFrameReader() {
+            private ICounter openCounter = stageletContext.getCounterContext().getCounter(
+                    conn.getConnectorId().getId() + ".receiver." + receiverIndex + ".open", true);
+            private ICounter closeCounter = stageletContext.getCounterContext().getCounter(
+                    conn.getConnectorId().getId() + ".receiver." + receiverIndex + ".close", true);
+            private ICounter frameCounter = stageletContext.getCounterContext().getCounter(
+                    conn.getConnectorId().getId() + ".receiver." + receiverIndex + ".nextFrame", true);
 
             @Override
             public void open() throws HyracksDataException {
-                frameCount = 0;
                 reader.open();
+                openCounter.update(1);
             }
 
             @Override
             public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 boolean status = reader.nextFrame(buffer);
                 if (status) {
-                    ++frameCount;
+                    frameCounter.update(1);
                 }
                 return status;
             }
@@ -294,10 +303,7 @@
             @Override
             public void close() throws HyracksDataException {
                 reader.close();
-                stagelet.getStatistics()
-                        .getStatisticsMap()
-                        .put("framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
-                                String.valueOf(frameCount));
+                closeCounter.update(1);
             }
         } : reader;
     }
@@ -317,6 +323,9 @@
 
             final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
 
+            final IHyracksContext stageletContext = new DelegateHyracksContext(ctx,
+                    stagelet.getStageletCounterContext());
+
             final JobSpecification spec = plan.getJobSpecification();
 
             for (ActivityNodeId hanId : tasks.keySet()) {
@@ -346,11 +355,12 @@
                                     if (LOGGER.isLoggable(Level.FINEST)) {
                                         LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
                                     }
-                                    return createWriter(connectionManager.connect(ep.getNetworkAddress(),
-                                            ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+                                    return createWriter(stageletContext, connectionManager.connect(
+                                            ep.getNetworkAddress(), ep.getEndpointId(), senderIndex), plan, conn,
+                                            senderIndex, index, stagelet);
                                 }
                             };
-                            or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan.getJobSpecification()
+                            or.setFrameWriter(j, conn.createSendSideWriter(stageletContext, plan.getJobSpecification()
                                     .getConnectorRecordDescriptor(conn), edwFactory, i, opPartitions.get(producerOpId)
                                     .size(), opPartitions.get(consumerOpId).size()), spec
                                     .getConnectorRecordDescriptor(conn));
@@ -365,30 +375,35 @@
         }
     }
 
-    private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
-            final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
-        return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
-            private int frameCount;
+    private IFrameWriter createWriter(final IHyracksContext stageletContext, final IFrameWriter writer, JobPlan plan,
+            final IConnectorDescriptor conn, final int senderIndex, final int receiverIndex, final Stagelet stagelet)
+            throws HyracksDataException {
+        return plan.getJobFlags().contains(JobFlag.PROFILE_RUNTIME) ? new IFrameWriter() {
+            private ICounter openCounter = stageletContext.getCounterContext().getCounter(
+                    conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex + ".open", true);
+            private ICounter closeCounter = stageletContext.getCounterContext().getCounter(
+                    conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex + ".close", true);
+            private ICounter frameCounter = stageletContext.getCounterContext()
+                    .getCounter(
+                            conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex
+                                    + ".nextFrame", true);
 
             @Override
             public void open() throws HyracksDataException {
-                frameCount = 0;
                 writer.open();
+                openCounter.update(1);
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                ++frameCount;
+                frameCounter.update(1);
                 writer.nextFrame(buffer);
             }
 
             @Override
             public void close() throws HyracksDataException {
+                closeCounter.update(1);
                 writer.close();
-                stagelet.getStatistics()
-                        .getStatisticsMap()
-                        .put("framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "."
-                                + receiverIndex, String.valueOf(frameCount));
             }
 
             @Override
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index 5631d58..9b37aff 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.control.nc.job.profiling.CounterContext;
 import edu.uci.ics.hyracks.control.nc.runtime.OperatorRunnable;
 
 public class Stagelet {
@@ -44,6 +45,8 @@
 
     private final Map<OperatorInstanceId, OperatorRunnable> honMap;
 
+    private final CounterContext stageletCounterContext;
+
     private List<Endpoint> endpointList;
 
     private boolean started;
@@ -61,6 +64,7 @@
         pendingOperators = new HashSet<OperatorInstanceId>();
         started = false;
         honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
+        stageletCounterContext = new CounterContext(joblet.getJobId() + "." + stageId + "." + nodeId);
         stats = new StageletStatistics();
         stats.setNodeId(nodeId);
     }
@@ -73,6 +77,10 @@
         return honMap;
     }
 
+    public CounterContext getStageletCounterContext() {
+        return stageletCounterContext;
+    }
+
     public void setEndpointList(List<Endpoint> endpointList) {
         this.endpointList = endpointList;
     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
new file mode 100644
index 0000000..b883683
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/job/profiling/CounterContext.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.job.profiling;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+
+public class CounterContext implements ICounterContext {
+    private final String name;
+    private final Map<String, Counter> counterMap;
+
+    public CounterContext(String name) {
+        this.name = name;
+        counterMap = new HashMap<String, Counter>();
+    }
+
+    @Override
+    public synchronized ICounter getCounter(String name, boolean create) {
+        Counter counter = counterMap.get(name);
+        if (counter == null && create) {
+            counter = new Counter(name);
+            counterMap.put(name, counter);
+        }
+        return counter;
+    }
+
+    @Override
+    public String getContextName() {
+        return name;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
similarity index 61%
copy from hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
copy to hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
index f61ff8b..12c1a76 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
@@ -15,24 +15,31 @@
 package edu.uci.ics.hyracks.control.nc.runtime;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IResourceManager;
 
-public class HyracksContext implements IHyracksContext {
-    private final IResourceManager resourceManager;
-    private final int frameSize;
+public class DelegateHyracksContext implements IHyracksContext {
+    private final IHyracksContext delegate;
 
-    public HyracksContext(int frameSize) {
-        resourceManager = new ResourceManager(this);
-        this.frameSize = frameSize;
+    private final ICounterContext counterContext;
+
+    public DelegateHyracksContext(IHyracksContext delegate, ICounterContext counterContext) {
+        this.delegate = delegate;
+        this.counterContext = counterContext;
     }
 
     @Override
     public IResourceManager getResourceManager() {
-        return resourceManager;
+        return delegate.getResourceManager();
     }
 
     @Override
     public int getFrameSize() {
-        return frameSize;
+        return delegate.getFrameSize();
+    }
+
+    @Override
+    public ICounterContext getCounterContext() {
+        return counterContext;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index e1fcf12..6c48ec3 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -24,7 +24,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class OperatorRunnable implements Runnable {
-    private IOperatorNodePushable opNode;
+    private final IOperatorNodePushable opNode;
     private IFrameReader reader;
     private ByteBuffer buffer;
     private volatile boolean abort;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
similarity index 79%
rename from hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
rename to hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
index f61ff8b..7eddfc3 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/HyracksContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -15,13 +15,14 @@
 package edu.uci.ics.hyracks.control.nc.runtime;
 
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IResourceManager;
 
-public class HyracksContext implements IHyracksContext {
+public class RootHyracksContext implements IHyracksContext {
     private final IResourceManager resourceManager;
     private final int frameSize;
 
-    public HyracksContext(int frameSize) {
+    public RootHyracksContext(int frameSize) {
         resourceManager = new ResourceManager(this);
         this.frameSize = frameSize;
     }
@@ -35,4 +36,9 @@
     public int getFrameSize() {
         return frameSize;
     }
+
+    @Override
+    public ICounterContext getCounterContext() {
+        throw new UnsupportedOperationException();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index cbce6f6..a74d7a0 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -33,7 +33,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -119,7 +119,7 @@
         
         // build search keys (which must be of type ITupleReference)
         // put search keys into frame and create tuplereference factories
-        IHyracksContext ctx = new HyracksContext(32768); // WARNING: make sure frame size is same as on NCs
+        IHyracksContext ctx = new RootHyracksContext(32768); // WARNING: make sure frame size is same as on NCs
         ByteBuffer keyFrame = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		appender.reset(keyFrame, true);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
index 46adf24..7fca1d3 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeOperatorsTest.java
@@ -33,7 +33,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -194,7 +194,7 @@
 		
         
         // put search keys into frame and create tuplereference factories
-        IHyracksContext ctx = new HyracksContext(32768); // WARNING: make sure frame size is same as on NCs
+        IHyracksContext ctx = new RootHyracksContext(32768); // WARNING: make sure frame size is same as on NCs
         ByteBuffer keyFrame = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		appender.reset(keyFrame, true);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 56569d9..b75fae3 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -31,7 +31,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializingDataReader;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -47,7 +47,7 @@
         private List<ByteBuffer> buffers;
 
         public SerDeserRunner(RecordDescriptor rDes) {
-            ctx = new HyracksContext(FRAME_SIZE);
+            ctx = new RootHyracksContext(FRAME_SIZE);
             this.rDes = rDes;
             buffers = new ArrayList<ByteBuffer>();
         }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index d85fb57..077c917 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -74,7 +74,7 @@
     }
     
     protected void runTest(JobSpecification spec) throws Exception {
-        UUID jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.COLLECT_FRAME_COUNTS));
+        UUID jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         System.err.println(spec.toJSON());
         hcc.start(jobId);
         System.err.print(jobId);
diff --git a/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeFieldPrefixNSMTest.java b/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeFieldPrefixNSMTest.java
index 7aa776a..77ef504 100644
--- a/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeFieldPrefixNSMTest.java
+++ b/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeFieldPrefixNSMTest.java
@@ -30,7 +30,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -85,7 +85,7 @@
     private ITupleReference createTuple(int f0, int f1, int f2, boolean print) throws HyracksDataException {
     	if(print) System.out.println("CREATING: " + f0 + " " + f1 + " " + f2);
     	
-    	IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);        
+    	IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);        
         ByteBuffer buf = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
diff --git a/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeTest.java b/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeTest.java
index 5f2508b..9d4f390 100644
--- a/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeTest.java
+++ b/hyracks-storage-am-btree/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeTest.java
@@ -28,7 +28,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.control.nc.runtime.HyracksContext;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -137,7 +137,7 @@
         
         print("INSERTING INTO TREE\n");
         
-        IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);        
+        IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);        
         ByteBuffer frame = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -343,7 +343,7 @@
         
         print("INSERTING INTO TREE\n");
         
-        IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);        
+        IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);        
         ByteBuffer frame = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -514,7 +514,7 @@
     	Random rnd = new Random();
     	rnd.setSeed(50);
 
-    	IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);        
+    	IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);        
         ByteBuffer frame = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -682,7 +682,7 @@
         Random rnd = new Random();
         rnd.setSeed(50);
 
-        IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);        
+        IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);        
         ByteBuffer frame = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -837,7 +837,7 @@
         Random rnd = new Random();
         rnd.setSeed(50);
 
-        IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);        
+        IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);        
         ByteBuffer frame = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
@@ -985,7 +985,7 @@
         Random rnd = new Random();
         rnd.setSeed(50);
         
-        IHyracksContext ctx = new HyracksContext(HYRACKS_FRAME_SIZE);        
+        IHyracksContext ctx = new RootHyracksContext(HYRACKS_FRAME_SIZE);        
         ByteBuffer frame = ctx.getResourceManager().allocateFrame();
 		FrameTupleAppender appender = new FrameTupleAppender(ctx);				
 		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());