-
Notifications
You must be signed in to change notification settings - Fork 152
/
Copy pathhelper_functions.py
287 lines (234 loc) · 8.23 KB
/
helper_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python
#
# Copyright (C) 2017-2020 Alex Manuskin, Gil Tsuker
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
""" Helper functions module with common useful functions """
import csv
import json
import logging
import os
import platform
import re
import subprocess
import sys
import time
from collections import OrderedDict
__version__ = "1.1.6"
_DEFAULT = object()
PY3 = sys.version_info[0] == 3
POSIX = os.name == "posix"
ENCODING = sys.getfilesystemencoding()
if not PY3:
ENCODING_ERRS = "replace"
else:
try:
ENCODING_ERRS = sys.getfilesystemencodeerrors() # py 3.6
except AttributeError:
ENCODING_ERRS = "surrogateescape" if POSIX else "replace"
def get_processor_name():
"""Returns the processor name in the system"""
if platform.system() == "Linux":
with open("/proc/cpuinfo", "rb") as cpuinfo:
all_info = cpuinfo.readlines()
for line in all_info:
if b"model name" in line:
return re.sub(rb".*model name.*:", b"", line, count=1)
elif platform.system() == "FreeBSD":
cmd = ["sysctl", "-n", "hw.model"]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return process.stdout.read()
elif platform.system() == "Darwin":
cmd = ["sysctl", "-n", "machdep.cpu.brand_string"]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return process.stdout.read()
return platform.processor()
def kill_child_processes(parent_proc):
"""Kills a process and all its children"""
logging.debug("Killing stress process")
try:
for proc in parent_proc.children(recursive=True):
logging.debug("Killing %s", proc)
proc.kill()
parent_proc.kill()
except AttributeError:
logging.debug("No such process")
logging.debug("Could not kill process")
def output_to_csv(sources, csv_writeable_file):
"""Print statistics to csv file"""
file_exists = os.path.isfile(csv_writeable_file)
with open(csv_writeable_file, "a", encoding="utf-8") as csvfile:
csv_dict = OrderedDict()
csv_dict.update({"Time": time.strftime("%Y-%m-%d_%H:%M:%S")})
summaries = [val for key, val in sources.items()]
for summarie in summaries:
update_dict = {}
for prob_, val in summarie.source.get_sensors_summary().items():
prob = summarie.source.get_source_name() + ":" + prob_
update_dict[prob] = val
csv_dict.update(update_dict)
fieldnames = [key for key, val in csv_dict.items()]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists:
writer.writeheader() # file doesn't exist yet, write a header
writer.writerow(csv_dict)
def output_to_terminal(sources):
"""Print statistics to the terminal"""
results = OrderedDict()
for source in sources:
if source.get_is_available():
source.update()
source_name = source.get_source_name()
results[source_name] = source.get_sensors_summary()
for key, value in results.items():
sys.stdout.write(str(key) + ": ")
for skey, svalue in value.items():
sys.stdout.write(str(skey) + ": " + str(svalue) + ", ")
sys.stdout.write("\n")
sys.exit()
def output_to_json(sources):
"""Print statistics to the terminal in Json format"""
results = OrderedDict()
for source in sources:
if source.get_is_available():
source.update()
source_name = source.get_source_name()
results[source_name] = source.get_sensors_summary()
print(json.dumps(results, indent=4))
sys.exit()
def get_user_config_dir():
"""
Return the path to the user s-tui config directory
"""
user_home = os.getenv("XDG_CONFIG_HOME")
if user_home is None or not user_home:
config_path = os.path.expanduser(os.path.join("~", ".config", "s-tui"))
else:
config_path = os.path.join(user_home, "s-tui")
return config_path
def get_config_dir():
"""
Return the path to the user home config directory
"""
user_home = os.getenv("XDG_CONFIG_HOME")
if user_home is None or not user_home:
config_path = os.path.expanduser(os.path.join("~", ".config"))
else:
config_path = user_home
return config_path
def get_user_config_file():
"""
Return the path to the user s-tui config directory
"""
user_home = os.getenv("XDG_CONFIG_HOME")
if user_home is None or not user_home:
config_path = os.path.expanduser(
os.path.join("~", ".config", "s-tui", "s-tui.conf")
)
else:
config_path = os.path.join(user_home, "s-tui", "s-tui.conf")
return config_path
def user_config_dir_exists():
"""
Check whether the user s-tui config dir exists or not
"""
return os.path.isdir(get_user_config_dir())
def config_dir_exists():
"""
Check whether the home config dir exists or not
"""
return os.path.isdir(get_config_dir())
def user_config_file_exists():
"""
Check whether the user s-tui config file exists or not
"""
return os.path.isfile(get_user_config_file())
def make_user_config_dir():
"""
Create the user s-tui config directory if it doesn't exist
"""
config_dir = get_config_dir()
config_path = get_user_config_dir()
if not config_dir_exists():
try:
os.mkdir(config_dir)
except OSError:
return None
if not user_config_dir_exists():
try:
os.mkdir(config_path)
os.mkdir(os.path.join(config_path, "hooks.d"))
except OSError:
return None
return config_path
def seconds_to_text(secs):
"""Converts seconds to a string of hours:minutes:seconds"""
hours = (secs) // 3600
minutes = (secs - hours * 3600) // 60
seconds = secs - hours * 3600 - minutes * 60
return "%02d:%02d:%02d" % (hours, minutes, seconds)
def str_to_bool(string):
"""Converts a string to a boolean"""
if string == "True":
return True
if string == "False":
return False
raise ValueError
def which(program):
"""Find the path of an executable"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, _ = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def _open_binary(fname, **kwargs):
return open(fname, "rb", **kwargs)
def _open_text(fname, **kwargs):
"""On Python 3 opens a file in text mode by using fs encoding and
a proper en/decoding errors handler.
On Python 2 this is just an alias for open(name, 'rt').
"""
if PY3:
kwargs.setdefault("encoding", ENCODING)
kwargs.setdefault("errors", ENCODING_ERRS)
return open(fname, "rt", **kwargs)
def cat(fname, fallback=_DEFAULT, binary=True):
"""Return file content.
fallback: the value returned in case the file does not exist or
cannot be read
binary: whether to open the file in binary or text mode.
"""
try:
with _open_binary(fname) if binary else _open_text(fname) as f_d:
return f_d.read().strip()
except (IOError, OSError):
if fallback is not _DEFAULT:
return fallback
raise