|
12 | 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 | 13 | # See the License for the specific language governing permissions and
|
14 | 14 | # limitations under the License.
|
15 |
| -''' Worker main loop. Pulls jobs from the DB and runs them in the sandbox |
| 15 | +''' Worker main loop. |
16 | 16 |
|
17 |
| -It also handles timeouts and graceful container termination. |
| 17 | +Runs a github action ephemeral runner in a nested sandboxed docker container. |
| 18 | +It also handles graceful container termination. |
18 | 19 | '''
|
19 | 20 |
|
20 | 21 | import logging
|
|
26 | 27 | import time
|
27 | 28 | import traceback
|
28 | 29 |
|
29 |
| -from config import DB, JOB_TIMEOUT_SEC |
30 |
| -from common_utils import req, utc_now_iso, init_logging |
31 |
| -from common_utils import ConcurrentModificationError, SCOPES |
| 30 | +from config import DB, SANDBOX_IMG, GITHUB_REPO, SANDBOX_SVC_ACCOUNT |
32 | 31 |
|
33 |
| -CUR_DIR = os.path.dirname(__file__) |
34 |
| -SCOPES.append('https://www.googleapis.com/auth/firebase.database') |
35 |
| -SCOPES.append('https://www.googleapis.com/auth/userinfo.email') |
36 |
| -WORKER_NAME = '%s-%s' % (os.getenv( |
37 |
| - 'WORKER_HOST', 'local').split('-')[-1], socket.gethostname()) |
38 |
| -sigterm = threading.Event() |
39 |
| - |
40 |
| - |
41 |
| -def try_acquire_job(job_id): |
42 |
| - ''' Transactionally acquire the given job. |
43 |
| -
|
44 |
| - Returns the job JSON object if it managed to acquire and put it into the |
45 |
| - STARTED state, None if another worker got there first. |
46 |
| - ''' |
47 |
| - logging.debug('Trying to acquire job %s', job_id) |
| 32 | +from get_github_token import get_github_token |
48 | 33 |
|
49 |
| - uri = '%s/jobs/%s.json' % (DB, job_id) |
50 |
| - job, etag = req('GET', uri, req_etag=True) |
51 |
| - if job['status'] != 'QUEUED': |
52 |
| - return None # Somebody else took it or the job is CANCELLED/INTERRUPTED |
53 |
| - try: |
54 |
| - job['status'] = 'STARTED' |
55 |
| - job['time_started'] = utc_now_iso() |
56 |
| - job['worker'] = WORKER_NAME |
57 |
| - req('PUT', uri, body=job, etag=etag) |
58 |
| - return job |
59 |
| - except ConcurrentModificationError: |
60 |
| - return None |
| 34 | +CUR_DIR = os.path.dirname(__file__) |
61 | 35 |
|
| 36 | +# The container name will be deadb33f-sandbox-N. |
| 37 | +SANDBOX_NAME = socket.gethostname().replace('-worker-', '-sandbox-') |
62 | 38 |
|
63 |
| -def make_worker_obj(status, job_id=None): |
64 |
| - return { |
65 |
| - 'job_id': job_id, |
66 |
| - 'status': status, |
67 |
| - 'last_update': utc_now_iso(), |
68 |
| - 'host': os.getenv('WORKER_HOST', '') |
69 |
| - } |
| 39 | +sigterm = threading.Event() |
70 | 40 |
|
71 | 41 |
|
72 | 42 | def worker_loop():
|
73 |
| - ''' Pulls a job from the queue and runs it invoking run_job.py ''' |
74 |
| - uri = '%s/jobs_queued.json?orderBy="$key"&limitToLast=100' % DB |
75 |
| - jobs = req('GET', uri) |
76 |
| - if not jobs: |
77 |
| - return |
78 |
| - |
79 |
| - # Work out the worker number from the hostname. We try to distribute the load |
80 |
| - # (via the time.sleep below) so that we fill first all the worker-1 of each |
81 |
| - # vm, then worker-2 and so on. This is designed so that if there is only one |
82 |
| - # CL (hence N jobs) in the queue, each VM gets only one job, maximizing the |
83 |
| - # cpu efficiency of each VM. |
84 |
| - try: |
85 |
| - worker_num = int(socket.gethostname().split('-')[-1]) |
86 |
| - except ValueError: |
87 |
| - worker_num = 1 |
88 |
| - |
89 |
| - # Transactionally acquire a job. Deal with races (two workers trying to |
90 |
| - # acquire the same job). |
91 |
| - job = None |
92 |
| - job_id = None |
93 |
| - for job_id in sorted(jobs.keys(), reverse=True): |
94 |
| - job = try_acquire_job(job_id) |
95 |
| - if job is not None: |
96 |
| - break |
97 |
| - time.sleep(worker_num) |
98 |
| - if job is None: |
99 |
| - logging.error('Failed to acquire a job') |
100 |
| - return |
101 |
| - |
102 |
| - logging.info('Starting job %s', job_id) |
103 |
| - |
104 |
| - # Update the db, move the job to the running queue. |
105 |
| - patch_obj = { |
106 |
| - 'jobs_queued/' + job_id: {}, # = DELETE |
107 |
| - 'jobs_running/' + job_id: { |
108 |
| - 'worker': WORKER_NAME |
109 |
| - }, |
110 |
| - 'workers/' + WORKER_NAME: make_worker_obj('RUNNING', job_id=job_id) |
111 |
| - } |
112 |
| - req('PATCH', '%s.json' % DB, body=patch_obj) |
113 |
| - |
114 |
| - cmd = [os.path.join(CUR_DIR, 'run_job.py'), job_id] |
115 |
| - |
116 |
| - # Propagate the worker's PERFETTO_ vars and merge with the job-specific vars. |
117 |
| - env = dict(os.environ, **{k: str(v) for (k, v) in job['env'].items()}) |
118 |
| - job_runner = subprocess.Popen(cmd, env=env) |
119 |
| - |
120 |
| - # Run the job in a python subprocess, to isolate the main loop from logs |
121 |
| - # uploader failures. |
122 |
| - res = None |
123 |
| - cancelled = False |
124 |
| - timed_out = False |
125 |
| - time_started = time.time() |
126 |
| - time_last_db_poll = time_started |
127 |
| - polled_status = 'STARTED' |
128 |
| - while res is None: |
129 |
| - time.sleep(0.25) |
130 |
| - res = job_runner.poll() |
131 |
| - now = time.time() |
132 |
| - if now - time_last_db_poll > 10: # Throttle DB polling. |
133 |
| - polled_status = req('GET', '%s/jobs/%s/status.json' % (DB, job_id)) |
134 |
| - time_last_db_poll = now |
135 |
| - if now - time_started > JOB_TIMEOUT_SEC: |
136 |
| - logging.info('Job %s timed out, terminating', job_id) |
137 |
| - timed_out = True |
138 |
| - job_runner.terminate() |
139 |
| - if (sigterm.is_set() or polled_status != 'STARTED') and not cancelled: |
140 |
| - logging.info('Job %s cancelled, terminating', job_id) |
141 |
| - cancelled = True |
142 |
| - job_runner.terminate() |
143 |
| - |
144 |
| - status = ('INTERRUPTED' if sigterm.is_set() else 'CANCELLED' if cancelled else |
145 |
| - 'TIMED_OUT' if timed_out else 'COMPLETED' if res == 0 else 'FAILED') |
146 |
| - logging.info('Job %s %s with code %s', job_id, status, res) |
147 |
| - |
148 |
| - # Update the DB, unless the job has been cancelled. The "is not None" |
149 |
| - # condition deals with a very niche case, that is, avoid creating a partial |
150 |
| - # job entry after doing a full clear of the DB (which is super rare, happens |
151 |
| - # only when re-deploying the CI). |
152 |
| - if polled_status is not None: |
153 |
| - patch = { |
154 |
| - 'jobs/%s/status' % job_id: status, |
155 |
| - 'jobs/%s/exit_code' % job_id: {} if res is None else res, |
156 |
| - 'jobs/%s/time_ended' % job_id: utc_now_iso(), |
157 |
| - 'jobs_running/%s' % job_id: {}, # = DELETE |
158 |
| - } |
159 |
| - req('PATCH', '%s.json' % (DB), body=patch) |
| 43 | + # Remove stale jobs, if any. |
| 44 | + subprocess.call(['sudo', 'docker', 'rm', '-f', SANDBOX_NAME]) |
| 45 | + |
| 46 | + # Impersonate the sandbox service account. This creates a temporary downgraded |
| 47 | + # service account that we pass to the sandbox. The sandbox service account |
| 48 | + # is allowed only storage object creation for untrusted CI artifacts. |
| 49 | + # sandbox_svc_token = subprocess.check_output([ |
| 50 | + # 'gcloud', 'auth', 'application-default', |
| 51 | + # 'print-access-token' |
| 52 | + # '--impersonate-service-account=%s' % SANDBOX_SVC_ACCOUNT, |
| 53 | + # ]) |
| 54 | + |
| 55 | + # Run the nested docker container that will execute the ephemeral GitHub |
| 56 | + # action runner in the sandbox image. |
| 57 | + cmd = [ |
| 58 | + 'sudo', 'docker', 'run', '--rm', '--name', SANDBOX_NAME, '--hostname', |
| 59 | + SANDBOX_NAME, '--cap-add', 'SYS_PTRACE', '--tmpfs', '/tmp:exec' |
| 60 | + ] |
| 61 | + |
| 62 | + # Obtain the (short-lived) token to register the Github Action Runner and |
| 63 | + # pass it to the sandbox. |
| 64 | + github_token = get_github_token() |
| 65 | + cmd += ['--env', 'GITHUB_TOKEN=%s' % github_token] |
| 66 | + cmd += ['--env', 'GITHUB_REPO=%s' % GITHUB_REPO] |
| 67 | + # cmd += ['--env', 'SANDBOX_SVC_TOKEN=%s' % sandbox_svc_token] |
| 68 | + |
| 69 | + # Propagate PERFETTO_ environment variables |
| 70 | + for kv in [kv for kv in os.environ.items() if kv[0].startswith('PERFETTO_')]: |
| 71 | + cmd += ['--env', '%s=%s' % kv] |
| 72 | + |
| 73 | + # We use the tmpfs mount created by gce-startup-script.sh, if present. The |
| 74 | + # problem is that Docker doesn't allow to both override the tmpfs-size and |
| 75 | + # prevent the "-o noexec". In turn the default tmpfs-size depends on the host |
| 76 | + # phisical memory size. |
| 77 | + if os.getenv('SANDBOX_TMP'): |
| 78 | + cmd += ['-v', '%s:/ci/ramdisk' % os.getenv('SANDBOX_TMP')] |
| 79 | + else: |
| 80 | + cmd += ['--tmpfs', '/ci/ramdisk:exec'] |
| 81 | + |
| 82 | + # Rationale for the conditional branches below: when running in the real GCE |
| 83 | + # environment, the gce-startup-script.sh mounts these directories in the right |
| 84 | + # locations, so that they are shared between all workers. |
| 85 | + # When running the worker container outside of GCE (i.e.for local testing) we |
| 86 | + # leave these empty. The VOLUME directive in the dockerfile will cause docker |
| 87 | + # to automatically mount a scratch volume for those. |
| 88 | + # This is so that the CI containers can be tested without having to do the |
| 89 | + # work that gce-startup-script.sh does. |
| 90 | + if os.getenv('SHARED_WORKER_CACHE'): |
| 91 | + cmd += ['--volume=%s:/ci/cache' % os.getenv('SHARED_WORKER_CACHE')] |
| 92 | + |
| 93 | + artifacts_dir = None |
| 94 | + if os.getenv('ARTIFACTS_DIR'): |
| 95 | + artifacts_dir = os.path.join(os.getenv('ARTIFACTS_DIR'), SANDBOX_NAME) |
| 96 | + subprocess.call(['sudo', 'rm', '-rf', artifacts_dir]) |
| 97 | + os.mkdir(artifacts_dir) |
| 98 | + cmd += ['--volume=%s:/ci/artifacts' % artifacts_dir] |
| 99 | + |
| 100 | + cmd += os.getenv('SANDBOX_NETWORK_ARGS', '').split() |
| 101 | + cmd += [SANDBOX_IMG] |
| 102 | + |
| 103 | + # This spawns the sandbox that runs one ephemeral GitHub Action job and |
| 104 | + # terminates when done. |
| 105 | + subprocess.call(cmd) |
| 106 | + |
| 107 | + if artifacts_dir: |
| 108 | + artifacts_uploader = os.path.join(CUR_DIR, 'artifacts_uploader.py') |
| 109 | + cmd = ['setsid', artifacts_uploader, '--dir=' + artifacts_dir, '--rm'] |
| 110 | + subprocess.call(cmd) |
160 | 111 |
|
161 | 112 |
|
162 | 113 | def sig_handler(_, __):
|
163 | 114 | logging.warning('Interrupted by signal, exiting worker')
|
| 115 | + subprocess.call(['sudo', 'docker', 'kill', SANDBOX_NAME]) |
164 | 116 | sigterm.set()
|
165 | 117 |
|
166 | 118 |
|
167 | 119 | def main():
|
168 |
| - init_logging() |
| 120 | + logging.basicConfig( |
| 121 | + format='%(levelname)-8s %(asctime)s %(message)s', |
| 122 | + level=logging.DEBUG if os.getenv('VERBOSE') else logging.INFO, |
| 123 | + datefmt=r'%Y-%m-%d %H:%M:%S') |
169 | 124 | logging.info('Worker started')
|
170 | 125 | signal.signal(signal.SIGTERM, sig_handler)
|
171 | 126 | signal.signal(signal.SIGINT, sig_handler)
|
172 | 127 |
|
173 | 128 | while not sigterm.is_set():
|
174 |
| - logging.debug('Starting poll cycle') |
175 | 129 | try:
|
176 | 130 | worker_loop()
|
177 |
| - req('PUT', |
178 |
| - '%s/workers/%s.json' % (DB, WORKER_NAME), |
179 |
| - body=make_worker_obj('IDLE')) |
180 | 131 | except:
|
181 | 132 | logging.error('Exception in worker loop:\n%s', traceback.format_exc())
|
182 | 133 | if sigterm.is_set():
|
183 | 134 | break
|
184 |
| - |
185 |
| - # Synchronize sleeping with the wall clock. This is so all VMs wake up at |
186 |
| - # the same time. See comment on distributing load above in this file. |
187 |
| - poll_time_sec = 5 |
188 |
| - time.sleep(poll_time_sec - (time.time() % poll_time_sec)) |
189 |
| - |
190 |
| - # The use case here is the VM being terminated by the GCE infrastructure. |
191 |
| - # We mark the worker as terminated and the job as cancelled so we don't wait |
192 |
| - # forever for it. |
193 |
| - logging.warning('Exiting the worker loop, got signal: %s', sigterm.is_set()) |
194 |
| - req('PUT', |
195 |
| - '%s/workers/%s.json' % (DB, WORKER_NAME), |
196 |
| - body=make_worker_obj('TERMINATED')) |
| 135 | + time.sleep(1) |
197 | 136 |
|
198 | 137 |
|
199 | 138 | if __name__ == '__main__':
|
|
0 commit comments