Skip to content

Commit d0582fa

Browse files
committed
unawareness prefetch implementation on snapshotter side
1. send post request to optimizer 2. store prefetchlist 3. add prefetchlist in nydusd xxx
1 parent 2331e91 commit d0582fa

File tree

8 files changed

+133
-61
lines changed

8 files changed

+133
-61
lines changed

config/global.go

+6
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type GlobalConfig struct {
3838
DaemonThreadsNum int
3939
CacheGCPeriod time.Duration
4040
MirrorsConfig MirrorsConfig
41+
PrefetchRoot string
4142
}
4243

4344
func IsFusedevSharedModeEnabled() bool {
@@ -64,6 +65,10 @@ func GetConfigRoot() string {
6465
return globalConfig.ConfigRoot
6566
}
6667

68+
func GetPrefetchRoot() string {
69+
return globalConfig.PrefetchRoot
70+
}
71+
6772
func GetMirrorsConfigDir() string {
6873
return globalConfig.MirrorsConfig.Dir
6974
}
@@ -181,6 +186,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error {
181186
globalConfig.ConfigRoot = filepath.Join(c.Root, "config")
182187
globalConfig.SocketRoot = filepath.Join(c.Root, "socket")
183188
globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt")
189+
globalConfig.PrefetchRoot = filepath.Join(c.Root, "prefetch")
184190

185191
globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig
186192

pkg/daemon/config.go

+15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/containerd/nydus-snapshotter/config"
1818
"github.com/containerd/nydus-snapshotter/internal/constant"
19+
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
1920
)
2021

2122
// Build runtime nydusd daemon object, which might be persisted later
@@ -31,6 +32,20 @@ func WithSocketDir(dir string) NewDaemonOpt {
3132
}
3233
}
3334

35+
func WithPrefetchDir(dir, imageID string) NewDaemonOpt {
36+
return func(d *Daemon) error {
37+
s := filepath.Join(dir, d.ID())
38+
prefetchDir, err := prefetch.GetPrefetchList(s, imageID)
39+
if err != nil && !errors.Is(err, prefetch.UdsError) {
40+
return errors.Wrapf(err, "failed to get prefetchList for image %s in path %s", imageID, s)
41+
}
42+
if prefetchDir != "" {
43+
d.States.PrefetchDir = prefetchDir
44+
}
45+
return nil
46+
}
47+
}
48+
3449
func WithRef(ref int32) NewDaemonOpt {
3550
return func(d *Daemon) error {
3651
d.ref = ref

pkg/daemon/daemon.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ type ConfigState struct {
5454
SupervisorPath string
5555
ThreadNum int
5656
// Where the configuration file resides, all rafs instances share the same configuration template
57-
ConfigDir string
57+
ConfigDir string
58+
PrefetchDir string
5859
}
5960

6061
// TODO: Record queried nydusd state

pkg/filesystem/fs.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
285285
if err != nil {
286286
return err
287287
}
288-
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0)
288+
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0, imageID)
289289
// if daemon already exists for snapshotID, just return
290290
if err != nil && !errdefs.IsAlreadyExists(err) {
291291
return err
@@ -578,7 +578,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
578578
return errors.Errorf("got null mountpoint for fsDriver %s", fsManager.FsDriver)
579579
}
580580

581-
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0)
581+
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0, "")
582582
if err != nil {
583583
return errors.Wrap(err, "initialize shared daemon")
584584
}
@@ -612,7 +612,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
612612

613613
// createDaemon create new nydus daemon by snapshotID and imageID
614614
func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config.DaemonMode,
615-
mountpoint string, ref int32) (d *daemon.Daemon, err error) {
615+
mountpoint string, ref int32, imageID string) (d *daemon.Daemon, err error) {
616616
opts := []daemon.NewDaemonOpt{
617617
daemon.WithRef(ref),
618618
daemon.WithSocketDir(config.GetSocketRoot()),
@@ -631,6 +631,10 @@ func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config
631631
opts = append(opts, daemon.WithMountpoint(mountpoint))
632632
}
633633

634+
if imageID != "" {
635+
opts = append(opts, daemon.WithPrefetchDir(config.GetPrefetchRoot(), imageID))
636+
}
637+
634638
d, err = daemon.NewDaemon(opts...)
635639
if err != nil {
636640
return nil, errors.Wrapf(err, "new daemon")

pkg/manager/daemon_adaptor.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
2424
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
2525
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
26-
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
2726
)
2827

2928
const endpointGetBackend string = "/api/v1/daemons/%s/backend"
@@ -122,7 +121,7 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
122121
// Build commandline according to nydusd daemon configuration.
123122
func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) (*exec.Cmd, error) {
124123
var cmdOpts []command.Opt
125-
var imageReference string
124+
// var imageReference string
126125

127126
nydusdThreadNum := d.NydusdThreadNum()
128127

@@ -148,7 +147,7 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
148147
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
149148
}
150149

