Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ai): add hardware info from Orchestrators and expand network information available. #3246

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
be613ee
feat(info): add hardware information to orchestrator info
ad-astra-video Oct 30, 2024
c9acc36
feat(info): add capabilities prices and summary OrchestratorInf
ad-astra-video Nov 6, 2024
4a2dab6
add struct to /getOrchestratorInfo handler
ad-astra-video Nov 12, 2024
bd4f485
fix tests
ad-astra-video Nov 12, 2024
127c1ba
add test for nil Hardware and CapabilitiesPrices with caps incl in Ge…
ad-astra-video Nov 12, 2024
9f42c49
add db test when updating orch
ad-astra-video Nov 12, 2024
9641099
fix line format (spaces > tab)
ad-astra-video Nov 12, 2024
37ecfba
update TestServeAIWorker to include HardwareInfo test
ad-astra-video Nov 12, 2024
089b8d2
add test for orch.GetCapabilities and refactor GetCapabilitiesPrices
ad-astra-video Nov 12, 2024
23c9d6f
fix test
ad-astra-video Nov 12, 2024
feaff86
refactor getNetworkCapabilitiesHandler to enable test
ad-astra-video Nov 13, 2024
679657a
add getNetworkCapabilitiesHandler test
ad-astra-video Nov 13, 2024
6ed5841
fix formatting
ad-astra-video Nov 13, 2024
d97e09d
add comment to WorkerHardware
ad-astra-video Nov 28, 2024
881222e
updates for rebase
ad-astra-video Mar 7, 2025
bf1e436
switch to store network capabilities in memory
ad-astra-video Mar 7, 2025
0b832e7
add grpc messages for hardware info
ad-astra-video Mar 7, 2025
da6d0bf
switch to using net.HardwareInformation
ad-astra-video Mar 7, 2025
1cf5d63
update to not save network capabilities in db
ad-astra-video Mar 7, 2025
ff063c2
update cli webserver endpoint to use new data location
ad-astra-video Mar 7, 2025
e316696
db version back to 1
ad-astra-video Mar 7, 2025
1dd50d5
add upgrade block to db.go back in
ad-astra-video Mar 9, 2025
48a793c
remove test not needed
ad-astra-video Mar 9, 2025
a6db6f5
refactor
ad-astra-video Mar 9, 2025
f82848f
get service uri and on chain address from dborch
ad-astra-video Mar 9, 2025
5489f41
fix spacing
ad-astra-video Mar 9, 2025
d88f09e
remove orch info handler
ad-astra-video Mar 9, 2025
920355e
cleanup
ad-astra-video Mar 9, 2025
839e3ec
move network capabilities to common
ad-astra-video Mar 10, 2025
3c6c9b6
move NetworkCapabilities add/get/modify to under lock, typo fix, log …
ad-astra-video Mar 10, 2025
c1a0061
add data to kafka events
ad-astra-video Mar 10, 2025
d3e4ea6
Merge branch 'master' into av-add-hardware-info
ad-astra-video Mar 11, 2025
e9efb85
add test to check worker.HardwareInformation to net.HardwareInformation
ad-astra-video Mar 11, 2025
561646c
remove test functions not used
ad-astra-video Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,16 @@ type OrchestratorStore interface {
type RoundsManager interface {
LastInitializedRound() *big.Int
}

type NetworkCapabilities struct {
Orchestrators []*OrchNetworkCapabilities `json:"orchestrators"`
}
type OrchNetworkCapabilities struct {
Address string `json:"address"`
LocalAddress string `json:"local_address"`
OrchURI string `json:"orch_uri"`
ServiceURI string `json:"service_uri"`
Capabilities *net.Capabilities `json:"capabilities"`
CapabilitiesPrices []*net.PriceInfo `json:"capabilities_prices"`
Hardware []*net.HardwareInformation `json:"hardware"`
}
1 change: 1 addition & 0 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type AI interface {
Stop(context.Context) error
HasCapacity(string, string) bool
EnsureImageAvailable(context.Context, string, string) error
HardwareInformation() []worker.HardwareInformation
}

// Custom type to parse a big.Rat from a JSON number.
Expand Down
47 changes: 40 additions & 7 deletions core/ai_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
manager *RemoteAIWorkerManager
stream net.AIWorker_RegisterAIWorkerServer
capabilities *Capabilities
hardware []worker.HardwareInformation
eof chan struct{}
addr string
}
Expand All @@ -64,13 +65,14 @@
requestSessions map[string]*RemoteAIWorker
}

