1
+ import hashlib
2
+ import secrets
1
3
from functools import lru_cache
2
4
from typing import Any , Mapping
3
5
12
14
StorageError ,
13
15
UnknownRecordError ,
14
16
)
15
- from monkey_island .cc .server_utils .encryption import ILockableEncryptor
16
17
17
18
from .i_otp_repository import IOTPRepository
18
19
@@ -21,21 +22,36 @@ class MongoOTPRepository(IOTPRepository):
21
22
def __init__ (
22
23
self ,
23
24
mongo_client : MongoClient ,
24
- encryptor : ILockableEncryptor ,
25
25
):
26
- self ._encryptor = encryptor
26
+ # SECURITY: A new salt is generated for each instance of this repository. This effectively
27
+ # makes all preexisting OTPS invalid on Island startup.
28
+ self ._salt = secrets .token_bytes (16 )
29
+
27
30
self ._otp_collection = mongo_client .monkey_island .otp
28
31
self ._otp_collection .create_index ("otp" , unique = True )
29
32
30
33
def insert_otp (self , otp : OTP , expiration : float ):
31
34
try :
32
- encrypted_otp = self ._encryptor .encrypt (otp .get_secret_value ().encode ())
33
35
self ._otp_collection .insert_one (
34
- {"otp" : encrypted_otp , "expiration_time" : expiration , "used" : False }
36
+ {"otp" : self . _hash_otp ( otp ) , "expiration_time" : expiration , "used" : False }
35
37
)
36
38
except Exception as err :
37
39
raise StorageError (f"Error inserting OTP: { err } " )
38
40
41
+ def _hash_otp (self , otp : OTP ) -> bytes :
42
+ # SECURITY: A single round of salted SHA256 is usually not considered sufficient for
43
+ # protecting passwords. However, OTPs have a very short life span (2 minutes at the time of
44
+ # this writing). Additionally, they can only be used once. Finally, they are 32 bytes long.
45
+ # At the present time, we consider this to be sufficient protection. I'm unaware of any
46
+ # technology in existence that can brute force SHA256 for (roughly) 48-byte inputs in under
47
+ # 2 minutes.
48
+ #
49
+ # Note that if any of these conditions change (timeouts become very long or OTPs become very
50
+ # short), this should be revisited. For now, we prefer the significantly faster performance
51
+ # of a single round of salted SHA256 over a more secure but slower algorithm.
52
+ otp_bytes = otp .get_secret_value ().encode ()
53
+ return hashlib .sha256 (self ._salt + otp_bytes ).digest ()
54
+
39
55
def set_used (self , otp : OTP ):
40
56
try :
41
57
otp_id = self ._get_otp_object_id (otp )
@@ -71,11 +87,10 @@ def _get_otp_document(self, otp: OTP) -> Mapping[str, Any]:
71
87
72
88
@lru_cache
73
89
def _get_otp_object_id (self , otp : OTP ) -> ObjectId :
74
- otp_str = otp .get_secret_value ()
75
-
76
90
try :
77
- encrypted_otp = self ._encryptor .encrypt (otp_str .encode ())
78
- otp_dict = self ._otp_collection .find_one ({"otp" : encrypted_otp }, [MONGO_OBJECT_ID_KEY ])
91
+ otp_dict = self ._otp_collection .find_one (
92
+ {"otp" : self ._hash_otp (otp )}, [MONGO_OBJECT_ID_KEY ]
93
+ )
79
94
except Exception as err :
80
95
raise RetrievalError (f"Error retrieving OTP: { err } " )
81
96
0 commit comments