Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Windows workers #17555

Merged
merged 11 commits into from
Aug 6, 2021
4 changes: 4 additions & 0 deletions docs/root/intro/arch_overview/intro/threading_model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ to have Envoy forcibly balance connections between worker threads. To support th
Envoy allows for different types of :ref:`connection balancing
<envoy_v3_api_field_config.listener.v3.Listener.connection_balance_config>` to be configured on each :ref:`listener
<arch_overview_listeners>`.

On Windows the kernel is not able to balance the connections properly with the async IO model that Envoy is using.
Until this is fixed by the platfrom, Envoy will enforce listener connection balancing on Windows. This allows us to
balance connections between different worker threads. This behavior comes with a performance penalty.
2 changes: 1 addition & 1 deletion docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Bug Fixes
* access log: fix `%UPSTREAM_CLUSTER%` when used in http upstream access logs. Previously, it was always logging as an unset value.
* aws request signer: fix the AWS Request Signer extension to correctly normalize the path and query string to be signed according to AWS' guidelines, so that the hash on the server side matches. See `AWS SigV4 documentaion <https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html>`_.
* cluster: delete pools when they're idle to fix unbounded memory use when using PROXY protocol upstream with tcp_proxy. This behavior can be temporarily reverted by setting the ``envoy.reloadable_features.conn_pool_delete_when_idle`` runtime guard to false.
* listener: fixed an issue on Windows where connections are not handled by all worker threads.
* xray: fix the AWS X-Ray tracer bug where span's error, fault and throttle information was not reported properly as per the `AWS X-Ray documentation <https://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html>`_. Before this fix, server error was reported under 'annotations' section of the segment data.

Removed Config or Runtime
Expand Down Expand Up @@ -70,4 +71,3 @@ Deprecated
* listener: :ref:`reuse_port <envoy_v3_api_field_config.listener.v3.Listener.reuse_port>` has been
deprecated in favor of :ref:`enable_reuse_port <envoy_v3_api_field_config.listener.v3.Listener.enable_reuse_port>`.
At the same time, the default has been changed from false to true. See above for more information.

40 changes: 31 additions & 9 deletions source/server/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,33 @@ void ListenSocketFactoryImpl::doFinalPreWorkerInit() {
return;
}

for (auto& socket : sockets_) {
const auto rc = socket->ioHandle().listen(tcp_backlog_size_);
#ifndef WIN32
ASSERT(!sockets_.empty());
auto listen_and_apply_options = [](Envoy::Network::SocketSharedPtr socket, int tcp_backlog_size) {
const auto rc = socket->ioHandle().listen(tcp_backlog_size);
if (rc.return_value_ != 0) {
throw EnvoyException(fmt::format("cannot listen() errno={}", rc.errno_));
}
#else
// TODO(davinci26): listen() error handling and generally listening on multiple workers
// is broken right now. This needs follow up to do something better on Windows.
UNREFERENCED_PARAMETER(rc);
#endif

if (!Network::Socket::applyOptions(socket->options(), *socket,
envoy::config::core::v3::SocketOption::STATE_LISTENING)) {
throw Network::SocketOptionException(
fmt::format("cannot set post-listen socket option on socket: {}",
socket->addressProvider().localAddress()->asString()));
}
};
// On all platforms we should listen on the first socket.
auto iterator = sockets_.begin();
listen_and_apply_options(*iterator, tcp_backlog_size_);
++iterator;
#ifndef WIN32
// With this implementation on Windows we only accept
// connections on Worker 1 and then we use the `ExactConnectionBalancer`
// to balance these connections to all workers.
// TODO(davinci26): We should update the behavior when socket duplication
// does not cause accepts to hang in the OS.
for (; iterator != sockets_.end(); ++iterator) {
listen_and_apply_options(*iterator, tcp_backlog_size_);
}
#endif
}

