forked from eth0izzle/bucket-stream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbucket-stream.py
125 lines (96 loc) · 5.05 KB
/
bucket-stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import argparse, logging
import yaml
import boto3
import certstream
import tldextract
import requests
from requests.adapters import HTTPAdapter
from queue import Queue
from threading import Thread
S3_URL = "http://s3-1-w.amazonaws.com"
BUCKET_HOST = "%s.s3.amazonaws.com"
QUEUE_SIZE = 100
CHECKED_BUCKETS = list()
FOUND_COUNT = 0
KEYWORDS = [line.strip() for line in open('keywords.txt')]
BUCKET_QUEUE = Queue(maxsize=QUEUE_SIZE)
ARGS = argparse.Namespace()
CONFIG = yaml.safe_load(open('config.yaml'))
S3_CLIENT = boto3.client('s3', aws_access_key_id=CONFIG['aws_access_key'], aws_secret_access_key=CONFIG['aws_secret'])
class BucketWorker(Thread):
def __init__(self, q, *args, **kwargs):
self.q = q
self.session = requests.Session()
self.session.mount("http://", HTTPAdapter(pool_connections=ARGS.threads, pool_maxsize=QUEUE_SIZE, max_retries=0))
super().__init__(*args, **kwargs)
def run(self):
global FOUND_COUNT
while True:
try:
bucket_url = self.q.get()
check_response = self.session.head(S3_URL, timeout=3, headers={"Host": bucket_url})
if check_response.status_code == 307: # valid bucket, lets check if its public
new_bucket_url = check_response.headers["Location"]
bucket_response = requests.request('GET' if ARGS.only_interesting else 'HEAD', new_bucket_url, timeout=3)
if bucket_response.status_code == 200: # bucket is public!
if not ARGS.only_interesting or (ARGS.only_interesting and any(keyword in bucket_response.text for keyword in KEYWORDS)):
bucket_owner = None
if CONFIG['aws_access_key'] and CONFIG['aws_secret']:
try:
result = S3_CLIENT.get_bucket_acl(Bucket=bucket_url.replace(".s3.amazonaws.com", ""))
bucket_owner = result['Owner']['DisplayName']
except:
pass
print("%s is public%s" % (new_bucket_url, (", owned by " + bucket_owner) if bucket_owner is not None else ""))
FOUND_COUNT += 1
except:
pass
self.q.task_done()
def listen(message, context):
if message["message_type"] == "heartbeat":
return
if message["message_type"] == "certificate_update":
all_domains = message["data"]["leaf_cert"]["all_domains"]
if ARGS.skip_lets_encrypt and "Let's Encrypt" in message["data"]["chain"][0]["subject"]["aggregated"]:
return
for domain in set(all_domains):
# cut the crap
if not domain.startswith("*.") and "cloudflaressl" not in domain and "xn--" not in domain and domain.count("-") < 4 and domain.count(".") < 4:
for permutation in get_permutations(tldextract.extract(domain)):
bucket_url = BUCKET_HOST % permutation
if bucket_url not in CHECKED_BUCKETS:
CHECKED_BUCKETS.append(bucket_url)
BUCKET_QUEUE.put(bucket_url)
if len(CHECKED_BUCKETS) % 100 == 0:
print("%s buckets checked. %s buckets found" % (len(CHECKED_BUCKETS), FOUND_COUNT))
def get_permutations(parsed_domain):
perms = [
"%s" % parsed_domain.domain,
"www-%s" % parsed_domain.domain,
"%s-www" % parsed_domain.domain,
"%s-%s" % (parsed_domain.subdomain, parsed_domain.domain) if parsed_domain.subdomain else "",
"%s-backup" % parsed_domain.domain,
"%s-dev" % parsed_domain.domain,
"%s-uat" % parsed_domain.domain
]
return filter(None, perms)
def main():
parser = argparse.ArgumentParser(description="Find interesting Amazon S3 Buckets by watching certificate transparency logs.",
usage="python bucket-stream.py",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--only-interesting", action="store_true", dest="only_interesting", default=False,
help="Only log 'interesting' buckets whose contents match anything within keywords.txt")
parser.add_argument("--skip-lets-encrypt", action="store_true", dest="skip_lets_encrypt", default=False,
help="Skip certs (and thus listed domains) issued by Let's Encrypt CA")
parser.add_argument("-t", "--threads", metavar="", type=int, dest="threads", default=20,
help="Number of threads to spawn. More threads = more power.")
parser.parse_args(namespace=ARGS)
logging.disable(logging.WARNING)
for _ in range(1, ARGS.threads):
BucketWorker(BUCKET_QUEUE).start()
print("Waiting for certstream events - this could take a few minutes to queue up...")
certstream.listen_for_events(listen) #blocking
print("Qutting - waiting for threads to finish up...")
BUCKET_QUEUE.join()
if __name__ == "__main__":
main()