-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
173 lines (134 loc) · 5.67 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import pymysql.cursors
import pymysql
from request_send import get_resp_data
import time
from rich import print
# ! 表 名称
table_name = "epidemic_data"
# ! 数据库连接信息
connection = pymysql.connect(
# 地址
host="diana.mshome.net",
# 用户名
user="diana",
# 密码
passwd="max1ndex",
# 数据库名
database="demos",
# 端口
port=3306,
charset="utf8",
cursorclass=pymysql.cursors.DictCursor,
)
# 请求的返回
data = get_resp_data()
# YYYY-mm-dd 格式 今天日期
date_today = time.strftime("%Y-%m-%d", time.gmtime())
# YYYY-mm-dd 数据的最新更新时间
date_end_update = data["end_update_time"][:-1] + ":00:00"
# 统计计数使用
statistic = {"update": 0, "insert": 0}
# 执行标记
result = None
# 查找当前社区是否存在于数据库中
with connection:
with connection.cursor() as cursor:
check_if_community_exists = (
"select id from {table_name} where date = '{date}'".format(
table_name=table_name,
date=date_today,
)
)
cursor.execute(check_if_community_exists)
result = cursor.fetchone()
if result is not None:
# 今日的所有风险设置为 none
with connection.cursor() as cursor:
risk_default_set = (
"update {table_name} set rank = 'none' where date = '{date}'".format(
table_name=table_name, date=date_today
)
)
cursor.execute(risk_default_set)
connection.commit()
# 高风险地区数
high_risk_count = data["hcount"]
# 中风险地区数
mid_risk_count = data["mcount"]
# 计数 用于判断风险等级
cnt = 0
print(":red_circle: 高风险地区数 - {}".format(high_risk_count))
# * 所有的地区
for risk_city in data["highlist"] + data["middlelist"] + data["lowlist"]:
# 省份和城市名
province_name = risk_city["province"]
city_name = risk_city["city"]
# 地区名
area_name = risk_city["area_name"].replace(" ", "") # 去空格
# 区或县城
county_name = risk_city["county"]
# ! 无具体社区信息
if len(risk_city["communitys"]) == 0:
risk_city["communitys"].append(" ")
# 该地区下 所有的社区
for community_name in risk_city["communitys"]:
# ---------判断当前的风险等级-------------
if cnt >= (mid_risk_count + high_risk_count):
risk_level = 'low'
elif cnt >= (high_risk_count):
risk_level = 'mid'
else:
risk_level = 'high'
# -------------------------------------
if result == None:
# *表中无今日数据 -> 插入
with connection.cursor() as cursor:
insert_new_community = "insert into {table} (`date`, `update_date`, `rank`, `area_name`, `province`, `city`, `county`) values('{date}', '{update_date}', '{rank}', '{area_name}', '{province}', '{city}' , '{county}')".format(
table=table_name,
date=date_today,
update_date=date_end_update,
rank=risk_level,
area_name= str(area_name + community_name).rstrip(),
province=province_name,
city=city_name,
county = county_name
)
cursor.execute(insert_new_community)
statistic["insert"] += 1
else:
# *有今日的的数据 -> 更新
with connection.cursor() as cursor:
# 以 (当天日期, 地区全名) 为搜索关键字 对搜索结果地区的 (时间, 风险等级) 做更新
update_community = """
update {table} set `update_date` = '{update_date}', rank='{rank}'
where date = '{date}' and area_name = '{area_name}'
""".format(
table=table_name,
date=date_today,
update_date=date_end_update,
rank=risk_level,
area_name=str(area_name + community_name).rstrip(),
)
cursor.execute(update_community)
statistic["update"] += 1
connection.commit()
# * 输出信息
level = ""
if risk_level == "high":
level = ":small_red_triangle: [red1]高风险[/red1]"
elif risk_level == "mid":
level = ":small_orange_diamond: [orange_red1]中风险[/orange_red1]"
else:
level = ":small_blue_diamond: [blue]低风险[/blue]"
print(
":bento: {opt} - {level} - [dark_cyan]{area}[/dark_cyan] ".format(
opt="[gold1]插入[/gold1]"
if result == None
else "[orchid2]更新[/orchid2]",
area=area_name + community_name,
level= level
)
)
# 计数 用于判断风险等级
cnt += 1
print("完成: 更新 - {} 次 插入 - {} 次".format(statistic["update"], statistic["insert"]))