Skip to content

Commit 5525660

Browse files
authored
Merge pull request #125 from ipfs/fix/cleanup-query
cleanup and optimize naive query filters
2 parents bce485c + b086f25 commit 5525660

File tree

6 files changed

+100
-104
lines changed

6 files changed

+100
-104
lines changed

batch.go

-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package datastore
22

3-
type verb int
4-
53
type op struct {
64
delete bool
75
value []byte

key_test.go

-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package datastore_test
22

33
import (
44
"bytes"
5-
"math/rand"
65
"path"
76
"strings"
87
"testing"
@@ -14,16 +13,6 @@ import (
1413
// Hook up gocheck into the "go test" runner.
1514
func Test(t *testing.T) { TestingT(t) }
1615

17-
func randomString() string {
18-
chars := "abcdefghijklmnopqrstuvwxyz1234567890"
19-
var buf bytes.Buffer
20-
l := rand.Intn(50)
21-
for j := 0; j < l; j++ {
22-
buf.WriteByte(chars[rand.Intn(len(chars))])
23-
}
24-
return buf.String()
25-
}
26-
2716
type KeySuite struct{}
2817

2918
var _ = Suite(&KeySuite{})

query/filter_test.go

-6
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@ import (
55
"testing"
66
)
77

8-
type filterTestCase struct {
9-
filter Filter
10-
keys []string
11-
expect []string
12-
}
13-
148
func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) {
159
e := make([]Entry, len(keys))
1610
for i, k := range keys {

query/order_test.go

-6
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@ import (
55
"testing"
66
)
77

8-
type orderTestCase struct {
9-
order Order
10-
keys []string
11-
expect []string
12-
}
13-
148
func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) {
159
e := make([]Entry, len(keys))
1610
for i, k := range keys {

query/query.go

+19-7
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ type Entry struct {
7575
}
7676

7777
// Result is a special entry that includes an error, so that the client
78-
// may be warned about internal errors.
78+
// may be warned about internal errors. If Error is non-nil, Entry must be
79+
// empty.
7980
type Result struct {
8081
Entry
8182

@@ -203,12 +204,12 @@ func NewResultBuilder(q Query) *ResultBuilder {
203204
}
204205

205206
// ResultsWithChan returns a Results object from a channel
206-
// of Result entries. Respects its own Close()
207+
// of Result entries.
208+
//
209+
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
210+
// will leave anything trying to write to the result channel hanging.
207211
func ResultsWithChan(q Query, res <-chan Result) Results {
208-
b := NewResultBuilder(q)
209-
210-
// go consume all the entries and add them to the results.
211-
b.Process.Go(func(worker goprocess.Process) {
212+
return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) {
212213
for {
213214
select {
214215
case <-worker.Closing(): // client told us to close early
@@ -219,13 +220,24 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
219220
}
220221

221222
select {
222-
case b.Output <- e:
223+
case out <- e:
223224
case <-worker.Closing(): // client told us to close early
224225
return
225226
}
226227
}
227228
}
228229
})
230+
}
231+
232+
// ResultsWithProcess returns a Results object with the results generated by the
233+
// passed subprocess.
234+
func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results {
235+
b := NewResultBuilder(q)
236+
237+
// go consume all the entries and add them to the results.
238+
b.Process.Go(func(worker goprocess.Process) {
239+
proc(worker, b.Output)
240+
})
229241

230242
go b.Process.CloseAfterChildren()
231243
return b.Results()

query/query_impl.go

+81-72
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,79 @@
11
package query
22

3-
import "sort"
3+
import (
4+
"sort"
45

5-
func DerivedResults(qr Results, ch <-chan Result) Results {
6-
return &results{
7-
query: qr.Query(),
8-
proc: qr.Process(),
9-
res: ch,
10-
}
11-
}
6+
goprocess "github.com/jbenet/goprocess"
7+
)
128

139
// NaiveFilter applies a filter to the results.
1410
func NaiveFilter(qr Results, filter Filter) Results {
15-
ch := make(chan Result)
16-
go func() {
17-
defer close(ch)
18-
defer qr.Close()
19-
20-
for e := range qr.Next() {
21-
if e.Error != nil || filter.Filter(e.Entry) {
22-
ch <- e
11+
return ResultsFromIterator(qr.Query(), Iterator{
12+
Next: func() (Result, bool) {
13+
for {
14+
e, ok := qr.NextSync()
15+
if !ok {
16+
return Result{}, false
17+
}
18+
if e.Error != nil || filter.Filter(e.Entry) {
19+
return e, true
20+
}
2321
}
24-
}
25-
}()
26-
27-
return ResultsWithChan(qr.Query(), ch)
22+
},
23+
Close: func() error {
24+
return qr.Close()
25+
},
26+
})
2827
}
2928

3029
// NaiveLimit truncates the results to a given int limit
3130
func NaiveLimit(qr Results, limit int) Results {
32-
ch := make(chan Result)
33-
go func() {
34-
defer close(ch)
35-
defer qr.Close()
36-
37-
l := 0
38-
for e := range qr.Next() {
39-
if e.Error != nil {
40-
ch <- e
41-
continue
31+
if limit == 0 {
32+
// 0 means no limit
33+
return qr
34+
}
35+
closed := false
36+
return ResultsFromIterator(qr.Query(), Iterator{
37+
Next: func() (Result, bool) {
38+
if limit == 0 {
39+
if !closed {
40+
closed = true
41+
err := qr.Close()
42+
if err != nil {
43+
return Result{Error: err}, true
44+
}
45+
}
46+
return Result{}, false
4247
}
43-
ch <- e
44-
l++
45-
if limit > 0 && l >= limit {
46-
break
48+
limit--
49+
return qr.NextSync()
50+
},
51+
Close: func() error {
52+
if closed {
53+
return nil
4754
}
48-
}
49-
}()
50-
51-
return ResultsWithChan(qr.Query(), ch)
55+
closed = true
56+
return qr.Close()
57+
},
58+
})
5259
}
5360

5461
// NaiveOffset skips a given number of results
5562
func NaiveOffset(qr Results, offset int) Results {
56-
ch := make(chan Result)
57-
go func() {
58-
defer close(ch)
59-
defer qr.Close()
60-
61-
sent := 0
62-
for e := range qr.Next() {
63-
if e.Error != nil {
64-
ch <- e
65-
}
66-
67-
if sent < offset {
68-
sent++
69-
continue
63+
return ResultsFromIterator(qr.Query(), Iterator{
64+
Next: func() (Result, bool) {
65+
for ; offset > 0; offset-- {
66+
res, ok := qr.NextSync()
67+
if !ok || res.Error != nil {
68+
return res, ok
69+
}
7070
}
71-
ch <- e
72-
}
73-
}()
74-
75-
return ResultsWithChan(qr.Query(), ch)
71+
return qr.NextSync()
72+
},
73+
Close: func() error {
74+
return qr.Close()
75+
},
76+
})
7677
}
7778

7879
// NaiveOrder reorders results according to given orders.
@@ -83,29 +84,37 @@ func NaiveOrder(qr Results, orders ...Order) Results {
8384
return qr
8485
}
8586

86-
ch := make(chan Result)
87-
var entries []Entry
88-
go func() {
89-
defer close(ch)
87+
return ResultsWithProcess(qr.Query(), func(worker goprocess.Process, out chan<- Result) {
9088
defer qr.Close()
91-
92-
for e := range qr.Next() {
93-
if e.Error != nil {
94-
ch <- e
89+
var entries []Entry
90+
collect:
91+
for {
92+
select {
93+
case <-worker.Closing():
94+
return
95+
case e, ok := <-qr.Next():
96+
if !ok {
97+
break collect
98+
}
99+
if e.Error != nil {
100+
out <- e
101+
continue
102+
}
103+
entries = append(entries, e.Entry)
95104
}
96-
97-
entries = append(entries, e.Entry)
98105
}
106+
99107
sort.Slice(entries, func(i int, j int) bool {
100108
return Less(orders, entries[i], entries[j])
101109
})
102-
103110
for _, e := range entries {
104-
ch <- Result{Entry: e}
111+
select {
112+
case <-worker.Closing():
113+
return
114+
case out <- Result{Entry: e}:
115+
}
105116
}
106-
}()
107-
108-
return DerivedResults(qr, ch)
117+
})
109118
}
110119

111120
func NaiveQueryApply(q Query, qr Results) Results {

0 commit comments

Comments
 (0)