func NewRemoteAIWorker(m *RemoteAIWorkerManager, stream net.AIWorker_RegisterAIWorkerServer, caps *Capabilities) *RemoteAIWorker {
func NewRemoteAIWorker(m *RemoteAIWorkerManager, stream net.AIWorker_RegisterAIWorkerServer, caps *Capabilities, hardware []worker.HardwareInformation) *RemoteAIWorker {
return &RemoteAIWorker{
manager: m,
stream: stream,
eof: make(chan struct{}, 1),
addr: common.GetConnectionAddr(stream.Context()),
capabilities: caps,
hardware: hardware,
}
}

Expand All @@ -87,13 +89,14 @@
}
}

func (orch *orchestrator) ServeAIWorker(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities) {
orch.node.serveAIWorker(stream, capabilities)
func (orch *orchestrator) ServeAIWorker(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities, hardware []*net.HardwareInformation) {
orch.node.serveAIWorker(stream, capabilities, hardware)

Check warning on line 93 in core/ai_orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/ai_orchestrator.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}

func (n *LivepeerNode) serveAIWorker(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities) {
func (n *LivepeerNode) serveAIWorker(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities, hardware []*net.HardwareInformation) {
from := common.GetConnectionAddr(stream.Context())
wkrCaps := CapabilitiesFromNetCapabilities(capabilities)
wkrHdw := hardwareInformationFromNetHardware(hardware)
if n.Capabilities.LivepeerVersionCompatibleWith(capabilities) {
glog.Infof("Worker compatible, connecting worker_version=%s orchestrator_version=%s worker_addr=%s", capabilities.Version, n.Capabilities.constraints.minVersion, from)
n.Capabilities.AddCapacity(wkrCaps)
Expand All @@ -102,17 +105,18 @@
defer n.RemoveAICapabilities(wkrCaps)

// Manage blocks while AI worker is connected
n.AIWorkerManager.Manage(stream, capabilities)
n.AIWorkerManager.Manage(stream, capabilities, wkrHdw)
glog.V(common.DEBUG).Infof("Closing aiworker=%s channel", from)
} else {
glog.Errorf("worker %s not connected, version not compatible", from)
}
}

// Manage adds aiworker to list of live aiworkers. Doesn't return until aiworker disconnects
func (rwm *RemoteAIWorkerManager) Manage(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities) {
func (rwm *RemoteAIWorkerManager) Manage(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities, hardware []worker.HardwareInformation) {
from := common.GetConnectionAddr(stream.Context())
aiworker := NewRemoteAIWorker(rwm, stream, CapabilitiesFromNetCapabilities(capabilities))

aiworker := NewRemoteAIWorker(rwm, stream, CapabilitiesFromNetCapabilities(capabilities), hardware)
go func() {
ctx := stream.Context()
<-ctx.Done()
Expand Down Expand Up @@ -407,6 +411,20 @@
}
}

func (orch *orchestrator) WorkerHardware() []worker.HardwareInformation {
if orch.node.AIWorker != nil {
return orch.node.AIWorker.HardwareInformation()
} else {
// return combined hardware information from all live remote workers from information provided by workers
// when connecting to orchestrator. Does not reach out for real-time information.
var wkrHdw []worker.HardwareInformation
for _, worker := range orch.node.AIWorkerManager.liveAIWorkers {
wkrHdw = append(wkrHdw, worker.hardware...)
}
return wkrHdw

Check warning on line 424 in core/ai_orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/ai_orchestrator.go#L414-L424

Added lines #L414 - L424 were not covered by tests
}
}

func (orch *orchestrator) AIResults(tcID int64, res *RemoteAIWorkerResult) {
orch.node.AIWorkerManager.aiResults(tcID, res)
}
Expand Down Expand Up @@ -1174,3 +1192,18 @@
}
return &tr
}

func hardwareInformationFromNetHardware(hdw []*net.HardwareInformation) []worker.HardwareInformation {
var netWorkerHardware []byte
netWorkerHardware, err := json.Marshal(hdw)
if err != nil {
return []worker.HardwareInformation{}
}

Check warning on line 1201 in core/ai_orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/ai_orchestrator.go#L1200-L1201

Added lines #L1200 - L1201 were not covered by tests
var workerHardware []worker.HardwareInformation
err = json.Unmarshal(netWorkerHardware, &workerHardware)
if err != nil {
return []worker.HardwareInformation{}
}

Check warning on line 1206 in core/ai_orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/ai_orchestrator.go#L1205-L1206

Added lines #L1205 - L1206 were not covered by tests

return workerHardware
}
92 changes: 78 additions & 14 deletions core/ai_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package core

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"reflect"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -47,13 +49,50 @@ func TestServeAIWorker(t *testing.T) {
// test that an ai worker was created
caps := createAIWorkerCapabilities()
netCaps := caps.ToNetCapabilities()
go n.serveAIWorker(strm, netCaps)
go n.serveAIWorker(strm, netCaps, nil)
time.Sleep(1 * time.Second)

wkr, ok := n.AIWorkerManager.liveAIWorkers[strm]
if !ok {
t.Error("Unexpected transcoder type")
}
//confirm worker info
assert.Equal(t, wkr.capabilities, caps)
assert.Nil(t, wkr.hardware)

// test shutdown
wkr.eof <- struct{}{}
time.Sleep(1 * time.Second)

// stream should be removed
_, ok = n.AIWorkerManager.liveAIWorkers[strm]
if ok {
t.Error("Unexpected ai worker presence")
}

//confirm no workers connected
assert.Len(t, n.AIWorkerManager.liveAIWorkers, 0)

//connect worker with hardware information
strm1 := &StubAIWorkerServer{}
hdwDetail := net.GPUComputeInfo{Id: "gpu-1", Name: "gpu name", Major: 8, Minor: 9, MemoryFree: 1, MemoryTotal: 10}
hdwInfo := make(map[string]*net.GPUComputeInfo)
hdwInfo["0"] = &hdwDetail
hdw := net.HardwareInformation{Pipeline: "livepeer-pipeline", ModelId: "livepeer/model1", GpuInfo: hdwInfo}
var netHdwList []*net.HardwareInformation
netHdwList = append(netHdwList, &hdw)
go n.serveAIWorker(strm1, netCaps, netHdwList)
time.Sleep(1 * time.Second)

wkr, ok = n.AIWorkerManager.liveAIWorkers[strm1]
if !ok {
t.Error("Unexpected transcoder type")
}

//confirm worker attached and has hardware information
assert.Len(t, n.AIWorkerManager.liveAIWorkers, 1)
wkrHdw := hardwareInformationFromNetHardware(netHdwList)
assert.Equal(t, wkrHdw, n.AIWorkerManager.liveAIWorkers[strm1].hardware)

// test shutdown
wkr.eof <- struct{}{}
Expand All @@ -65,6 +104,7 @@ func TestServeAIWorker(t *testing.T) {
t.Error("Unexpected ai worker presence")
}
}

func TestServeAIWorker_IncompatibleVersion(t *testing.T) {
assert := assert.New(t)
n, _ := NewLivepeerNode(nil, "", nil)
Expand All @@ -76,7 +116,7 @@ func TestServeAIWorker_IncompatibleVersion(t *testing.T) {
// test that an ai worker was created
caps := createAIWorkerCapabilities()
netCaps := caps.ToNetCapabilities()
go n.serveAIWorker(strm, netCaps)
go n.serveAIWorker(strm, netCaps, nil)
time.Sleep(5 * time.Second)
assert.Zero(len(n.AIWorkerManager.liveAIWorkers))
assert.Zero(len(n.AIWorkerManager.remoteAIWorkers))
Expand All @@ -88,14 +128,14 @@ func TestRemoteAIWorkerManager(t *testing.T) {
initAIWorker := func() (*RemoteAIWorker, *StubAIWorkerServer) {
strm := &StubAIWorkerServer{manager: m}
caps := createAIWorkerCapabilities()
wkr := NewRemoteAIWorker(m, strm, caps)
wkr := NewRemoteAIWorker(m, strm, caps, nil)
return wkr, strm
}
//create worker and connect to manager
wkr, strm := initAIWorker()

go func() {
m.Manage(strm, wkr.capabilities.ToNetCapabilities())
m.Manage(strm, wkr.capabilities.ToNetCapabilities(), nil)
}()
time.Sleep(1 * time.Millisecond) // allow the workers to activate

Expand Down Expand Up @@ -156,9 +196,9 @@ func TestSelectAIWorker(t *testing.T) {

// register ai workers, which adds ai worker to liveAIWorkers and remoteAIWorkers
wg := newWg(1)
go func() { m.Manage(strm, capabilities.ToNetCapabilities()) }()
go func() { m.Manage(strm, capabilities.ToNetCapabilities(), nil) }()
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
go func() { m.Manage(strm2, extraModelCapabilities.ToNetCapabilities()); wg.Done() }()
go func() { m.Manage(strm2, extraModelCapabilities.ToNetCapabilities(), nil); wg.Done() }()
time.Sleep(1 * time.Millisecond) // allow time for second stream to register e for third stream to register

//update worker.addr to be different
Expand Down Expand Up @@ -256,7 +296,7 @@ func TestSelectAIWorker(t *testing.T) {
assert.EqualError(err, ErrNoCompatibleWorkersAvailable.Error())

// reconnect worker and check pipeline only on second worker is available
go func() { m.Manage(strm2, extraModelCapabilities.ToNetCapabilities()); wg.Done() }()
go func() { m.Manage(strm2, extraModelCapabilities.ToNetCapabilities(), nil); wg.Done() }()
time.Sleep(1 * time.Millisecond)
w, err = m.selectWorker(testRequestId, "image-to-image", "livepeer/model2")
assert.NotNil(w)
Expand All @@ -280,7 +320,7 @@ func TestManageAIWorkers(t *testing.T) {

// test that transcoder is added to liveTranscoders and remoteTranscoders
wg1 := newWg(1)
go func() { m.Manage(strm, capabilities.ToNetCapabilities()); wg1.Done() }()
go func() { m.Manage(strm, capabilities.ToNetCapabilities(), nil); wg1.Done() }()
time.Sleep(1 * time.Millisecond) // allow the manager to activate

assert.NotNil(m.liveAIWorkers[strm])
Expand All @@ -291,7 +331,7 @@ func TestManageAIWorkers(t *testing.T) {

// test that additional transcoder is added to liveTranscoders and remoteTranscoders
wg2 := newWg(1)
go func() { m.Manage(strm2, capabilities.ToNetCapabilities()); wg2.Done() }()
go func() { m.Manage(strm2, capabilities.ToNetCapabilities(), nil); wg2.Done() }()
time.Sleep(1 * time.Millisecond) // allow the manager to activate

assert.NotNil(m.liveAIWorkers[strm])
Expand Down Expand Up @@ -321,7 +361,7 @@ func TestRemoteAIWorkerTimeout(t *testing.T) {
strm := &StubAIWorkerServer{manager: m}
//create capabilities and constraints the ai worker sends to orch
caps := createAIWorkerCapabilities()
wkr := NewRemoteAIWorker(m, strm, caps)
wkr := NewRemoteAIWorker(m, strm, caps, nil)
return wkr, strm
}
//create a new worker
Expand Down Expand Up @@ -483,14 +523,14 @@ func TestCheckAICapacity(t *testing.T) {
initAIWorker := func() (*RemoteAIWorker, *StubAIWorkerServer) {
strm := &StubAIWorkerServer{manager: o.node.AIWorkerManager}
caps := createAIWorkerCapabilities()
wkr := NewRemoteAIWorker(o.node.AIWorkerManager, strm, caps)
wkr := NewRemoteAIWorker(o.node.AIWorkerManager, strm, caps, nil)
return wkr, strm
}
//create worker and connect to manager
wkr2, strm := initAIWorker()

go func() {
o.node.AIWorkerManager.Manage(strm, wkr2.capabilities.ToNetCapabilities())
o.node.AIWorkerManager.Manage(strm, wkr2.capabilities.ToNetCapabilities(), nil)
}()
time.Sleep(1 * time.Millisecond) // allow the workers to activate

Expand All @@ -513,12 +553,12 @@ func TestRemoteAIWorkerProcessPipelines(t *testing.T) {
initAIWorker := func() (*RemoteAIWorker, *StubAIWorkerServer) {
strm := &StubAIWorkerServer{manager: o.node.AIWorkerManager}
caps := createAIWorkerCapabilities()
wkr := NewRemoteAIWorker(o.node.AIWorkerManager, strm, caps)
wkr := NewRemoteAIWorker(o.node.AIWorkerManager, strm, caps, nil)
return wkr, strm
}
//create worker and connect to manager
wkr, strm := initAIWorker()
go o.node.serveAIWorker(strm, wkr.capabilities.ToNetCapabilities())
go o.node.serveAIWorker(strm, wkr.capabilities.ToNetCapabilities(), nil)
time.Sleep(5 * time.Millisecond) // allow the workers to activate

//check workers connected
Expand Down Expand Up @@ -686,6 +726,10 @@ func (a *stubAIWorker) EnsureImageAvailable(ctx context.Context, pipeline string
return nil
}

func (a *stubAIWorker) HardwareInformation() []worker.HardwareInformation {
return nil
}

type StubAIWorkerServer struct {
manager *RemoteAIWorkerManager
SendError error
Expand Down Expand Up @@ -800,3 +844,23 @@ func TestParseAIModelConfigs(t *testing.T) {
})
}
}

func TestHardwareInformationFromNetHardware(t *testing.T) {
netHdwDetail := net.GPUComputeInfo{Id: "gpu-1", Name: "gpu name", Major: 8, Minor: 9, MemoryFree: 1, MemoryTotal: 10}
netHdwInfo := make(map[string]*net.GPUComputeInfo)
netHdwInfo["0"] = &netHdwDetail
netHdw := net.HardwareInformation{Pipeline: "livepeer-pipeline", ModelId: "livepeer/model1", GpuInfo: netHdwInfo}
var netHdwList []*net.HardwareInformation
netHdwList = append(netHdwList, &netHdw)
//create []worker.HardwareInformation
hdwList := hardwareInformationFromNetHardware(netHdwList)

netHdwJson, _ := json.Marshal(netHdwList)
hdwJson, _ := json.Marshal(hdwList)

var hdw1, hdw2 interface{}
json.Unmarshal(netHdwJson, &hdw1)
json.Unmarshal(hdwJson, &hdw2)
assert.True(t, reflect.DeepEqual(hdw1, hdw2))

}
Loading
Loading