-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathtimeout_handler.go
197 lines (174 loc) · 6.26 KB
/
timeout_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package reconciler
import (
"sync"
"time"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
var (
defaultFunc = func(i interface{}) {}
)
const (
defaultTimeout = 10 * time.Minute
)
// StatusKey interface to be implemented by Taskrun Pipelinerun types
type StatusKey interface {
GetRunKey() string
}
// TimeoutSet contains required k8s interfaces to handle build timeouts
type TimeoutSet struct {
logger *zap.SugaredLogger
kubeclientset kubernetes.Interface
pipelineclientset clientset.Interface
taskRunCallbackFunc func(interface{})
pipelineRunCallbackFunc func(interface{})
stopCh <-chan struct{}
done map[string]chan bool
doneMut sync.Mutex
}
// NewTimeoutHandler returns TimeoutSet filled structure
func NewTimeoutHandler(
kubeclientset kubernetes.Interface,
pipelineclientset clientset.Interface,
stopCh <-chan struct{},
logger *zap.SugaredLogger,
) *TimeoutSet {
return &TimeoutSet{
kubeclientset: kubeclientset,
pipelineclientset: pipelineclientset,
stopCh: stopCh,
done: make(map[string]chan bool),
doneMut: sync.Mutex{},
logger: logger,
}
}
// SetTaskRunCallbackFunc sets the callback function when timeout occurs for taskrun objects
func (t *TimeoutSet) SetTaskRunCallbackFunc(f func(interface{})) {
t.taskRunCallbackFunc = f
}
// SetPipelineRunCallbackFunc sets the callback function when timeout occurs for pipelinerun objects
func (t *TimeoutSet) SetPipelineRunCallbackFunc(f func(interface{})) {
t.pipelineRunCallbackFunc = f
}
// Release function deletes key from timeout map
func (t *TimeoutSet) Release(runObj StatusKey) {
key := runObj.GetRunKey()
t.doneMut.Lock()
defer t.doneMut.Unlock()
if finished, ok := t.done[key]; ok {
delete(t.done, key)
close(finished)
}
}
func (t *TimeoutSet) getOrCreateFinishedChan(runObj StatusKey) chan bool {
var finished chan bool
key := runObj.GetRunKey()
t.doneMut.Lock()
defer t.doneMut.Unlock()
if existingfinishedChan, ok := t.done[key]; ok {
finished = existingfinishedChan
} else {
finished = make(chan bool)
}
t.done[key] = finished
return finished
}
// GetTimeout takes a kubernetes Duration representing the timeout period for a
// resource and returns it as a time.Duration. If the provided duration is nil
// then fallback behaviour is to return a default timeout period.
func GetTimeout(d *metav1.Duration) time.Duration {
timeout := defaultTimeout
if d != nil {
timeout = d.Duration
}
return timeout
}
// checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to
// finish/timeout in a given namespace
func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) {
pipelineRuns, err := t.pipelineclientset.TektonV1alpha1().PipelineRuns(namespace).List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get pipelinerun list in namespace %s: %s", namespace, err)
return
}
for _, pipelineRun := range pipelineRuns.Items {
pipelineRun := pipelineRun
if pipelineRun.IsDone() || pipelineRun.IsCancelled() {
continue
}
if pipelineRun.HasStarted() {
go t.WaitPipelineRun(&pipelineRun, pipelineRun.Status.StartTime)
}
}
}
// CheckTimeouts function iterates through all namespaces and calls corresponding
// taskrun/pipelinerun timeout functions
func (t *TimeoutSet) CheckTimeouts() {
namespaces, err := t.kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get namespaces list: %s", err)
return
}
for _, namespace := range namespaces.Items {
t.checkTaskRunTimeouts(namespace.GetName())
t.checkPipelineRunTimeouts(namespace.GetName())
}
}
// checkTaskRunTimeouts function creates goroutines to wait for pipelinerun to
// finish/timeout in a given namespace
func (t *TimeoutSet) checkTaskRunTimeouts(namespace string) {
taskruns, err := t.pipelineclientset.TektonV1alpha1().TaskRuns(namespace).List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get taskrun list in namespace %s: %s", namespace, err)
return
}
for _, taskrun := range taskruns.Items {
taskrun := taskrun
if taskrun.IsDone() || taskrun.IsCancelled() {
continue
}
if taskrun.HasStarted() {
go t.WaitTaskRun(&taskrun, taskrun.Status.StartTime)
}
}
}
// WaitTaskRun function creates a blocking function for taskrun to wait for
// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out, which is
// determined by checking if the tr's timeout has occurred since the startTime
func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun, startTime *metav1.Time) {
t.waitRun(tr, GetTimeout(tr.Spec.Timeout), startTime, t.taskRunCallbackFunc)
}
// WaitPipelineRun function creates a blocking function for pipelinerun to wait for
// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out which is
// determined by checking if the tr's timeout has occurred since the startTime
func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun, startTime *metav1.Time) {
t.waitRun(pr, GetTimeout(pr.Spec.Timeout), startTime, t.pipelineRunCallbackFunc)
}
func (t *TimeoutSet) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) {
if startTime == nil {
t.logger.Errorf("startTime must be specified in order for a timeout to be calculated accurately for %s", runObj.GetRunKey())
return
}
runtime := time.Since(startTime.Time)
finished := t.getOrCreateFinishedChan(runObj)
defer t.Release(runObj)
t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime)
select {
case <-t.stopCh:
t.logger.Infof("Stopping timeout timer for %s", runObj.GetRunKey())
return
case <-finished:
t.logger.Infof("%s finished, stopping the timeout timer", runObj.GetRunKey())
return
case <-time.After(timeout - runtime):
t.logger.Infof("Timeout timer for %s has timed out (started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime, timeout, time.Since(startTime.Time))
if callback != nil {
callback(runObj)
} else {
defaultFunc(runObj)
}
}
}