temporarily altered defaults for testing
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 06ef109..addfb2e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -33,118 +33,136 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class AlgebricksMetaOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- // array of factories for building the local runtime pipeline
- private final AlgebricksPipeline pipeline;
+ // array of factories for building the local runtime pipeline
+ private final AlgebricksPipeline pipeline;
- public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
- IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
- super(spec, inputArity, outputArity);
- if (outputArity == 1) {
- this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
- }
- this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
- }
+ public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ int inputArity, int outputArity,
+ IPushRuntimeFactory[] runtimeFactories,
+ RecordDescriptor[] internalRecordDescriptors) {
+ super(spec, inputArity, outputArity);
+ if (outputArity == 1) {
+ this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+ }
+ this.pipeline = new AlgebricksPipeline(runtimeFactories,
+ internalRecordDescriptors);
+ }
- public AlgebricksPipeline getPipeline() {
- return pipeline;
- }
+ public AlgebricksPipeline getPipeline() {
+ return pipeline;
+ }
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject json = super.toJSON();
- json.put("micro-operators", pipeline.getRuntimeFactories());
- return json;
- }
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = super.toJSON();
+ json.put("micro-operators", pipeline.getRuntimeFactories());
+ return json;
+ }
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Asterix { \n");
- for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
- sb.append(" " + f.toString() + ";\n");
- }
- sb.append("}");
- // sb.append(super.getInputArity());
- // sb.append(";");
- // sb.append(super.getOutputArity());
- // sb.append(";");
- return sb.toString();
- }
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Asterix { \n");
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" " + f.toString() + ";\n");
+ }
+ sb.append("}");
+ // sb.append(super.getInputArity());
+ // sb.append(";");
+ // sb.append(super.getOutputArity());
+ // sb.append(";");
+ return sb.toString();
+ }
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- if (inputArity == 0) {
- return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- } else {
- return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- }
- }
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ if (inputArity == 0) {
+ return createSourceInputPushRuntime(ctx, recordDescProvider,
+ partition, nPartitions);
+ } else {
+ return createOneInputOneOutputPushRuntime(ctx, recordDescProvider,
+ partition, nPartitions);
+ }
+ }
- private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private IOperatorNodePushable createSourceInputPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
- public void initialize() throws HyracksDataException {
- IFrameWriter startOfPipeline;
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
+ public void initialize() throws HyracksDataException {
+ IFrameWriter startOfPipeline;
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
- PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
- pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- startOfPipeline.open();
- startOfPipeline.close();
- }
- };
- }
+ PipelineAssembler pa = new PipelineAssembler(pipeline,
+ inputArity, outputArity, null,
+ pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ startOfPipeline.open();
+ startOfPipeline.close();
+ }
+ };
+ }
- private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private IOperatorNodePushable createOneInputOneOutputPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private IFrameWriter startOfPipeline;
+ private IFrameWriter startOfPipeline;
- @Override
- public void open() throws HyracksDataException {
- if (startOfPipeline == null) {
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
- RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
- AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
- PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
- pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- startOfPipeline.open();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ if (startOfPipeline == null) {
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
+ RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider
+ .getInputRecordDescriptor(
+ AlgebricksMetaOperatorDescriptor.this
+ .getActivityId(), 0);
+ PipelineAssembler pa = new PipelineAssembler(pipeline,
+ inputArity, outputArity,
+ pipelineInputRecordDescriptor,
+ pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ startOfPipeline.open();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- startOfPipeline.nextFrame(buffer);
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ startOfPipeline.nextFrame(buffer);
+ }
- @Override
- public void close() throws HyracksDataException {
- startOfPipeline.close();
- }
+ @Override
+ public void close() throws HyracksDataException {
+ startOfPipeline.close();
+ }
- @Override
- public void fail() throws HyracksDataException {
- startOfPipeline.fail();
- }
- };
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ startOfPipeline.fail();
+ }
+ };
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index c7244fb..8e5b672 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -38,10 +38,10 @@
public int httpPort = 16001;
@Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
- public int heartbeatPeriod = 10000;
+ public int heartbeatPeriod = 5000;
@Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
- public int maxHeartbeatLapsePeriods = 5;
+ public int maxHeartbeatLapsePeriods = 1;
@Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
public int profileDumpPeriod = 0;