ListenerFactoryContextBaseImpl::ListenerFactoryContextBaseImpl(
Expand Down Expand Up @@ -545,6 +553,19 @@ void ListenerImpl::buildFilterChains() {
void ListenerImpl::buildSocketOptions() {
// TCP specific setup.
if (connection_balancer_ == nullptr) {
#ifdef WIN32
// On Windows we use the exact connection balancer to dispatch connections
// from worker 1 to all workers. This is a perf hit but it is the only way
// to make all the workers do work.
// TODO(davinci26): We can be faster here if we create a balancer implementation
// that dispatches the connection to a random thread.
ENVOY_LOG(warn,
"ExactBalance was forced enabled for TCP listener '{}' because "
"Envoy is running on Windows."
"ExactBalance is used to load balance connections between workers on Windows.",
config_.name());
connection_balancer_ = std::make_shared<Network::ExactConnectionBalancerImpl>();
#else
// Not in place listener update.
if (config_.has_connection_balance_config()) {
// Currently exact balance is the only supported type and there are no options.
Expand All @@ -553,6 +574,7 @@ void ListenerImpl::buildSocketOptions() {
} else {
connection_balancer_ = std::make_shared<Network::NopConnectionBalancerImpl>();
}
#endif
}

if (config_.has_tcp_fast_open_queue_length()) {
Expand Down
47 changes: 47 additions & 0 deletions test/integration/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,53 @@ TEST_P(IntegrationTest, PerWorkerStatsAndBalancing) {
check_listener_stats(0, 1);
}

// Make sure we all workers picking up connections
TEST_P(IntegrationTest, AllWorkersAreHandlingLoad) {
concurrency_ = 2;
initialize();

std::string worker0_stat_name, worker1_stat_name;
if (GetParam() == Network::Address::IpVersion::v4) {
worker0_stat_name = "listener.127.0.0.1_0.worker_0.downstream_cx_total";
worker1_stat_name = "listener.127.0.0.1_0.worker_1.downstream_cx_total";
} else {
worker0_stat_name = "listener.[__1]_0.worker_0.downstream_cx_total";
worker1_stat_name = "listener.[__1]_0.worker_1.downstream_cx_total";
}

test_server_->waitForCounterEq(worker0_stat_name, 0);
test_server_->waitForCounterEq(worker1_stat_name, 0);

// We set the counters for the two workers to see how many connections each handles.
uint64_t w0_ctr = 0;
uint64_t w1_ctr = 0;
constexpr int loops = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be considered about flakes here. Can you make sure to run this test 1000 times? I think with REUSE_PORT on linux and exact balance on Windows this is probably OK, but it might flake on OSX. I'm not sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a mac to test it. I have put this in a loop on Windows to verify the fix and it did not flake.

for (int i = 0; i < loops; i++) {
constexpr int requests_per_loop = 4;
std::array<IntegrationCodecClientPtr, requests_per_loop> connections;
for (int j = 0; j < requests_per_loop; j++) {
connections[j] = makeHttpConnection(lookupPort("http"));
}

auto worker0_ctr = test_server_->counter(worker0_stat_name);
auto worker1_ctr = test_server_->counter(worker1_stat_name);
auto target = w0_ctr + w1_ctr + requests_per_loop;
while (test_server_->counter(worker0_stat_name)->value() +
test_server_->counter(worker1_stat_name)->value() <
target) {
timeSystem().advanceTimeWait(std::chrono::milliseconds(10));
}
w0_ctr = test_server_->counter(worker0_stat_name)->value();
w1_ctr = test_server_->counter(worker1_stat_name)->value();
for (int j = 0; j < requests_per_loop; j++) {
connections[j]->close();
}
}

EXPECT_TRUE(w0_ctr > 1);
EXPECT_TRUE(w1_ctr > 1);
}

TEST_P(IntegrationTest, RouterDirectResponseWithBody) {
const std::string body = "Response body";
const std::string file_path = TestEnvironment::writeStringToFileForTest("test_envoy", body);
Expand Down
4 changes: 0 additions & 4 deletions test/server/listener_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1837,9 +1837,6 @@ TEST_F(ListenerManagerImplTest, NotSupportedDatagramUds) {
"socket type SocketType::Datagram not supported for pipes");
}

// TODO(davinci26): See ListenSocketFactoryImpl::doFinalPreWorkerInit() for why this test is
// not run on Windows.
#ifndef WIN32
TEST_F(ListenerManagerImplTest, CantListen) {
InSequence s;

Expand Down Expand Up @@ -1870,7 +1867,6 @@ name: foo
1UL,
server_.stats_store_.counterFromString("listener_manager.listener_create_failure").value());
}
#endif

TEST_F(ListenerManagerImplTest, CantBindSocket) {
time_system_.setSystemTime(std::chrono::milliseconds(1001001001001));
Expand Down