Skip to content

Commit 0c88007

Browse files
committed
Send structured data over the streaming log api.
1 parent c5dfa20 commit 0c88007

11 files changed

+185
-113
lines changed

api/client.go

+37-14
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"os"
1313
"strconv"
1414
"strings"
15+
"time"
1516

1617
"github.com/juju/errors"
1718
"github.com/juju/loggo"
@@ -593,25 +594,47 @@ func (args DebugLogParams) URLQuery() url.Values {
593594
return attrs
594595
}
595596

596-
// WatchDebugLog returns a ReadCloser that the caller can read the log
597-
// lines from. Only log lines that match the filtering specified in
598-
// the DebugLogParams are returned. It returns an error that satisfies
599-
// errors.IsNotImplemented when the API server does not support the
600-
// end-point.
601-
func (c *Client) WatchDebugLog(args DebugLogParams) (io.ReadCloser, error) {
602-
// The websocket connection just hangs if the server doesn't have the log
603-
// end point. So do a version check, as version was added at the same time
604-
// as the remote end point.
605-
_, err := c.AgentVersion()
606-
if err != nil {
607-
return nil, errors.NotSupportedf("WatchDebugLog")
608-
}
597+
// LogMessage is a structured logging entry.
598+
type LogMessage struct {
599+
Entity string
600+
Timestamp time.Time
601+
Severity string
602+
Module string
603+
Location string
604+
Message string
605+
}
606+
607+
// WatchDebugLog returns a channel of structured Log Messages. Only log entries
608+
// that match the filtering specified in the DebugLogParams are returned.
609+
func (c *Client) WatchDebugLog(args DebugLogParams) (<-chan LogMessage, error) {
609610
// Prepare URL query attributes.
610611
attrs := args.URLQuery()
611612

612613
connection, err := c.st.ConnectStream("/log", attrs)
613614
if err != nil {
614615
return nil, errors.Trace(err)
615616
}
616-
return connection, nil
617+
618+
messages := make(chan LogMessage)
619+
go func() {
620+
defer close(messages)
621+
622+
for {
623+
var msg params.LogMessage
624+
err := connection.ReadJSON(&msg)
625+
if err != nil {
626+
return
627+
}
628+
messages <- LogMessage{
629+
Entity: msg.Entity,
630+
Timestamp: msg.Timestamp,
631+
Severity: msg.Severity,
632+
Module: msg.Module,
633+
Location: msg.Location,
634+
Message: msg.Message,
635+
}
636+
}
637+
}()
638+
639+
return messages, nil
617640
}

api/client_test.go

+22-31
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package api_test
55

66
import (
7-
"bufio"
87
"bytes"
98
"fmt"
109
"io"
@@ -357,10 +356,9 @@ func (s *clientSuite) TestWatchDebugLogConnected(c *gc.C) {
357356
// Use the no tail option so we don't try to start a tailing cursor
358357
// on the oplog when there is no oplog configured in mongo as the tests
359358
// don't set up mongo in replicaset mode.
360-
reader, err := client.WatchDebugLog(api.DebugLogParams{NoTail: true})
359+
messages, err := client.WatchDebugLog(api.DebugLogParams{NoTail: true})
361360
c.Assert(err, jc.ErrorIsNil)
362-
c.Assert(reader, gc.NotNil)
363-
reader.Close()
361+
c.Assert(messages, gc.NotNil)
364362
}
365363

366364
func (s *clientSuite) TestConnectStreamRequiresSlashPathPrefix(c *gc.C) {
@@ -407,7 +405,8 @@ func (s *clientSuite) TestConnectStreamErrorReadError(c *gc.C) {
407405
}
408406

409407
func (s *clientSuite) TestWatchDebugLogParamsEncoded(c *gc.C) {
410-
s.PatchValue(api.WebsocketDialConfig, echoURL(c))
408+
catcher := urlCatcher{}
409+
s.PatchValue(api.WebsocketDialConfig, catcher.recordLocation)
411410

412411
params := api.DebugLogParams{
413412
IncludeEntity: []string{"a", "b"},
@@ -422,10 +421,10 @@ func (s *clientSuite) TestWatchDebugLogParamsEncoded(c *gc.C) {
422421
}
423422

424423
client := s.APIState.Client()
425-
reader, err := client.WatchDebugLog(params)
424+
_, err := client.WatchDebugLog(params)
426425
c.Assert(err, jc.ErrorIsNil)
427426

428-
connectURL := connectURLFromReader(c, reader)
427+
connectURL := catcher.location
429428
values := connectURL.Query()
430429
c.Assert(values, jc.DeepEquals, url.Values{
431430
"includeEntity": params.IncludeEntity,
@@ -441,18 +440,18 @@ func (s *clientSuite) TestWatchDebugLogParamsEncoded(c *gc.C) {
441440
}
442441

443442
func (s *clientSuite) TestConnectStreamAtUUIDPath(c *gc.C) {
444-
s.PatchValue(api.WebsocketDialConfig, echoURL(c))
445-
// If the server supports it, we should log at "/model/UUID/log"
443+
catcher := urlCatcher{}
444+
s.PatchValue(api.WebsocketDialConfig, catcher.recordLocation)
446445
environ, err := s.State.Model()
447446
c.Assert(err, jc.ErrorIsNil)
448447
info := s.APIInfo(c)
449448
info.ModelTag = environ.ModelTag()
450449
apistate, err := api.Open(info, api.DialOpts{})
451450
c.Assert(err, jc.ErrorIsNil)
452451
defer apistate.Close()
453-
reader, err := apistate.ConnectStream("/path", nil)
452+
_, err = apistate.ConnectStream("/path", nil)
454453
c.Assert(err, jc.ErrorIsNil)
455-
connectURL := connectURLFromReader(c, reader)
454+
connectURL := catcher.location
456455
c.Assert(connectURL.Path, gc.Matches, fmt.Sprintf("/model/%s/path", environ.UUID()))
457456
}
458457

@@ -529,25 +528,17 @@ func (r *badReader) Read(p []byte) (n int, err error) {
529528
return 0, r.err
530529
}
531530

532-
func echoURL(c *gc.C) func(*websocket.Config) (base.Stream, error) {
533-
return func(config *websocket.Config) (base.Stream, error) {
534-
pr, pw := io.Pipe()
535-
go func() {
536-
fmt.Fprintf(pw, "null\n")
537-
fmt.Fprintf(pw, "%s\n", config.Location)
538-
}()
539-
return fakeStreamReader{pr}, nil
540-
}
531+
type urlCatcher struct {
532+
location *url.URL
541533
}
542534

543-
func connectURLFromReader(c *gc.C, rc io.ReadCloser) *url.URL {
544-
bufReader := bufio.NewReader(rc)
545-
location, err := bufReader.ReadString('\n')
546-
c.Assert(err, jc.ErrorIsNil)
547-
connectURL, err := url.Parse(strings.TrimSpace(location))
548-
c.Assert(err, jc.ErrorIsNil)
549-
rc.Close()
550-
return connectURL
535+
func (u *urlCatcher) recordLocation(config *websocket.Config) (base.Stream, error) {
536+
u.location = config.Location
537+
pr, pw := io.Pipe()
538+
go func() {
539+
fmt.Fprintf(pw, "null\n")
540+
}()
541+
return fakeStreamReader{pr}, nil
551542
}
552543

553544
type fakeStreamReader struct {
@@ -562,13 +553,13 @@ func (s fakeStreamReader) Close() error {
562553
}
563554

564555
func (s fakeStreamReader) Write([]byte) (int, error) {
565-
panic("not implemented")
556+
return 0, errors.NotImplementedf("Write")
566557
}
567558

568559
func (s fakeStreamReader) ReadJSON(v interface{}) error {
569-
panic("not implemented")
560+
return errors.NotImplementedf("ReadJSON")
570561
}
571562

572563
func (s fakeStreamReader) WriteJSON(v interface{}) error {
573-
panic("not implemented")
564+
return errors.NotImplementedf("WriteJSON")
574565
}

api/state_macaroon_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ func (s *macaroonLoginSuite) TestUnknownUserLogin(c *gc.C) {
6060
}
6161

6262
func (s *macaroonLoginSuite) TestConnectStream(c *gc.C) {
63-
s.PatchValue(api.WebsocketDialConfig, echoURL(c))
63+
catcher := urlCatcher{}
64+
s.PatchValue(api.WebsocketDialConfig, catcher.recordLocation)
6465

6566
dischargeCount := 0
6667
s.DischargerLogin = func() string {
@@ -77,13 +78,14 @@ func (s *macaroonLoginSuite) TestConnectStream(c *gc.C) {
7778
conn, err := s.client.ConnectStream("/path", nil)
7879
c.Assert(err, gc.IsNil)
7980
defer conn.Close()
80-
connectURL := connectURLFromReader(c, conn)
81+
connectURL := catcher.location
8182
c.Assert(connectURL.Path, gc.Equals, "/model/"+s.State.ModelTag().Id()+"/path")
8283
c.Assert(dischargeCount, gc.Equals, 1)
8384
}
8485

8586
func (s *macaroonLoginSuite) TestConnectStreamWithoutLogin(c *gc.C) {
86-
s.PatchValue(api.WebsocketDialConfig, echoURL(c))
87+
catcher := urlCatcher{}
88+
s.PatchValue(api.WebsocketDialConfig, catcher.recordLocation)
8789

8890
conn, err := s.client.ConnectStream("/path", nil)
8991
c.Assert(err, gc.ErrorMatches, `cannot use ConnectStream without logging in`)

apiserver/debuglog.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package apiserver
55

66
import (
7-
"io"
87
"net"
98
"net/http"
109
"net/url"
@@ -71,7 +70,7 @@ func (h *debugLogHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
7170
server := websocket.Server{
7271
Handler: func(conn *websocket.Conn) {
7372
socket := &debugLogSocketImpl{conn}
74-
defer socket.Close()
73+
defer conn.Close()
7574

7675
logger.Infof("debug log handler starting")
7776
// Validate before authenticate because the authentication is
@@ -111,20 +110,21 @@ func isBrokenPipe(err error) bool {
111110
// debugLogSocket describes the functionality required for the
112111
// debuglog handlers to send logs to the client.
113112
type debugLogSocket interface {
114-
io.Writer
115-
116113
// sendOk sends a nil error response, indicating there were no errors.
117114
sendOk()
118115

119116
// sendError sends a JSON-encoded error response.
120117
sendError(err error)
118+
119+
// sendLogRecord sends record JSON encoded.
120+
sendLogRecord(record *params.LogMessage) error
121121
}
122122

123123
// debugLogSocketImpl implements the debugLogSocket interface. It
124124
// wraps a websocket.Conn and provides a few debug-log specific helper
125125
// methods.
126126
type debugLogSocketImpl struct {
127-
*websocket.Conn
127+
conn *websocket.Conn
128128
}
129129

130130
// sendOk implements debugLogSocket.
@@ -134,11 +134,15 @@ func (s *debugLogSocketImpl) sendOk() {
134134

135135
// sendError implements debugLogSocket.
136136
func (s *debugLogSocketImpl) sendError(err error) {
137-
sendJSON(s.Conn, &params.ErrorResult{
137+
sendJSON(s.conn, &params.ErrorResult{
138138
Error: common.ServerError(err),
139139
})
140140
}
141141

142+
func (s *debugLogSocketImpl) sendLogRecord(record *params.LogMessage) error {
143+
return sendJSON(s.conn, record)
144+
}
145+
142146
// debugLogParams contains the parsed debuglog API request parameters.
143147
type debugLogParams struct {
144148
maxLines uint

apiserver/debuglog_db.go

+11-18
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@
44
package apiserver
55

66
import (
7-
"fmt"
87
"net/http"
9-
"time"
108

119
"github.com/juju/errors"
1210

11+
"github.com/juju/juju/apiserver/params"
1312
"github.com/juju/juju/state"
1413
)
1514

@@ -43,9 +42,7 @@ func handleDebugLogDBRequest(
4342
return errors.Annotate(tailer.Err(), "tailer stopped")
4443
}
4544

46-
line := formatLogRecord(rec)
47-
_, err := socket.Write([]byte(line))
48-
if err != nil {
45+
if err := socket.sendLogRecord(formatLogRecord(rec)); err != nil {
4946
return errors.Annotate(err, "sending failed")
5047
}
5148

@@ -73,19 +70,15 @@ func makeLogTailerParams(reqParams *debugLogParams) *state.LogTailerParams {
7370
return params
7471
}
7572

76-
func formatLogRecord(r *state.LogRecord) string {
77-
return fmt.Sprintf("%s: %s %s %s %s %s\n",
78-
r.Entity,
79-
formatTime(r.Time),
80-
r.Level.String(),
81-
r.Module,
82-
r.Location,
83-
r.Message,
84-
)
85-
}
86-
87-
func formatTime(t time.Time) string {
88-
return t.In(time.UTC).Format("2006-01-02 15:04:05")
73+
func formatLogRecord(r *state.LogRecord) *params.LogMessage {
74+
return &params.LogMessage{
75+
Entity: r.Entity.String(),
76+
Timestamp: r.Time,
77+
Severity: r.Level.String(),
78+
Module: r.Module,
79+
Location: r.Location,
80+
Message: r.Message,
81+
}
8982
}
9083

9184
var newLogTailer = _newLogTailer // For replacing in tests

apiserver/debuglog_db_internal_test.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
gc "gopkg.in/check.v1"
1313
"gopkg.in/juju/names.v2"
1414

15+
"github.com/juju/juju/apiserver/params"
1516
"github.com/juju/juju/state"
1617
coretesting "github.com/juju/juju/testing"
1718
)
@@ -244,7 +245,17 @@ func (s *fakeDebugLogSocket) sendError(err error) {
244245
s.writes <- fmt.Sprintf("err: %v", err)
245246
}
246247

247-
func (s *fakeDebugLogSocket) Write(buf []byte) (int, error) {
248-
s.writes <- string(buf)
249-
return len(buf), nil
248+
func (s *fakeDebugLogSocket) sendLogRecord(r *params.LogMessage) error {
249+
s.writes <- fmt.Sprintf("%s: %s %s %s %s %s\n",
250+
r.Entity,
251+
s.formatTime(r.Timestamp),
252+
r.Severity,
253+
r.Module,
254+
r.Location,
255+
r.Message)
256+
return nil
257+
}
258+
259+
func (c *fakeDebugLogSocket) formatTime(t time.Time) string {
260+
return t.In(time.UTC).Format("2006-01-02 15:04:05")
250261
}

apiserver/httpcontext.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,15 @@ func (ctxt *httpContext) stop() <-chan struct{} {
180180

181181
// sendJSON writes a JSON-encoded response value
182182
// to the given writer along with a trailing newline.
183-
func sendJSON(w io.Writer, response interface{}) {
183+
func sendJSON(w io.Writer, response interface{}) error {
184184
body, err := json.Marshal(response)
185185
if err != nil {
186186
logger.Errorf("cannot marshal JSON result %#v: %v", response, err)
187-
return
187+
return err
188188
}
189189
body = append(body, '\n')
190-
w.Write(body)
190+
_, err = w.Write(body)
191+
return err
191192
}
192193

193194
// sendStatusAndJSON sends an HTTP status code and

apiserver/params/internal.go

+10
Original file line numberDiff line numberDiff line change
@@ -707,3 +707,13 @@ type GUIVersionRequest struct {
707707
// Version holds the Juju GUI version number.
708708
Version version.Number `json:"version"`
709709
}
710+
711+
// LogMessage is a structured logging entry.
712+
type LogMessage struct {
713+
Entity string `json:"tag"`
714+
Timestamp time.Time `json:"ts"`
715+
Severity string `json:"sev"`
716+
Module string `json:"mod"`
717+
Location string `json:"loc"`
718+
Message string `json:"msg"`
719+
}

0 commit comments

Comments
 (0)