Merged hyracks_asterix_stabilization_yfix r2529, r2551, r2564, r2575, r2576, r2577, r2584, and r2597
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging_bigmerge_target@2748 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index f1e7acb..dde4443 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -44,6 +45,7 @@
protected ITypeTraitProvider typeTraitProvider;
protected ISerializerDeserializerProvider serializerDeserializerProvider;
protected IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+ protected IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
protected IBinaryComparatorFactoryProvider comparatorFactoryProvider;
protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
@@ -94,6 +96,14 @@
return hashFunctionFactoryProvider;
}
+ public void setHashFunctionFamilyProvider(IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider) {
+ this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+ }
+
+ public IBinaryHashFunctionFamilyProvider getHashFunctionFamilyProvider() {
+ return hashFunctionFamilyProvider;
+ }
+
public void setComparatorFactoryProvider(IBinaryComparatorFactoryProvider comparatorFactoryProvider) {
this.comparatorFactoryProvider = comparatorFactoryProvider;
}
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 1d21463..edc1b66 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -84,11 +84,12 @@
public JobSpecification createJob(Object appContext) throws AlgebricksException {
AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
JobGenContext context = new JobGenContext(null, metadata, appContext,
- serializerDeserializerProvider, hashFunctionFactoryProvider, comparatorFactoryProvider,
- typeTraitProvider, binaryBooleanInspectorFactory, binaryIntegerInspectorFactory,
- printerProvider, nullWriterFactory, normalizedKeyComputerFactoryProvider,
- expressionRuntimeProvider, expressionTypeComputer, nullableTypeComputer, oc,
- expressionEvalSizeComputer, partialAggregationTypeComputer, frameSize, clusterLocations);
+ serializerDeserializerProvider, hashFunctionFactoryProvider,
+ hashFunctionFamilyProvider, comparatorFactoryProvider, typeTraitProvider,
+ binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
+ nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
+ expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
+ partialAggregationTypeComputer, frameSize, clusterLocations);
PlanCompiler pc = new PlanCompiler(context);
return pc.compilePlan(plan, null);
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index c737cc4..6da42b4 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -32,14 +32,21 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
@@ -90,6 +97,8 @@
IVariableTypeEnvironment env = context.getTypeEnvironment(op);
IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
keysLeftBranch, env, context);
+ IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
+ keysLeftBranch, env, context);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
int i = 0;
IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
@@ -97,33 +106,75 @@
Object t = env.getVarType(v);
comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
}
- RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IOperatorDescriptor opDesc = null;
- try {
- switch (kind) {
- case INNER: {
- opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
- maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
- hashFunFactories, comparatorFactories, recDescriptor);
- break;
- }
- case LEFT_OUTER: {
- INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
- for (int j = 0; j < nullWriterFactories.length; j++) {
- nullWriterFactories[j] = context.getNullWriterFactory();
- }
- opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
- maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
- hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
- break;
- }
- default: {
- throw new NotImplementedException();
- }
+
+ boolean optimizedHashJoin = true;
+ for (IBinaryHashFunctionFamily family : hashFunFamilies) {
+ if (family == null) {
+ optimizedHashJoin = false;
+ break;
}
- } catch (HyracksDataException e) {
- throw new AlgebricksException(e);
+ }
+
+ if (!optimizedHashJoin) {
+ try {
+ switch (kind) {
+ case INNER: {
+ opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+ hashFunFactories, comparatorFactories, recDescriptor);
+ break;
+ }
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+ hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ } else {
+ try {
+ switch (kind) {
+ case INNER: {
+ opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+ comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+ keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+ keysRight, keysLeft));
+ break;
+ }
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+ comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+ keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+ keysRight, keysLeft), true, nullWriterFactories);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
}
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
@@ -140,3 +191,72 @@
}
}
+
+/**
+ * {@ ITuplePairComparatorFactory} implementation for optimized hybrid hash join.
+ */
+class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryComparatorFactory[] binaryComparatorFactories;
+ private final int[] keysLeft;
+ private final int[] keysRight;
+
+ public JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft,
+ int[] keysRight) {
+ this.binaryComparatorFactories = binaryComparatorFactory;
+ this.keysLeft = keysLeft;
+ this.keysRight = keysRight;
+ }
+
+ @Override
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+ IBinaryComparator[] binaryComparators = new IBinaryComparator[binaryComparatorFactories.length];
+ for (int i = 0; i < binaryComparators.length; i++) {
+ binaryComparators[i] = binaryComparatorFactories[i].createBinaryComparator();
+ }
+ return new JoinMultiComparator(binaryComparators, keysLeft, keysRight);
+ }
+}
+
+/**
+ * {@ ITuplePairComparator} implementation for optimized hybrid hash join.
+ * The comparator applies multiple binary comparators, one for each key pairs
+ */
+class JoinMultiComparator implements ITuplePairComparator {
+ private final IBinaryComparator[] binaryComparators;
+ private final int[] keysLeft;
+ private final int[] keysRight;
+
+ public JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) {
+ this.binaryComparators = bComparator;
+ this.keysLeft = keysLeft;
+ this.keysRight = keysRight;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ for (int i = 0; i < binaryComparators.length; i++) {
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, keysLeft[i]);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, keysLeft[i]);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, keysRight[i]);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, keysRight[i]);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = binaryComparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0,
+ accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 22a1a81..365d1a5 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -42,151 +43,167 @@
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
public class JobGenContext {
- private final IOperatorSchema outerFlowSchema;
- private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
- private final ISerializerDeserializerProvider serializerDeserializerProvider;
- private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
- private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
- private final IPrinterFactoryProvider printerFactoryProvider;
- private final ITypeTraitProvider typeTraitProvider;
- private final IMetadataProvider<?, ?> metadataProvider;
- private final INullWriterFactory nullWriterFactory;
- private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
- private final Object appContext;
- private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
- private final IBinaryIntegerInspectorFactory integerInspectorFactory;
- private final IExpressionRuntimeProvider expressionRuntimeProvider;
- private final IExpressionTypeComputer expressionTypeComputer;
- private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
- private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
- private final int frameSize;
- private AlgebricksPartitionConstraint clusterLocations;
- private int varCounter;
- private final ITypingContext typingContext;
+ private final IOperatorSchema outerFlowSchema;
+ private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
+ private final ISerializerDeserializerProvider serializerDeserializerProvider;
+ private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+ private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
+ private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
+ private final IPrinterFactoryProvider printerFactoryProvider;
+ private final ITypeTraitProvider typeTraitProvider;
+ private final IMetadataProvider<?, ?> metadataProvider;
+ private final INullWriterFactory nullWriterFactory;
+ private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
+ private final Object appContext;
+ private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+ private final IBinaryIntegerInspectorFactory integerInspectorFactory;
+ private final IExpressionRuntimeProvider expressionRuntimeProvider;
+ private final IExpressionTypeComputer expressionTypeComputer;
+ private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+ private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+ private final int frameSize;
+ private AlgebricksPartitionConstraint clusterLocations;
+ private int varCounter;
+ private final ITypingContext typingContext;
- public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
- ISerializerDeserializerProvider serializerDeserializerProvider,
- IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
- IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
- IBinaryBooleanInspectorFactory booleanInspectorFactory,
- IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
- INullWriterFactory nullWriterFactory,
- INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
- IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
- INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
- IExpressionEvalSizeComputer expressionEvalSizeComputer,
- IPartialAggregationTypeComputer partialAggregationTypeComputer, int frameSize,
- AlgebricksPartitionConstraint clusterLocations) {
- this.outerFlowSchema = outerFlowSchema;
- this.metadataProvider = metadataProvider;
- this.appContext = appContext;
- this.serializerDeserializerProvider = serializerDeserializerProvider;
- this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
- this.comparatorFactoryProvider = comparatorFactoryProvider;
- this.typeTraitProvider = typeTraitProvider;
- this.booleanInspectorFactory = booleanInspectorFactory;
- this.integerInspectorFactory = integerInspectorFactory;
- this.printerFactoryProvider = printerFactoryProvider;
- this.clusterLocations = clusterLocations;
- this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
- this.nullWriterFactory = nullWriterFactory;
- this.expressionRuntimeProvider = expressionRuntimeProvider;
- this.expressionTypeComputer = expressionTypeComputer;
- this.typingContext = typingContext;
- this.expressionEvalSizeComputer = expressionEvalSizeComputer;
- this.partialAggregationTypeComputer = partialAggregationTypeComputer;
- this.frameSize = frameSize;
- this.varCounter = 0;
- }
+ public JobGenContext(
+ IOperatorSchema outerFlowSchema,
+ IMetadataProvider<?, ?> metadataProvider,
+ Object appContext,
+ ISerializerDeserializerProvider serializerDeserializerProvider,
+ IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
+ IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider,
+ ITypeTraitProvider typeTraitProvider,
+ IBinaryBooleanInspectorFactory booleanInspectorFactory,
+ IBinaryIntegerInspectorFactory integerInspectorFactory,
+ IPrinterFactoryProvider printerFactoryProvider,
+ INullWriterFactory nullWriterFactory,
+ INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
+ IExpressionRuntimeProvider expressionRuntimeProvider,
+ IExpressionTypeComputer expressionTypeComputer,
+ INullableTypeComputer nullableTypeComputer,
+ ITypingContext typingContext,
+ IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ IPartialAggregationTypeComputer partialAggregationTypeComputer,
+ int frameSize, AlgebricksPartitionConstraint clusterLocations) {
+ this.outerFlowSchema = outerFlowSchema;
+ this.metadataProvider = metadataProvider;
+ this.appContext = appContext;
+ this.serializerDeserializerProvider = serializerDeserializerProvider;
+ this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
+ this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+ this.comparatorFactoryProvider = comparatorFactoryProvider;
+ this.typeTraitProvider = typeTraitProvider;
+ this.booleanInspectorFactory = booleanInspectorFactory;
+ this.integerInspectorFactory = integerInspectorFactory;
+ this.printerFactoryProvider = printerFactoryProvider;
+ this.clusterLocations = clusterLocations;
+ this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
+ this.nullWriterFactory = nullWriterFactory;
+ this.expressionRuntimeProvider = expressionRuntimeProvider;
+ this.expressionTypeComputer = expressionTypeComputer;
+ this.typingContext = typingContext;
+ this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+ this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+ this.frameSize = frameSize;
+ this.varCounter = 0;
+ }
- public IOperatorSchema getOuterFlowSchema() {
- return outerFlowSchema;
- }
+ public IOperatorSchema getOuterFlowSchema() {
+ return outerFlowSchema;
+ }
- public AlgebricksPartitionConstraint getClusterLocations() {
- return clusterLocations;
- }
+ public AlgebricksPartitionConstraint getClusterLocations() {
+ return clusterLocations;
+ }
- public IMetadataProvider<?, ?> getMetadataProvider() {
- return metadataProvider;
- }
+ public IMetadataProvider<?, ?> getMetadataProvider() {
+ return metadataProvider;
+ }
- public Object getAppContext() {
- return appContext;
- }
+ public Object getAppContext() {
+ return appContext;
+ }
- public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
- return serializerDeserializerProvider;
- }
+ public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
+ return serializerDeserializerProvider;
+ }
- public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
- return hashFunctionFactoryProvider;
- }
+ public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
+ return hashFunctionFactoryProvider;
+ }
- public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
- return comparatorFactoryProvider;
- }
+ public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
+ return hashFunctionFamilyProvider;
+ }
- public ITypeTraitProvider getTypeTraitProvider() {
- return typeTraitProvider;
- }
+ public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
+ return comparatorFactoryProvider;
+ }
- public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
- return booleanInspectorFactory;
- }
+ public ITypeTraitProvider getTypeTraitProvider() {
+ return typeTraitProvider;
+ }
- public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
- return integerInspectorFactory;
- }
+ public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+ return booleanInspectorFactory;
+ }
- public IPrinterFactoryProvider getPrinterFactoryProvider() {
- return printerFactoryProvider;
- }
+ public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+ return integerInspectorFactory;
+ }
- public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
- return expressionRuntimeProvider;
- }
+ public IPrinterFactoryProvider getPrinterFactoryProvider() {
+ return printerFactoryProvider;
+ }
- public IOperatorSchema getSchema(ILogicalOperator op) {
- return schemaMap.get(op);
- }
+ public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
+ return expressionRuntimeProvider;
+ }
- public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
- schemaMap.put(op, schema);
- }
+ public IOperatorSchema getSchema(ILogicalOperator op) {
+ return schemaMap.get(op);
+ }
- public LogicalVariable createNewVar() {
- varCounter++;
- LogicalVariable var = new LogicalVariable(-varCounter);
- return var;
- }
+ public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
+ schemaMap.put(op, schema);
+ }
- public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env) throws AlgebricksException {
- return expressionTypeComputer.getType(expr, typingContext.getMetadataProvider(), env);
- }
+ public LogicalVariable createNewVar() {
+ varCounter++;
+ LogicalVariable var = new LogicalVariable(-varCounter);
+ return var;
+ }
- public INullWriterFactory getNullWriterFactory() {
- return nullWriterFactory;
- }
+ public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env)
+ throws AlgebricksException {
+ return expressionTypeComputer.getType(expr,
+ typingContext.getMetadataProvider(), env);
+ }
- public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
- return normalizedKeyComputerFactoryProvider;
- }
+ public INullWriterFactory getNullWriterFactory() {
+ return nullWriterFactory;
+ }
- public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
- return expressionEvalSizeComputer;
- }
+ public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
+ return normalizedKeyComputerFactoryProvider;
+ }
- public int getFrameSize() {
- return frameSize;
- }
+ public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+ return expressionEvalSizeComputer;
+ }
- public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
- return partialAggregationTypeComputer;
- }
+ public int getFrameSize() {
+ return frameSize;
+ }
- public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
- return typingContext.getOutputTypeEnvironment(op);
- }
+ public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
+ return partialAggregationTypeComputer;
+ }
+
+ public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
+ return typingContext.getOutputTypeEnvironment(op);
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
index 790fb93..530d19c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -31,6 +32,7 @@
import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -42,8 +44,8 @@
@SuppressWarnings("rawtypes")
public static RecordDescriptor mkRecordDescriptor(IVariableTypeEnvironment env, IOperatorSchema opSchema,
- JobGenContext context) throws AlgebricksException {
- ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
+ JobGenContext context) throws AlgebricksException {
+ ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
ITypeTraits[] typeTraits = new ITypeTraits[opSchema.getSize()];
ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
ITypeTraitProvider ttp = context.getTypeTraitProvider();
@@ -59,7 +61,7 @@
}
return new RecordDescriptor(fields, typeTraits);
}
-
+
public static IPrinterFactory[] mkPrinterFactories(IOperatorSchema opSchema, IVariableTypeEnvironment env,
JobGenContext context, int[] printColumns) throws AlgebricksException {
IPrinterFactory[] pf = new IPrinterFactory[printColumns.length];
@@ -94,7 +96,20 @@
}
return funFactories;
}
-
+
+ public static IBinaryHashFunctionFamily[] variablesToBinaryHashFunctionFamilies(
+ Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+ throws AlgebricksException {
+ IBinaryHashFunctionFamily[] funFamilies = new IBinaryHashFunctionFamily[varLogical.size()];
+ int i = 0;
+ IBinaryHashFunctionFamilyProvider bhffProvider = context.getBinaryHashFunctionFamilyProvider();
+ for (LogicalVariable var : varLogical) {
+ Object type = env.getVarType(var);
+ funFamilies[i++] = bhffProvider.getBinaryHashFunctionFamily(type);
+ }
+ return funFamilies;
+ }
+
public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
throws AlgebricksException {
@@ -107,15 +122,14 @@
}
return compFactories;
}
-
- public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
- List<LogicalVariable> varLogical, int start, int size, IVariableTypeEnvironment env, JobGenContext context)
- throws AlgebricksException {
+
+ public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(List<LogicalVariable> varLogical,
+ int start, int size, IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[size];
IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
for (int i = 0; i < size; i++) {
- Object type = env.getVarType(varLogical.get(start + i));
- compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+ Object type = env.getVarType(varLogical.get(start + i));
+ compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
}
return compFactories;
}
@@ -144,15 +158,14 @@
}
return typeTraits;
}
-
- public static ITypeTraits[] variablesToTypeTraits(
- List<LogicalVariable> varLogical, int start, int size, IVariableTypeEnvironment env, JobGenContext context)
- throws AlgebricksException {
+
+ public static ITypeTraits[] variablesToTypeTraits(List<LogicalVariable> varLogical, int start, int size,
+ IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
ITypeTraits[] typeTraits = new ITypeTraits[size];
ITypeTraitProvider typeTraitProvider = context.getTypeTraitProvider();
for (int i = 0; i < size; i++) {
- Object type = env.getVarType(varLogical.get(start + i));
- typeTraits[i] = typeTraitProvider.getTypeTrait(type);
+ Object type = env.getVarType(varLogical.get(start + i));
+ typeTraits[i] = typeTraitProvider.getTypeTrait(type);
}
return typeTraits;
}
diff --git a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java
new file mode 100644
index 0000000..8a992b3
--- /dev/null
+++ b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.data;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public interface IBinaryHashFunctionFamilyProvider {
+
+ public IBinaryHashFunctionFamily getBinaryHashFunctionFamily(Object type)
+ throws AlgebricksException;
+}
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/RawUTF8StringPointable.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/RawUTF8StringPointable.java
new file mode 100644
index 0000000..c90ce5a
--- /dev/null
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/RawUTF8StringPointable.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.data.std.primitive;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
+import edu.uci.ics.hyracks.data.std.api.IComparable;
+import edu.uci.ics.hyracks.data.std.api.IHashable;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+
+/**
+ * This class provides the raw bytes-based comparison and hash function for UTF8 strings.
+ * Note that the comparison may not deliver the correct ordering for certain languages that include 2 or 3 bytes characters.
+ * But it works for single-byte character languages.
+ */
+public final class RawUTF8StringPointable extends AbstractPointable implements IHashable, IComparable {
+ public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return 0;
+ }
+ };
+
+ public static final IPointableFactory FACTORY = new IPointableFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IPointable createPointable() {
+ return new RawUTF8StringPointable();
+ }
+
+ @Override
+ public ITypeTraits getTypeTraits() {
+ return TYPE_TRAITS;
+ }
+ };
+
+ @Override
+ public int compareTo(IPointable pointer) {
+ return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
+ }
+
+ @Override
+ public int compareTo(byte[] bytes, int start, int length) {
+ int utflen1 = UTF8StringPointable.getUTFLength(this.bytes, this.start);
+ int utflen2 = UTF8StringPointable.getUTFLength(bytes, start);
+
+ int c1 = 0;
+ int c2 = 0;
+
+ int s1Start = this.start + 2;
+ int s2Start = start + 2;
+
+ while (c1 < utflen1 && c2 < utflen2) {
+ char ch1 = (char) this.bytes[s1Start + c1];
+ char ch2 = (char) bytes[s2Start + c2];
+
+ if (ch1 != ch2) {
+ return ch1 - ch2;
+ }
+ c1++;
+ c2++;
+ }
+ return utflen1 - utflen2;
+ }
+
+ @Override
+ public int hash() {
+ int h = 0;
+ int utflen = UTF8StringPointable.getUTFLength(bytes, start);
+ int sStart = start + 2;
+ int c = 0;
+
+ while (c < utflen) {
+ char ch = (char) bytes[sStart + c];
+ h = 31 * h + ch;
+ c++;
+ }
+ return h;
+ }
+
+ public void toString(StringBuilder buffer) {
+ UTF8StringPointable.toString(buffer, bytes, start);
+ }
+}
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/UTF8StringPointable.java
index f6d6093..866ebb0 100644
--- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/UTF8StringPointable.java
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -216,4 +216,4 @@
public void toString(StringBuilder buffer) {
toString(buffer, bytes, start);
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..e95e9c2
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
@@ -0,0 +1,28 @@
+package edu.uci.ics.hyracks.dataflow.common.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class DoubleNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ int prefix = IntegerSerializerDeserializer.getInt(bytes, start);
+ if (prefix >= 0) {
+ return prefix ^ Integer.MIN_VALUE;
+ } else {
+ return (int) ((long) 0xffffffff - (long) prefix);
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..d58afc1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
@@ -0,0 +1,28 @@
+package edu.uci.ics.hyracks.dataflow.common.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class FloatNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ int prefix = IntegerSerializerDeserializer.getInt(bytes, start);
+ if (prefix >= 0) {
+ return prefix ^ Integer.MIN_VALUE;
+ } else {
+ return (int) ((long) 0xffffffff - (long) prefix);
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..4589909
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -0,0 +1,55 @@
+package edu.uci.ics.hyracks.dataflow.common.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+
+public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+ private static final long serialVersionUID = 8735044913496854551L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private static final int POSTIVE_LONG_MASK = (3 << 30);
+ private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+ private static final int NEGATIVE_LONG_MASK = (0 << 30);
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ long value = Integer64SerializerDeserializer.getLong(bytes, start);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /**
+ * larger than Integer.MAX
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /**
+ * smaller than Integer.MAX but >=0
+ */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /**
+ * less than 0: have not optimized for that
+ */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ }
+
+ private int getKey(int value) {
+ return value ^ Integer.MIN_VALUE;
+ }
+
+ };
+ }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
index 2f7a778..6a01842 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
@@ -27,8 +27,7 @@
@Override
public int normalize(byte[] bytes, int start, int length) {
int value = IntegerSerializerDeserializer.getInt(bytes, start);
- long unsignedValue = (long) value;
- return (int) ((unsignedValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+ return value ^Integer.MIN_VALUE;
}
};
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
index 6a9dab7..d9191c7 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -32,7 +32,7 @@
return new IValueParser() {
@Override
public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
- long n = 0;
+ int n = 0;
int sign = 1;
int i = 0;
boolean pre = true;
@@ -102,7 +102,7 @@
throw new HyracksDataException("Encountered " + ch);
}
}
-
+
try {
out.writeLong(n * sign);
} catch (IOException e) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 2905574..6e2b16a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -81,6 +81,8 @@
private int[] buildPSizeInFrames; //Used for partition tuning
private int freeFramesCounter; //Used for partition tuning
+
+ private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
@@ -89,10 +91,10 @@
this.memForJoin = memForJoin;
this.buildRd = buildRd;
this.probeRd = probeRd;
- this.buildHpc = probeHpc;
- this.probeHpc = buildHpc;
- this.buildKeys = keys0;
- this.probeKeys = keys1;
+ this.buildHpc = buildHpc;
+ this.probeHpc = probeHpc;
+ this.buildKeys = keys1;
+ this.probeKeys = keys0;
this.comparators = comparators;
this.rel0Name = rel0Name;
this.rel1Name = rel1Name;
@@ -117,10 +119,10 @@
this.memForJoin = memForJoin;
this.buildRd = buildRd;
this.probeRd = probeRd;
- this.buildHpc = probeHpc;
- this.probeHpc = buildHpc;
- this.buildKeys = keys0;
- this.probeKeys = keys1;
+ this.buildHpc = buildHpc;
+ this.probeHpc = probeHpc;
+ this.buildKeys = keys1;
+ this.probeKeys = keys0;
this.comparators = comparators;
this.rel0Name = rel0Name;
this.rel1Name = rel1Name;
@@ -171,6 +173,12 @@
public void build(ByteBuffer buffer) throws HyracksDataException {
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
+
+ boolean print = false;
+ if(print){
+ accessorBuild.prettyPrint();
+ }
+
for (int i = 0; i < tupleCount; ++i) {
int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
processTuple(i, pid);
@@ -338,6 +346,7 @@
createInMemoryJoiner(inMemTupCount);
cacheInMemJoin();
+ this.isTableEmpty = (inMemTupCount == 0);
}
private void partitionTune() throws HyracksDataException {
@@ -457,10 +466,14 @@
}
public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-
accessorProbe.reset(buffer);
int tupleCount = accessorProbe.getTupleCount();
+ boolean print = false;
+ if(print){
+ accessorProbe.prettyPrint();
+ }
+
if (numOfSpilledParts == 0) {
inMemJoiner.join(buffer, writer);
return;
@@ -604,4 +617,8 @@
+ freeFramesCounter;
return s;
}
+
+ public boolean isTableEmpty(){
+ return this.isTableEmpty;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 3a7ee2c..4842ad7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -167,10 +167,10 @@
ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(probeAid, buildAid);
builder.addActivity(this, phase1);
- builder.addSourceEdge(0, phase1, 0);
+ builder.addSourceEdge(1, phase1, 0);
builder.addActivity(this, phase2);
- builder.addSourceEdge(1, phase2, 0);
+ builder.addSourceEdge(0, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -253,14 +253,7 @@
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
-
- final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
- if (isLeftOuter) {
- for (int i = 0; i < nullWriterFactories1.length; i++) {
- nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
- }
- }
-
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
.getJobId(), new TaskId(getActivityId(), partition));
@@ -278,9 +271,17 @@
state.memForJoin = memsize - 2;
state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
nPartitions);
- state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
- PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
- buildHpc);
+ if(!isLeftOuter){
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+ PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+ buildHpc);
+ }
+ else{
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+ PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+ buildHpc, isLeftOuter, nullWriterFactories1);
+ }
+
state.hybridHJ.initBuild();
}
@@ -368,7 +369,9 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.hybridHJ.probe(buffer, writer);
+ if(!state.hybridHJ.isTableEmpty()){
+ state.hybridHJ.probe(buffer, writer);
+ }
}
@Override
diff --git a/hyracks/hyracks-dist/pom.xml b/hyracks/hyracks-dist/pom.xml
new file mode 100755
index 0000000..4f9fc20
--- /dev/null
+++ b/hyracks/hyracks-dist/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>hyracks</artifactId>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>hyracks-dist</artifactId>
+ <name>hyracks-dist</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.5</version>
+ <executions>
+ <execution>
+ <id>copy-scripts</id>
+ <!-- here the phase you need -->
+ <phase>package</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/appassembler/</outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <directoryMode>0755</directoryMode>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.6</version>
+ <executions>
+ <execution>
+ <id>process-test-classes</id>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <chmod file="target/appassembler/bin/*)" perm="755" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/getip.sh b/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
new file mode 100755
index 0000000..e0cdf73
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
@@ -0,0 +1,21 @@
+#get the OS
+OS_NAME=`uname -a|awk '{print $1}'`
+LINUX_OS='Linux'
+
+if [ $OS_NAME = $LINUX_OS ];
+then
+ #Get IP Address
+ IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ if [ "$IPADDR" = "" ]
+ then
+ IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ fi
+else
+ IPADDR=`/sbin/ifconfig en1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ if [ "$IPADDR" = "" ]
+ then
+ IPADDR=`/sbin/ifconfig lo0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ fi
+
+fi
+echo $IPADDR
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startAllNCs.sh b/hyracks/hyracks-dist/src/main/resources/bin/startAllNCs.sh
new file mode 100755
index 0000000..629bd90
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startAllNCs.sh
@@ -0,0 +1,6 @@
+PREGELIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+ ssh $i "cd ${PREGELIX_PATH}; bin/startnc.sh"
+done
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startCluster.sh b/hyracks/hyracks-dist/src/main/resources/bin/startCluster.sh
new file mode 100755
index 0000000..a0c2063
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startCluster.sh
@@ -0,0 +1,3 @@
+bin/startcc.sh
+sleep 5
+bin/startAllNCs.sh
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh
new file mode 100755
index 0000000..fe6cf27
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh
@@ -0,0 +1,50 @@
+hostname
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CURRENT_PATH=`pwd`
+CCHOST=`ssh ${CCHOST_NAME} "cd ${CURRENT_PATH}; bin/getip.sh"`
+
+#Import cluster properties
+. conf/cluster.properties
+. conf/debugnc.properties
+
+#Clean up temp dir
+
+rm -rf $NCTMP_DIR2
+mkdir $NCTMP_DIR2
+
+#Clean up log dir
+rm -rf $NCLOGS_DIR2
+mkdir $NCLOGS_DIR2
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS2 | tr "," "\n")
+for io_dir in $io_dirs
+do
+ rm -rf $io_dir
+ mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+#Get OS
+IPADDR=`bin/getip.sh`
+
+#Get node ID
+NODEID=`hostname | cut -d '.' -f 1`
+NODEID=${NODEID}2
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS2
+
+cd $HYRACKS_HOME
+HYRACKS_HOME=`pwd`
+
+#Enter the temp dir
+cd $NCTMP_DIR2
+
+#Launch hyracks nc
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
new file mode 100755
index 0000000..fe2551d
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+hostname
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`bin/getip.sh`
+
+#Remove the temp dir
+rm -rf $CCTMP_DIR
+mkdir $CCTMP_DIR
+
+#Remove the logs dir
+rm -rf $CCLOGS_DIR
+mkdir $CCLOGS_DIR
+
+#Export JAVA_HOME and JAVA_OPTS
+export JAVA_HOME=$JAVA_HOME
+export JAVA_OPTS=$CCJAVA_OPTS
+
+#Launch hyracks cc script
+chmod -R 755 $HYRACKS_HOME
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
new file mode 100755
index 0000000..6e0f90e
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
@@ -0,0 +1,49 @@
+hostname
+
+MY_NAME=`hostname`
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CURRENT_PATH=`pwd`
+CCHOST=`ssh ${CCHOST_NAME} "cd ${CURRENT_PATH}; bin/getip.sh"`
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Clean up temp dir
+
+rm -rf $NCTMP_DIR
+mkdir $NCTMP_DIR
+
+#Clean up log dir
+rm -rf $NCLOGS_DIR
+mkdir $NCLOGS_DIR
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+ rm -rf $io_dir
+ mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+IPADDR=`bin/getip.sh`
+#echo $IPADDR
+
+#Get node ID
+NODEID=`hostname | cut -d '.' -f 1`
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS
+
+cd $HYRACKS_HOME
+HYRACKS_HOME=`pwd`
+
+#Enter the temp dir
+cd $NCTMP_DIR
+
+#Launch hyracks nc
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopAllNCs.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopAllNCs.sh
new file mode 100755
index 0000000..12367c1
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopAllNCs.sh
@@ -0,0 +1,6 @@
+PREGELIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+ ssh $i "cd ${PREGELIX_PATH}; bin/stopnc.sh"
+done
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopCluster.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopCluster.sh
new file mode 100755
index 0000000..4889934
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopCluster.sh
@@ -0,0 +1,3 @@
+bin/stopAllNCs.sh
+sleep 2
+bin/stopcc.sh
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopcc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopcc.sh
new file mode 100755
index 0000000..c2f525a
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopcc.sh
@@ -0,0 +1,10 @@
+hostname
+. conf/cluster.properties
+
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep hyracks|awk '{print $2}'`
+echo $PID
+kill -9 $PID
+
+#Clean up CC temp dir
+rm -rf $CCTMP_DIR/*
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
new file mode 100755
index 0000000..03ce4e7
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
@@ -0,0 +1,23 @@
+hostname
+. conf/cluster.properties
+
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+
+if [ "$PID" == "" ]; then
+ USERID=`id | sed 's/^uid=//;s/(.*$//'`
+ PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+fi
+
+echo $PID
+kill -9 $PID
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+ rm -rf $io_dir/*
+done
+
+#Clean up NC temp dir
+rm -rf $NCTMP_DIR/*
diff --git a/hyracks/hyracks-dist/src/main/resources/conf/cluster.properties b/hyracks/hyracks-dist/src/main/resources/conf/cluster.properties
new file mode 100755
index 0000000..3b382f7
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/conf/cluster.properties
@@ -0,0 +1,37 @@
+#The CC port for Hyracks clients
+CC_CLIENTPORT=3099
+
+#The CC port for Hyracks cluster management
+CC_CLUSTERPORT=1099
+
+#The directory of hyracks binaries
+HYRACKS_HOME=../../../
+
+#The tmp directory for cc to install jars
+CCTMP_DIR=/tmp/t1
+
+#The tmp directory for nc to install jars
+NCTMP_DIR=/tmp/t2
+
+#The directory to put cc logs
+CCLOGS_DIR=$CCTMP_DIR/logs
+
+#The directory to put nc logs
+NCLOGS_DIR=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS="/tmp/t3,/tmp/t4"
+
+#The JAVA_HOME
+JAVA_HOME=$JAVA_HOME
+
+#The frame size of the internal dataflow engine
+FRAME_SIZE=65536
+
+#CC JAVA_OPTS
+CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/hyracks/hyracks-dist/src/main/resources/conf/debugnc.properties b/hyracks/hyracks-dist/src/main/resources/conf/debugnc.properties
new file mode 100755
index 0000000..27afa26
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/conf/debugnc.properties
@@ -0,0 +1,12 @@
+#The tmp directory for nc to install jars
+NCTMP_DIR2=/tmp/t-1
+
+#The directory to put nc logs
+NCLOGS_DIR2=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS2="/tmp/t-2,/tmp/t-3"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS2="-Xdebug -Xrunjdwp:transport=dt_socket,address=7003,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/hyracks/hyracks-dist/src/main/resources/conf/master b/hyracks/hyracks-dist/src/main/resources/conf/master
new file mode 100755
index 0000000..2fbb50c
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/conf/master
@@ -0,0 +1 @@
+localhost
diff --git a/hyracks/hyracks-dist/src/main/resources/conf/slaves b/hyracks/hyracks-dist/src/main/resources/conf/slaves
new file mode 100755
index 0000000..2fbb50c
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/conf/slaves
@@ -0,0 +1 @@
+localhost
diff --git a/hyracks/pom.xml b/hyracks/pom.xml
index 26bd647..1d4e094 100644
--- a/hyracks/pom.xml
+++ b/hyracks/pom.xml
@@ -1,4 +1,4 @@
-
+<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
@@ -8,7 +8,7 @@
<name>hyracks</name>
<properties>
- <jvm.extraargs />
+ <jvm.extraargs/>
</properties>
<build>
@@ -101,5 +101,6 @@
<module>hyracks-hadoop-compat</module>
<!--module>hyracks-yarn</module-->
<module>hyracks-maven-plugins</module>
+ <module>hyracks-dist</module>
</modules>
-</project>
+</project>
\ No newline at end of file