Skip to content

Commit

Permalink
Issue hibernating-rhinos#13: Test and fix for loss of final rows by S…
Browse files Browse the repository at this point in the history
…ortMergeJoinOperation.
  • Loading branch information
ggeurts committed Dec 12, 2013
1 parent 4f5e74d commit a2abf60
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
12 changes: 8 additions & 4 deletions Rhino.Etl.Core/Operations/SortMergeJoinOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
rightRows.MoveNext();
Row rightRow = (Row) rightRows.Current;

while (leftRow != null && rightRow != null)
while (leftRow != null || rightRow != null)
{
var match = MatchJoinCondition(leftRow, rightRow);
Row mergedRow = null;

var match = CompareRows(leftRow, rightRow);
if (match == 0)
{
mergedRow = MergeRows(leftRow, rightRow);
Expand Down Expand Up @@ -101,9 +101,13 @@ public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
if (mergedRow != null)
yield return mergedRow;
}
}

if (leftRow == null && rightRow != null && (JoinType & JoinType.Right) != 0)
yield return MergeRows(new Row(), rightRow);
private int CompareRows(Row leftRow, Row rightRow)
{
return leftRow == null
? (rightRow == null ? 0 : 1)
: (rightRow == null ? -1 : MatchJoinCondition(leftRow, rightRow));
}

/// <summary>
Expand Down
1 change: 0 additions & 1 deletion Rhino.Etl.Tests/Joins/BaseJoinFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public BaseJoinFixture()

AddPerson(3, "foo@example.org");
AddPerson(5, "silver@exaple.org");

}

protected void AddPerson(int id, string email)
Expand Down
48 changes: 47 additions & 1 deletion Rhino.Etl.Tests/Joins/JoinFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,52 @@ public void SortMergeFullJoin()
Assert.Null(items[2]["name"]);
Assert.Equal(5, items[2]["person_id"]);
}
}
}

[Fact]
public void SortMergeFullJoinMergesMultipleNonMatchingLeftRowsAtEnd()
{
AddUser("toby", "toby@test.org");
AddUser("tom", "tom@test.org");

using (FullMergeJoinUsersToPeopleByEmail join = new FullMergeJoinUsersToPeopleByEmail())
{
join.Left(new GenericEnumerableOperation(left))
.Right(new GenericEnumerableOperation(right));
join.PrepareForExecution(new SingleThreadedPipelineExecuter());
IEnumerable<Row> result = join.Execute(null);
List<Row> items = new List<Row>(result);

Assert.Equal(5, items.Count);
Assert.Null(items[3]["person_id"]);
Assert.Equal("toby", items[3]["name"]);

Assert.Null(items[4]["person_id"]);
Assert.Equal("tom", items[4]["name"]);
}
}

[Fact]
public void SortMergeFullJoinMergesMultipleNonMatchingRightRowsAtEnd()
{
AddPerson(8, "toby@test.org");
AddPerson(10, "tom@test.org");

using (FullMergeJoinUsersToPeopleByEmail join = new FullMergeJoinUsersToPeopleByEmail())
{
join.Left(new GenericEnumerableOperation(left))
.Right(new GenericEnumerableOperation(right));
join.PrepareForExecution(new SingleThreadedPipelineExecuter());
IEnumerable<Row> result = join.Execute(null);
List<Row> items = new List<Row>(result);

Assert.Equal(5, items.Count);
Assert.Equal(8, items[3]["person_id"]);
Assert.Null(items[3]["name"]);

Assert.Equal(10, items[4]["person_id"]);
Assert.Null(items[4]["name"]);
}
}
}
}

0 comments on commit a2abf60

Please sign in to comment.