151-
imageReference = rafs.ImageID
150+
// imageReference = rafs.ImageID
152151

153152
bootstrap, err := rafs.BootstrapFile()
154153
if err != nil {
@@ -176,12 +175,8 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
176175
command.WithID(d.ID()))
177176
}
178177

179-
if imageReference != "" {
180-
prefetchfiles := prefetch.Pm.GetPrefetchInfo(imageReference)
181-
if prefetchfiles != "" {
182-
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(prefetchfiles))
183-
prefetch.Pm.DeleteFromPrefetchMap(imageReference)
184-
}
178+
if d.States.PrefetchDir != "" {
179+
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchDir))
185180
}
186181

187182
cmdOpts = append(cmdOpts,

pkg/manager/manager.go

+4
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,10 @@ func (m *Manager) cleanUpDaemonResources(d *daemon.Daemon) {
287287
resource := []string{d.States.ConfigDir, d.States.LogDir}
288288
if !d.IsSharedDaemon() {
289289
socketDir := path.Dir(d.GetAPISock())
290+
if d.States.PrefetchDir != "" {
291+
prefetchDir := path.Dir(d.States.PrefetchDir)
292+
resource = append(resource, prefetchDir)
293+
}
290294
resource = append(resource, socketDir)
291295
}
292296

pkg/prefetch/prefetch.go

+95-30
Original file line numberDiff line numberDiff line change
@@ -7,54 +7,119 @@
77
package prefetch
88

99
import (
10+
"context"
1011
"encoding/json"
11-
"sync"
12+
"fmt"
13+
"io"
14+
"net"
15+
"net/http"
16+
"os"
17+
"path/filepath"
18+
"strings"
1219

1320
"github.com/containerd/containerd/log"
21+
"github.com/pkg/errors"
1422
)
1523

16-
type prefetchInfo struct {
17-
prefetchMap map[string]string
18-
prefetchMutex sync.Mutex
24+
type prefetchlist struct {
25+
FilePaths []string `json:"files"`
1926
}
2027

21-
var Pm prefetchInfo
28+
const (
29+
endpointPrefetch = "/api/v1/imagename"
30+
udsSocket = "/tmp/prefetch.sock"
31+
)
32+
33+
var UdsError = errors.New("failed to connect unix domain socket")
2234

23-
func (p *prefetchInfo) SetPrefetchFiles(body []byte) error {
24-
p.prefetchMutex.Lock()
25-
defer p.prefetchMutex.Unlock()
35+
func GetPrefetchList(prefetchDir, imageRepo string) (string, error) {
36+
url := fmt.Sprintf("http://unix%s", endpointPrefetch)
2637

27-
var prefetchMsg []map[string]string
28-
if err := json.Unmarshal(body, &prefetchMsg); err != nil {
29-
return err
38+
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(imageRepo))
39+
if err != nil {
40+
return "", err
3041
}
3142

32-
if p.prefetchMap == nil {
33-
p.prefetchMap = make(map[string]string)
43+
client := &http.Client{
44+
Transport: &http.Transport{
45+
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
46+
return net.Dial("unix", udsSocket)
47+
},
48+
},
3449
}
35-
for _, item := range prefetchMsg {
36-
image := item["image"]
37-
prefetchfiles := item["prefetch"]
38-
p.prefetchMap[image] = prefetchfiles
50+
resp, err := client.Do(req)
51+
if err != nil {
52+
log.L.Infof("failed to connect unix domain socket. Skipping prefetch for image: %s\n", imageRepo)
53+
return "", UdsError
3954
}
55+
defer resp.Body.Close()
4056

41-
log.L.Infof("received prefetch list from nri plugin: %v ", p.prefetchMap)
42-
return nil
43-
}
57+
if resp.StatusCode != http.StatusOK {
58+
return "", fmt.Errorf("failed to send data, status code: %v", resp.StatusCode)
59+
}
4460

45-
func (p *prefetchInfo) GetPrefetchInfo(image string) string {
46-
p.prefetchMutex.Lock()
47-
defer p.prefetchMutex.Unlock()
61+
body, err := io.ReadAll(resp.Body)
62+
if err != nil {
63+
return "", err
64+
}
4865

49-
if prefetchfiles, ok := p.prefetchMap[image]; ok {
50-
return prefetchfiles
66+
if strings.Contains(string(body), "CacheItem not found") {
67+
log.L.Infof("Cache item not found for image: %s\n", imageRepo)
68+
return "", nil
5169
}
52-
return ""
70+
71+
prefetchfilePath, err := storePrefetchList(prefetchDir, body)
72+
if err != nil {
73+
return "", err
74+
}
75+
return prefetchfilePath, nil
5376
}
5477

55-
func (p *prefetchInfo) DeleteFromPrefetchMap(image string) {
56-
p.prefetchMutex.Lock()
57-
defer p.prefetchMutex.Unlock()
78+
func storePrefetchList(prefetchDir string, list []byte) (string, error) {
79+
if err := os.MkdirAll(prefetchDir, 0755); err != nil {
80+
return "", errors.Wrapf(err, "create prefetch dir %s", prefetchDir)
81+
}
82+
83+
filePath := filepath.Join(prefetchDir, "prefetchList")
84+
jsonfilePath := filepath.Join(prefetchDir, "prefetchList.json")
85+
86+
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
87+
if err != nil {
88+
fmt.Println("Error opening file:", err)
89+
return "", errors.Wrap(err, "error opening prefetch file")
90+
}
91+
defer file.Close()
92+
93+
var prefetchSlice []string
94+
err = json.Unmarshal(list, &prefetchSlice)
95+
if err != nil {
96+
return "", errors.Wrap(err, "failed to parse prefetch list")
97+
}
98+
99+
for _, path := range prefetchSlice {
100+
content := path + "\n"
101+
_, err := file.WriteString(content)
102+
if err != nil {
103+
return "", errors.Wrap(err, "error writing to prefetch file")
104+
}
105+
}
106+
107+
prefetchStruct := prefetchlist{FilePaths: prefetchSlice}
108+
jsonByte, err := json.Marshal(prefetchStruct)
109+
if err != nil {
110+
return "", errors.Wrap(err, "failed to marshal to JSON")
111+
}
112+
113+
jsonfile, err := os.Create(jsonfilePath)
114+
if err != nil {
115+
return "", errors.Wrapf(err, "failed to create file %s", jsonfilePath)
116+
}
117+
defer jsonfile.Close()
118+
119+
_, err = jsonfile.Write(jsonByte)
120+
if err != nil {
121+
return "", errors.Wrap(err, "error writing JSON to file")
122+
}
58123

59-
delete(p.prefetchMap, image)
124+
return filePath, nil
60125
}

pkg/system/system.go

-18
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package system
99
import (
1010
"encoding/json"
1111
"fmt"
12-
"io"
1312
"net"
1413
"net/http"
1514
"os"
@@ -30,7 +29,6 @@ import (
3029
"github.com/containerd/nydus-snapshotter/pkg/filesystem"
3130
"github.com/containerd/nydus-snapshotter/pkg/manager"
3231
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
33-
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
3432
)
3533

3634
const (
@@ -41,7 +39,6 @@ const (
4139
// it's very helpful to check daemon's record in database.
4240
endpointDaemonRecords string = "/api/v1/daemons/records"
4341
endpointDaemonsUpgrade string = "/api/v1/daemons/upgrade"
44-
endpointPrefetch string = "/api/v1/prefetch"
4542
// Provide backend information
4643
endpointGetBackend string = "/api/v1/daemons/{id}/backend"
4744
)
@@ -172,7 +169,6 @@ func (sc *Controller) registerRouter() {
172169
sc.router.HandleFunc(endpointDaemons, sc.describeDaemons()).Methods(http.MethodGet)
173170
sc.router.HandleFunc(endpointDaemonsUpgrade, sc.upgradeDaemons()).Methods(http.MethodPut)
174171
sc.router.HandleFunc(endpointDaemonRecords, sc.getDaemonRecords()).Methods(http.MethodGet)
175-
sc.router.HandleFunc(endpointPrefetch, sc.setPrefetchConfiguration()).Methods(http.MethodPut)
176172
sc.router.HandleFunc(endpointGetBackend, sc.getBackend()).Methods(http.MethodGet)
177173
}
178174

@@ -216,20 +212,6 @@ func (sc *Controller) getBackend() func(w http.ResponseWriter, r *http.Request)
216212
}
217213
}
218214

219-
func (sc *Controller) setPrefetchConfiguration() func(w http.ResponseWriter, r *http.Request) {
220-
return func(w http.ResponseWriter, r *http.Request) {
221-
body, err := io.ReadAll(r.Body)
222-
if err != nil {
223-
log.L.Errorf("Failed to read prefetch list: %v", err)
224-
return
225-
}
226-
if err = prefetch.Pm.SetPrefetchFiles(body); err != nil {
227-
log.L.Errorf("Failed to parse request body: %v", err)
228-
return
229-
}
230-
}
231-
}
232-
233215
func (sc *Controller) describeDaemons() func(w http.ResponseWriter, r *http.Request) {
234216
return func(w http.ResponseWriter, r *http.Request) {
235217
info := make([]daemonInfo, 0, 10)

0 commit comments

Comments
 (0)