-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate-s3-objects.py
198 lines (163 loc) · 6.18 KB
/
create-s3-objects.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python3
import argparse
import concurrent.futures
import glob
import logging
import os
import threading
from io import BytesIO
import base64
import boto3
from botocore.exceptions import ClientError
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Thread-local storage for boto3 clients
thread_local = threading.local()
boto3_client_lock = threading.Lock()
# Constants
NUM_WORKERS = 10
CHUNK_SIZE = 100
OBJECT_PREFIX = ''
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description='Read sanctioned addresses from TXT files and create empty S3 objects.'
)
parser.add_argument(
'-d', '--directory',
type=str,
default='./data',
help='Directory containing sanctioned_addresses_*.txt files (default: ./data)'
)
parser.add_argument(
'-b', '--bucket',
type=str,
required=True,
help='S3 bucket name where objects will be created'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Perform a dry run without creating S3 objects'
)
return parser.parse_args()
def read_sanctioned_addresses(directory):
"""
Read all sanctioned_addresses_*.txt files in the given directory
and return a set of unique addresses.
"""
unique_addresses = set()
file_pattern = os.path.join(directory, 'sanctioned_addresses_*.txt')
files = glob.glob(file_pattern)
if not files:
logger.warning(f"No sanctioned_addresses_*.txt files found in {directory}")
return unique_addresses
logger.info(f"Found {len(files)} sanctioned address files")
for file_path in files:
try:
with open(file_path) as f:
addresses = [line.strip() for line in f if line.strip()]
logger.info(f"Read {len(addresses)} addresses from {os.path.basename(file_path)}")
unique_addresses.update(addresses)
except Exception as e:
logger.error(f"Error reading {file_path}: {e}")
logger.info(f"Total unique addresses: {len(unique_addresses)}")
return unique_addresses
def get_s3_client(session):
"""Get thread-local S3 client"""
if not hasattr(thread_local, 's3_client'):
with boto3_client_lock:
thread_local.s3_client = session.client('s3')
return thread_local.s3_client
def create_s3_object(address, bucket, prefix, dry_run, session):
"""Create a single empty S3 object for the given address."""
# Encode the address using urlsafe_b64encode and remove trailing equals signs
encoded_address = base64.urlsafe_b64encode(address.encode()).decode().rstrip('=')
object_key = f"{prefix}{encoded_address}"
if dry_run:
logger.info(f"DRY RUN: Would create S3 object s3://{bucket}/{object_key} ({address})")
return True, None
try:
# Get thread-local S3 client
s3_client = get_s3_client(session)
# Create an empty in-memory file-like object
empty_file = BytesIO(b'')
# Use upload_fileobj with default retry behavior
s3_client.upload_fileobj(
Fileobj=empty_file,
Bucket=bucket,
Key=object_key
)
return True, None
except ClientError as e:
return False, f"Error creating S3 object for {address}: {e}"
def process_address_chunk(addresses_chunk, bucket, prefix, dry_run, session):
"""Process a chunk of addresses."""
results = {
'success': 0,
'errors': 0
}
for address in addresses_chunk:
success, error = create_s3_object(address, bucket, prefix, dry_run, session)
if success:
results['success'] += 1
else:
results['errors'] += 1
logger.error(error)
return results
def create_s3_objects(addresses, bucket, prefix, dry_run, session, workers=NUM_WORKERS, chunk_size=CHUNK_SIZE):
"""Create empty S3 objects for each address using thread pool."""
addresses_list = list(addresses)
total_addresses = len(addresses_list)
logger.info(f"Starting to create {total_addresses} S3 objects using {workers} worker threads")
# Create chunks of addresses to process
address_chunks = [addresses_list[i:i + chunk_size] for i in range(0, total_addresses, chunk_size)]
created_count = 0
error_count = 0
# Use ThreadPoolExecutor to process chunks in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# Submit all chunks to the executor
future_to_chunk = {
executor.submit(
process_address_chunk, chunk, bucket, prefix, dry_run, session
): i for i, chunk in enumerate(address_chunks)
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_chunk):
chunk_index = future_to_chunk[future]
try:
results = future.result()
created_count += results['success']
error_count += results['errors']
logger.info(f"Completed chunk {chunk_index+1}/{len(address_chunks)}, "
f"total progress: {created_count + error_count}/{total_addresses} "
f"({(created_count + error_count) / total_addresses * 100:.1f}%)")
except Exception as e:
logger.error(f"Exception processing chunk {chunk_index}: {e}")
logger.info(f"Successfully created {created_count} S3 objects")
if error_count > 0:
logger.warning(f"Failed to create {error_count} S3 objects")
def main():
args = parse_arguments()
session = boto3.Session()
# Read sanctioned addresses
addresses = read_sanctioned_addresses(args.directory)
if not addresses:
logger.error("No addresses found. Exiting.")
return
# Create S3 objects
create_s3_objects(
addresses=addresses,
bucket=args.bucket,
prefix=OBJECT_PREFIX,
dry_run=args.dry_run,
session=session,
workers=NUM_WORKERS,
chunk_size=CHUNK_SIZE
)
if __name__ == "__main__":
main()