Merge branch 'master' into zheilbron/hyracks_msr_demo
Conflicts:
hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 075de03..06e79de 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -80,6 +80,8 @@
public static final String RECOVERY_COUNT = "pregelix.recoveryCount";
/** the checkpoint interval */
public static final String CKP_INTERVAL = "pregelix.ckpinterval";
+ /** the dynamic optimization */
+ public static final String DYNAMIC_OPTIMIZATION = "pregelix.dynamicopt";
/** comma */
public static final String COMMA_STR = ",";
@@ -260,6 +262,15 @@
final public void setCheckpointingInterval(int ckpInterval) {
getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
}
+
+ /**
+ * Indicate if dynamic optimization is enabled
+ *
+ * @param dynamicOpt
+ */
+ final public void setEnableDynamicOptimization(boolean dynamicOpt){
+ getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
+ }
@Override
public String toString() {
diff --git a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index a5d9cd7..f44942f 100644
--- a/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -671,6 +671,16 @@
public static int getRecoveryCount(Configuration conf) {
return conf.getInt(PregelixJob.RECOVERY_COUNT, 0);
}
+
+ /***
+ * Get enable dynamic optimization
+ *
+ * @param conf Configuration
+ * @return true if enabled; otherwise false
+ */
+ public static boolean getEnableDynamicOptimization(Configuration conf){
+ return conf.getBoolean(PregelixJob.DYNAMIC_OPTIMIZATION, true);
+ }
/***
* Get the user-set checkpoint interval
diff --git a/pregelix-core/pom.xml b/pregelix-core/pom.xml
index 3d1699f..9ae263c 100644
--- a/pregelix-core/pom.xml
+++ b/pregelix-core/pom.xml
@@ -358,5 +358,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-client</artifactId>
+ <version>0.2.10-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index d6a6f3d..b3a90e9 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -21,6 +21,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
@@ -42,21 +43,24 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.stats.Counters;
+import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
import edu.uci.ics.pregelix.api.job.ICheckpointHook;
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.base.IDriver;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.optimizer.DynamicOptimizer;
+import edu.uci.ics.pregelix.core.optimizer.IOptimizer;
import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@SuppressWarnings("rawtypes")
public class Driver implements IDriver {
+ public static final String[] COUNTERS = { Counters.NUM_PROCESSOR, Counters.SYSTEM_LOAD, Counters.MEMORY_USAGE,
+ Counters.DISK_READ, Counters.DISK_WRITE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE };
private static final Log LOG = LogFactory.getLog(Driver.class);
private IHyracksClientConnection hcc;
private Class exampleClass;
@@ -93,6 +97,8 @@
PregelixJob currentJob = jobs.get(0);
PregelixJob lastJob = currentJob;
addHadoopConfiguration(currentJob, ipAddress, port, true);
+ ClientCounterContext counterContext = new ClientCounterContext(ipAddress, 16001,
+ Arrays.asList(ClusterConfig.getNCNames()));
JobGen jobGen = null;
/** prepare job -- deploy jars */
@@ -105,6 +111,7 @@
int retryCount = 0;
int maxRetryCount = 3;
jobGen = selectJobGen(planChoice, currentJob);
+ IOptimizer dynamicOptimzier = new DynamicOptimizer();
do {
try {
@@ -131,7 +138,10 @@
jobGen.reset(currentJob);
}
- /** run loop-body jobs */
+ /** run loop-body jobs with dynamic optimizer if it is enabled */
+ if (BspUtils.getEnableDynamicOptimization(currentJob.getConfiguration())) {
+ jobGen = dynamicOptimzier.optimize(counterContext, jobGen, i);
+ }
runLoopBody(deploymentId, currentJob, jobGen, i, lastSnapshotJobIndex, lastSnapshotSuperstep,
ckpHook, failed);
runClearState(deploymentId, jobGen);
@@ -156,6 +166,13 @@
}
} while (failed && retryCount < maxRetryCount);
LOG.info("job finished");
+ StringBuffer counterBuffer = new StringBuffer();
+ counterBuffer.append("performance counters\n");
+ for (String counter : COUNTERS) {
+ counterBuffer.append("\t" + counter + ": " + counterContext.getCounter(counter, false).get() + "\n");
+ }
+ LOG.info(counterBuffer.toString());
+ counterContext.stop();
} catch (Exception e) {
throw new HyracksException(e);
}
@@ -180,24 +197,7 @@
}
private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
- JobGen jobGen;
- switch (planChoice) {
- case INNER_JOIN:
- jobGen = new JobGenInnerJoin(currentJob);
- break;
- case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(currentJob);
- break;
- case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(currentJob);
- break;
- case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(currentJob);
- break;
- default:
- jobGen = new JobGenInnerJoin(currentJob);
- }
- return jobGen;
+ return JobGenFactory.createJobGen(planChoice, currentJob);
}
private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index f703fcc..c1f6aae 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -31,12 +31,14 @@
import java.util.UUID;
import java.util.logging.Logger;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -93,6 +95,7 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.base.IJobGen;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
@@ -110,11 +113,14 @@
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.function.ExtractLiveVertexIdFunctionFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -131,7 +137,7 @@
protected PregelixJob pregelixJob;
protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
- protected String jobId = UUID.randomUUID().toString();
+ protected String jobId = UUID.randomUUID().toString();;
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
@@ -147,6 +153,13 @@
public JobGen(PregelixJob job) {
init(job);
}
+
+ public JobGen(PregelixJob job, String jobId) {
+ if(jobId!=null){
+ this.jobId = jobId;
+ }
+ init(job);
+ }
private void init(PregelixJob job) {
conf = job.getConfiguration();
@@ -779,4 +792,88 @@
/** generate clean-up job */
public abstract JobSpecification[] generateCleanup() throws HyracksException;
+ /**
+ * Switch the plan to a desired one
+ *
+ * @param iteration
+ * , the latest completed iteration number
+ * @param plan
+ * , plan choice
+ * @return the list of jobspecification for preparing plan switch and the new jobgen
+ */
+ public Pair<List<JobSpecification>, JobGen> switchPlan(int iteration, Plan plan) throws HyracksException {
+ /**
+ * bulk-load a live vertex btree
+ */
+ List<JobSpecification> list = new ArrayList<JobSpecification>();
+ list.add(bulkLoadLiveVertexBTree(iteration));
+ JobGen jobGen = new JobGenInnerJoin(pregelixJob, jobId);
+ return Pair.of(list, jobGen);
+ }
+
+ /**
+ * Build a jobspec to bulkload the live vertex btree
+ *
+ * @param iteration
+ * @return the job specification
+ * @throws HyracksException
+ */
+ private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct btree search and function call update operator
+ */
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
+ MsgList.class.getName());
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
+ getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
+ preHookFactory, null, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct bulk-load index operator
+ */
+ IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+ int[] fieldPermutation = new int[] { 0, 1 };
+ int[] keyFields = new int[] { 0 };
+ IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+ indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+ WritableComparator.get(vertexIdClass).getClass());
+ TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+ fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /** connect job spec */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, btreeBulkLoad, 0);
+ spec.addRoot(btreeBulkLoad);
+
+ return spec;
+ }
+
}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
new file mode 100644
index 0000000..ed580de
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+
+public class JobGenFactory {
+
+ public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+ JobGen jobGen = null;
+ switch (planChoice) {
+ case INNER_JOIN:
+ jobGen = new JobGenInnerJoin(currentJob);
+ break;
+ case OUTER_JOIN:
+ jobGen = new JobGenOuterJoin(currentJob);
+ break;
+ case OUTER_JOIN_SORT:
+ jobGen = new JobGenOuterJoinSort(currentJob);
+ break;
+ case OUTER_JOIN_SINGLE_SORT:
+ jobGen = new JobGenOuterJoinSingleSort(currentJob);
+ break;
+ default:
+ jobGen = new JobGenInnerJoin(currentJob);
+ }
+ return jobGen;
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 1bad401..7bdb069 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -96,6 +96,10 @@
public JobGenInnerJoin(PregelixJob job) {
super(job);
}
+
+ public JobGenInnerJoin(PregelixJob job, String jobId) {
+ super(job, jobId);
+ }
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index d01c069..68e6706 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -72,6 +72,10 @@
public JobGenOuterJoin(PregelixJob job) {
super(job);
}
+
+ public JobGenOuterJoin(PregelixJob job, String jobId) {
+ super(job, jobId);
+ }
@Override
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 4480b97..3e4b213 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -72,6 +72,10 @@
public JobGenOuterJoinSingleSort(PregelixJob job) {
super(job);
}
+
+ public JobGenOuterJoinSingleSort(PregelixJob job, String jobId) {
+ super(job, jobId);
+ }
@Override
protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 89fbdcd..5c1a4b8 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -256,6 +256,10 @@
return locations;
}
+ public static String[] getNCNames() {
+ return NCs;
+ }
+
public static void addToBlackListNodes(Collection<String> nodes) {
blackListNodes.addAll(nodes);
}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
new file mode 100644
index 0000000..01fc81b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/DynamicOptimizer.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+
+public class DynamicOptimizer implements IOptimizer {
+
+ @Override
+ public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration) {
+ return jobGen;
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
new file mode 100644
index 0000000..b5913c4
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/optimizer/IOptimizer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.optimizer;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+
+public interface IOptimizer {
+
+ public JobGen optimize(ICounterContext counterContext, JobGen jobGen, int iteration);
+
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
new file mode 100644
index 0000000..ae9463f
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+@SuppressWarnings("rawtypes")
+public class ExtractLiveVertexIdFunctionFactory implements IUpdateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder alive = new ArrayTupleBuilder(2);
+
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+
+ private MsgList dummyMessageList = new MsgList();
+ private Vertex vertex;
+
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+ throws HyracksDataException {
+ this.writerAlive = writers[0];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ }
+
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ try {
+ // vertex Id, vertex
+ alive.reset();
+ vertex = (Vertex) tuple[1];
+ if (!vertex.isHalted()) {
+ alive.reset();
+ DataOutput outputAlive = alive.getDataOutput();
+ vertex.getVertexId().write(outputAlive);
+ alive.addFieldEndOffset();
+ dummyMessageList.write(outputAlive);
+ alive.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderAlive, alive, writerAlive);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+ }
+
+ @Override
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
+
+ }
+ };
+ }
+}