generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathuse-timeline.ts
91 lines (80 loc) · 3.59 KB
/
use-timeline.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import { Code, ConnectError } from '@connectrpc/connect'
import { type InfiniteData, useInfiniteQuery, useQueryClient } from '@tanstack/react-query'
import { ConsoleService } from '../../../protos/xyz/block/ftl/console/v1/console_connect'
import type { Event } from '../../../protos/xyz/block/ftl/timeline/v1/event_pb'
import { type TimelineQuery_Filter, TimelineQuery_Order } from '../../../protos/xyz/block/ftl/timeline/v1/timeline_pb'
import { useClient } from '../../../shared/hooks/use-client'
import { useVisibility } from '../../../shared/hooks/use-visibility'
const timelineKey = 'timeline'
const maxTimelineEntries = 1000
export const useTimeline = (isStreaming: boolean, filters: TimelineQuery_Filter[], updateIntervalMs = 1000, enabled = true) => {
const client = useClient(ConsoleService)
const queryClient = useQueryClient()
const isVisible = useVisibility()
const order = TimelineQuery_Order.DESC
const limit = isStreaming ? 200 : 1000
const queryKey = [timelineKey, isStreaming, filters, order, limit]
const fetchTimeline = async ({ signal }: { signal: AbortSignal }) => {
try {
console.debug('fetching timeline')
const response = await client.getTimeline({ query: { filters, limit, order } }, { signal })
return response.events
} catch (error) {
if (error instanceof ConnectError) {
if (error.code === Code.Canceled) {
return []
}
}
throw error
}
}
const streamTimeline = async ({ signal }: { signal: AbortSignal }) => {
try {
console.debug('streaming timeline')
console.debug('timeline-filters:', filters)
// Initialize with empty pages instead of clearing cache
queryClient.setQueryData(queryKey, { pages: [], pageParams: [] })
for await (const response of client.streamTimeline(
{ updateInterval: { seconds: BigInt(0), nanos: updateIntervalMs * 1000 }, query: { limit, filters, order } },
{ signal },
)) {
console.debug('timeline-response:', response)
if (response.events) {
queryClient.setQueryData<InfiniteData<Event[]>>(queryKey, (old = { pages: [], pageParams: [] }) => {
const newEvents = response.events
const existingEvents = old.pages[0] || []
const uniqueNewEvents = newEvents.filter((newEvent) => !existingEvents.some((existingEvent) => existingEvent.id === newEvent.id))
// Combine and sort all events by timestamp
const allEvents = [...uniqueNewEvents, ...existingEvents]
.sort((a, b) => {
const aTime = a.timestamp
const bTime = b.timestamp
if (!aTime || !bTime) return 0
return Number(bTime.seconds - aTime.seconds) || Number(bTime.nanos - aTime.nanos)
})
.slice(0, maxTimelineEntries)
return {
pages: [allEvents, ...old.pages.slice(1)],
pageParams: old.pageParams,
}
})
}
}
} catch (error) {
if (error instanceof ConnectError) {
if (error.code !== Code.Canceled) {
console.error('Console service - streamEvents - Connect error:', error)
}
} else {
console.error('Console service - streamEvents:', error)
}
}
}
return useInfiniteQuery({
queryKey: queryKey,
queryFn: async ({ signal }) => (isStreaming ? streamTimeline({ signal }) : await fetchTimeline({ signal })),
enabled: enabled && isVisible,
getNextPageParam: () => null, // Disable pagination for streaming
initialPageParam: null, // Disable pagination for streaming
})
}