-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathserver_utils.py
271 lines (233 loc) · 11.7 KB
/
server_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import os
import re
import signal
from pathlib import Path
from json import dumps, loads
from urllib.parse import urljoin
from typing import Dict, List, Union
from time import time
from fastapi import HTTPException, status, UploadFile
from fastapi.responses import FileResponse
from httpx import AsyncClient, Timeout
from logging import Logger
from requests import get as requests_get
from requests_unixsocket import sys
from ocrd.resolver import Resolver
from ocrd.task_sequence import ProcessorTask
from ocrd.workspace import Workspace
from ocrd_validators import ParameterValidator
from .database import (
db_create_workspace,
db_get_processing_job,
db_get_workflow_job,
db_get_workflow_script,
db_get_workspace
)
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput
from .rabbitmq_utils import OcrdProcessingMessage
from .utils import (
calculate_processing_request_timeout,
expand_page_ids,
generate_created_time,
generate_workflow_content,
get_ocrd_workspace_physical_pages
)
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage:
try:
processing_message = OcrdProcessingMessage(
job_id=job.job_id,
processor_name=job.processor_name,
created_time=generate_created_time(),
path_to_mets=job.path_to_mets,
workspace_id=job.workspace_id,
input_file_grps=job.input_file_grps,
output_file_grps=job.output_file_grps,
page_id=job.page_id,
parameters=job.parameters,
result_queue_name=job.result_queue_name,
callback_url=job.callback_url,
internal_callback_url=job.internal_callback_url
)
return processing_message
except ValueError as error:
message = f"Failed to create OcrdProcessingMessage from DBProcessorJob"
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace:
try:
# Core cannot create workspaces by API, but the Processing Server needs
# the workspace in the database. The workspace is created if the path is
# available locally and not existing in the database - since it has not
# been uploaded through the Workspace Server.
db_workspace = await db_create_workspace(mets_path)
return db_workspace
except FileNotFoundError as error:
message = f"Mets file path not existing: {mets_path}"
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob:
try:
workflow_job = await db_get_workflow_job(workflow_job_id)
return workflow_job
except ValueError as error:
message = f"Workflow job with id '{workflow_job_id}' not found in the DB."
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
async def get_from_database_workspace(
logger: Logger,
workspace_id: str = None,
workspace_mets_path: str = None
) -> DBWorkspace:
try:
db_workspace = await db_get_workspace(workspace_id, workspace_mets_path)
return db_workspace
except ValueError as error:
message = f"Workspace with id '{workspace_id}' not found in the DB."
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]:
try:
if page_id:
page_range = expand_page_ids(page_id)
else:
# If no page_id is specified, all physical pages are assigned as page range
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
return page_range
except Exception as error:
message = f"Failed to determine page range for mets path: {mets_path}"
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput:
""" Return processing job-information from the database
"""
try:
job = await db_get_processing_job(job_id)
return job.to_job_output()
except ValueError as error:
message = f"Processing job with id '{job_id}' not existing."
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse:
db_job = await _get_processor_job(logger, job_id)
log_file_path = Path(db_job.log_file_path)
return FileResponse(path=log_file_path, filename=log_file_path.name)
def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict:
# Request the ocrd tool json from the Processor Server
try:
response = requests_get(
urljoin(base=processor_server_base_url, url="info"),
headers={"Content-Type": "application/json"}
)
except Exception as error:
message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}"
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
if response.status_code != 200:
message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}"
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
return response.json()
async def forward_job_to_processor_server(
logger: Logger, job_input: PYJobInput, processor_server_base_url: str
) -> PYJobOutput:
try:
json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True))
except Exception as error:
message = f"Failed to json dump the PYJobInput: {job_input}"
raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error)
# TODO: The amount of pages should come as a request input
# TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161
# currently, use 200 as a default
request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0)
# Post a processing job to the Processor Server asynchronously
async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client:
response = await client.post(
urljoin(base=processor_server_base_url, url="run"),
headers={"Content-Type": "application/json"},
json=loads(json_data)
)
if response.status_code != 202:
message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}"
raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
job_output = response.json()
return job_output
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, None]) -> str:
if not workflow and not workflow_id:
message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing."
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
if workflow_id:
try:
db_workflow = await db_get_workflow_script(workflow_id)
return db_workflow.content
except ValueError as error:
message = f"Workflow with id '{workflow_id}' not found"
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
return await generate_workflow_content(workflow)
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str:
if job_input.workspace_id:
db_workspace = await get_from_database_workspace(logger, job_input.workspace_id)
return db_workspace.workspace_mets_path
return job_input.path_to_mets
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]:
try:
tasks_list = workflow_content.splitlines()
return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
except ValueError as error:
message = f"Failed parsing processing tasks from a workflow."
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None:
if error:
message = f"{message} {error}"
logger.exception(f"{message}")
raise HTTPException(status_code=status_code, detail=message)
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None:
# logger.warning(f"Job input: {job_input}")
if bool(job_input.path_to_mets) == bool(job_input.workspace_id):
message = (
"Wrong processing job input format. "
"Either 'path_to_mets' or 'workspace_id' must be provided. "
"Both are provided or both are missing."
)
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
if not ocrd_tool:
message = f"Failed parsing processing tasks from a workflow."
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
try:
report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters))
except Exception as error:
message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}"
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error)
if report and not report.is_valid:
message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n"
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}")
def validate_workflow(logger: Logger, workflow: str) -> None:
"""
Check whether workflow is not empty and parseable to a lists of ProcessorTask
"""
if not workflow.strip():
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.")
try:
tasks_list = workflow.splitlines()
[ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
except ValueError as error:
message = "Provided workflow script is invalid, failed to parse ProcessorTasks."
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]):
# Validate the input file groups of the first task in the workflow
available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups
for group in input_file_grps:
if group not in available_groups:
message = f"Input file group '{group}' of the first processor not found: {input_file_grps}"
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
def kill_mets_server_zombies(minutes_ago=60) -> list[int]:
now = time()
cmdline_pat = r'.*ocrd workspace -U.*server start $'
ret = []
for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime):
if not procdir.is_dir():
continue
cmdline_file = procdir.joinpath('cmdline')
if not cmdline_file.is_file():
continue
ctime_ago = int((now - procdir.stat().st_ctime) / 60)
if ctime_ago < minutes_ago:
continue
cmdline = cmdline_file.read_text().replace('\x00', ' ')
if re.match(cmdline_pat, cmdline):
pid = procdir.name
ret.append(pid)
print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr)
os.kill(int(pid), signal.SIGTERM)
return ret