Skip to content

Commit f3bb380

Browse files
authored
Dealer: Rework context retrieval (#1414)
* connect: simplify `handle_command` for SpircCommand * connect: simplify `handle_player_event` * connect: `handle_player_event` update log entries * connect: set `playback_speed` according to player state * connect: reduce/group state updates by delaying them slightly * connect: load entire context at once * connect: use is_playing from connect_state * connect: move `ResolveContext` in own file * connect: handle metadata correct * connect: resolve context rework - resolved contexts independently, by that we don't block our main loop - move resolve logic into own file - polish handling for play and transfer * connect: rework aftermath * general logging and comment fixups * connect: fix incorrect stopping * connect: always handle player seek event * connect: adjust behavior - rename `handle_context` to `handle_next_context` - disconnect should only pause the playback - find_next should not exceed queue length * fix typo and `find_next` * connect: fixes for context and transfer - fix context_metadata and restriction incorrect reset - do some state updates earlier - add more logging * revert removal of state setup * `clear_next_tracks` should never clear queued items just mimics official client behavior * connect: make `playing_track` optional and handle it correctly * connect: adjust finish of context resolving * connect: set track position when shuffling * example: adjust to model change * connect: remove duplicate track * connect: provide all recently played tracks to autoplay request - removes previously added workaround * connect: apply review suggestions - use drain instead of for with pop - use for instead of loop - use or_else instead of match - use Self::Error instead of the value - free memory for metadata and restrictions * connect: impl trait for player context * connect: fix incorrect playing and paused * connect: apply options as official clients * protocol: move trait impls into impl_trait mod
1 parent c288cf7 commit f3bb380

18 files changed

+997
-727
lines changed

connect/src/context_resolver.rs

+347
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
use crate::{
2+
core::{Error, Session},
3+
protocol::{
4+
autoplay_context_request::AutoplayContextRequest, context::Context,
5+
transfer_state::TransferState,
6+
},
7+
state::{
8+
context::{ContextType, UpdateContext},
9+
ConnectState,
10+
},
11+
};
12+
use std::cmp::PartialEq;
13+
use std::{
14+
collections::{HashMap, VecDeque},
15+
fmt::{Display, Formatter},
16+
hash::Hash,
17+
time::Duration,
18+
};
19+
use thiserror::Error as ThisError;
20+
use tokio::time::Instant;
21+
22+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
23+
enum Resolve {
24+
Uri(String),
25+
Context(Context),
26+
}
27+
28+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
29+
pub(super) enum ContextAction {
30+
Append,
31+
Replace,
32+
}
33+
34+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
35+
pub(super) struct ResolveContext {
36+
resolve: Resolve,
37+
fallback: Option<String>,
38+
update: UpdateContext,
39+
action: ContextAction,
40+
}
41+
42+
impl ResolveContext {
43+
fn append_context(uri: impl Into<String>) -> Self {
44+
Self {
45+
resolve: Resolve::Uri(uri.into()),
46+
fallback: None,
47+
update: UpdateContext::Default,
48+
action: ContextAction::Append,
49+
}
50+
}
51+
52+
pub fn from_uri(
53+
uri: impl Into<String>,
54+
fallback: impl Into<String>,
55+
update: UpdateContext,
56+
action: ContextAction,
57+
) -> Self {
58+
let fallback_uri = fallback.into();
59+
Self {
60+
resolve: Resolve::Uri(uri.into()),
61+
fallback: (!fallback_uri.is_empty()).then_some(fallback_uri),
62+
update,
63+
action,
64+
}
65+
}
66+
67+
pub fn from_context(context: Context, update: UpdateContext, action: ContextAction) -> Self {
68+
Self {
69+
resolve: Resolve::Context(context),
70+
fallback: None,
71+
update,
72+
action,
73+
}
74+
}
75+
76+
/// the uri which should be used to resolve the context, might not be the context uri
77+
fn resolve_uri(&self) -> Option<&str> {
78+
// it's important to call this always, or at least for every ResolveContext
79+
// otherwise we might not even check if we need to fallback and just use the fallback uri
80+
match self.resolve {
81+
Resolve::Uri(ref uri) => ConnectState::valid_resolve_uri(uri),
82+
Resolve::Context(ref ctx) => ConnectState::get_context_uri_from_context(ctx),
83+
}
84+
.or(self.fallback.as_deref())
85+
}
86+
87+
/// the actual context uri
88+
fn context_uri(&self) -> &str {
89+
match self.resolve {
90+
Resolve::Uri(ref uri) => uri,
91+
Resolve::Context(ref ctx) => ctx.uri.as_deref().unwrap_or_default(),
92+
}
93+
}
94+
}
95+
96+
impl Display for ResolveContext {
97+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98+
write!(
99+
f,
100+
"resolve_uri: <{:?}>, context_uri: <{}>, update: <{:?}>",
101+
self.resolve_uri(),
102+
self.context_uri(),
103+
self.update,
104+
)
105+
}
106+
}
107+
108+
#[derive(Debug, ThisError)]
109+
enum ContextResolverError {
110+
#[error("no next context to resolve")]
111+
NoNext,
112+
#[error("tried appending context with {0} pages")]
113+
UnexpectedPagesSize(usize),
114+
#[error("tried resolving not allowed context: {0:?}")]
115+
NotAllowedContext(String),
116+
}
117+
118+
impl From<ContextResolverError> for Error {
119+
fn from(value: ContextResolverError) -> Self {
120+
Error::failed_precondition(value)
121+
}
122+
}
123+
124+
pub struct ContextResolver {
125+
session: Session,
126+
queue: VecDeque<ResolveContext>,
127+
unavailable_contexts: HashMap<ResolveContext, Instant>,
128+
}
129+
130+
// time after which an unavailable context is retried
131+
const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600);
132+
133+
impl ContextResolver {
134+
pub fn new(session: Session) -> Self {
135+
Self {
136+
session,
137+
queue: VecDeque::new(),
138+
unavailable_contexts: HashMap::new(),
139+
}
140+
}
141+
142+
pub fn add(&mut self, resolve: ResolveContext) {
143+
let last_try = self
144+
.unavailable_contexts
145+
.get(&resolve)
146+
.map(|i| i.duration_since(Instant::now()));
147+
148+
let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) {
149+
let _ = self.unavailable_contexts.remove(&resolve);
150+
debug!(
151+
"context was requested {}s ago, trying again to resolve the requested context",
152+
last_try.expect("checked by condition").as_secs()
153+
);
154+
None
155+
} else {
156+
last_try
157+
};
158+
159+
if last_try.is_some() {
160+
debug!("tried loading unavailable context: {resolve}");
161+
return;
162+
} else if self.queue.contains(&resolve) {
163+
debug!("update for {resolve} is already added");
164+
return;
165+
} else {
166+
trace!(
167+
"added {} to resolver queue",
168+
resolve.resolve_uri().unwrap_or(resolve.context_uri())
169+
)
170+
}
171+
172+
self.queue.push_back(resolve)
173+
}
174+
175+
pub fn add_list(&mut self, resolve: Vec<ResolveContext>) {
176+
for resolve in resolve {
177+
self.add(resolve)
178+
}
179+
}
180+
181+
pub fn remove_used_and_invalid(&mut self) {
182+
if let Some((_, _, remove)) = self.find_next() {
183+
let _ = self.queue.drain(0..remove); // remove invalid
184+
}
185+
self.queue.pop_front(); // remove used
186+
}
187+
188+
pub fn clear(&mut self) {
189+
self.queue = VecDeque::new()
190+
}
191+
192+
fn find_next(&self) -> Option<(&ResolveContext, &str, usize)> {
193+
for idx in 0..self.queue.len() {
194+
let next = self.queue.get(idx)?;
195+
match next.resolve_uri() {
196+
None => {
197+
warn!("skipped {idx} because of invalid resolve_uri: {next}");
198+
continue;
199+
}
200+
Some(uri) => return Some((next, uri, idx)),
201+
}
202+
}
203+
None
204+
}
205+
206+
pub fn has_next(&self) -> bool {
207+
self.find_next().is_some()
208+
}
209+
210+
pub async fn get_next_context(
211+
&self,
212+
recent_track_uri: impl Fn() -> Vec<String>,
213+
) -> Result<Context, Error> {
214+
let (next, resolve_uri, _) = self.find_next().ok_or(ContextResolverError::NoNext)?;
215+
216+
match next.update {
217+
UpdateContext::Default => {
218+
let mut ctx = self.session.spclient().get_context(resolve_uri).await;
219+
if let Ok(ctx) = ctx.as_mut() {
220+
ctx.uri = Some(next.context_uri().to_string());
221+
ctx.url = ctx.uri.as_ref().map(|s| format!("context://{s}"));
222+
}
223+
224+
ctx
225+
}
226+
UpdateContext::Autoplay => {
227+
if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:")
228+
{
229+
// autoplay is not supported for podcasts
230+
Err(ContextResolverError::NotAllowedContext(
231+
resolve_uri.to_string(),
232+
))?
233+
}
234+
235+
let request = AutoplayContextRequest {
236+
context_uri: Some(resolve_uri.to_string()),
237+
recent_track_uri: recent_track_uri(),
238+
..Default::default()
239+
};
240+
self.session.spclient().get_autoplay_context(&request).await
241+
}
242+
}
243+
}
244+
245+
pub fn mark_next_unavailable(&mut self) {
246+
if let Some((next, _, _)) = self.find_next() {
247+
self.unavailable_contexts
248+
.insert(next.clone(), Instant::now());
249+
}
250+
}
251+
252+
pub fn apply_next_context(
253+
&self,
254+
state: &mut ConnectState,
255+
mut context: Context,
256+
) -> Result<Option<Vec<ResolveContext>>, Error> {
257+
let (next, _, _) = self.find_next().ok_or(ContextResolverError::NoNext)?;
258+
259+
let remaining = match next.action {
260+
ContextAction::Append if context.pages.len() == 1 => state
261+
.fill_context_from_page(context.pages.remove(0))
262+
.map(|_| None),
263+
ContextAction::Replace => {
264+
let remaining = state.update_context(context, next.update);
265+
if let Resolve::Context(ref ctx) = next.resolve {
266+
state.merge_context(Some(ctx.clone()));
267+
}
268+
269+
remaining
270+
}
271+
ContextAction::Append => {
272+
warn!("unexpected page size: {context:#?}");
273+
Err(ContextResolverError::UnexpectedPagesSize(context.pages.len()).into())
274+
}
275+
}?;
276+
277+
Ok(remaining.map(|remaining| {
278+
remaining
279+
.into_iter()
280+
.map(ResolveContext::append_context)
281+
.collect::<Vec<_>>()
282+
}))
283+
}
284+
285+
pub fn try_finish(
286+
&self,
287+
state: &mut ConnectState,
288+
transfer_state: &mut Option<TransferState>,
289+
) -> bool {
290+
let (next, _, _) = match self.find_next() {
291+
None => return false,
292+
Some(next) => next,
293+
};
294+
295+
// when there is only one update type, we are the last of our kind, so we should update the state
296+
if self
297+
.queue
298+
.iter()
299+
.filter(|resolve| resolve.update == next.update)
300+
.count()
301+
!= 1
302+
{
303+
return false;
304+
}
305+
306+
match (next.update, state.active_context) {
307+
(UpdateContext::Default, ContextType::Default) | (UpdateContext::Autoplay, _) => {
308+
debug!(
309+
"last item of type <{:?}>, finishing state setup",
310+
next.update
311+
);
312+
}
313+
(UpdateContext::Default, _) => {
314+
debug!("skipped finishing default, because it isn't the active context");
315+
return false;
316+
}
317+
}
318+
319+
let active_ctx = state.get_context(state.active_context);
320+
let res = if let Some(transfer_state) = transfer_state.take() {
321+
state.finish_transfer(transfer_state)
322+
} else if state.shuffling_context() {
323+
state.shuffle()
324+
} else if matches!(active_ctx, Ok(ctx) if ctx.index.track == 0) {
325+
// has context, and context is not touched
326+
// when the index is not zero, the next index was already evaluated elsewhere
327+
let ctx = active_ctx.expect("checked by precondition");
328+
let idx = ConnectState::find_index_in_context(ctx, |t| {
329+
state.current_track(|c| t.uri == c.uri)
330+
})
331+
.ok();
332+
333+
state.reset_playback_to_position(idx)
334+
} else {
335+
state.fill_up_next_tracks()
336+
};
337+
338+
if let Err(why) = res {
339+
error!("setup of state failed: {why}, last used resolve {next:#?}")
340+
}
341+
342+
state.update_restrictions();
343+
state.update_queue_revision();
344+
345+
true
346+
}
347+
}

connect/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use librespot_core as core;
55
use librespot_playback as playback;
66
use librespot_protocol as protocol;
77

8+
mod context_resolver;
89
mod model;
910
pub mod spirc;
1011
pub mod state;

0 commit comments

Comments
 (0)