forked from guardicore/monkey
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpost_breach_handler.py
45 lines (35 loc) · 1.23 KB
/
post_breach_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import logging
from multiprocessing.dummy import Pool
from typing import Sequence
from infection_monkey.post_breach.pba import PBA
from infection_monkey.utils.environment import is_windows_os
LOG = logging.getLogger(__name__)
__author__ = 'VakarisZ'
PATH_TO_ACTIONS = "infection_monkey.post_breach.actions."
class PostBreach(object):
"""
This class handles post breach actions execution
"""
def __init__(self):
self.os_is_linux = not is_windows_os()
self.pba_list = self.config_to_pba_list()
def execute_all_configured(self):
"""
Executes all post breach actions.
"""
with Pool(5) as pool:
pool.map(self.run_pba, self.pba_list)
LOG.info("All PBAs executed. Total {} executed.".format(len(self.pba_list)))
@staticmethod
def config_to_pba_list() -> Sequence[PBA]:
"""
:return: A list of PBA objects.
"""
return PBA.get_instances()
def run_pba(self, pba):
try:
LOG.debug("Executing PBA: '{}'".format(pba.name))
pba.run()
LOG.debug(f"Execution of {pba.name} finished")
except Exception as e:
LOG.error("PBA {} failed. Error info: {}".format(pba.name, e))