7
7
package main
8
8
9
9
import (
10
+ "bytes"
10
11
"context"
12
+ "encoding/json"
11
13
"fmt"
12
14
"io"
13
15
"log/syslog"
16
+ "net/http"
14
17
"os"
15
18
"path/filepath"
16
19
"strings"
@@ -38,11 +41,12 @@ const (
38
41
type PluginConfig struct {
39
42
Events []string `toml:"events"`
40
43
41
- ServerPath string `toml:"server_path"`
42
- PersistDir string `toml:"persist_dir"`
43
- Readable bool `toml:"readable"`
44
- Timeout int `toml:"timeout"`
45
- Overwrite bool `toml:"overwrite"`
44
+ ServerPath string `toml:"server_path"`
45
+ PersistDir string `toml:"persist_dir"`
46
+ Readable bool `toml:"readable"`
47
+ Timeout int `toml:"timeout"`
48
+ Overwrite bool `toml:"overwrite"`
49
+ PrefetchDistributionEndpoint string `toml:"prefetch_distribution_endpoint"`
46
50
}
47
51
48
52
type PluginArgs struct {
@@ -104,6 +108,11 @@ func buildFlags(args *PluginArgs) []cli.Flag {
104
108
Usage : "whether to overwrite the existed persistent files" ,
105
109
Destination : & args .Config .Overwrite ,
106
110
},
111
+ & cli.StringFlag {
112
+ Name : "prefetch-distribution-endpoint" ,
113
+ Usage : "The service endpoint of prefetch distribution, for example: http://localhost:1323/api/v1/prefetch/upload" ,
114
+ Destination : & args .Config .PrefetchDistributionEndpoint ,
115
+ },
107
116
}
108
117
}
109
118
@@ -129,7 +138,8 @@ var (
129
138
)
130
139
131
140
const (
132
- imageNameLabel = "io.kubernetes.cri.image-name"
141
+ imageNameLabel = "io.kubernetes.cri.image-name"
142
+ containerNameLabel = "io.kubernetes.cri.container-name"
133
143
)
134
144
135
145
func (p * plugin ) Configure (config , runtime , version string ) (stub.EventMask , error ) {
@@ -156,11 +166,26 @@ func (p *plugin) Configure(config, runtime, version string) (stub.EventMask, err
156
166
return p .mask , nil
157
167
}
158
168
169
+ type PrefetchFile struct {
170
+ Path string
171
+ }
172
+
173
+ type CacheItem struct {
174
+ ImageName string
175
+ ContainerName string
176
+ PrefetchFiles []PrefetchFile
177
+ }
178
+
179
+ type Cache struct {
180
+ Items map [string ]* CacheItem
181
+ }
182
+
159
183
func (p * plugin ) StartContainer (_ * api.PodSandbox , container * api.Container ) error {
160
- dir , imageName , err := GetImageName (container .Annotations )
184
+ dir , imageName , imageRepo , err := GetImageName (container .Annotations )
161
185
if err != nil {
162
186
return err
163
187
}
188
+ containerName := container .Annotations [containerNameLabel ]
164
189
165
190
persistDir := filepath .Join (cfg .PersistDir , dir )
166
191
if err := os .MkdirAll (persistDir , os .ModePerm ); err != nil {
@@ -172,37 +197,127 @@ func (p *plugin) StartContainer(_ *api.PodSandbox, container *api.Container) err
172
197
persistFile = fmt .Sprintf ("%s.timeout%ds" , persistFile , cfg .Timeout )
173
198
}
174
199
175
- fanotifyServer := fanotify .NewServer (cfg .ServerPath , container .Pid , imageName , persistFile , cfg .Readable , cfg .Overwrite , time .Duration (cfg .Timeout )* time .Second , logWriter )
200
+ var hasSentPrefetchList = false
201
+
202
+ fanotifyServer := fanotify .NewServer (cfg .ServerPath , container .Pid , imageName , persistFile , cfg .Readable , cfg .Overwrite , time .Duration (cfg .Timeout )* time .Second , logWriter , containerName , hasSentPrefetchList )
176
203
177
204
if err := fanotifyServer .RunServer (); err != nil {
178
205
return err
179
206
}
180
207
208
+ go func () {
209
+ time .Sleep (10 * time .Minute )
210
+ fanotifyServer .Mu .Lock ()
211
+ if ! fanotifyServer .IsSent {
212
+ data , err := getPrefetchList (persistFile )
213
+ if err != nil {
214
+ log .WithError (err ).Error ("error reading file" )
215
+ }
216
+ if err = sendToServer (imageRepo , containerName , cfg .PrefetchDistributionEndpoint , data ); err != nil {
217
+ log .WithError (err ).Error ("failed to send prefetch to http server" )
218
+ }
219
+ fanotifyServer .IsSent = true
220
+ }
221
+ fanotifyServer .Mu .Unlock ()
222
+ }()
223
+
181
224
globalFanotifyServer [imageName ] = fanotifyServer
182
225
183
226
return nil
184
227
}
185
228
229
+ func sendToServer (imageName , containerName , serverURL string , data []byte ) error {
230
+ filePaths := strings .Split (string (data ), "\n " )
231
+
232
+ var prefetchFiles []PrefetchFile
233
+ for _ , path := range filePaths {
234
+ if path != "" {
235
+ prefetchFiles = append (prefetchFiles , PrefetchFile {Path : path })
236
+ }
237
+ }
238
+
239
+ item := CacheItem {
240
+ ImageName : imageName ,
241
+ ContainerName : containerName ,
242
+ PrefetchFiles : prefetchFiles ,
243
+ }
244
+
245
+ err := postRequest (item , serverURL )
246
+ if err != nil {
247
+ return errors .Wrap (err , "error uploading to server" )
248
+ }
249
+
250
+ return nil
251
+ }
252
+
253
+ func postRequest (item CacheItem , endpoint string ) error {
254
+ data , err := json .Marshal (item )
255
+ if err != nil {
256
+ return err
257
+ }
258
+
259
+ resp , err := http .Post (endpoint , "application/json" , bytes .NewBuffer (data ))
260
+ if err != nil {
261
+ return err
262
+ }
263
+ defer resp .Body .Close ()
264
+
265
+ if resp .StatusCode != http .StatusOK {
266
+ return errors .Wrap (fmt .Errorf ("server returned a non-OK status code: %d" , resp .StatusCode ), "HTTP Status Error" )
267
+ }
268
+
269
+ body , err := io .ReadAll (resp .Body )
270
+ if err != nil {
271
+ return errors .Wrap (err , "failed to read response body" )
272
+ }
273
+
274
+ log .Info ("Server Response:" , string (body ))
275
+
276
+ return nil
277
+ }
278
+
279
+ func getPrefetchList (prefetchListPath string ) ([]byte , error ) {
280
+ data , err := os .ReadFile (prefetchListPath )
281
+ if err != nil {
282
+ return nil , err
283
+ }
284
+ return data , nil
285
+ }
286
+
186
287
func (p * plugin ) StopContainer (_ * api.PodSandbox , container * api.Container ) ([]* api.ContainerUpdate , error ) {
187
288
var update = []* api.ContainerUpdate {}
188
- _ , imageName , err := GetImageName (container .Annotations )
289
+ _ , imageName , imageRepo , err := GetImageName (container .Annotations )
189
290
if err != nil {
190
291
return update , err
191
292
}
293
+
192
294
if fanotifyServer , ok := globalFanotifyServer [imageName ]; ok {
193
- fanotifyServer .StopServer ()
295
+ fanotifyServer .Mu .Lock ()
296
+ if ! fanotifyServer .IsSent {
297
+ data , err := getPrefetchList (fanotifyServer .PersistFile )
298
+ if err != nil {
299
+ return update , err
300
+ }
301
+ if err = sendToServer (imageRepo , fanotifyServer .ContainerName , cfg .PrefetchDistributionEndpoint , data ); err != nil {
302
+ log .WithError (err ).Error ("failed to send prefetch to http server" )
303
+ }
304
+ fanotifyServer .IsSent = true
305
+
306
+ fanotifyServer .StopServer ()
307
+ }
308
+ fanotifyServer .Mu .Unlock ()
194
309
} else {
195
310
return nil , errors .New ("can not find fanotify server for container image " + imageName )
196
311
}
197
-
198
312
return update , nil
199
313
}
200
314
201
- func GetImageName (annotations map [string ]string ) (string , string , error ) {
315
+ func GetImageName (annotations map [string ]string ) (string , string , string , error ) {
202
316
named , err := docker .ParseDockerRef (annotations [imageNameLabel ])
203
317
if err != nil {
204
- return "" , "" , err
318
+ return "" , "" , "" , err
205
319
}
320
+ imageRepo := docker .Named .String (named )
206
321
nameTagged := named .(docker.NamedTagged )
207
322
repo := docker .Path (nameTagged )
208
323
@@ -211,7 +326,7 @@ func GetImageName(annotations map[string]string) (string, string, error) {
211
326
212
327
imageName := image + ":" + nameTagged .Tag ()
213
328
214
- return dir , imageName , nil
329
+ return dir , imageName , imageRepo , nil
215
330
}
216
331
217
332
func (p * plugin ) onClose () {
0 commit comments