Skip to content

Commit 1937957

Browse files
committed
Cleanup and used existing caretta architecture
1 parent c857a1d commit 1937957

File tree

4 files changed

+104
-128
lines changed

4 files changed

+104
-128
lines changed

pkg/caretta/caretta.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (caretta *Caretta) Start() {
7373
pollingTicker := time.NewTicker(time.Duration(caretta.config.pollingIntervalSeconds) * time.Second)
7474

7575
pastLinks := make(map[NetworkLink]uint64)
76-
pastConnections := []ConnectionLink{}
76+
pastTcpConnections := make(map[TcpConnection]uint64)
7777

7878
go func() {
7979
for {
@@ -82,20 +82,20 @@ func (caretta *Caretta) Start() {
8282
return
8383
case <-pollingTicker.C:
8484
var links map[NetworkLink]uint64
85-
var connections []ConnectionLink
85+
var tcpConnections map[TcpConnection]uint64
8686

8787
if err != nil {
8888
log.Printf("Error updating snapshot of cluster state, skipping iteration")
8989
continue
9090
}
9191

92-
pastLinks, links, pastConnections, connections = caretta.tracer.TracesPollingIteration(pastLinks, pastConnections)
92+
pastLinks, links, pastTcpConnections, tcpConnections = caretta.tracer.TracesPollingIteration(pastLinks, pastTcpConnections)
9393
for link, throughput := range links {
9494
caretta.handleLink(&link, throughput)
9595
}
9696

97-
for _, connection := range connections {
98-
caretta.handleConnection(&connection)
97+
for connection, throughput := range tcpConnections {
98+
caretta.handleTcpConnection(&connection, throughput)
9999
}
100100
}
101101
}
@@ -133,10 +133,20 @@ func (caretta *Caretta) handleLink(link *NetworkLink, throughput uint64) {
133133
}).Set(float64(throughput))
134134
}
135135

136-
func (caretta *Caretta) handleConnection(connection *ConnectionLink) {
136+
func (caretta *Caretta) handleTcpConnection(connection *TcpConnection, throughput uint64) {
137+
tcpState := "unknown"
138+
switch connection.State {
139+
case TcpConnectionOpenState:
140+
tcpState = "open"
141+
case TcpConnectionAcceptState:
142+
tcpState = "accept"
143+
case TcpConnectionClosedState:
144+
tcpState = "closed"
145+
}
146+
137147
connectionsMetrics.With(prometheus.Labels{
138148
"connection_id": strconv.Itoa(int(fnvHash(connection.Client.Name+connection.Client.Namespace+connection.Server.Name+connection.Server.Namespace) + connection.Role)),
139-
"connection_state": connection.State,
149+
"connection_state": tcpState,
140150
"client_id": strconv.Itoa(int(fnvHash(connection.Client.Name + connection.Client.Namespace))),
141151
"client_name": connection.Client.Name,
142152
"client_namespace": connection.Client.Namespace,
@@ -148,7 +158,7 @@ func (caretta *Caretta) handleConnection(connection *ConnectionLink) {
148158
"server_kind": connection.Server.Kind,
149159
"server_port": strconv.Itoa(int(connection.ServerPort)),
150160
"role": strconv.Itoa(int(connection.Role)),
151-
}).Set(1)
161+
}).Set(float64(throughput))
152162
}
153163

154164
func (caretta *Caretta) getClientSet() (*kubernetes.Clientset, error) {

pkg/caretta/links_tracer.go

+57-47
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,15 @@ func (tracer *LinksTracer) Stop() error {
8989

9090
// a single polling from the eBPF maps
9191
// iterating the traces from the kernel-space, summing each network link
92-
func (tracer *LinksTracer) TracesPollingIteration(pastLinks map[NetworkLink]uint64, pastConnections []ConnectionLink) (map[NetworkLink]uint64, map[NetworkLink]uint64, []ConnectionLink, []ConnectionLink) {
92+
func (tracer *LinksTracer) TracesPollingIteration(pastLinks map[NetworkLink]uint64, pastTcpConnections map[TcpConnection]uint64) (map[NetworkLink]uint64, map[NetworkLink]uint64, map[TcpConnection]uint64, map[TcpConnection]uint64) {
9393
// outline of an iteration -
9494
// filter unwanted connections, sum all connections as links, add past links, and return the new map
9595
pollsMade.Inc()
9696
unroledCounter := 0
9797
loopbackCounter := 0
9898

99-
currentConnections := []ConnectionLink{}
100-
10199
currentLinks := make(map[NetworkLink]uint64)
100+
currentTcpConnections := make(map[TcpConnection]uint64)
102101
var connectionsToDelete []ConnectionIdentifier
103102

104103
var conn ConnectionIdentifier
@@ -128,49 +127,14 @@ func (tracer *LinksTracer) TracesPollingIteration(pastLinks map[NetworkLink]uint
128127
continue
129128
}
130129

131-
currentLinks[link] += throughput.BytesSent
132-
133-
// Update observed connections
134-
state := "open"
135-
if throughput.IsActive == 0 {
136-
state = "close"
137-
} else if conn.Role == ServerConnectionRole {
138-
state = "accept"
139-
}
140-
141-
connectionLink := ConnectionLink{
142-
Client: link.Client,
143-
Server: link.Server,
144-
ServerPort: link.ServerPort,
145-
Role: conn.Role,
146-
State: state,
147-
}
148-
149-
// Try to find the connection in the past connections
150-
addToConnections := true
151-
for i, pastConnection := range pastConnections {
152-
// If the client, server and port are the same, and the state is close,
153-
// remove the connection from the past connections and don't add it to the current connections
154-
if pastConnection.Client == connectionLink.Client &&
155-
pastConnection.Server == connectionLink.Server &&
156-
pastConnection.ServerPort == connectionLink.ServerPort &&
157-
pastConnection.Role == connectionLink.Role {
158-
159-
if pastConnection.State == "close" && state == "close" {
160-
pastConnections = append(pastConnections[:i], pastConnections[i+1:]...)
161-
addToConnections = false
162-
break
163-
}
164-
}
130+
tcpConn, err := tracer.reduceConnectionToTcp(conn, throughput)
131+
if err != nil {
132+
unroledCounter++
133+
continue
165134
}
166135

167-
if addToConnections {
168-
currentConnections = append(currentConnections, connectionLink)
169-
170-
if throughput.IsActive == 0 {
171-
pastConnections = append(pastConnections, connectionLink)
172-
}
173-
}
136+
currentLinks[link] += throughput.BytesSent
137+
currentTcpConnections[tcpConn] += throughput.BytesSent
174138
}
175139

176140
mapSize.Set(float64(itemsCounter))
@@ -182,16 +146,21 @@ func (tracer *LinksTracer) TracesPollingIteration(pastLinks map[NetworkLink]uint
182146
currentLinks[pastLink] += pastThroughput
183147
}
184148

149+
// add past connections
150+
for pastConn, pastThroughput := range pastTcpConnections {
151+
currentTcpConnections[pastConn] += pastThroughput
152+
}
153+
185154
// delete connections marked to delete
186155
for _, conn := range connectionsToDelete {
187-
tracer.deleteAndStoreConnection(&conn, pastLinks)
156+
tracer.deleteAndStoreConnection(&conn, pastLinks, pastTcpConnections)
188157
}
189158

190-
return pastLinks, currentLinks, pastConnections, currentConnections
159+
return pastLinks, currentLinks, pastTcpConnections, currentTcpConnections
191160

192161
}
193162

194-
func (tracer *LinksTracer) deleteAndStoreConnection(conn *ConnectionIdentifier, pastLinks map[NetworkLink]uint64) {
163+
func (tracer *LinksTracer) deleteAndStoreConnection(conn *ConnectionIdentifier, pastLinks map[NetworkLink]uint64, pastTcpConnections map[TcpConnection]uint64) {
195164
// newer kernels introduce batch map operation, but it might not be available so we delete item-by-item
196165
var throughput ConnectionThroughputStats
197166
err := tracer.connections.Lookup(conn, &throughput)
@@ -212,7 +181,17 @@ func (tracer *LinksTracer) deleteAndStoreConnection(conn *ConnectionIdentifier,
212181
log.Printf("Error reducing connection to link when deleting: %v", err)
213182
return
214183
}
184+
185+
// if deletion is successful, add it to past tcp connections
186+
tcpConn, err := tracer.reduceConnectionToTcp(*conn, throughput)
187+
if err != nil {
188+
log.Printf("Error reducing connection to tcp connection when deleting: %v", err)
189+
return
190+
}
191+
215192
pastLinks[link] += throughput.BytesSent
193+
pastTcpConnections[tcpConn] += throughput.BytesSent
194+
216195
mapDeletions.Inc()
217196
}
218197

@@ -240,6 +219,37 @@ func (tracer *LinksTracer) reduceConnectionToLink(connection ConnectionIdentifie
240219
return link, nil
241220
}
242221

222+
// reduce a specific connection to a general tcp connection
223+
func (tracer *LinksTracer) reduceConnectionToTcp(connection ConnectionIdentifier, throughput ConnectionThroughputStats) (TcpConnection, error) {
224+
var tcpConn TcpConnection
225+
tcpConn.Role = connection.Role
226+
227+
srcWorkload := tracer.resolver.ResolveIP(IP(connection.Tuple.SrcIp).String())
228+
dstWorkload := tracer.resolver.ResolveIP(IP(connection.Tuple.DstIp).String())
229+
230+
if connection.Role == ClientConnectionRole {
231+
// Src is Client, Dst is Server, Port is DstPort
232+
tcpConn.Client = srcWorkload
233+
tcpConn.Server = dstWorkload
234+
tcpConn.ServerPort = connection.Tuple.DstPort
235+
tcpConn.State = TcpConnectionOpenState
236+
} else if connection.Role == ServerConnectionRole {
237+
// Dst is Client, Src is Server, Port is SrcPort
238+
tcpConn.Client = dstWorkload
239+
tcpConn.Server = srcWorkload
240+
tcpConn.ServerPort = connection.Tuple.SrcPort
241+
tcpConn.State = TcpConnectionAcceptState
242+
} else {
243+
return TcpConnection{}, errors.New("connection's role is unknown")
244+
}
245+
246+
if throughput.IsActive == 0 {
247+
tcpConn.State = TcpConnectionClosedState
248+
}
249+
250+
return tcpConn, nil
251+
}
252+
243253
func isAddressLoopback(ip uint32) bool {
244254
ipAddr := make(net.IP, 4)
245255
binary.LittleEndian.PutUint32(ipAddr, ip)

pkg/caretta/links_tracer_test.go

+21-68
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ func TestAggregations(t *testing.T) {
357357

358358
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
359359
pastLinks := make(map[caretta.NetworkLink]uint64)
360-
pastConnections := []caretta.ConnectionLink{}
360+
pastConnections := make(map[caretta.TcpConnection]uint64)
361361
var currentLinks map[caretta.NetworkLink]uint64
362362
for _, connection := range test.connections {
363363
m.Update(connection.connId, connection.throughput)
@@ -388,7 +388,7 @@ func TestDeletion_ActiveConnection_NotDeleted(t *testing.T) {
388388
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
389389

390390
pastLinks := make(map[caretta.NetworkLink]uint64)
391-
pastConnections := []caretta.ConnectionLink{}
391+
pastConnections := make(map[caretta.TcpConnection]uint64)
392392

393393
// Act
394394
m.Update(conn1, throughput1)
@@ -423,7 +423,7 @@ func TestDeletion_InactiveConnection_AddedToPastLinksAndRemovedFromMap(t *testin
423423
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
424424

425425
pastLinks := make(map[caretta.NetworkLink]uint64)
426-
pastConnections := []caretta.ConnectionLink{}
426+
pastConnections := make(map[caretta.TcpConnection]uint64)
427427

428428
pastLinks, _, _, _ = tracer.TracesPollingIteration(pastLinks, pastConnections)
429429

@@ -462,7 +462,7 @@ func TestDeletion_InactiveConnection_NewConnectionAfterDeletionUpdatesCorrectly(
462462
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
463463

464464
pastLinks := make(map[caretta.NetworkLink]uint64)
465-
pastConnections := []caretta.ConnectionLink{}
465+
pastConnections := make(map[caretta.TcpConnection]uint64)
466466

467467
// update the throughput so the connection is inactive
468468
throughput2 := inactiveThroughput
@@ -497,15 +497,19 @@ func TestConnectionState_Open(t *testing.T) {
497497
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
498498

499499
pastLinks := make(map[caretta.NetworkLink]uint64)
500-
pastConnections := []caretta.ConnectionLink{}
500+
pastConnections := make(map[caretta.TcpConnection]uint64)
501501

502502
// Act
503503
m.Update(conn1, throughput1)
504504
_, _, _, currentConnections := tracer.TracesPollingIteration(pastLinks, pastConnections)
505505

506506
// Assert
507507
assert.Equal(1, len(currentConnections))
508-
assert.Equal("open", currentConnections[0].State)
508+
// Get the first element of the map
509+
for tcp := range currentConnections {
510+
assert.Equal(uint32(caretta.TcpConnectionOpenState), tcp.State)
511+
break
512+
}
509513
}
510514

511515
func TestConnectionState_Close(t *testing.T) {
@@ -525,15 +529,18 @@ func TestConnectionState_Close(t *testing.T) {
525529
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
526530

527531
pastLinks := make(map[caretta.NetworkLink]uint64)
528-
pastConnections := []caretta.ConnectionLink{}
532+
pastConnections := make(map[caretta.TcpConnection]uint64)
529533

530534
// Act
531535
m.Update(conn1, throughput1)
532536
_, _, _, currentConnections := tracer.TracesPollingIteration(pastLinks, pastConnections)
533537

534538
// Assert
535539
assert.Equal(1, len(currentConnections))
536-
assert.Equal("close", currentConnections[0].State)
540+
for tcp := range currentConnections {
541+
assert.Equal(uint32(caretta.TcpConnectionClosedState), tcp.State)
542+
break
543+
}
537544
}
538545

539546
func TestConnectionState_Accept(t *testing.T) {
@@ -553,15 +560,18 @@ func TestConnectionState_Accept(t *testing.T) {
553560
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
554561

555562
pastLinks := make(map[caretta.NetworkLink]uint64)
556-
pastConnections := []caretta.ConnectionLink{}
563+
pastConnections := make(map[caretta.TcpConnection]uint64)
557564

558565
// Act
559566
m.Update(conn1, throughput1)
560567
_, _, _, currentConnections := tracer.TracesPollingIteration(pastLinks, pastConnections)
561568

562569
// Assert
563570
assert.Equal(1, len(currentConnections))
564-
assert.Equal("accept", currentConnections[0].State)
571+
for tcp := range currentConnections {
572+
assert.Equal(uint32(caretta.TcpConnectionAcceptState), tcp.State)
573+
break
574+
}
565575
}
566576

567577
func TestConnectionState_UnknownRole(t *testing.T) {
@@ -581,7 +591,7 @@ func TestConnectionState_UnknownRole(t *testing.T) {
581591
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
582592

583593
pastLinks := make(map[caretta.NetworkLink]uint64)
584-
pastConnections := []caretta.ConnectionLink{}
594+
pastConnections := make(map[caretta.TcpConnection]uint64)
585595

586596
// Act
587597
m.Update(conn1, throughput1)
@@ -590,60 +600,3 @@ func TestConnectionState_UnknownRole(t *testing.T) {
590600
// Assert
591601
assert.Equal(0, len(currentConnections))
592602
}
593-
594-
func TestConnectionRemoved_AfterClosed(t *testing.T) {
595-
assert := assert.New(t)
596-
597-
// Arrange mock map, initial connection
598-
m := NewMockConnectionsMap()
599-
600-
conn1 := caretta.ConnectionIdentifier{
601-
Id: 1,
602-
Pid: 1,
603-
Tuple: serverTuple,
604-
Role: caretta.ServerConnectionRole,
605-
}
606-
throughput1 := activeThroughput
607-
608-
tracer := caretta.NewTracerWithObjs(&MockResolver{}, m, nil)
609-
610-
pastLinks := make(map[caretta.NetworkLink]uint64)
611-
pastConnections := []caretta.ConnectionLink{}
612-
613-
// Act
614-
m.Update(conn1, throughput1)
615-
_, _, pastConnections, currentConnections := tracer.TracesPollingIteration(pastLinks, pastConnections)
616-
617-
// Assert
618-
assert.Equal(1, len(currentConnections))
619-
assert.Equal(0, len(pastConnections))
620-
621-
// Act
622-
throughput2 := inactiveThroughput
623-
m.Update(conn1, throughput2)
624-
_, _, pastConnections, currentConnections = tracer.TracesPollingIteration(pastLinks, pastConnections)
625-
626-
// Assert
627-
assert.Equal(1, len(currentConnections))
628-
assert.Equal(1, len(pastConnections))
629-
630-
// Act
631-
throughput3 := inactiveThroughput
632-
m.Update(conn1, throughput3)
633-
_, _, pastConnections, currentConnections = tracer.TracesPollingIteration(pastLinks, pastConnections)
634-
635-
// Assert
636-
assert.Equal(0, len(currentConnections))
637-
assert.Equal(0, len(pastConnections))
638-
639-
// Attempt to re-add the connection
640-
throughput4 := activeThroughput
641-
642-
// Act
643-
m.Update(conn1, throughput4)
644-
_, _, pastConnections, currentConnections = tracer.TracesPollingIteration(pastLinks, pastConnections)
645-
646-
// Assert
647-
assert.Equal(1, len(currentConnections))
648-
assert.Equal(0, len(pastConnections))
649-
}

0 commit comments

Comments
 (0)