From f44e28b13328f8060f921a9686ebd47aef49cb1e Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 1 Oct 2024 15:32:47 +0200 Subject: [PATCH 01/21] introduce: OCRD_NETWORK_CLIENT_POLLING_PRINT --- src/ocrd_network/client.py | 10 +++++++--- src/ocrd_network/client_utils.py | 14 +++++++++----- src/ocrd_utils/config.py | 7 ++++++- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 8ec8e541ea..c45aa3ecf3 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -19,7 +19,8 @@ def __init__( self, server_addr_processing: Optional[str], timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT, - wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP + wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP, + print_output: bool = config.OCRD_NETWORK_CLIENT_POLLING_PRINT ): self.log = getLogger(f"ocrd_network.client") if not server_addr_processing: @@ -29,6 +30,7 @@ def __init__( self.polling_timeout = timeout self.polling_wait = wait self.polling_tries = int(timeout / wait) + self.polling_print_output = print_output def check_deployed_processors(self): return get_ps_deployed_processors(ps_server_host=self.server_addr_processing) @@ -48,11 +50,13 @@ def check_workflow_status(self, workflow_job_id: str): def poll_job_status(self, job_id: str) -> str: return poll_job_status_till_timeout_fail_or_success( - ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, + print_output=self.polling_print_output) def poll_workflow_status(self, job_id: str) -> str: return poll_wf_status_till_timeout_fail_or_success( - ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, + print_output=self.polling_print_output) def send_processing_job_request(self, processor_name: str, req_params: dict) -> str: return post_ps_processing_request( diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 9b924c16a4..3ebe8d3b87 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -3,7 +3,7 @@ from .constants import JobState, NETWORK_PROTOCOLS -def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int): +def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_output: bool): if job_type not in ["workflow", "processor"]: raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'") job_state = JobState.unset @@ -13,18 +13,22 @@ def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries job_state = get_ps_processing_job_status(ps_server_host, job_id) if job_type == "workflow": job_state = get_ps_workflow_job_status(ps_server_host, job_id) + if print_output: + print(f"State of the {job_type} job {job_id}: {job_state}") if job_state == JobState.success or job_state == JobState.failed: break tries -= 1 return job_state -def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: - return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait) +def poll_job_status_till_timeout_fail_or_success( + ps_server_host: str, job_id: str, tries: int, wait: int, print_output: bool) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait, print_output) -def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: - return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait) +def poll_wf_status_till_timeout_fail_or_success( + ps_server_host: str, job_id: str, tries: int, wait: int, print_output: bool) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait, print_output) def get_ps_deployed_processors(ps_server_host: str): diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index 4182456435..ab058c7830 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -160,13 +160,18 @@ def _ocrd_download_timeout_parser(val): config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP", description="How many seconds to sleep before trying again.", parser=int, - default=(True, 30)) + default=(True, 10)) config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", description="Timeout for a blocking ocrd network client (in seconds).", parser=int, default=(True, 3600)) +config.add("OCRD_NETWORK_CLIENT_POLLING_PRINT", + description="Timeout for a blocking ocrd network client (in seconds).", + parser=bool, + default=(True, False)) + config.add("OCRD_NETWORK_SERVER_ADDR_WORKFLOW", description="Default address of Workflow Server to connect to (for `ocrd network client workflow`).", default=(True, '')) From 7177eb147f6234417e20dbeeba7c0f707375cd02 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 1 Oct 2024 15:35:50 +0200 Subject: [PATCH 02/21] fix: config value description --- src/ocrd_utils/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index ab058c7830..03d654bc74 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -168,7 +168,7 @@ def _ocrd_download_timeout_parser(val): default=(True, 3600)) config.add("OCRD_NETWORK_CLIENT_POLLING_PRINT", - description="Timeout for a blocking ocrd network client (in seconds).", + description="Whether the blocking client commands should print status output each iteration.", parser=bool, default=(True, False)) From df8e8eede7548f74f195b884559a73b600de2f4a Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 1 Oct 2024 15:41:53 +0200 Subject: [PATCH 03/21] add default value param to preserver backwards compatibility --- src/ocrd_network/client_utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 3ebe8d3b87..d3534b4b3f 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -3,7 +3,8 @@ from .constants import JobState, NETWORK_PROTOCOLS -def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_output: bool): +def _poll_endpoint_status( + ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_output: bool = False): if job_type not in ["workflow", "processor"]: raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'") job_state = JobState.unset @@ -22,12 +23,12 @@ def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries def poll_job_status_till_timeout_fail_or_success( - ps_server_host: str, job_id: str, tries: int, wait: int, print_output: bool) -> JobState: + ps_server_host: str, job_id: str, tries: int, wait: int, print_output: bool = False) -> JobState: return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait, print_output) def poll_wf_status_till_timeout_fail_or_success( - ps_server_host: str, job_id: str, tries: int, wait: int, print_output: bool) -> JobState: + ps_server_host: str, job_id: str, tries: int, wait: int, print_output: bool = False) -> JobState: return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait, print_output) From b183cfcb007d627399b3a18e527c8a3ed298010d Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 1 Oct 2024 15:56:25 +0200 Subject: [PATCH 04/21] make -b/--block as flags --- src/ocrd_network/cli/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 9c7f15c88f..39ef62c5fe 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -104,7 +104,7 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str): @click.option('--result-queue-name') @click.option('--callback-url') @click.option('--agent-type', default='worker') -@click.option('-b', '--block', default=False, +@click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') def send_processing_job_request( address: Optional[str], @@ -176,7 +176,7 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str): 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--path-to-mets', required=True) @click.option('-w', '--path-to-workflow', required=True) -@click.option('-b', '--block', default=False, +@click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') def send_workflow_job_request( address: Optional[str], From 342ef3a78f3620ff3e63200b2a9bc4c11639c581 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 1 Oct 2024 16:00:12 +0200 Subject: [PATCH 05/21] implement feedback --- src/ocrd_network/cli/client.py | 8 ++++++-- src/ocrd_network/client.py | 12 +++++------- src/ocrd_network/client_utils.py | 12 ++++++------ src/ocrd_utils/config.py | 5 ----- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 39ef62c5fe..5dd7fd0f78 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -106,6 +106,8 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str): @click.option('--agent-type', default='worker') @click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') +@click.option('-p', '--print-state', default=False, is_flag=True, + help='If set, the client will print job states by each iteration.') def send_processing_job_request( address: Optional[str], processor_name: str, @@ -146,7 +148,7 @@ def send_processing_job_request( assert processing_job_id print(f"Processing job id: {processing_job_id}") if block: - client.poll_job_status(job_id=processing_job_id) + client.poll_job_status(job_id=processing_job_id, print_state=print_state) @client_cli.group('workflow') @@ -178,6 +180,8 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str): @click.option('-w', '--path-to-workflow', required=True) @click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') +@click.option('-p', '--print-state', default=False, is_flag=True, + help='If set, the client will print job states by each iteration.') def send_workflow_job_request( address: Optional[str], path_to_mets: str, @@ -192,7 +196,7 @@ def send_workflow_job_request( assert workflow_job_id print(f"Workflow job id: {workflow_job_id}") if block: - client.poll_workflow_status(job_id=workflow_job_id) + client.poll_workflow_status(job_id=workflow_job_id, print_state=print_state) @client_cli.group('workspace') diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index c45aa3ecf3..5a6831bea7 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -19,8 +19,7 @@ def __init__( self, server_addr_processing: Optional[str], timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT, - wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP, - print_output: bool = config.OCRD_NETWORK_CLIENT_POLLING_PRINT + wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP ): self.log = getLogger(f"ocrd_network.client") if not server_addr_processing: @@ -30,7 +29,6 @@ def __init__( self.polling_timeout = timeout self.polling_wait = wait self.polling_tries = int(timeout / wait) - self.polling_print_output = print_output def check_deployed_processors(self): return get_ps_deployed_processors(ps_server_host=self.server_addr_processing) @@ -48,15 +46,15 @@ def check_job_status(self, job_id: str): def check_workflow_status(self, workflow_job_id: str): return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id) - def poll_job_status(self, job_id: str) -> str: + def poll_job_status(self, job_id: str, print_state: bool) -> str: return poll_job_status_till_timeout_fail_or_success( ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, - print_output=self.polling_print_output) + print_state=print_state) - def poll_workflow_status(self, job_id: str) -> str: + def poll_workflow_status(self, job_id: str, print_state: bool) -> str: return poll_wf_status_till_timeout_fail_or_success( ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, - print_output=self.polling_print_output) + print_state=print_state) def send_processing_job_request(self, processor_name: str, req_params: dict) -> str: return post_ps_processing_request( diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index d3534b4b3f..87649d5ad4 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -4,7 +4,7 @@ def _poll_endpoint_status( - ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_output: bool = False): + ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_state: bool = False): if job_type not in ["workflow", "processor"]: raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'") job_state = JobState.unset @@ -14,7 +14,7 @@ def _poll_endpoint_status( job_state = get_ps_processing_job_status(ps_server_host, job_id) if job_type == "workflow": job_state = get_ps_workflow_job_status(ps_server_host, job_id) - if print_output: + if print_state: print(f"State of the {job_type} job {job_id}: {job_state}") if job_state == JobState.success or job_state == JobState.failed: break @@ -23,13 +23,13 @@ def _poll_endpoint_status( def poll_job_status_till_timeout_fail_or_success( - ps_server_host: str, job_id: str, tries: int, wait: int, print_output: bool = False) -> JobState: - return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait, print_output) + ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait, print_state) def poll_wf_status_till_timeout_fail_or_success( - ps_server_host: str, job_id: str, tries: int, wait: int, print_output: bool = False) -> JobState: - return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait, print_output) + ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait, print_state) def get_ps_deployed_processors(ps_server_host: str): diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index 03d654bc74..d2cc4efce1 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -167,11 +167,6 @@ def _ocrd_download_timeout_parser(val): parser=int, default=(True, 3600)) -config.add("OCRD_NETWORK_CLIENT_POLLING_PRINT", - description="Whether the blocking client commands should print status output each iteration.", - parser=bool, - default=(True, False)) - config.add("OCRD_NETWORK_SERVER_ADDR_WORKFLOW", description="Default address of Workflow Server to connect to (for `ocrd network client workflow`).", default=(True, '')) From 0e80a7cf84a5db1073ea5ba1363819ed40d16020 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 1 Oct 2024 16:02:30 +0200 Subject: [PATCH 06/21] fix: missed params --- src/ocrd_network/cli/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 5dd7fd0f78..fd28552866 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -122,7 +122,8 @@ def send_processing_job_request( # TODO: This is temporally available to toggle # between the ProcessingWorker/ProcessorServer agent_type: Optional[str], - block: Optional[bool] + block: Optional[bool], + print_state: Optional[bool] ): """ Submit a processing job to the processing server. @@ -186,7 +187,8 @@ def send_workflow_job_request( address: Optional[str], path_to_mets: str, path_to_workflow: str, - block: Optional[bool] + block: Optional[bool], + print_state: Optional[bool] ): """ Submit a workflow job to the processing server. From d7df20049fe3175e001a1feb60ec42b17ee3a2f0 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 1 Oct 2024 16:08:57 +0200 Subject: [PATCH 07/21] fix: integration client tests --- src/ocrd_network/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 5a6831bea7..c4315ded4d 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -46,12 +46,12 @@ def check_job_status(self, job_id: str): def check_workflow_status(self, workflow_job_id: str): return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id) - def poll_job_status(self, job_id: str, print_state: bool) -> str: + def poll_job_status(self, job_id: str, print_state: bool = False) -> str: return poll_job_status_till_timeout_fail_or_success( ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, print_state=print_state) - def poll_workflow_status(self, job_id: str, print_state: bool) -> str: + def poll_workflow_status(self, job_id: str, print_state: bool = False) -> str: return poll_wf_status_till_timeout_fail_or_success( ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, print_state=print_state) From 0bfef64ec694e6695f1c95a5fab343c268b25ec0 Mon Sep 17 00:00:00 2001 From: kba Date: Tue, 1 Oct 2024 16:25:43 +0200 Subject: [PATCH 08/21] post_ps_workflow_request: pagewise configurable --- src/ocrd_network/cli/client.py | 20 +++++++++++++++++--- src/ocrd_network/client_utils.py | 26 +++++++++++++++++--------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 9c7f15c88f..a57cb88b82 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -2,6 +2,7 @@ from json import dumps from typing import List, Optional, Tuple from ocrd.decorators.parameter_option import parameter_option, parameter_override_option +from ocrd_network.constants import JobState from ocrd_utils import DEFAULT_METS_BASENAME from ocrd_utils.introspect import set_json_key_value_overrides from ocrd_utils.str import parse_json_string_or_file @@ -176,23 +177,36 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str): 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--path-to-mets', required=True) @click.option('-w', '--path-to-workflow', required=True) -@click.option('-b', '--block', default=False, +@click.option('-p/-P', '--page-wise/--no-page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs") +@click.option('-b', '--block', is_flag=True, default=False, help='If set, the client will block till job timeout, fail or success.') def send_workflow_job_request( address: Optional[str], path_to_mets: str, path_to_workflow: str, + page_wise : bool, block: Optional[bool] ): """ Submit a workflow job to the processing server. """ client = Client(server_addr_processing=address) - workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets) + workflow_job_id = client.send_workflow_job_request( + path_to_wf=path_to_workflow, + path_to_mets=path_to_mets, + page_wise=page_wise, + ) assert workflow_job_id print(f"Workflow job id: {workflow_job_id}") if block: - client.poll_workflow_status(job_id=workflow_job_id) + print(f"Polling state of workflow job {workflow_job_id}") + state = client.poll_workflow_status(job_id=workflow_job_id) + if state != JobState.success: + print(f"Workflow failed with {state}") + exit(1) + else: + print(f"Workflow succeeded") + exit(0) @client_cli.group('workspace') diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 9b924c16a4..24f3da105c 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -1,9 +1,10 @@ +import json from requests import get as request_get, post as request_post from time import sleep from .constants import JobState, NETWORK_PROTOCOLS -def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int): +def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int) -> JobState: if job_type not in ["workflow", "processor"]: raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'") job_state = JobState.unset @@ -47,22 +48,21 @@ def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str): return response -def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str: +def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> JobState: request_url = f"{ps_server_host}/processor/job/{processing_job_id}" response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" job_state = response.json()["state"] assert job_state - return job_state - + return getattr(JobState, job_state.lower()) -def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str: +def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> JobState: request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}" response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" job_state = response.json()["state"] assert job_state - return job_state + return getattr(JobState, job_state.lower()) def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str: @@ -79,8 +79,13 @@ def post_ps_processing_request(ps_server_host: str, processor: str, job_input: d # TODO: Can be extended to include other parameters such as page_wise -def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str: - request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True" +def post_ps_workflow_request( + ps_server_host: str, + path_to_wf: str, + path_to_mets: str, + page_wise : bool, +) -> str: + request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise={'True' if page_wise else 'False'}" response = request_post( url=request_url, headers={"accept": "application/json; charset=utf-8"}, @@ -88,8 +93,11 @@ def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: ) # print(response.json()) # print(response.__dict__) + json_resp_raw = response.text + # print(f'post_ps_workflow_request >> {response.status_code}') + # print(f'post_ps_workflow_request >> {json_resp_raw}') assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" - wf_job_id = response.json()["job_id"] + wf_job_id = json.loads(json_resp_raw)["job_id"] assert wf_job_id return wf_job_id From 611b6b566e565873648c4a112adbb6d8bedc155d Mon Sep 17 00:00:00 2001 From: kba Date: Tue, 1 Oct 2024 18:01:30 +0200 Subject: [PATCH 09/21] deployer: Remove any pre-existing socket file before starting the server (again) --- src/ocrd_network/runtime_data/deployer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index b956904d07..7b064961c5 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -146,6 +146,11 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: if is_mets_server_running(mets_server_url=str(mets_server_url)): self.log.debug(f"The UDS mets server for {ws_dir_path} is already started: {mets_server_url}") return mets_server_url + elif Path(mets_server_url).is_socket(): + self.log.warning( + f"The UDS mets server for {ws_dir_path} is not running but the socket file exists: {mets_server_url}." + "Removing to avoid any weird behavior before starting the server.") + Path(mets_server_url).unlink() self.log.info(f"Starting UDS mets server: {mets_server_url}") pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file) self.mets_servers[mets_server_url] = pid From 9a71d048dd8ddc1dceba3fa24d34af719690eaf5 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 2 Oct 2024 10:11:07 +0200 Subject: [PATCH 10/21] remove UDS socket files --- src/ocrd/mets_server.py | 2 +- src/ocrd_network/runtime_data/deployer.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index c85368e305..a8f766289c 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -434,7 +434,7 @@ def kill_process(mets_server_pid: int): def shutdown(self): if self.is_uds: if Path(self.url).exists(): - self.log.debug(f'UDS socket {self.url} still exists, removing it') + self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}") Path(self.url).unlink() # os._exit because uvicorn catches SystemExit raised by sys.exit _exit(0) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 7b064961c5..90f7c6d5c7 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -165,6 +165,9 @@ def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False raise Exception(message) mets_server_pid = self.mets_servers[Path(mets_server_url)] OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid) + if Path(mets_server_url).exists(): + self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}") + Path(mets_server_url).unlink() return # TODO: Reconsider this again # Not having this sleep here causes connection errors From 854403de6ea880c31b82463bba3850c07565327d Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 2 Oct 2024 10:38:07 +0200 Subject: [PATCH 11/21] remove shortcuts for page-wise --- src/ocrd_network/cli/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 6733f893aa..450cce43fb 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -180,7 +180,7 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str): 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--path-to-mets', required=True) @click.option('-w', '--path-to-workflow', required=True) -@click.option('-p/-P', '--page-wise/--no-page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs") +@click.option('--page-wise/--no-page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs") @click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') @click.option('-p', '--print-state', default=False, is_flag=True, From 4d01e66229bcd63872f4fd93699aa0084792c02c Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 2 Oct 2024 10:40:19 +0200 Subject: [PATCH 12/21] fix: pass page-wise argument to relevant methods --- src/ocrd_network/cli/client.py | 2 +- src/ocrd_network/client.py | 5 +++-- src/ocrd_network/client_utils.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 450cce43fb..350cf64b90 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -189,7 +189,7 @@ def send_workflow_job_request( address: Optional[str], path_to_mets: str, path_to_workflow: str, - page_wise : bool, + page_wise: bool, block: bool, print_state: bool ): diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index c4315ded4d..1521997942 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -60,6 +60,7 @@ def send_processing_job_request(self, processor_name: str, req_params: dict) -> return post_ps_processing_request( ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params) - def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str): + def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str, page_wise: bool): return post_ps_workflow_request( - ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets) + ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets, + page_wise=page_wise) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index b23442e502..456398ecf8 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -87,7 +87,7 @@ def post_ps_workflow_request( ps_server_host: str, path_to_wf: str, path_to_mets: str, - page_wise : bool, + page_wise: bool, ) -> str: request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise={'True' if page_wise else 'False'}" response = request_post( From 97427e07326bddc0ff83e4d1ed5eba4cb6631829 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 2 Oct 2024 10:42:00 +0200 Subject: [PATCH 13/21] Update src/ocrd_network/client_utils.py Co-authored-by: Konstantin Baierer --- src/ocrd_network/client_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 456398ecf8..51db2681a6 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -82,7 +82,6 @@ def post_ps_processing_request(ps_server_host: str, processor: str, job_input: d return processing_job_id -# TODO: Can be extended to include other parameters such as page_wise def post_ps_workflow_request( ps_server_host: str, path_to_wf: str, From 745484588ab9c77481397a9daaabee086f7790ee Mon Sep 17 00:00:00 2001 From: kba Date: Wed, 2 Oct 2024 14:07:19 +0200 Subject: [PATCH 14/21] add endpoint DELETE /workflow/kill-mets-server-zombies to kill -SIGTERM METS servers with ctime > 60mins ago --- src/ocrd/mets_server.py | 5 ++-- src/ocrd_network/processing_server.py | 12 ++++++++++ src/ocrd_network/server_utils.py | 33 +++++++++++++++++++++++---- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index c85368e305..c46a99a2d8 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -1,8 +1,10 @@ """ # METS server functionality """ +import os import re from os import _exit, chmod +import signal from typing import Dict, Optional, Union, List, Tuple from time import sleep from pathlib import Path @@ -428,8 +430,7 @@ def create_process(mets_server_url: str, ws_dir_path: str, log_file: str) -> int @staticmethod def kill_process(mets_server_pid: int): - subprocess_run(args=["kill", "-s", "SIGINT", f"{mets_server_pid}"], shell=False, universal_newlines=True) - return + return os.kill(mets_server_pid, signal.SIGTERM) def shutdown(self): if self.is_uds: diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 34c22e5cf6..29061c5645 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -48,6 +48,7 @@ get_workflow_content, get_from_database_workspace, get_from_database_workflow_job, + kill_mets_server_zombies, parse_workflow_tasks, raise_http_exception, request_processor_server_tool_json, @@ -314,6 +315,14 @@ def add_api_routes_workflow(self): status_code=status.HTTP_200_OK, summary="Get information about a workflow run" ) + workflow_router.add_api_route( + path="/workflow/kill-mets-server-zombies", + endpoint=self.kill_mets_server_zombies, + methods=["DELETE"], + tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], + status_code=status.HTTP_200_OK, + summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago." + ) self.include_router(workflow_router) async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict: @@ -817,6 +826,9 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: response = self._produce_workflow_status_response(processing_jobs=jobs) return response + async def kill_mets_server_zombies(self) -> None: + kill_mets_server_zombies(minutes_ago=60) + async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: """ Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 9d8628170c..1897f3a62e 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -1,12 +1,18 @@ +import os +import re +import signal +from pathlib import Path +from json import dumps, loads +from urllib.parse import urljoin +from typing import Dict, List, Union +from time import time + from fastapi import HTTPException, status, UploadFile from fastapi.responses import FileResponse from httpx import AsyncClient, Timeout -from json import dumps, loads from logging import Logger -from pathlib import Path from requests import get as requests_get -from typing import Dict, List, Union -from urllib.parse import urljoin +from requests_unixsocket import sys from ocrd.resolver import Resolver from ocrd.task_sequence import ProcessorTask @@ -241,3 +247,22 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s if group not in available_groups: message = f"Input file group '{group}' of the first processor not found: {input_file_grps}" raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) + + +def kill_mets_server_zombies(minutes_ago=60): + now = time() + cmdline_pat = r'.*ocrd workspace -U.*server start $' + for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime): + if not procdir.is_dir(): + continue + cmdline_file = procdir.joinpath('cmdline') + if not cmdline_file.is_file(): + continue + ctime_ago = int((now - procdir.stat().st_ctime) / 60) + if ctime_ago < minutes_ago: + continue + cmdline = cmdline_file.read_text().replace('\x00', ' ') + if re.match(cmdline_pat, cmdline): + pid = procdir.name + print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr) + os.kill(int(pid), signal.SIGTERM) From 0506e9d5f5edca7e7f6198ad93c0ac4a04f0061d Mon Sep 17 00:00:00 2001 From: kba Date: Wed, 2 Oct 2024 14:28:45 +0200 Subject: [PATCH 15/21] move mets-zombie killer to / and return list of killed PIDs --- src/ocrd_network/processing_server.py | 21 +++++++++++---------- src/ocrd_network/server_utils.py | 5 ++++- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 29061c5645..04305a6fbb 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -201,6 +201,14 @@ def add_api_routes_others(self): tags=[ServerApiTags.WORKSPACE], summary="Forward a TCP request to UDS mets server" ) + others_router.add_api_route( + path="/kill-mets-server-zombies", + endpoint=self.kill_mets_server_zombies, + methods=["DELETE"], + tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], + status_code=status.HTTP_200_OK, + summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago." + ) self.include_router(others_router) def add_api_routes_processing(self): @@ -315,14 +323,6 @@ def add_api_routes_workflow(self): status_code=status.HTTP_200_OK, summary="Get information about a workflow run" ) - workflow_router.add_api_route( - path="/workflow/kill-mets-server-zombies", - endpoint=self.kill_mets_server_zombies, - methods=["DELETE"], - tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], - status_code=status.HTTP_200_OK, - summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago." - ) self.include_router(workflow_router) async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict: @@ -826,8 +826,9 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: response = self._produce_workflow_status_response(processing_jobs=jobs) return response - async def kill_mets_server_zombies(self) -> None: - kill_mets_server_zombies(minutes_ago=60) + async def kill_mets_server_zombies(self) -> List[int]: + pids_killed = kill_mets_server_zombies(minutes_ago=60) + return pids_killed async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: """ diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 1897f3a62e..b143e344af 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -249,9 +249,10 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) -def kill_mets_server_zombies(minutes_ago=60): +def kill_mets_server_zombies(minutes_ago=60) -> list[int]: now = time() cmdline_pat = r'.*ocrd workspace -U.*server start $' + ret = [] for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime): if not procdir.is_dir(): continue @@ -264,5 +265,7 @@ def kill_mets_server_zombies(minutes_ago=60): cmdline = cmdline_file.read_text().replace('\x00', ' ') if re.match(cmdline_pat, cmdline): pid = procdir.name + ret.append(pid) print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr) os.kill(int(pid), signal.SIGTERM) + return ret From ad81356d32178c53814ff1293f35d3dd7827b793 Mon Sep 17 00:00:00 2001 From: kba Date: Wed, 2 Oct 2024 14:31:56 +0200 Subject: [PATCH 16/21] /kill_mets_server_zombies use underscores not slashes --- src/ocrd_network/processing_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 04305a6fbb..505e106ba2 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -202,7 +202,7 @@ def add_api_routes_others(self): summary="Forward a TCP request to UDS mets server" ) others_router.add_api_route( - path="/kill-mets-server-zombies", + path="/kill_mets_server_zombies", endpoint=self.kill_mets_server_zombies, methods=["DELETE"], tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], From 4862d72fe6f7149ff4ce97d56ac870837bafddc5 Mon Sep 17 00:00:00 2001 From: kba Date: Wed, 2 Oct 2024 14:41:32 +0200 Subject: [PATCH 17/21] use 3.8 compatible typing --- src/ocrd_network/server_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index b143e344af..773668f5b7 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -249,7 +249,7 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) -def kill_mets_server_zombies(minutes_ago=60) -> list[int]: +def kill_mets_server_zombies(minutes_ago=60) -> List[int]: now = time() cmdline_pat = r'.*ocrd workspace -U.*server start $' ret = [] From 4f6775f358fdf0c7d3164d30e01ecb63106b4a6a Mon Sep 17 00:00:00 2001 From: Konstantin Baierer Date: Wed, 2 Oct 2024 15:13:38 +0200 Subject: [PATCH 18/21] OcrdMetsServer.kill_process: try the easy way (SIGINT) then the hard way (SIGKILL) --- src/ocrd/mets_server.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index b6a8f140ba..4b4ffa728f 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -430,7 +430,12 @@ def create_process(mets_server_url: str, ws_dir_path: str, log_file: str) -> int @staticmethod def kill_process(mets_server_pid: int): - return os.kill(mets_server_pid, signal.SIGTERM) + os.kill(mets_server_pid, signal.SIGINT) + sleep(3) + try: + os.kill(mets_server_pid, signal.SIGKILL) + except ProcessLookupError as e: + pass def shutdown(self): if self.is_uds: From 3882e7abf397650ece1e36798232cb148922a43d Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 2 Oct 2024 15:17:46 +0200 Subject: [PATCH 19/21] fix: add default to page_wise param --- src/ocrd_network/client.py | 2 +- src/ocrd_network/client_utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 1521997942..bb7cf4dbf2 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -60,7 +60,7 @@ def send_processing_job_request(self, processor_name: str, req_params: dict) -> return post_ps_processing_request( ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params) - def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str, page_wise: bool): + def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str, page_wise: bool = False): return post_ps_workflow_request( ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets, page_wise=page_wise) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 51db2681a6..4eaf4ea95b 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -86,7 +86,7 @@ def post_ps_workflow_request( ps_server_host: str, path_to_wf: str, path_to_mets: str, - page_wise: bool, + page_wise: bool = False, ) -> str: request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise={'True' if page_wise else 'False'}" response = request_post( From d39c3d716917239f2db25550f0be3f5c48ae2768 Mon Sep 17 00:00:00 2001 From: kba Date: Thu, 10 Oct 2024 12:15:43 +0200 Subject: [PATCH 20/21] kill_mets_server_zombies: actually return List[int] --- src/ocrd_network/server_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 773668f5b7..2560dbbb03 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -264,8 +264,8 @@ def kill_mets_server_zombies(minutes_ago=60) -> List[int]: continue cmdline = cmdline_file.read_text().replace('\x00', ' ') if re.match(cmdline_pat, cmdline): - pid = procdir.name + pid = int(procdir.name) ret.append(pid) print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr) - os.kill(int(pid), signal.SIGTERM) + os.kill(pid, signal.SIGTERM) return ret From 7512bd68f1b2e06ad8a62603c222b10624988a7f Mon Sep 17 00:00:00 2001 From: kba Date: Thu, 10 Oct 2024 12:16:21 +0200 Subject: [PATCH 21/21] kill_mets_server_zombies: allow dry_run to test --- src/ocrd_network/processing_server.py | 6 +++--- src/ocrd_network/server_utils.py | 14 +++++++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 505e106ba2..336d04f0d9 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -1,7 +1,7 @@ from datetime import datetime from os import getpid from pathlib import Path -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from uvicorn import run as uvicorn_run from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile @@ -826,8 +826,8 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: response = self._produce_workflow_status_response(processing_jobs=jobs) return response - async def kill_mets_server_zombies(self) -> List[int]: - pids_killed = kill_mets_server_zombies(minutes_ago=60) + async def kill_mets_server_zombies(self, minutes_ago : Optional[int] = None, dry_run : Optional[bool] = None) -> List[int]: + pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run) return pids_killed async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 2560dbbb03..6e485f261f 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -4,7 +4,7 @@ from pathlib import Path from json import dumps, loads from urllib.parse import urljoin -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from time import time from fastapi import HTTPException, status, UploadFile @@ -249,7 +249,12 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) -def kill_mets_server_zombies(minutes_ago=60) -> List[int]: +def kill_mets_server_zombies(minutes_ago : Optional[int], dry_run : Optional[bool]) -> List[int]: + if minutes_ago == None: + minutes_ago = 90 + if dry_run == None: + dry_run = False + now = time() cmdline_pat = r'.*ocrd workspace -U.*server start $' ret = [] @@ -267,5 +272,8 @@ def kill_mets_server_zombies(minutes_ago=60) -> List[int]: pid = int(procdir.name) ret.append(pid) print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr) - os.kill(pid, signal.SIGTERM) + if dry_run: + print(f'[dry_run is active] kill {pid}') + else: + os.kill(pid, signal.SIGTERM) return ret