Skip to content

Commit

Permalink
distsql: control the sending rate of copIteratorTaskSender if… (#11679)
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway authored and zz-jason committed Aug 9, 2019
1 parent eb6f46a commit b9a894e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 4 deletions.
55 changes: 51 additions & 4 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
// Make sure that there is at least one worker.
it.concurrency = 1
}
if !it.req.KeepOrder {
if it.req.KeepOrder {
it.sendRate = newRateLimit(2 * it.concurrency)
} else {
it.respChan = make(chan *copResponse, it.concurrency)
}
it.open(ctx)
Expand Down Expand Up @@ -259,9 +261,11 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
tasks = append(tasks, &copTask{
region: region,
ranges: ranges.slice(i, nextI),
respChan: make(chan *copResponse, 1),
region: region,
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
})
i = nextI
Expand Down Expand Up @@ -371,6 +375,9 @@ type copIterator struct {
// If keepOrder, results are stored in copTask.respChan, read them out one by one.
tasks []*copTask
curr int
// sendRate controls the sending rate of copIteratorTaskSender, if keepOrder,
// to prevent all tasks being done (aka. all of the responses are buffered)
sendRate *rateLimit

// Otherwise, results are stored in respChan.
respChan chan *copResponse
Expand Down Expand Up @@ -401,6 +408,7 @@ type copIteratorTaskSender struct {
tasks []*copTask
finishCh <-chan struct{}
respChan chan<- *copResponse
sendRate *rateLimit
}

type copResponse struct {
Expand Down Expand Up @@ -501,6 +509,7 @@ func (it *copIterator) open(ctx context.Context) {
wg: &it.wg,
tasks: it.tasks,
finishCh: it.finishCh,
sendRate: it.sendRate,
}
taskSender.respChan = it.respChan
go taskSender.run()
Expand All @@ -509,6 +518,16 @@ func (it *copIterator) open(ctx context.Context) {
func (sender *copIteratorTaskSender) run() {
// Send tasks to feed the worker goroutines.
for _, t := range sender.tasks {
// If keepOrder, we must control the sending rate to prevent all tasks
// being done (aka. all of the responses are buffered) by copIteratorWorker.
// We keep the number of inflight tasks within the number of concurrency * 2.
// It sends one more task if a task has been finished in copIterator.Next.
if sender.sendRate != nil {
exit := sender.sendRate.getToken(sender.finishCh)
if exit {
break
}
}
exit := sender.sendToTaskCh(t)
if exit {
break
Expand Down Expand Up @@ -596,6 +615,7 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
// Switch to next task.
it.tasks[it.curr] = nil
it.curr++
it.sendRate.putToken()
}
}

Expand Down Expand Up @@ -862,6 +882,33 @@ func (it *copIterator) Close() error {
return nil
}

type rateLimit struct {
token chan struct{}
}

func newRateLimit(n int) *rateLimit {
return &rateLimit{
token: make(chan struct{}, n),
}
}

func (r *rateLimit) getToken(done <-chan struct{}) (exit bool) {
select {
case <-done:
return true
case r.token <- struct{}{}:
return false
}
}

func (r *rateLimit) putToken() {
select {
case <-r.token:
default:
panic("put a redundant token")
}
}

// copErrorResponse returns error when calling Next()
type copErrorResponse struct{ error }

Expand Down
27 changes: 27 additions & 0 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tikv

import (
"context"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -294,6 +295,32 @@ func (s *testCoprocessorSuite) TestCopRangeSplit(c *C) {
)
}

func (s *testCoprocessorSuite) TestRateLimit(c *C) {
done := make(chan struct{}, 1)
rl := newRateLimit(1)
c.Assert(rl.putToken, PanicMatches, "put a redundant token")
exit := rl.getToken(done)
c.Assert(exit, Equals, false)
rl.putToken()
c.Assert(rl.putToken, PanicMatches, "put a redundant token")

exit = rl.getToken(done)
c.Assert(exit, Equals, false)
done <- struct{}{}
exit = rl.getToken(done) // blocked but exit
c.Assert(exit, Equals, true)

sig := make(chan int, 1)
go func() {
exit = rl.getToken(done) // blocked
c.Assert(exit, Equals, false)
close(sig)
}()
time.Sleep(200 * time.Millisecond)
rl.putToken()
<-sig
}

type splitCase struct {
key string
*copRanges
Expand Down

0 comments on commit b9a894e

Please sign in to comment.