Skip to content

Commit b8ebc82

Browse files
authored
Merge pull request #7 from halkyon/add-retrieve-command
Add support for retrieving purged logs stored in Storj DCS
2 parents 2921fd6 + 19c6cf5 commit b8ebc82

File tree

5 files changed

+360
-1
lines changed

5 files changed

+360
-1
lines changed

cmd/oklog/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func usage() {
3333
fmt.Fprintf(os.Stderr, " query Querying commandline tool\n")
3434
fmt.Fprintf(os.Stderr, " stream Streaming commandline tool\n")
3535
fmt.Fprintf(os.Stderr, " testsvc Test service, emits log lines at a fixed rate\n")
36+
fmt.Fprintf(os.Stderr, " retrieve Retrieve commandline tool to read purged logs stored in Storj DCS")
3637
fmt.Fprintf(os.Stderr, "\n")
3738
fmt.Fprintf(os.Stderr, "VERSION\n")
3839
fmt.Fprintf(os.Stderr, " %s (%s)\n", version, runtime.Version())
@@ -61,6 +62,8 @@ func main() {
6162
run = runStream
6263
case "testsvc":
6364
run = runTestService
65+
case "retrieve":
66+
run = runRetrieve
6467
default:
6568
usage()
6669
os.Exit(1)

cmd/oklog/retrieve.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"net/url"
9+
"os"
10+
"strings"
11+
"time"
12+
13+
"github.com/oklog/oklog/pkg/store"
14+
"github.com/pkg/errors"
15+
)
16+
17+
func runRetrieve(args []string) error {
18+
flagset := flag.NewFlagSet("retrieve", flag.ExitOnError)
19+
var (
20+
storeAddr = flagset.String("store", "localhost:7650", "address of store instance to query")
21+
from = flagset.String("from", "1h", "from, as RFC3339 timestamp or duration ago")
22+
to = flagset.String("to", "now", "to, as RFC3339 timestamp or duration ago")
23+
withulid = flagset.Bool("ulid", false, "include ULID prefix with each record")
24+
withtime = flagset.Bool("time", false, "include time prefix with each record")
25+
verbose = flagset.Bool("v", false, "verbose output to stderr")
26+
)
27+
flagset.Usage = usageFor(flagset, "oklog retrieve [flags]")
28+
if err := flagset.Parse(args); err != nil {
29+
return err
30+
}
31+
32+
verbosePrintf := func(string, ...interface{}) {}
33+
if *verbose {
34+
verbosePrintf = func(format string, args ...interface{}) {
35+
fmt.Fprintf(os.Stderr, format, args...)
36+
}
37+
}
38+
39+
_, hostport, _, _, err := parseAddr(*storeAddr, defaultAPIPort)
40+
if err != nil {
41+
return errors.Wrap(err, "couldn't parse -store")
42+
}
43+
44+
fromDuration, durationErr := time.ParseDuration(*from)
45+
fromTime, timeErr := time.Parse(time.RFC3339Nano, *from)
46+
fromNow := strings.ToLower(*from) == "now"
47+
var fromStr string
48+
switch {
49+
case fromNow:
50+
fromStr = time.Now().Format(time.RFC3339)
51+
case durationErr == nil && timeErr != nil:
52+
fromStr = time.Now().Add(neg(fromDuration)).Format(time.RFC3339)
53+
case durationErr != nil && timeErr == nil:
54+
fromStr = fromTime.Format(time.RFC3339)
55+
default:
56+
return fmt.Errorf("couldn't parse -from (%q) as either duration or time", *from)
57+
}
58+
59+
toDuration, durationErr := time.ParseDuration(*to)
60+
toTime, timeErr := time.Parse(time.RFC3339, *to)
61+
toNow := strings.ToLower(*to) == "now"
62+
var toStr string
63+
switch {
64+
case toNow:
65+
toStr = time.Now().Format(time.RFC3339)
66+
case durationErr == nil && timeErr != nil:
67+
toStr = time.Now().Add(neg(toDuration)).Format(time.RFC3339)
68+
case durationErr != nil && timeErr == nil:
69+
toStr = toTime.Format(time.RFC3339)
70+
default:
71+
return fmt.Errorf("couldn't parse -to (%q) as either duration or time", *to)
72+
}
73+
74+
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(
75+
"http://%s/store%s?from=%s&to=%s",
76+
hostport,
77+
store.APIPathDCSQuery,
78+
url.QueryEscape(fromStr),
79+
url.QueryEscape(toStr),
80+
), nil)
81+
if err != nil {
82+
return err
83+
}
84+
verbosePrintf("GET %s\n", req.URL.String())
85+
resp, err := http.DefaultClient.Do(req)
86+
if err != nil {
87+
return err
88+
}
89+
if resp.StatusCode != http.StatusOK {
90+
req.URL.RawQuery = "" // for pretty print
91+
return errors.Errorf("%s %s: %s", req.Method, req.URL.String(), resp.Status)
92+
}
93+
94+
switch {
95+
case *withulid:
96+
io.Copy(os.Stdout, resp.Body)
97+
case *withtime:
98+
io.Copy(os.Stdout, parseTime(resp.Body))
99+
default:
100+
io.Copy(os.Stdout, strip(resp.Body))
101+
}
102+
103+
return nil
104+
}

