Skip to content

Commit 790f39b

Browse files
committed
Add endpoint cache
1 parent 54f0a34 commit 790f39b

File tree

1 file changed

+67
-20
lines changed

1 file changed

+67
-20
lines changed

cpp/src/arrow/filesystem/s3fs.cc

+67-20
Original file line numberDiff line numberDiff line change
@@ -914,21 +914,21 @@ Result<std::shared_ptr<S3ClientHolder>> GetClientHolder(
914914
// S3 client factory: build S3Client from S3Options
915915

916916
struct EndpointConfigKey {
917-
explicit EndpointConfigKey(const S3Options& options)
918-
: region(options.region),
919-
scheme(options.scheme),
920-
endpoint_override(options.endpoint_override),
921-
force_virtual_addressing(options.force_virtual_addressing) {}
922-
923-
std::string region;
924-
std::string scheme;
925-
std::string endpoint_override;
926-
bool force_virtual_addressing;
927-
928-
bool operator==(const EndpointConfigKey& other) {
917+
explicit EndpointConfigKey(const Aws::S3::S3ClientConfiguration& config)
918+
: region(config.region),
919+
scheme(config.scheme),
920+
endpoint_override(config.endpointOverride),
921+
use_virtual_addressing(config.useVirtualAddressing) {}
922+
923+
Aws::String region;
924+
Aws::Http::Scheme scheme;
925+
Aws::String endpoint_override;
926+
bool use_virtual_addressing;
927+
928+
bool operator==(const EndpointConfigKey& other) const noexcept {
929929
return region == other.region && scheme == other.scheme &&
930930
endpoint_override == other.endpoint_override &&
931-
force_virtual_addressing == other.force_virtual_addressing;
931+
use_virtual_addressing == other.use_virtual_addressing;
932932
}
933933
};
934934

@@ -938,14 +938,59 @@ struct EndpointConfigKey {
938938
template <>
939939
struct std::hash<arrow::fs::EndpointConfigKey> {
940940
std::size_t operator()(const arrow::fs::EndpointConfigKey& key) const noexcept {
941-
auto h = std::hash<std::string>{};
942-
return h(key.region) ^ h(key.scheme) ^ h(key.endpoint_override);
941+
auto h = std::hash<Aws::String>{};
942+
return h(key.region) ^ h(key.endpoint_override);
943943
}
944944
};
945945

946946
namespace arrow::fs {
947947
namespace {
948948

949+
class EndpointProviderBuilder {
950+
public:
951+
struct LockedEndpointProvider {
952+
std::unique_lock<std::mutex> lock;
953+
std::shared_ptr<Aws::S3::S3EndpointProvider> endpoint_provider;
954+
};
955+
956+
LockedEndpointProvider Lookup(
957+
const Aws::S3::S3ClientConfiguration& config) {
958+
auto key = EndpointConfigKey(config);
959+
CacheValue* value;
960+
{
961+
std::unique_lock lock(cache_mutex_);
962+
value = &cache_[std::move(key)];
963+
}
964+
std::unique_lock lock(value->mutex);
965+
if (!value->endpoint_provider) {
966+
value->endpoint_provider = std::make_shared<Aws::S3::S3EndpointProvider>();
967+
value->endpoint_provider->InitBuiltInParameters(config);
968+
}
969+
return {std::move(lock), value->endpoint_provider};
970+
}
971+
972+
void Reset() {
973+
std::unique_lock lock(cache_mutex_);
974+
cache_.clear();
975+
}
976+
977+
static EndpointProviderBuilder* Instance() {
978+
static EndpointProviderBuilder instance;
979+
return &instance;
980+
}
981+
982+
protected:
983+
EndpointProviderBuilder() = default;
984+
985+
struct CacheValue {
986+
std::mutex mutex;
987+
std::shared_ptr<Aws::S3::S3EndpointProvider> endpoint_provider;
988+
};
989+
990+
std::mutex cache_mutex_;
991+
std::unordered_map<EndpointConfigKey, CacheValue> cache_;
992+
};
993+
949994
class ClientBuilder {
950995
public:
951996
explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
@@ -1022,12 +1067,13 @@ class ClientBuilder {
10221067
client_config_.maxConnections = std::max(io_context->executor()->GetCapacity(), 25);
10231068
}
10241069

1025-
auto endpoint_provider = std::make_shared<Aws::S3::S3EndpointProvider>();
1026-
endpoint_provider->InitBuiltInParameters(client_config_);
1027-
1070+
// The EndPointProvider is locked because S3Client updates EndPointProvider
1071+
// configuration in a non-thread-safe way, even though the updates are
1072+
// idempotent.
1073+
auto locked = EndpointProviderBuilder::Instance()->Lookup(client_config_);
10281074
auto client =
1029-
std::make_shared<S3Client>(credentials_provider_, nullptr, client_config_);
1030-
client->accessEndpointProvider() = endpoint_provider;
1075+
std::make_shared<S3Client>(credentials_provider_, locked.endpoint_provider,
1076+
client_config_);
10311077

10321078
client->s3_retry_strategy_ = options_.retry_strategy;
10331079
return GetClientHolder(std::move(client));
@@ -2962,6 +3008,7 @@ struct AwsInstance {
29623008
"This could lead to a segmentation fault at exit";
29633009
}
29643010
GetClientFinalizer()->Finalize();
3011+
EndpointProviderBuilder::Instance()->Reset();
29653012
Aws::ShutdownAPI(aws_options_);
29663013
}
29673014
}

0 commit comments

Comments
 (0)