Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamped input buffering - prevent stalling and improve timing #77062

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 247 additions & 46 deletions core/input/input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,190 @@ void (*Input::warp_mouse_func)(const Vector2 &p_position) = nullptr;
Input::CursorShape (*Input::get_current_cursor_shape_func)() = nullptr;
void (*Input::set_custom_mouse_cursor_func)(const Ref<Resource> &, Input::CursorShape, const Vector2 &) = nullptr;

// Accumulating immediately rather than deferred at flush
// is slightly more efficient (because of less allocations / transfer to the main buffer etc)
// but is less accurate timing wise, so should only be used in frame buffering mode.
void InputEventBuffer::accumulate_or_push_event(const Ref<InputEvent> &p_event, uint64_t p_timestamp) {
// Events can come in any time, including when we are preparing to read the incoming queue,
// so we must lock to prevent race condition.
MutexLock lock(data.incoming_mutex);

LocalVector<Event> &incoming = data.incoming[data.incoming_write];

// First, attempt to accumulate.
if (incoming.size()) {
Event &prev = incoming[incoming.size() - 1];
if (prev.event->accumulate(p_event)) {
return;
}
}

// Accumulate failed, fall back to push.
incoming.resize(incoming.size() + 1);
Event &e = incoming[incoming.size() - 1];
e.event = p_event;
e.timestamp = p_timestamp;
Comment on lines +102 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just call push_event here to reuse code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicating here avoids a redundant extra lock, especially as this will not be a rare path.

Alternatively we could place the push in a separate function from the locks, but that might be overkill for the few lines of code here (Godot codebase tends towards duplication rather more than I would personally, but it's no biggie).

}

void InputEventBuffer::push_event(const Ref<InputEvent> &p_event, uint64_t p_timestamp) {
// Events can come in any time, including when we are preparing to read the incoming queue,
// so we must lock to prevent race condition.
MutexLock lock(data.incoming_mutex);

LocalVector<Event> &incoming = data.incoming[data.incoming_write];
incoming.resize(incoming.size() + 1);
Event &e = incoming[incoming.size() - 1];
e.event = p_event;
e.timestamp = p_timestamp;
}

void InputEventBuffer::_try_accumulate(uint64_t p_timestamp) {
// Try and accumulate events after the current front
// until we fail or pass the current timestamp.
List<Event>::Element *front = data.buffer.front();
Event &front_event = front->get();

while (List<Event>::Element *next = data.buffer.front()->next()) {
const Event &next_event = next->get();
if (next_event.timestamp > p_timestamp) {
// Don't want to accumulate events that are on the next tick..
// want to keep some resolution to the events.
break;
}
if (front_event.event->accumulate(next_event.event)) {
// Remove the accumulated event from the buffer.
data.buffer.swap(front, next);
data.buffer.pop_front();
// Check this does not invalidate front and front_event.
DEV_ASSERT(front == data.buffer.front());
DEV_ASSERT(&front_event == &front->get());
} else {
break;
}
}
}

void InputEventBuffer::flush_events(uint64_t p_current_timestamp, Input &r_input_handler, bool p_accumulate) {
// Flushing function is not re-entrant.
// This is unlikely to be called multithread, but this check should be cheap.
MutexLock lock(data.buffer_mutex);

// Only allow one flush at a time.
// This *does* occur notably on starting android debugging
// from the editor, where Main::iteration is called recursively.
if (data.flushing) {
return;
}
data.flushing = true;

data.incoming_mutex.lock();
SWAP(data.incoming_write, data.incoming_read);
data.incoming_mutex.unlock();

LocalVector<Event> &incoming = data.incoming[data.incoming_read];

for (uint32_t n = 0; n < incoming.size(); n++) {
// Copy to main buffer.
data.buffer.push_back(incoming[n]);
}

// Prepare for more input next time, prevent leak.
incoming.clear();

// Now we can read through the input buffer, up to the current time, and process.
while (data.buffer.front()) {
const Event &e = data.buffer.front()->get();

// Timestamp within range?
if (e.timestamp > p_current_timestamp) {
// We are up to date, process no more input on this tick / frame.
break;
}

if (p_accumulate) {
_try_accumulate(p_current_timestamp);
}

r_input_handler._parse_input_event_impl(e.event, false, false);

// Event processed, remove from buffer.
data.buffer.pop_front();
}

data.flushing = false;
}

Input *Input::get_singleton() {
return singleton;
}

void Input::flush_buffered_events_post_frame() {
if (data.use_legacy_flushing) {
// Matches old logic - if buffering, but not agile.
if (data.buffering_mode == BUFFERING_MODE_FRAME) {
_flush_buffered_events_ex(UINT64_MAX);
}
}
}

void Input::flush_buffered_events() {
_flush_buffered_events_ex(UINT64_MAX);
}

void Input::set_use_accumulated_input(bool p_enable) {
data.use_accumulated_input = p_enable;
_update_buffering_mode();
}

void Input::set_use_input_buffering(bool p_enable) {
data.use_buffering = p_enable;
_update_buffering_mode();
}

bool Input::is_using_input_buffering() const {
return data.use_buffering;
}

void Input::set_use_agile_flushing(bool p_enable) {
data.use_agile = p_enable;
_update_buffering_mode();
}

bool Input::is_agile_flushing() const {
return data.buffering_mode == BUFFERING_MODE_AGILE;
}

void Input::set_use_legacy_flushing(bool p_enable) {
data.use_legacy_flushing = p_enable;
}

void Input::set_has_input_thread(bool p_has_thread) {
data.has_input_thread = p_has_thread;
_update_buffering_mode();
}

void Input::_update_buffering_mode() {
// Logic here may appear confusing but it is historical,
// and to prevent compat breaking.
// Accumulated input was added in such a way as to override
// the use_buffering setting.
// i.e. Buffering is used if use_buffering is false, but accumulated is true,
// as accumulating needs the previous event in order to work.

// Additionally for agile input, it currently only makes sense to activate when
// the platform has a separate input thread, otherwise the extra processing
// on flush on each tick just wastes CPU.
if (data.use_accumulated_input || data.use_buffering) {
if (data.use_agile && data.has_input_thread) {
data.buffering_mode = BUFFERING_MODE_AGILE;
} else {
data.buffering_mode = BUFFERING_MODE_FRAME;
}
} else {
data.buffering_mode = BUFFERING_MODE_NONE;
}
}

void Input::set_mouse_mode(MouseMode p_mode) {
ERR_FAIL_INDEX((int)p_mode, 5);
set_mouse_mode_func(p_mode);
Expand Down Expand Up @@ -140,6 +320,7 @@ void Input::_bind_methods() {
ClassDB::bind_method(D_METHOD("parse_input_event", "event"), &Input::parse_input_event);
ClassDB::bind_method(D_METHOD("set_use_accumulated_input", "enable"), &Input::set_use_accumulated_input);
ClassDB::bind_method(D_METHOD("is_using_accumulated_input"), &Input::is_using_accumulated_input);

ClassDB::bind_method(D_METHOD("flush_buffered_events"), &Input::flush_buffered_events);

ADD_PROPERTY(PropertyInfo(Variant::INT, "mouse_mode"), "set_mouse_mode", "get_mouse_mode");
Expand Down Expand Up @@ -489,7 +670,7 @@ Vector3 Input::get_gyroscope() const {
return gyroscope;
}

void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_emulated) {
void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_emulated, bool p_unlock) {
// This function does the final delivery of the input event to user land.
// Regardless where the event came from originally, this has to happen on the main thread.
DEV_ASSERT(Thread::get_caller_id() == Thread::get_main_id());
Expand Down Expand Up @@ -546,9 +727,13 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
touch_event->set_position(mb->get_position());
touch_event->set_double_tap(mb->is_double_click());
touch_event->set_device(InputEvent::DEVICE_ID_EMULATION);
_THREAD_SAFE_UNLOCK_
event_dispatch_function(touch_event);
_THREAD_SAFE_LOCK_
if (p_unlock) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(touch_event);
_THREAD_SAFE_LOCK_
} else {
event_dispatch_function(touch_event);
}
}
}

