Skip to content

Commit a734b96

Browse files
Krzysztof Nagrodkiewiczmagickris93
Krzysztof Nagrodkiewicz
authored andcommitted
add support for openstack swift filesystem
1 parent c24a23d commit a734b96

File tree

4 files changed

+207
-0
lines changed

4 files changed

+207
-0
lines changed

examples/swift_write.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"time"
7+
8+
"github.com/ncw/swift"
9+
"github.com/xitongsys/parquet-go/reader"
10+
"github.com/xitongsys/parquet-go/writer"
11+
12+
"github.com/xitongsys/parquet-go-source/swift"
13+
)
14+
15+
type Student struct {
16+
Name string `parquet:"name=name, type=UTF8"`
17+
Age int32 `parquet:"name=age, type=INT32"`
18+
Id int64 `parquet:"name=id, type=INT64"`
19+
Weight float32 `parquet:"name=weight, type=FLOAT"`
20+
Sex bool `parquet:"name=sex, type=BOOLEAN"`
21+
Day int32 `parquet:"name=day, type=DATE"`
22+
}
23+
24+
func main() {
25+
connection := swift.Connection{
26+
UserName: "test_user",
27+
ApiKey: "passw0rd",
28+
AuthUrl: "http://localhost:35357/v3",
29+
Domain: "Default",
30+
Tenant: "test_project",
31+
}
32+
if err := connection.Authenticate(); err != nil {
33+
log.Print("Failed to authenticate to keystone: ", err)
34+
return
35+
}
36+
37+
containerName := "swift-parquet-test"
38+
fileName := "flat.parquet"
39+
num := 100
40+
41+
log.Println("Write started")
42+
fw, err := swiftsource.NewSwiftFileWriter(containerName, fileName, &connection)
43+
if err != nil {
44+
log.Println("Failed to create file: ", err)
45+
return
46+
}
47+
48+
//write
49+
pw, err := writer.NewParquetWriter(fw, new(Student), 4)
50+
if err != nil {
51+
log.Println("Failed to create parquet writer: ", err)
52+
return
53+
}
54+
55+
for i := 0; i < num; i++ {
56+
stu := Student{
57+
Name: fmt.Sprintf("Student-%d", i),
58+
Age: int32(20 + i%5),
59+
Id: int64(i),
60+
Weight: 50.0 + float32(i)*0.1,
61+
Sex: i%2 == 0,
62+
Day: int32(time.Now().Unix() / 3600 / 24),
63+
}
64+
if err = pw.Write(stu); err != nil {
65+
log.Println("Write error: ", err)
66+
}
67+
}
68+
if err = pw.WriteStop(); err != nil {
69+
log.Println("WriteStop error: ", err)
70+
return
71+
}
72+
73+
if err = fw.Close(); err != nil {
74+
log.Println("Failed to close writer: ", err)
75+
}
76+
log.Println("Write finished")
77+
78+
///read
79+
log.Println("Read started")
80+
fr, err := swiftsource.NewSwiftFileReader(containerName, fileName, &connection)
81+
if err != nil {
82+
log.Println("Failed to open file: ", err)
83+
return
84+
}
85+
86+
pr, err := reader.NewParquetReader(fr, new(Student), 4)
87+
if err != nil {
88+
log.Println("Failed to create parquet reader: ", err)
89+
return
90+
}
91+
num = int(pr.GetNumRows())
92+
for i := 0; i < num/10; i++ {
93+
stus := make([]Student, 10) //read 10 rows
94+
if err = pr.Read(&stus); err != nil {
95+
log.Println("Read error: ", err)
96+
}
97+
log.Println(stus)
98+
}
99+
pr.ReadStop()
100+
if err = fr.Close(); err != nil {
101+
log.Println("Failed to close reader: ", err)
102+
}
103+
log.Println("Read finished")
104+
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/aws/aws-sdk-go v1.30.19
1010
github.com/colinmarc/hdfs/v2 v2.1.1
1111
github.com/golang/mock v1.4.3
12+
github.com/ncw/swift v1.0.52
1213
github.com/spf13/afero v1.2.2
1314
github.com/xitongsys/parquet-go v1.5.1
1415
)

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
132132
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc=
133133
github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI=
134134
github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
135+
github.com/ncw/swift v1.0.52 h1:ACF3JufDGgeKp/9mrDgQlEgS8kRYC4XKcuzj/8EJjQU=
136+
github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
135137
github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
136138
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
137139
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

swift/swift.go

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package swiftsource
2+
3+
import (
4+
"github.com/ncw/swift"
5+
"github.com/xitongsys/parquet-go/source"
6+
)
7+
8+
type SwiftFile struct {
9+
Connection *swift.Connection
10+
11+
Container string
12+
FilePath string
13+
14+
FileReader *swift.ObjectOpenFile
15+
FileWriter *swift.ObjectCreateFile
16+
}
17+
18+
func newSwiftFile(containerName string, filePath string, conn *swift.Connection) *SwiftFile {
19+
return &SwiftFile{
20+
Connection: conn,
21+
Container: containerName,
22+
FilePath: filePath,
23+
}
24+
}
25+
26+
func NewSwiftFileReader(container string, filePath string, conn *swift.Connection) (source.ParquetFile, error) {
27+
res := newSwiftFile(container, filePath, conn)
28+
return res.Open(filePath)
29+
}
30+
31+
func NewSwiftFileWriter(container string, filePath string, conn *swift.Connection) (source.ParquetFile, error) {
32+
res := newSwiftFile(container, filePath, conn)
33+
return res.Create(filePath)
34+
}
35+
36+
func (file *SwiftFile) Open(name string) (source.ParquetFile, error) {
37+
if name == "" {
38+
name = file.FilePath
39+
}
40+
41+
fr, _, err := file.Connection.ObjectOpen(file.Container, name, false, nil)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
res := &SwiftFile{
47+
Connection: file.Connection,
48+
Container: file.Container,
49+
FilePath: name,
50+
FileReader: fr,
51+
}
52+
53+
return res, nil
54+
}
55+
56+
func (file *SwiftFile) Create(name string) (source.ParquetFile, error) {
57+
if name == "" {
58+
name = file.FilePath
59+
}
60+
61+
fw, err := file.Connection.ObjectCreate(file.Container, name, false, "", "", nil)
62+
if err != nil {
63+
return nil, err
64+
}
65+
66+
res := &SwiftFile{
67+
Connection: file.Connection,
68+
Container: file.Container,
69+
FilePath: name,
70+
FileWriter: fw,
71+
}
72+
73+
return res, nil
74+
}
75+
76+
func (file *SwiftFile) Read(b []byte) (n int, err error) {
77+
return file.FileReader.Read(b)
78+
}
79+
80+
func (file *SwiftFile) Seek(offset int64, whence int) (int64, error) {
81+
return file.FileReader.Seek(offset, whence)
82+
}
83+
84+
func (file *SwiftFile) Write(p []byte) (n int, err error) {
85+
return file.FileWriter.Write(p)
86+
}
87+
88+
func (file *SwiftFile) Close() error {
89+
if file.FileWriter != nil {
90+
if err := file.FileWriter.Close(); err != nil {
91+
return err
92+
}
93+
}
94+
if file.FileReader != nil {
95+
if err := file.FileReader.Close(); err != nil {
96+
return err
97+
}
98+
}
99+
return nil
100+
}

0 commit comments

Comments
 (0)