-
Notifications
You must be signed in to change notification settings - Fork 186
/
Copy pathtest_DeleteApi.py
123 lines (92 loc) · 5.87 KB
/
test_DeleteApi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from datetime import datetime
from pytz import UTC
from influxdb_client import PermissionResource, Permission, InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from tests.base_test import BaseTest
class DeleteApiTest(BaseTest):
def setUp(self) -> None:
super(DeleteApiTest, self).setUp()
response = self.buckets_api.find_buckets()
for bucket in response.buckets:
if bucket.name.endswith("_IT"):
self.buckets_api.delete_bucket(bucket)
self.bucket = self.create_test_bucket()
self.organization = self.find_my_org()
resource = PermissionResource(type="buckets", org_id=self.organization.id, id=self.bucket.id)
read_bucket = Permission(resource=resource, action="read")
write_bucket = Permission(resource=resource, action="write")
authorization = self.client.authorizations_api().create_authorization(org_id=self.organization.id,
permissions=[read_bucket, write_bucket])
self.auth_token = authorization.token
self.client.close()
self.client = InfluxDBClient(url=self.host, token=self.auth_token, org=self.org)
self.delete_api = self.client.delete_api()
def test_delete_buckets(self):
self._write_data()
q = f'from(bucket:\"{self.bucket.name}\") |> range(start: 1970-01-01T00:00:00.000000001Z)'
flux_tables = self.client.query_api().query(query=q, org=self.organization.id)
self.assertEqual(len(flux_tables), 1)
self.assertEqual(len(flux_tables[0].records), 12)
start = "1970-01-01T00:00:00.000000001Z"
stop = "1970-01-01T00:00:00.000000012Z"
self.delete_api.delete(start, stop, "", bucket=self.bucket.id, org=self.organization.id)
flux_tables2 = self.client.query_api().query(
f'from(bucket:"{self.bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)',
org=self.organization.id)
self.assertEqual(len(flux_tables2), 0)
def test_delete_buckets_by_name(self):
self._write_data()
q = f'from(bucket:\"{self.bucket.name}\") |> range(start: 1970-01-01T00:00:00.000000001Z)'
flux_tables = self.client.query_api().query(query=q, org=self.organization.id)
self.assertEqual(len(flux_tables), 1)
self.assertEqual(len(flux_tables[0].records), 12)
start = "1970-01-01T00:00:00.000000001Z"
stop = "1970-01-01T00:00:00.000000012Z"
self._delete_and_verify(start, stop, self.organization.name)
def test_delete_org_parameters_types(self):
orgs = [
self.organization,
self.organization.id,
self.organization.name,
None
]
for org in orgs:
self._write_data()
self._delete_and_verify("1970-01-01T00:00:00.000000001Z", "1970-01-01T00:00:00.000000012Z", org)
def test_start_stop_types(self):
starts_stops = [
("1970-01-01T00:00:00.000000001Z", "1970-01-01T00:00:00.000000012Z"),
(datetime(1970, 1, 1, 0, 0, 0, 0, UTC), datetime(1970, 1, 1, 0, 0, 0, 1, UTC)),
(datetime(1970, 1, 1, 0, 0, 0, 0), datetime(1970, 1, 1, 0, 0, 0, 1))
]
for start_stop in starts_stops:
self._write_data()
self._delete_and_verify(start_stop[0], start_stop[1], self.organization.name)
def _delete_and_verify(self, start, stop, org):
self.delete_api.delete(start, stop, "", bucket=self.bucket.name, org=org)
flux_tables = self.client.query_api().query(
f'from(bucket:"{self.bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)',
org=self.organization.id)
self.assertEqual(len(flux_tables), 0)
def _write_data(self):
write_api = self.client.write_api(write_options=SYNCHRONOUS)
p1 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 7.0).time(1)
write_api.write(bucket=self.bucket.name, org=self.organization.name, record=p1)
p2 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 8.0).time(2)
write_api.write(bucket=self.bucket.name, org=self.organization.name, record=p2)
p3 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 9.0).time(3)
p4 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 10.0).time(4)
write_api.write(bucket=self.bucket.name, org=self.organization.name, record=[p3, p4])
p5 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 11.0).time(5)
p6 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 12.0).time(6)
write_api.write(bucket=self.bucket.name, org=self.organization.name, record=[p5, p6])
p7 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 8.0).time(7)
write_api.write(bucket=self.bucket.name, org=self.organization.name, record=p7)
p8 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 9.0).time(8)
write_api.write(bucket=self.bucket.name, org=self.organization.name, record=p8)
p9 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 9.0).time(9)
p10 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 11.0).time(10)
write_api.write(bucket=self.bucket.name, org=self.organization.name, record=[p9, p10])
p11 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 11.0).time(11)
p12 = Point(measurement_name="h2o").tag("location", "coyote_creek").field("watter_level", 13.0).time(12)
write_api.write(bucket=self.bucket.name, org=self.organization.name, record=[p11, p12])