pkg/store/api.go

+25
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
APIPathInternalStream = "/_stream"
3131
APIPathReplicate = "/replicate"
3232
APIPathClusterState = "/_clusterstate"
33+
APIPathDCSQuery = "/dcsquery"
3334
)
3435

3536
// ClusterPeer models cluster.Peer.
@@ -111,6 +112,8 @@ func (a *API) ServeHTTP(w http.ResponseWriter, r *http.Request) {
111112
a.handleReplicate(w, r)
112113
case method == "GET" && path == APIPathClusterState:
113114
a.handleClusterState(w, r)
115+
case method == "GET" && path == APIPathDCSQuery:
116+
a.handleDCSQuery(w, r)
114117
default:
115118
http.NotFound(w, r)
116119
}
@@ -449,6 +452,28 @@ func (a *API) handleClusterState(w http.ResponseWriter, r *http.Request) {
449452
w.Write(buf)
450453
}
451454

455+
func (a *API) handleDCSQuery(w http.ResponseWriter, r *http.Request) {
456+
var qp QueryParams
457+
if err := qp.DecodeFrom(r.URL, rangeNotRequired); err != nil {
458+
http.Error(w, err.Error(), http.StatusBadRequest)
459+
return
460+
}
461+
462+
log, ok := a.log.(*fileLogDCS)
463+
if !ok {
464+
http.Error(w, "unsupported log type, must be fileLogDCS", http.StatusNotImplemented)
465+
return
466+
}
467+
468+
lr := log.DCSReader(qp)
469+
470+
_, err := io.Copy(w, lr)
471+
if err != nil {
472+
http.Error(w, err.Error(), http.StatusInternalServerError)
473+
return
474+
}
475+
}
476+
452477
func teeRecords(src io.Reader, dst ...io.Writer) (lo, hi ulid.ULID, n int, err error) {
453478
var (
454479
first = true

pkg/store/file_log_dcs.go

+81-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import (
1111
"storj.io/uplink"
1212
)
1313

14-
const uploadTimeout = 15 * time.Second
14+
const (
15+
uploadTimeout = 15 * time.Second
16+
downloadTimeout = 15 * time.Second
17+
)
1518

1619
var (
1720
_ Log = (*fileLogDCS)(nil)
@@ -133,3 +136,80 @@ func (t fileTrashSegmentDCS) Purge() error {
133136

134137
return t.fileTrashSegment.Purge() // The segment is in DCS now, and we're safe to delete it.
135138
}
139+
140+
func (fl *fileLogDCS) DCSReader(qp QueryParams) io.Reader {
141+
pr, pw := io.Pipe()
142+
143+
go func() {
144+
defer pw.Close()
145+
146+
ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout)
147+
defer cancel()
148+
149+
var totalBytes int64
150+
151+
// TODO: sorting the segment files from the bucket, an equivalent to sorting in queryMatchingSegments()
152+
iterator := fl.project.ListObjects(ctx, fl.bucketName, nil)
153+
for iterator.Next() {
154+
low, high, err := parseFilename(iterator.Item().Key)
155+
if err != nil {
156+
fl.reportDCSWarning(err, iterator.Item().Key)
157+
continue
158+
}
159+
160+
if !overlap(qp.From.ULID, qp.To.ULID, low, high) {
161+
continue
162+
}
163+
164+
download, err := fl.project.DownloadObject(ctx, fl.bucketName, iterator.Item().Key, nil)
165+
if err != nil {
166+
fl.reportDCSError(err, iterator.Item().Key)
167+
continue
168+
}
169+
170+
gzDownload, err := gzip.NewReader(download)
171+
if err != nil {
172+
download.Close()
173+
fl.reportDCSError(err, iterator.Item().Key)
174+
continue
175+
}
176+
177+
n, err := io.Copy(pw, gzDownload)
178+
if err != nil {
179+
gzDownload.Close()
180+
download.Close()
181+
fl.reportDCSError(err, iterator.Item().Key)
182+
continue
183+
}
184+
185+
gzDownload.Close()
186+
download.Close()
187+
188+
totalBytes += n
189+
}
190+
191+
fl.reporter.ReportEvent(Event{
192+
Debug: true,
193+
Op: "DCSReader",
194+
Msg: fmt.Sprintf("Downloaded %d bytes from DCS", totalBytes),
195+
})
196+
}()
197+
198+
return pr
199+
}
200+
201+
func (fl *fileLogDCS) reportDCSWarning(err error, filename string) {
202+
fl.reporter.ReportEvent(Event{
203+
Op: "DCSReader",
204+
File: fmt.Sprintf("sj://%s/%s", fl.bucketName, filename),
205+
Warning: err,
206+
})
207+
}
208+
209+
func (fl *fileLogDCS) reportDCSError(err error, filename string) {
210+
fl.reporter.ReportEvent(Event{
211+
Op: "DCSReader",
212+
File: fmt.Sprintf("sj://%s/%s", fl.bucketName, filename),
213+
Error: err,
214+
})
215+
}

0 commit comments

Comments
 (0)