Expand All @@ -574,9 +759,13 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
drag_event->set_velocity(get_last_mouse_velocity());
drag_event->set_device(InputEvent::DEVICE_ID_EMULATION);

_THREAD_SAFE_UNLOCK_
event_dispatch_function(drag_event);
_THREAD_SAFE_LOCK_
if (p_unlock) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(drag_event);
_THREAD_SAFE_LOCK_
} else {
event_dispatch_function(drag_event);
}
}
}

Expand Down Expand Up @@ -626,7 +815,7 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
}
button_event->set_button_mask(ev_bm);

_parse_input_event_impl(button_event, true);
_parse_input_event_impl(button_event, true, p_unlock);
}
}
}
Expand All @@ -652,7 +841,7 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
motion_event->set_velocity(sd->get_velocity());
motion_event->set_button_mask(mouse_button_mask);

_parse_input_event_impl(motion_event, true);
_parse_input_event_impl(motion_event, true, p_unlock);
}
}

Expand All @@ -678,9 +867,13 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em

if (ge.is_valid()) {
if (event_dispatch_function) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(ge);
_THREAD_SAFE_LOCK_
if (p_unlock) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(ge);
_THREAD_SAFE_LOCK_
} else {
event_dispatch_function(ge);
}
}
}

Expand All @@ -703,9 +896,13 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
}

