-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathclient.py
67 lines (56 loc) · 2.99 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from typing import Optional
from ocrd_utils import config, getLogger, LOG_FORMAT
from .client_utils import (
get_ps_deployed_processors,
get_ps_deployed_processor_ocrd_tool,
get_ps_processing_job_log,
get_ps_processing_job_status,
get_ps_workflow_job_status,
poll_job_status_till_timeout_fail_or_success,
poll_wf_status_till_timeout_fail_or_success,
post_ps_processing_request,
post_ps_workflow_request,
verify_server_protocol
)
class Client:
def __init__(
self,
server_addr_processing: Optional[str],
timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT,
wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP,
print_output: bool = config.OCRD_NETWORK_CLIENT_POLLING_PRINT
):
self.log = getLogger(f"ocrd_network.client")
if not server_addr_processing:
server_addr_processing = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING
self.server_addr_processing = server_addr_processing
verify_server_protocol(self.server_addr_processing)
self.polling_timeout = timeout
self.polling_wait = wait
self.polling_tries = int(timeout / wait)
self.polling_print_output = print_output
def check_deployed_processors(self):
return get_ps_deployed_processors(ps_server_host=self.server_addr_processing)
def check_deployed_processor_ocrd_tool(self, processor_name: str):
return get_ps_deployed_processor_ocrd_tool(
ps_server_host=self.server_addr_processing, processor_name=processor_name)
def check_job_log(self, job_id: str):
return get_ps_processing_job_log(self.server_addr_processing, processing_job_id=job_id)
def check_job_status(self, job_id: str):
return get_ps_processing_job_status(self.server_addr_processing, processing_job_id=job_id)
def check_workflow_status(self, workflow_job_id: str):
return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id)
def poll_job_status(self, job_id: str) -> str:
return poll_job_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait,
print_output=self.polling_print_output)
def poll_workflow_status(self, job_id: str) -> str:
return poll_wf_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait,
print_output=self.polling_print_output)
def send_processing_job_request(self, processor_name: str, req_params: dict) -> str:
return post_ps_processing_request(
ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params)
def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str):
return post_ps_workflow_request(
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets)