Skip to content

Commit bd0bebb

Browse files
indigophoxjduokou
authored andcommitted
apacheGH-34865: [C++][Java][Flight RPC] Add Session management messages (apache#34817)
### Rationale for this change Flight presently contains no formal mechanism for managing connection/query configuration options; instead, request headers and/or non-query SQL statements are often used in lieu, with unnecessary overhead and poor failure handling. A stateless (from Flight's perspective) Flight format extension is desirable to close this gap for server implementations that use/want connection state/context. ### What changes are included in this PR? "Session" set/get/close Actions and server-side helper middleware. ### Are these changes tested? Integration tests (C++ currently broken due to middleware-related framework issue) and some complex-case unit testing are included. ### Are there any user-facing changes? Non-breaking extensions to wire format and corresponding client/server Flight RPC API extensions. * Closes: apache#34865 Lead-authored-by: Paul Nienaber <paul.nienaber@dremio.com> Co-authored-by: Paul Nienaber <github@phox.ca> Co-authored-by: James Duong <jduong@dremio.com> Co-authored-by: Sutou Kouhei <kou@cozmixng.org> Signed-off-by: David Li <li.davidm96@gmail.com>
1 parent 952d87a commit bd0bebb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3527
-21
lines changed

cpp/src/arrow/flight/client.cc

+41
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,47 @@ arrow::Result<FlightClient::DoExchangeResult> FlightClient::DoExchange(
713713
return result;
714714
}
715715

716+
::arrow::Result<SetSessionOptionsResult> FlightClient::SetSessionOptions(
717+
const FlightCallOptions& options, const SetSessionOptionsRequest& request) {
718+
RETURN_NOT_OK(CheckOpen());
719+
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
720+
Action action{ActionType::kSetSessionOptions.type, Buffer::FromString(body)};
721+
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
722+
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
723+
ARROW_ASSIGN_OR_RAISE(
724+
auto set_session_options_result,
725+
SetSessionOptionsResult::Deserialize(std::string_view(*result->body)));
726+
ARROW_RETURN_NOT_OK(stream->Drain());
727+
return set_session_options_result;
728+
}
729+
730+
::arrow::Result<GetSessionOptionsResult> FlightClient::GetSessionOptions(
731+
const FlightCallOptions& options, const GetSessionOptionsRequest& request) {
732+
RETURN_NOT_OK(CheckOpen());
733+
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
734+
Action action{ActionType::kGetSessionOptions.type, Buffer::FromString(body)};
735+
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
736+
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
737+
ARROW_ASSIGN_OR_RAISE(
738+
auto get_session_options_result,
739+
GetSessionOptionsResult::Deserialize(std::string_view(*result->body)));
740+
ARROW_RETURN_NOT_OK(stream->Drain());
741+
return get_session_options_result;
742+
}
743+
744+
::arrow::Result<CloseSessionResult> FlightClient::CloseSession(
745+
const FlightCallOptions& options, const CloseSessionRequest& request) {
746+
RETURN_NOT_OK(CheckOpen());
747+
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
748+
Action action{ActionType::kCloseSession.type, Buffer::FromString(body)};
749+
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
750+
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
751+
ARROW_ASSIGN_OR_RAISE(auto close_session_result,
752+
CloseSessionResult::Deserialize(std::string_view(*result->body)));
753+
ARROW_RETURN_NOT_OK(stream->Drain());
754+
return close_session_result;
755+
}
756+
716757
Status FlightClient::Close() {
717758
if (!closed_) {
718759
closed_ = true;

cpp/src/arrow/flight/client.h

+21
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,27 @@ class ARROW_FLIGHT_EXPORT FlightClient {
383383
return DoExchange({}, descriptor);
384384
}
385385

386+
/// \brief Set server session option(s) by name/value. Sessions are generally
387+
/// persisted via HTTP cookies.
388+
/// \param[in] options Per-RPC options
389+
/// \param[in] request The server session options to set
390+
::arrow::Result<SetSessionOptionsResult> SetSessionOptions(
391+
const FlightCallOptions& options, const SetSessionOptionsRequest& request);
392+
393+
/// \brief Get the current server session options. The session is generally
394+
/// accessed via an HTTP cookie.
395+
/// \param[in] options Per-RPC options
396+
/// \param[in] request The (empty) GetSessionOptions request object.
397+
::arrow::Result<GetSessionOptionsResult> GetSessionOptions(
398+
const FlightCallOptions& options, const GetSessionOptionsRequest& request);
399+
400+
/// \brief Close/invalidate the current server session. The session is generally
401+
/// accessed via an HTTP cookie.
402+
/// \param[in] options Per-RPC options
403+
/// \param[in] request The (empty) CloseSession request object.
404+
::arrow::Result<CloseSessionResult> CloseSession(const FlightCallOptions& options,
405+
const CloseSessionRequest& request);
406+
386407
/// \brief Explicitly shut down and clean up the client.
387408
///
388409
/// For backwards compatibility, this will be implicitly called by

cpp/src/arrow/flight/integration_tests/flight_integration_test.cc

+2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
7171
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
7272
}
7373

74+
TEST(FlightIntegration, SessionOptions) { ASSERT_OK(RunScenario("session_options")); }
75+
7476
TEST(FlightIntegration, PollFlightInfo) { ASSERT_OK(RunScenario("poll_flight_info")); }
7577

7678
TEST(FlightIntegration, AppMetadataFlightInfoEndpoint) {

cpp/src/arrow/flight/integration_tests/test_integration.cc

+154
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
#include "arrow/array/array_nested.h"
2929
#include "arrow/array/array_primitive.h"
3030
#include "arrow/array/builder_primitive.h"
31+
#include "arrow/flight/client_cookie_middleware.h"
3132
#include "arrow/flight/client_middleware.h"
3233
#include "arrow/flight/server_middleware.h"
3334
#include "arrow/flight/sql/client.h"
3435
#include "arrow/flight/sql/column_metadata.h"
3536
#include "arrow/flight/sql/server.h"
37+
#include "arrow/flight/sql/server_session_middleware.h"
3638
#include "arrow/flight/sql/types.h"
3739
#include "arrow/flight/test_util.h"
3840
#include "arrow/flight/types.h"
@@ -744,6 +746,155 @@ class ExpirationTimeRenewFlightEndpointScenario : public Scenario {
744746
}
745747
};
746748

749+
/// \brief The server used for testing Session Options.
750+
///
751+
/// SetSessionOptions has a blacklisted option name and string option value,
752+
/// both "lol_invalid", which will result in errors attempting to set either.
753+
class SessionOptionsServer : public sql::FlightSqlServerBase {
754+
static inline const std::string invalid_option_name = "lol_invalid";
755+
static inline const SessionOptionValue invalid_option_value = "lol_invalid";
756+
757+
const std::string session_middleware_key;
758+
// These will never be threaded so using a plain map and no lock
759+
std::map<std::string, SessionOptionValue> session_store_;
760+
761+
public:
762+
explicit SessionOptionsServer(std::string session_middleware_key)
763+
: FlightSqlServerBase(),
764+
session_middleware_key(std::move(session_middleware_key)) {}
765+
766+
arrow::Result<SetSessionOptionsResult> SetSessionOptions(
767+
const ServerCallContext& context,
768+
const SetSessionOptionsRequest& request) override {
769+
SetSessionOptionsResult res;
770+
771+
auto* middleware = static_cast<sql::ServerSessionMiddleware*>(
772+
context.GetMiddleware(session_middleware_key));
773+
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<sql::FlightSession> session,
774+
middleware->GetSession());
775+
776+
for (const auto& [name, value] : request.session_options) {
777+
// Blacklisted value name
778+
if (name == invalid_option_name) {
779+
res.errors.emplace(name, SetSessionOptionsResult::Error{
780+
SetSessionOptionErrorValue::kInvalidName});
781+
continue;
782+
}
783+
// Blacklisted option value
784+
if (value == invalid_option_value) {
785+
res.errors.emplace(name, SetSessionOptionsResult::Error{
786+
SetSessionOptionErrorValue::kInvalidValue});
787+
continue;
788+
}
789+
if (std::holds_alternative<std::monostate>(value)) {
790+
session->EraseSessionOption(name);
791+
continue;
792+
}
793+
session->SetSessionOption(name, value);
794+
}
795+
796+
return res;
797+
}
798+
799+
arrow::Result<GetSessionOptionsResult> GetSessionOptions(
800+
const ServerCallContext& context,
801+
const GetSessionOptionsRequest& request) override {
802+
auto* middleware = static_cast<sql::ServerSessionMiddleware*>(
803+
context.GetMiddleware(session_middleware_key));
804+
if (!middleware->HasSession()) {
805+
return Status::Invalid("No existing session to get options from.");
806+
}
807+
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<sql::FlightSession> session,
808+
middleware->GetSession());
809+
810+
return GetSessionOptionsResult{session->GetSessionOptions()};
811+
}
812+
813+
arrow::Result<CloseSessionResult> CloseSession(
814+
const ServerCallContext& context, const CloseSessionRequest& request) override {
815+
// Broken (does not expire cookie) until C++ middleware handling (GH-39791) fixed:
816+
auto* middleware = static_cast<sql::ServerSessionMiddleware*>(
817+
context.GetMiddleware(session_middleware_key));
818+
ARROW_RETURN_NOT_OK(middleware->CloseSession());
819+
return CloseSessionResult{CloseSessionStatus::kClosed};
820+
}
821+
};
822+
823+
/// \brief The Session Options scenario.
824+
///
825+
/// This tests Session Options functionality as well as ServerSessionMiddleware.
826+
class SessionOptionsScenario : public Scenario {
827+
static inline const std::string server_middleware_key = "sessionmiddleware";
828+
829+
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
830+
FlightServerOptions* options) override {
831+
*server = std::make_unique<SessionOptionsServer>(server_middleware_key);
832+
833+
auto id_gen_int = std::make_shared<std::atomic_int>(1000);
834+
options->middleware.emplace_back(
835+
server_middleware_key,
836+
sql::MakeServerSessionMiddlewareFactory(
837+
[=]() -> std::string { return std::to_string((*id_gen_int)++); }));
838+
839+
return Status::OK();
840+
}
841+
842+
Status MakeClient(FlightClientOptions* options) override {
843+
options->middleware.emplace_back(GetCookieFactory());
844+
return Status::OK();
845+
}
846+
847+
Status RunClient(std::unique_ptr<FlightClient> flight_client) override {
848+
sql::FlightSqlClient client{std::move(flight_client)};
849+
850+
// Set
851+
auto req1 = SetSessionOptionsRequest{
852+
{{"foolong", 123L},
853+
{"bardouble", 456.0},
854+
{"lol_invalid", "this won't get set"},
855+
{"key_with_invalid_value", "lol_invalid"},
856+
{"big_ol_string_list", std::vector<std::string>{"a", "b", "sea", "dee", " ",
857+
" ", "geee", "(づ。◕‿‿◕。)づ"}}}};
858+
ARROW_ASSIGN_OR_RAISE(auto res1, client.SetSessionOptions({}, req1));
859+
// Some errors
860+
if (res1.errors !=
861+
std::map<std::string, SetSessionOptionsResult::Error>{
862+
{"lol_invalid",
863+
SetSessionOptionsResult::Error{SetSessionOptionErrorValue::kInvalidName}},
864+
{"key_with_invalid_value", SetSessionOptionsResult::Error{
865+
SetSessionOptionErrorValue::kInvalidValue}}}) {
866+
return Status::Invalid("res1 incorrect: " + res1.ToString());
867+
}
868+
// Some set, some omitted due to above errors
869+
ARROW_ASSIGN_OR_RAISE(auto res2, client.GetSessionOptions({}, {}));
870+
if (res2.session_options !=
871+
std::map<std::string, SessionOptionValue>{
872+
{"foolong", 123L},
873+
{"bardouble", 456.0},
874+
{"big_ol_string_list",
875+
std::vector<std::string>{"a", "b", "sea", "dee", " ", " ", "geee",
876+
"(づ。◕‿‿◕。)づ"}}}) {
877+
return Status::Invalid("res2 incorrect: " + res2.ToString());
878+
}
879+
// Update
880+
ARROW_ASSIGN_OR_RAISE(
881+
auto res3,
882+
client.SetSessionOptions(
883+
{}, SetSessionOptionsRequest{
884+
{{"foolong", std::monostate{}},
885+
{"big_ol_string_list", "a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ"}}}));
886+
ARROW_ASSIGN_OR_RAISE(auto res4, client.GetSessionOptions({}, {}));
887+
if (res4.session_options !=
888+
std::map<std::string, SessionOptionValue>{
889+
{"bardouble", 456.0},
890+
{"big_ol_string_list", "a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ"}}) {
891+
return Status::Invalid("res4 incorrect: " + res4.ToString());
892+
}
893+
894+
return Status::OK();
895+
}
896+
};
897+
747898
/// \brief The server used for testing PollFlightInfo().
748899
class PollFlightInfoServer : public FlightServerBase {
749900
public:
@@ -1952,6 +2103,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
19522103
} else if (scenario_name == "expiration_time:renew_flight_endpoint") {
19532104
*out = std::make_shared<ExpirationTimeRenewFlightEndpointScenario>();
19542105
return Status::OK();
2106+
} else if (scenario_name == "session_options") {
2107+
*out = std::make_shared<SessionOptionsScenario>();
2108+
return Status::OK();
19552109
} else if (scenario_name == "poll_flight_info") {
19562110
*out = std::make_shared<PollFlightInfoScenario>();
19572111
return Status::OK();

0 commit comments

Comments
 (0)