1. add deployment retry 2. support plan switch
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 0724a01..2407c10 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -181,36 +181,44 @@
* @throws HyracksException
*/
private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
- try {
- List<URL> downloadedFileURLs = new ArrayList<URL>();
- File dir = new File(deploymentDir);
- if (!dir.exists()) {
- FileUtils.forceMkdir(dir);
- }
- for (URL url : urls) {
- String urlString = url.toString();
- int slashIndex = urlString.lastIndexOf('/');
- String fileName = urlString.substring(slashIndex + 1).split("&")[1];
- String filePath = deploymentDir + File.separator + fileName;
- File targetFile = new File(filePath);
- if (isNC) {
- HttpClient hc = new DefaultHttpClient();
- HttpGet get = new HttpGet(url.toString());
- HttpResponse response = hc.execute(get);
- InputStream is = response.getEntity().getContent();
- OutputStream os = new FileOutputStream(targetFile);
- try {
- IOUtils.copyLarge(is, os);
- } finally {
- os.close();
- is.close();
- }
+ //retry 10 times at maximum for downloading binaries
+ int retryCount = 10;
+ int tried = 0;
+ Exception trace = null;
+ while (tried < retryCount) {
+ try {
+ tried++;
+ List<URL> downloadedFileURLs = new ArrayList<URL>();
+ File dir = new File(deploymentDir);
+ if (!dir.exists()) {
+ FileUtils.forceMkdir(dir);
}
- downloadedFileURLs.add(targetFile.toURI().toURL());
+ for (URL url : urls) {
+ String urlString = url.toString();
+ int slashIndex = urlString.lastIndexOf('/');
+ String fileName = urlString.substring(slashIndex + 1).split("&")[1];
+ String filePath = deploymentDir + File.separator + fileName;
+ File targetFile = new File(filePath);
+ if (isNC) {
+ HttpClient hc = new DefaultHttpClient();
+ HttpGet get = new HttpGet(url.toString());
+ HttpResponse response = hc.execute(get);
+ InputStream is = response.getEntity().getContent();
+ OutputStream os = new FileOutputStream(targetFile);
+ try {
+ IOUtils.copyLarge(is, os);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+ downloadedFileURLs.add(targetFile.toURI().toURL());
+ }
+ return downloadedFileURLs;
+ } catch (Exception e) {
+ trace = e;
}
- return downloadedFileURLs;
- } catch (Exception e) {
- throw new HyracksException(e);
}
+ throw new HyracksException(trace);
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index d6a6f3d..c86e563 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -47,10 +47,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.base.IDriver;
import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.util.ExceptionUtilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@@ -180,24 +177,7 @@
}
private JobGen selectJobGen(Plan planChoice, PregelixJob currentJob) {
- JobGen jobGen;
- switch (planChoice) {
- case INNER_JOIN:
- jobGen = new JobGenInnerJoin(currentJob);
- break;
- case OUTER_JOIN:
- jobGen = new JobGenOuterJoin(currentJob);
- break;
- case OUTER_JOIN_SORT:
- jobGen = new JobGenOuterJoinSort(currentJob);
- break;
- case OUTER_JOIN_SINGLE_SORT:
- jobGen = new JobGenOuterJoinSingleSort(currentJob);
- break;
- default:
- jobGen = new JobGenInnerJoin(currentJob);
- }
- return jobGen;
+ return JobGenFactory.createJobGen(planChoice, currentJob);
}
private long loadData(PregelixJob currentJob, JobGen jobGen, DeploymentId deploymentId) throws IOException,
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index f703fcc..b9f2d70 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -31,12 +31,14 @@
import java.util.UUID;
import java.util.logging.Logger;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -93,6 +95,7 @@
import edu.uci.ics.pregelix.api.job.PregelixJob;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.base.IJobGen;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
@@ -110,11 +113,14 @@
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeSearchFunctionUpdateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.function.ExtractLiveVertexIdFunctionFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -131,7 +137,7 @@
protected PregelixJob pregelixJob;
protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
- protected String jobId = UUID.randomUUID().toString();
+ protected String jobId;
protected int frameSize = ClusterConfig.getFrameSize();
protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
@@ -149,6 +155,10 @@
}
private void init(PregelixJob job) {
+ jobId = BspUtils.getJobId(job.getConfiguration());
+ if (jobId == null) {
+ jobId = UUID.randomUUID().toString();
+ }
conf = job.getConfiguration();
pregelixJob = job;
initJobConfiguration();
@@ -779,4 +789,88 @@
/** generate clean-up job */
public abstract JobSpecification[] generateCleanup() throws HyracksException;
+ /**
+ * Switch the plan to a desired one
+ *
+ * @param iteration
+ * , the latest completed iteration number
+ * @param plan
+ * , plan choice
+ * @return the list of jobspecification for preparing plan switch and the new jobgen
+ */
+ public Pair<List<JobSpecification>, JobGen> switchPlan(int iteration, Plan plan) throws HyracksException {
+ /**
+ * bulk-load a live vertex btree
+ */
+ List<JobSpecification> list = new ArrayList<JobSpecification>();
+ list.add(bulkLoadLiveVertexBTree(iteration));
+ JobGen jobGen = new JobGenInnerJoin(pregelixJob);
+ return Pair.of(list, jobGen);
+ }
+
+ /**
+ * Build a jobspec to bulkload the live vertex btree
+ *
+ * @param iteration
+ * @return the job specification
+ * @throws HyracksException
+ */
+ private JobSpecification bulkLoadLiveVertexBTree(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct btree search and function call update operator
+ */
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf,
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, vertexIdClass);
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ conf, vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(conf, vertexIdClass.getName(),
+ MsgList.class.getName());
+ TreeSearchFunctionUpdateOperatorDescriptor scanner = new TreeSearchFunctionUpdateOperatorDescriptor(spec,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
+ getIndexDataflowHelperFactory(), inputRdFactory, 1, new ExtractLiveVertexIdFunctionFactory(),
+ preHookFactory, null, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct bulk-load index operator
+ */
+ IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+ int[] fieldPermutation = new int[] { 0, 1 };
+ int[] keyFields = new int[] { 0 };
+ IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+ indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+ WritableComparator.get(vertexIdClass).getClass());
+ TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+ fieldPermutation, keyFields, DEFAULT_BTREE_FILL_FACTOR, getIndexDataflowHelperFactory());
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /** connect job spec */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, btreeBulkLoad, 0);
+ spec.addRoot(btreeBulkLoad);
+
+ return spec;
+ }
+
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
new file mode 100644
index 0000000..ed580de
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+
+public class JobGenFactory {
+
+ public static JobGen createJobGen(Plan planChoice, PregelixJob currentJob) {
+ JobGen jobGen = null;
+ switch (planChoice) {
+ case INNER_JOIN:
+ jobGen = new JobGenInnerJoin(currentJob);
+ break;
+ case OUTER_JOIN:
+ jobGen = new JobGenOuterJoin(currentJob);
+ break;
+ case OUTER_JOIN_SORT:
+ jobGen = new JobGenOuterJoinSort(currentJob);
+ break;
+ case OUTER_JOIN_SINGLE_SORT:
+ jobGen = new JobGenOuterJoinSingleSort(currentJob);
+ break;
+ default:
+ jobGen = new JobGenInnerJoin(currentJob);
+ }
+ return jobGen;
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
new file mode 100644
index 0000000..ae9463f
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ExtractLiveVertexIdFunctionFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.function;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.util.FrameTupleUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
+
+@SuppressWarnings("rawtypes")
+public class ExtractLiveVertexIdFunctionFactory implements IUpdateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IUpdateFunction createFunction() {
+ return new IUpdateFunction() {
+ // for writing intermediate data
+ private final ArrayTupleBuilder alive = new ArrayTupleBuilder(2);
+
+ // for writing out to alive message channel
+ private IFrameWriter writerAlive;
+ private FrameTupleAppender appenderAlive;
+ private ByteBuffer bufferAlive;
+
+ private MsgList dummyMessageList = new MsgList();
+ private Vertex vertex;
+
+ @Override
+ public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+ throws HyracksDataException {
+ this.writerAlive = writers[0];
+ this.bufferAlive = ctx.allocateFrame();
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive.reset(bufferAlive, true);
+ }
+
+ @Override
+ public void process(Object[] tuple) throws HyracksDataException {
+ try {
+ // vertex Id, vertex
+ alive.reset();
+ vertex = (Vertex) tuple[1];
+ if (!vertex.isHalted()) {
+ alive.reset();
+ DataOutput outputAlive = alive.getDataOutput();
+ vertex.getVertexId().write(outputAlive);
+ alive.addFieldEndOffset();
+ dummyMessageList.write(outputAlive);
+ alive.addFieldEndOffset();
+ FrameTupleUtils.flushTuple(appenderAlive, alive, writerAlive);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+ }
+
+ @Override
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb, IIndexCursor cursor)
+ throws HyracksDataException {
+
+ }
+ };
+ }
+}