if (event_dispatch_function) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(p_event);
_THREAD_SAFE_LOCK_
if (p_unlock) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(p_event);
_THREAD_SAFE_LOCK_
} else {
event_dispatch_function(p_event);
}
}
}

Expand Down Expand Up @@ -865,7 +1062,7 @@ void Input::ensure_touch_mouse_raised() {
ev_bm.clear_flag(MouseButtonMask::LEFT);
button_event->set_button_mask(ev_bm);

_parse_input_event_impl(button_event, true);
_parse_input_event_impl(button_event, true, true);
}
}

Expand Down Expand Up @@ -941,47 +1138,48 @@ void Input::parse_input_event(const Ref<InputEvent> &p_event) {
}
#endif

if (use_accumulated_input) {
if (buffered_events.is_empty() || !buffered_events.back()->get()->accumulate(p_event)) {
buffered_events.push_back(p_event);
}
} else if (use_input_buffering) {
buffered_events.push_back(p_event);
if (data.buffering_mode == Input::BUFFERING_MODE_NONE) {
_parse_input_event_impl(p_event, false, true);
} else {
_parse_input_event_impl(p_event, false);
// We can accumulate immediately on input if in frame mode,
// but if in agile / logical mode accumulation is deferred until flushing,
// so we can just push directly.
if ((data.buffering_mode == Input::BUFFERING_MODE_FRAME) && data.use_accumulated_input) {
_event_buffer.accumulate_or_push_event(p_event, OS::get_singleton()->get_ticks_usec());
} else {
_event_buffer.push_event(p_event, OS::get_singleton()->get_ticks_usec());
}
}
}

void Input::flush_buffered_events() {
_THREAD_SAFE_METHOD_

while (buffered_events.front()) {
// The final delivery of the input event involves releasing the lock.
// While the lock is released, another thread may lock it and add new events to the back.
// Therefore, we get each event and pop it while we still have the lock,
// to ensure the list is in a consistent state.
List<Ref<InputEvent>>::Element *E = buffered_events.front();
Ref<InputEvent> e = E->get();
buffered_events.pop_front();

_parse_input_event_impl(e, false);
}
void Input::_flush_buffered_events_ex(uint64_t p_up_to_timestamp) {
_event_buffer.flush_events(p_up_to_timestamp, *this, data.use_accumulated_input);
}

bool Input::is_using_input_buffering() {
return use_input_buffering;
}
void Input::flush_buffered_events_iteration() {
// legacy did not flush here.
if (data.use_legacy_flushing) {
return;
}

void Input::set_use_input_buffering(bool p_enable) {
use_input_buffering = p_enable;
if (data.buffering_mode == BUFFERING_MODE_FRAME) {
_flush_buffered_events_ex(UINT64_MAX);
}
}

void Input::set_use_accumulated_input(bool p_enable) {
use_accumulated_input = p_enable;
void Input::flush_buffered_events_tick(uint64_t p_tick_timestamp) {
if (data.buffering_mode == BUFFERING_MODE_AGILE) {
_flush_buffered_events_ex(p_tick_timestamp);
}
}

bool Input::is_using_accumulated_input() {
return use_accumulated_input;
void Input::flush_buffered_events_frame() {
// If we are in legacy mode, if not NONE or FRAME,
// then it will be AGILE, in which case legacy had a flush
// here, so the new logic works as before.
if (data.buffering_mode == BUFFERING_MODE_AGILE) {
_flush_buffered_events_ex(UINT64_MAX);
}
}

void Input::release_pressed_events() {
Expand Down Expand Up @@ -1531,6 +1729,9 @@ Input::Input() {
parse_mapping(entries[i]);
}
}

// Make sure buffering mode is appropriate for the state.
_update_buffering_mode();
}

Input::~Input() {
Expand Down
Loading