25
25
26
26
from collections .abc import Iterable
27
27
from pathlib import Path
28
+ from typing import Any
28
29
29
30
import sqlalchemy
30
31
35
36
from .tools import job_ids_from_dep_str , parse_array_indexes
36
37
37
38
39
+ def parse_scontrol_output (output : str ) -> dict [str , Any ]:
40
+ """Parse scontrol output and return a dict similar to `sacct --json`."""
41
+ result : dict [str , Any ] = dict ()
42
+ for key_value in output .strip ().split ():
43
+ if "=" not in key_value :
44
+ continue
45
+ key , value = key_value .split ("=" , 1 )
46
+ result [key ] = value
47
+ # make results similar to sacct --json
48
+ result ["state" ] = {"current" : [result ["JobState" ]], "reason" : result ["Reason" ]}
49
+ result ["derived_exit_code" ] = {
50
+ "return_code" : {"number" : result ["ExitCode" ].split (":" )[0 ]}
51
+ }
52
+ result ["nodes" ] = result ["NodeList" ]
53
+ if result ["nodes" ] == "(null)" :
54
+ result ["nodes" ] = "None assigned"
55
+ return result
56
+
57
+
58
+ def job_status_from_scontrol (job_id : int ) -> dict :
59
+ """Retrieve the status of a job using scontrol."""
60
+ try :
61
+ # we don't use --json because it is not supported by older versions of scontrol
62
+ output = subprocess .check_output (
63
+ ["scontrol" , "show" , "job" , str (job_id )], text = True
64
+ )
65
+ except subprocess .CalledProcessError :
66
+ return dict ()
67
+ return parse_scontrol_output (output )
68
+
69
+
38
70
def update_job_statuses (grid_ids : Iterable [int ]) -> dict [int , dict ]:
39
71
"""Retrieve the status of the jobs in the database."""
40
72
status = dict ()
@@ -44,6 +76,10 @@ def update_job_statuses(grid_ids: Iterable[int]) -> dict[int, dict]:
44
76
text = True ,
45
77
)
46
78
except subprocess .CalledProcessError :
79
+ for job_id in grid_ids :
80
+ job_status = job_status_from_scontrol (job_id )
81
+ if job_status :
82
+ status [job_id ] = job_status
47
83
return status
48
84
for job in json .loads (output )["jobs" ]:
49
85
status [job ["job_id" ]] = job
0 commit comments