Merge branch 'master' into zheilbron/hyracks_msr_demo
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index b35c6f9..a2e697d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -57,13 +57,8 @@
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
/**
- * <<<<<<< .working
- * Left input is broadcast and preserves its local properties. Right input can
- * be partitioned in any way.
- * =======
* Left input is broadcast and preserves its local properties.
* Right input can be partitioned in any way.
- * >>>>>>> .merge-right.r3014
*/
public class NLJoinPOperator extends AbstractJoinPOperator {
@@ -290,4 +285,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 09354a5..af4bff2 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -128,8 +128,8 @@
}
@Override
- public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 8973c75..918be11 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -239,7 +239,7 @@
@Override
public void close() throws HyracksDataException {
- if(outputAppender.getTupleCount() > 0){
+ if (outputAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outputFrame, outputWriter);
outputAppender.reset(outputFrame, true);
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 9b638b4..11a7a5c 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -119,8 +119,8 @@
}
@Override
- public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index addfb2e..06ef109 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -33,136 +33,118 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class AlgebricksMetaOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
+public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- // array of factories for building the local runtime pipeline
- private final AlgebricksPipeline pipeline;
+ // array of factories for building the local runtime pipeline
+ private final AlgebricksPipeline pipeline;
- public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec,
- int inputArity, int outputArity,
- IPushRuntimeFactory[] runtimeFactories,
- RecordDescriptor[] internalRecordDescriptors) {
- super(spec, inputArity, outputArity);
- if (outputArity == 1) {
- this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
- }
- this.pipeline = new AlgebricksPipeline(runtimeFactories,
- internalRecordDescriptors);
- }
+ public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
+ IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
+ super(spec, inputArity, outputArity);
+ if (outputArity == 1) {
+ this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+ }
+ this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
+ }
- public AlgebricksPipeline getPipeline() {
- return pipeline;
- }
+ public AlgebricksPipeline getPipeline() {
+ return pipeline;
+ }
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject json = super.toJSON();
- json.put("micro-operators", pipeline.getRuntimeFactories());
- return json;
- }
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = super.toJSON();
+ json.put("micro-operators", pipeline.getRuntimeFactories());
+ return json;
+ }
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Asterix { \n");
- for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
- sb.append(" " + f.toString() + ";\n");
- }
- sb.append("}");
- // sb.append(super.getInputArity());
- // sb.append(";");
- // sb.append(super.getOutputArity());
- // sb.append(";");
- return sb.toString();
- }
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Asterix { \n");
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" " + f.toString() + ";\n");
+ }
+ sb.append("}");
+ // sb.append(super.getInputArity());
+ // sb.append(";");
+ // sb.append(super.getOutputArity());
+ // sb.append(";");
+ return sb.toString();
+ }
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- if (inputArity == 0) {
- return createSourceInputPushRuntime(ctx, recordDescProvider,
- partition, nPartitions);
- } else {
- return createOneInputOneOutputPushRuntime(ctx, recordDescProvider,
- partition, nPartitions);
- }
- }
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ if (inputArity == 0) {
+ return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ } else {
+ return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ }
+ }
- private IOperatorNodePushable createSourceInputPushRuntime(
- final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
- public void initialize() throws HyracksDataException {
- IFrameWriter startOfPipeline;
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
+ public void initialize() throws HyracksDataException {
+ IFrameWriter startOfPipeline;
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
- PipelineAssembler pa = new PipelineAssembler(pipeline,
- inputArity, outputArity, null,
- pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- startOfPipeline.open();
- startOfPipeline.close();
- }
- };
- }
+ PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
+ pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ startOfPipeline.open();
+ startOfPipeline.close();
+ }
+ };
+ }
- private IOperatorNodePushable createOneInputOneOutputPushRuntime(
- final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private IFrameWriter startOfPipeline;
+ private IFrameWriter startOfPipeline;
- @Override
- public void open() throws HyracksDataException {
- if (startOfPipeline == null) {
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
- RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider
- .getInputRecordDescriptor(
- AlgebricksMetaOperatorDescriptor.this
- .getActivityId(), 0);
- PipelineAssembler pa = new PipelineAssembler(pipeline,
- inputArity, outputArity,
- pipelineInputRecordDescriptor,
- pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- startOfPipeline.open();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ if (startOfPipeline == null) {
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
+ RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
+ AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
+ PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
+ pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ startOfPipeline.open();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
- startOfPipeline.nextFrame(buffer);
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ startOfPipeline.nextFrame(buffer);
+ }
- @Override
- public void close() throws HyracksDataException {
- startOfPipeline.close();
- }
+ @Override
+ public void close() throws HyracksDataException {
+ startOfPipeline.close();
+ }
- @Override
- public void fail() throws HyracksDataException {
- startOfPipeline.fail();
- }
- };
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ startOfPipeline.fail();
+ }
+ };
+ }
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 131eea0..54ed192 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -57,8 +57,8 @@
@Override
public void open() throws HyracksDataException {
if (frameSorter == null) {
- frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
- outputRecordDesc);
+ frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, outputRecordDesc);
}
frameSorter.reset();
writer.open();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index 048dbc9..d19dd34 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -41,11 +41,11 @@
public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
public static int NO_DEFAULT_BRANCH = -1;
-
+
private final ICopyEvaluatorFactory[] evalFactories;
private final IBinaryBooleanInspector boolInspector;
private final int defaultBranchIndex;
-
+
public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories,
IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) {
super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
@@ -66,14 +66,15 @@
private final ByteBuffer[] writeBuffers = new ByteBuffer[outputArity];
private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
- private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
+ 0);
private final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inOutRecDesc);
private final FrameTupleReference frameTuple = new FrameTupleReference();
-
+
private final FrameTupleAppender tupleAppender = new FrameTupleAppender(ctx.getFrameSize());
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
private final DataOutput tupleDos = tupleBuilder.getDataOutput();
-
+
@Override
public void close() throws HyracksDataException {
// Flush (possibly not full) buffers that have data, and close writers.
@@ -102,28 +103,28 @@
boolean found = false;
for (int j = 0; j < evals.length; j++) {
try {
- evalBuf.reset();
+ evalBuf.reset();
evals[j].evaluate(frameTuple);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
if (found) {
- copyAndAppendTuple(j);
- break;
+ copyAndAppendTuple(j);
+ break;
}
}
// Optionally write to default output branch.
if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
- copyAndAppendTuple(defaultBranchIndex);
+ copyAndAppendTuple(defaultBranchIndex);
}
}
}
private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
- // Copy tuple into tuple builder.
+ // Copy tuple into tuple builder.
try {
- tupleBuilder.reset();
+ tupleBuilder.reset();
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
frameTuple.getFieldLength(i));
@@ -134,15 +135,17 @@
}
// Append to frame.
tupleAppender.reset(writeBuffers[outputIndex], false);
- if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
FrameUtils.flushFrame(writeBuffers[outputIndex], writers[outputIndex]);
tupleAppender.reset(writeBuffers[outputIndex], true);
- if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
throw new IllegalStateException();
}
}
}
-
+
@Override
public void open() throws HyracksDataException {
for (IFrameWriter writer : writers) {
@@ -155,13 +158,13 @@
tupleAppender.reset(writeBuffers[i], true);
}
// Create evaluators for partitioning.
- try {
- for (int i = 0; i < evalFactories.length; i++) {
- evals[i] = evalFactories[i].createEvaluator(evalBuf);
- }
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
+ try {
+ for (int i = 0; i < evalFactories.length; i++) {
+ evals[i] = evalFactories[i].createEvaluator(evalBuf);
+ }
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
}
@Override
@@ -171,4 +174,3 @@
};
}
}
-
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index 015e3a9..1ca4c6d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -58,5 +58,5 @@
* @return The Cluster Controller Context.
*/
public ICCContext getCCContext();
-
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
index 7225969..5a567b4 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
@@ -3,15 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
-<<<<<<< HEAD
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
-=======
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
->>>>>>> master
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
index 83aa0f2..8e35dda 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -19,8 +19,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
- * {@link IFrameWriter} is the interface implemented by a stream consumer. An
- * {@link IFrameWriter} could be in one of the following states:
+ * {@link IFrameWriter} is the interface implemented by a stream consumer. An {@link IFrameWriter} could be in one of the following states:
* <ul>
* <li>INITIAL</li>
* <li>OPENED</li>
@@ -29,33 +28,21 @@
* </ul>
* A producer follows the following protocol when using an {@link IFrameWriter}.
* Initially, the {@link IFrameWriter} is in the INITIAL state.
- * The first valid call to an {@link IFrameWriter} is always the
- * {@link IFrameWriter#open()}. This call provides the opportunity for the
- * {@link IFrameWriter} implementation to allocate any resources for its
+ * The first valid call to an {@link IFrameWriter} is always the {@link IFrameWriter#open()}. This call provides the opportunity for the {@link IFrameWriter} implementation to allocate any resources for its
* processing. Once this call returns, the {@link IFrameWriter} is in the OPENED
* state. If an error occurs
- * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException}
- * is thrown and it stays in the INITIAL state.
+ * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException} is thrown and it stays in the INITIAL state.
* While the {@link IFrameWriter} is in the OPENED state, the producer can call
* one of:
* <ul>
- * <li> {@link IFrameWriter#close()} to give up any resources owned by the
- * {@link IFrameWriter} and enter the CLOSED state.</li>
- * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the
- * {@link IFrameWriter}. The call returns normally on success and the
- * {@link IFrameWriter} remains in the OPENED state. On failure, the call throws
- * a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR
- * state.</li>
- * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The
- * {@link IFrameWriter} enters the FAILED state.</li>
+ * <li> {@link IFrameWriter#close()} to give up any resources owned by the {@link IFrameWriter} and enter the CLOSED state.</li>
+ * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the {@link IFrameWriter}. The call returns normally on success and the {@link IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR state.</li>
+ * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The {@link IFrameWriter} enters the FAILED state.</li>
* </ul>
- * In the FAILED state, the only call allowed is the
- * {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
+ * In the FAILED state, the only call allowed is the {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
* state and give up all resources.
* No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
- *
- * Note: If the call to {@link IFrameWriter#open()} failed, the
- * {@link IFrameWriter#close()} is not called by the producer. So an exceptional
+ * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} is not called by the producer. So an exceptional
* return from the {@link IFrameWriter#open()} call must clean up all partially
* allocated resources.
*
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index 1af32de..965ade7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,6 +25,6 @@
public IIOManager getIOManager();
public ByteBuffer allocateFrame() throws HyracksDataException;
-
+
public void deallocateFrames(int frameCount);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ILinearizeComparatorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ILinearizeComparatorFactory.java
index a06b311..d4b158b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ILinearizeComparatorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ILinearizeComparatorFactory.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.api.dataflow.value;
-
public interface ILinearizeComparatorFactory extends IBinaryComparatorFactory {
public ILinearizeComparator createBinaryComparator();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
index 575472c..d42d714 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
@@ -21,5 +21,5 @@
* Compares two tuples to make sure that records, whose comparison keys are NULL do not pass comparator filter
*/
public interface IPredicateEvaluator {
- public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1, int tupId1);
+ public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1, int tupId1);
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
index bc2f339..41cd6f6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
@@ -22,5 +22,5 @@
*/
public interface IPredicateEvaluatorFactory extends Serializable {
- public IPredicateEvaluator createPredicateEvaluator();
+ public IPredicateEvaluator createPredicateEvaluator();
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
index 029ab21..09127b5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
@@ -21,6 +21,6 @@
* Provides PredicateEvaluatorFactory based on (equi-join) keys
*/
-public interface IPredicateEvaluatorFactoryProvider extends Serializable{
- public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys0, int[] keys1);
+public interface IPredicateEvaluatorFactoryProvider extends Serializable {
+ public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys0, int[] keys1);
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
index 8d8d728..6eab7a7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
@@ -21,7 +21,6 @@
* The representation of a deployment id
*
* @author yingyib
- *
*/
public class DeploymentId implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java
index 4eda148..2270c6a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java
@@ -34,13 +34,13 @@
}
public File getFile() {
- return file;
+ return file;
}
public IODeviceHandle getDeviceHandle() {
- return dev;
+ return dev;
}
-
+
@Override
public String toString() {
return file.getAbsolutePath();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
index a105953..dcd8860 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
@@ -60,7 +60,6 @@
public Class<?> loadClass(String className) throws HyracksException;
/**
- *
* @param binaryURLs
* @throws HyracksException
*/
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
index 812a32a..7d3230e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
@@ -1,72 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.api.job;
import java.io.Serializable;
-import java.util.List;
import java.util.Map;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-public class JobInfo implements Serializable{
+public class JobInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private final JobId jobId;
- private JobStatus status;
+ private final JobStatus status;
- private List<Exception> exceptions;
+ private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
- private JobStatus pendingStatus;
-
- private List<Exception> pendingExceptions;
-
- private Map<OperatorDescriptorId, List<String>> operatorLocations;
-
- public JobInfo(JobId jobId, JobStatus jobStatus, Map<OperatorDescriptorId, List<String>> operatorLocations) {
+ public JobInfo(JobId jobId, JobStatus jobStatus, Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations) {
this.jobId = jobId;
this.operatorLocations = operatorLocations;
- this.status = status;
+ this.status = jobStatus;
+ }
+
+ public JobId getJobId() {
+ return jobId;
}
public JobStatus getStatus() {
return status;
}
- public void setStatus(JobStatus status) {
- this.status = status;
- }
-
- public List<Exception> getExceptions() {
- return exceptions;
- }
-
- public void setExceptions(List<Exception> exceptions) {
- this.exceptions = exceptions;
- }
-
- public JobStatus getPendingStatus() {
- return pendingStatus;
- }
-
- public void setPendingStatus(JobStatus pendingStatus) {
- this.pendingStatus = pendingStatus;
- }
-
- public List<Exception> getPendingExceptions() {
- return pendingExceptions;
- }
-
- public void setPendingExceptions(List<Exception> pendingExceptions) {
- this.pendingExceptions = pendingExceptions;
- }
-
- public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+ public Map<OperatorDescriptorId, Map<Integer, String>> getOperatorLocations() {
return operatorLocations;
}
- public void setOperatorLocations(Map<OperatorDescriptorId, List<String>> operatorLocations) {
- this.operatorLocations = operatorLocations;
- }
-
- public JobId getJobId() {
- return jobId;
- }
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
index e9ddd7e..7f9b5ea 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
@@ -18,7 +18,6 @@
/**
* @author rico
- *
*/
public interface IMessage extends Serializable {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
index 7b33eb4..f4f770f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
@@ -16,7 +16,6 @@
/**
* @author rico
- *
*/
public interface IMessageBroker {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/ResultSetPartitionId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/ResultSetPartitionId.java
index 21f8d98..0c2e0cc 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/ResultSetPartitionId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/ResultSetPartitionId.java
@@ -25,7 +25,7 @@
private final JobId jobId;
private final ResultSetId resultSetId;
-
+
private final int partition;
public ResultSetPartitionId(JobId jobId, ResultSetId resultSetId, int partition) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/AbstractPage.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/AbstractPage.java
index ecae1cd..dd4642d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/AbstractPage.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/AbstractPage.java
@@ -20,7 +20,7 @@
public class AbstractPage extends WebPage {
private static final long serialVersionUID = 1L;
-
+
public HyracksAdminConsoleApplication getAdminConsoleApplication() {
return (HyracksAdminConsoleApplication) getApplication();
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index dab05f7..de3c1c0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -89,7 +89,7 @@
private List<Exception> pendingExceptions;
- private Map<OperatorDescriptorId, List<String>> operatorLocations;
+ private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
@@ -105,7 +105,7 @@
cleanupPendingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
- operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
+ operatorLocations = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
}
public DeploymentId getDeploymentId() {
@@ -183,13 +183,13 @@
this.endTime = endTime;
}
- public void registerOperatorLocation(OperatorDescriptorId op, String location) {
- List<String> locations = operatorLocations.get(op);
+ public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
+ Map<Integer, String> locations = operatorLocations.get(op);
if (locations == null) {
- locations = new ArrayList<String>();
+ locations = new HashMap<Integer, String>();
operatorLocations.put(op, locations);
}
- locations.add(location);
+ locations.put(partition, location);
}
@Override
@@ -393,7 +393,7 @@
return result;
}
- public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+ public Map<OperatorDescriptorId, Map<Integer, String>> getOperatorLocations() {
return operatorLocations;
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index d2c018f..ad4744b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -329,7 +329,7 @@
taskAttemptMap.put(nodeId, tads);
}
OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
- jobRun.registerOperatorLocation(opId, nodeId);
+ jobRun.registerOperatorLocation(opId, tid.getPartition(), nodeId);
ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
index d54444c..ef82862 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -71,7 +71,7 @@
nodeIds.add(nc);
}
final DeploymentRun dRun = new DeploymentRun(nodeIds);
-
+
/** The following call prevents a user to undeploy with the same deployment id simultaneously. */
ccs.addDeploymentRun(deploymentId, dRun);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
index c2e08f2..72f8811 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -39,8 +39,8 @@
if (run == null) {
run = ccs.getRunMapArchive().get(jobId);
}
- JobInfo info = run == null ? null
- : new JobInfo(run.getJobId(), run.getStatus(), run.getOperatorLocations());
+ JobInfo info = (run != null) ? new JobInfo(run.getJobId(), run.getStatus(), run.getOperatorLocations())
+ : null;
callback.setValue(info);
} catch (Exception e) {
callback.setException(e);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index c7244fb..f6c1e54 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -113,4 +113,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 1d4daf7..01525e4 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -124,4 +124,3 @@
}
}
-