-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathPendingTasksRunner.swift
169 lines (131 loc) · 5.91 KB
/
PendingTasksRunner.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import Foundation
import Semaphore
public extension Wendy {
var currentlyRunningTask: PendingTask? {
PendingTasksRunner.DataStore.shared.getDataSnapshot().currentlyRunningTask
}
}
/*
Requirements of the task runner:
1. when you call `Wendy.runTask()` it returns instantly to you. It does *not* block the calling thread. It's like it schedules a job to run in the future.
2. X number of threads all running the task runner's `runTask()` function can only run one at a time. (so, run runTask() in a sync way)
3. The task runner's `runTask()` runs in a sync way (while the `PendingTask.runTask()` is async) where it does not return a result until the task has failed or succeeded. So, work in a blocking way.
*/
// sourcery: InjectRegister = "PendingTasksRunner"
public final class PendingTasksRunner: Sendable, Singleton {
public static let shared = PendingTasksRunner()
private var pendingTasksManager: PendingTasksManager { inject.pendingTasksManager }
private var taskRunner: WendyTaskRunner? { inject.taskRunner }
private let runAllTasksLock = RunAllTasksLock()
private let runTaskSemaphore = AsyncSemaphore(value: 1)
public func reset() {}
/**
Runs 1 task and returns the result.
Guarantees:
* Will only run 1 task at a time. Your request may need to wait in order for it to begin running. The function call will return after the wait period + time it took to run the task.
* Runs the request in a background thread.
*/
func runTask(taskId: Double) async -> TaskRunResult {
guard let taskRunner else {
return .cancelled
}
await runTaskSemaphore.wait()
// Allow the runner to be cancelled between executing tasks.
if Task.isCancelled {
runTaskSemaphore.signal()
return .cancelled
}
guard let taskToRun = pendingTasksManager.getTaskByTaskId(taskId) else {
runTaskSemaphore.signal()
return .cancelled
}
if !PendingTasksUtil.isTaskValid(taskId: taskId) {
LogUtil.d("Task: \(taskToRun.describe()) is cancelled. Deleting the task.")
pendingTasksManager.delete(taskId: taskId)
LogUtil.logTaskComplete(taskToRun, successful: true, cancelled: true)
runTaskSemaphore.signal()
return .cancelled
}
dataStore.updateDataBlock { data in
data.currentlyRunningTask = taskToRun
}
LogUtil.logTaskRunning(taskToRun)
LogUtil.d("Running task: \(taskToRun.describe())")
do {
try await taskRunner.runTask(tag: taskToRun.tag, data: taskToRun.data)
dataStore.updateDataBlock { data in
data.currentlyRunningTask = nil
}
LogUtil.d("Task: \(taskToRun.describe()) ran successful.")
LogUtil.d("Deleting task: \(taskToRun.describe()).")
pendingTasksManager.delete(taskId: taskId)
LogUtil.logTaskComplete(taskToRun, successful: true, cancelled: false)
runTaskSemaphore.signal()
return .successful
} catch {
dataStore.updateDataBlock { data in
data.currentlyRunningTask = nil
}
LogUtil.d("Task: \(taskToRun.describe()) failed but will reschedule it. Skipping it.")
LogUtil.logTaskComplete(taskToRun, successful: false, cancelled: false)
runTaskSemaphore.signal()
return .failure(error: error)
}
}
/**
Runs all tasks, allowing a filter to skip running some tasks.
Guarantees:
* Runs on a background thread.
* Runs all tasks in a serial way, in order.
* Only 1 runner is running at a time. If a runner is already running, this request will be ignored and returned instantly.
*/
func runAllTasks(filter: RunAllTasksFilter?) async -> PendingTasksRunnerResult {
if await runAllTasksLock.requestToRunAllTasks() == false {
return .new() // return a result that says that 0 tasks were executed. Which, is true.
}
var nextTaskToRun: PendingTask?
var runAllTasksResult = PendingTasksRunnerResult.new()
var lastSuccessfulOrFailedTaskId: Double = 0
var failedTasksGroups: [String] = []
repeat {
nextTaskToRun = pendingTasksManager.getNextTaskToRun(lastSuccessfulOrFailedTaskId, filter: filter)
guard let nextTaskToRun else {
break
}
lastSuccessfulOrFailedTaskId = nextTaskToRun.taskId!
if nextTaskToRun.groupId != nil, failedTasksGroups.contains(nextTaskToRun.groupId!) {
LogUtil.logTaskSkipped(nextTaskToRun, reason: ReasonPendingTaskSkipped.partOfFailedGroup)
LogUtil.d("Task: \(nextTaskToRun.describe()) belongs to a failing group of tasks. Skipping it.")
continue
}
let jobRunResult = await runTask(taskId: nextTaskToRun.taskId!)
runAllTasksResult = runAllTasksResult.addResult(jobRunResult)
if case .failure = jobRunResult, let taskGroupId = nextTaskToRun.groupId {
failedTasksGroups.append(taskGroupId)
}
} while nextTaskToRun != nil
LogUtil.d("All done running tasks.")
LogUtil.logAllTasksComplete()
await runAllTasksLock.unlock()
return runAllTasksResult
}
actor RunAllTasksLock {
private var isRunningAllTasks = false
func requestToRunAllTasks() async -> Bool {
if isRunningAllTasks {
return false
}
isRunningAllTasks = true
return true
}
func unlock() {
isRunningAllTasks = false
}
}
public struct Data: AutoResettable {
var currentlyRunningTask: PendingTask?
}
final class DataStore: InMemoryDataStore<Data>, Singleton {
static let shared: DataStore = .init(data: .init())
}
}