1
1
import glob
2
2
import logging
3
3
import os
4
- from typing import Dict , Iterable
4
+ from typing import Dict , Iterable , Sequence
5
5
6
+ from common .credentials import Credentials , SSHKeypair , Username
7
+ from common .event_queue import IEventQueue
8
+ from common .events import CredentialsStolenEvent
6
9
from common .utils .attack_utils import ScanStatus
7
10
from infection_monkey .telemetry .attack .t1005_telem import T1005Telem
8
11
from infection_monkey .telemetry .attack .t1145_telem import T1145Telem
12
15
logger = logging .getLogger (__name__ )
13
16
14
17
DEFAULT_DIRS = ["/.ssh/" , "/" ]
18
+ SSH_CREDENTIAL_COLLECTOR_TAG = "ssh-credentials-collector"
19
+ T1003_ATTACK_TECHNIQUE_TAG = "attack-t1003"
20
+ T1005_ATTACK_TECHNIQUE_TAG = "attack-t1005"
21
+ T1145_ATTACK_TECHNIQUE_TAG = "attack-t1145"
15
22
23
+ SSH_COLLECTOR_EVENT_TAG = {
24
+ SSH_CREDENTIAL_COLLECTOR_TAG ,
25
+ T1003_ATTACK_TECHNIQUE_TAG ,
26
+ T1005_ATTACK_TECHNIQUE_TAG ,
27
+ T1145_ATTACK_TECHNIQUE_TAG ,
28
+ }
16
29
17
- def get_ssh_info (telemetry_messenger : ITelemetryMessenger ) -> Iterable [Dict ]:
30
+
31
+ def get_ssh_info (
32
+ telemetry_messenger : ITelemetryMessenger , event_queue : IEventQueue
33
+ ) -> Iterable [Dict ]:
18
34
# TODO: Remove this check when this is turned into a plugin.
19
35
if is_windows_os ():
20
36
logger .debug (
@@ -23,7 +39,7 @@ def get_ssh_info(telemetry_messenger: ITelemetryMessenger) -> Iterable[Dict]:
23
39
return []
24
40
25
41
home_dirs = _get_home_dirs ()
26
- ssh_info = _get_ssh_files (home_dirs , telemetry_messenger )
42
+ ssh_info = _get_ssh_files (home_dirs , telemetry_messenger , event_queue )
27
43
28
44
return ssh_info
29
45
@@ -62,9 +78,9 @@ def _get_ssh_struct(name: str, home_dir: str) -> Dict:
62
78
63
79
64
80
def _get_ssh_files (
65
- usr_info : Iterable [Dict ], telemetry_messenger : ITelemetryMessenger
81
+ user_info : Iterable [Dict ], telemetry_messenger : ITelemetryMessenger , event_queue : IEventQueue
66
82
) -> Iterable [Dict ]:
67
- for info in usr_info :
83
+ for info in user_info :
68
84
path = info ["home_dir" ]
69
85
for directory in DEFAULT_DIRS :
70
86
# TODO: Use PATH
@@ -101,6 +117,11 @@ def _get_ssh_files(
101
117
ScanStatus .USED , info ["name" ], info ["home_dir" ]
102
118
)
103
119
)
120
+
121
+ collected_credentials = to_credentials ([info ])
122
+ _publish_credentials_stolen_event (
123
+ collected_credentials , event_queue
124
+ )
104
125
else :
105
126
continue
106
127
except (IOError , OSError ):
@@ -112,5 +133,40 @@ def _get_ssh_files(
112
133
pass
113
134
except OSError :
114
135
pass
115
- usr_info = [info for info in usr_info if info ["private_key" ] or info ["public_key" ]]
116
- return usr_info
136
+ user_info = [info for info in user_info if info ["private_key" ] or info ["public_key" ]]
137
+ return user_info
138
+
139
+
140
+ def to_credentials (ssh_info : Iterable [Dict ]) -> Sequence [Credentials ]:
141
+ ssh_credentials = []
142
+
143
+ for info in ssh_info :
144
+ identity = None
145
+ secret = None
146
+
147
+ if info .get ("name" , "" ):
148
+ identity = Username (info ["name" ])
149
+
150
+ ssh_keypair = {}
151
+ for key in ["public_key" , "private_key" ]:
152
+ if info .get (key ) is not None :
153
+ ssh_keypair [key ] = info [key ]
154
+
155
+ if len (ssh_keypair ):
156
+ secret = SSHKeypair (
157
+ ssh_keypair .get ("private_key" , "" ), ssh_keypair .get ("public_key" , "" )
158
+ )
159
+
160
+ if any ([identity , secret ]):
161
+ ssh_credentials .append (Credentials (identity , secret ))
162
+
163
+ return ssh_credentials
164
+
165
+
166
+ def _publish_credentials_stolen_event (collected_credentials : Credentials , event_queue : IEventQueue ):
167
+ credentials_stolen_event = CredentialsStolenEvent (
168
+ tags = frozenset (SSH_COLLECTOR_EVENT_TAG ),
169
+ stolen_credentials = [collected_credentials ],
170
+ )
171
+
172
+ event_queue .publish (credentials_stolen_event )
0 commit comments