Skip to content

Commit c23f32d

Browse files
Luis Garciayeralin
Luis Garcia
authored andcommitted
Add retry logic in case of websocket connection closing. (Qiskit#324)
* Add retry logic in case of websocket connection closing. * Change from allowing multiple retries to only allowing one retry. * Updated doc string. * Add test and mock websocket server handler for a successful retry. * Remove currently unused global vars. * Remove unused global import, add decorator * Update doc string. * Add test and mock websocket server handler for a failed retry. * Add test and mock websocket server handler for job not found. * Update doc strings for test cases. * Update docstrings and documentation. * moved helper decorator to test/decorators.py * Fixed lint and style * Added logger warning for retry when websocket is closed. * Refactored handle_token_retry_success * Removed decorator to keep track of retry * Wrong function was called, fix. * Updated logger.warning message. * Improve readibility and consistensy.
1 parent 3b7ebe1 commit c23f32d

File tree

3 files changed

+77
-3
lines changed

3 files changed

+77
-3
lines changed

qiskit/providers/ibmq/api_v2/clients/websocket.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ def get_job_status(self, job_id, timeout=None):
135135
136136
Reads status messages from the API, which are issued at regular
137137
intervals (20 seconds). When a final state is reached, the server
138-
closes the socket.
138+
closes the socket. If the websocket connection is closed without
139+
a reason, there is an attempt to retry one time.
139140
140141
Args:
141142
job_id (str): id of the job.
@@ -154,6 +155,7 @@ def get_job_status(self, job_id, timeout=None):
154155

155156
original_timeout = timeout
156157
start_time = time.time()
158+
attempt_retry = True # By default, attempt to retry if the websocket connection closes.
157159
last_status = None
158160

159161
try:
@@ -198,7 +200,17 @@ def get_job_status(self, job_id, timeout=None):
198200
elif ex.code == 4002:
199201
break
200202
elif ex.code == 4003:
203+
attempt_retry = False # No point in retrying.
201204
message = 'Job id not found'
205+
206+
if attempt_retry:
207+
logger.warning('Connection with the websocket closed '
208+
'unexpectedly: %s(status_code=%s). '
209+
'Retrying get_job_status.', message, ex.code)
210+
attempt_retry = False # Disallow further retries.
211+
websocket = yield from self._connect(url)
212+
continue
213+
202214
raise WebsocketError('Connection with websocket closed '
203215
'unexpectedly: {}'.format(message)) from ex
204216
finally:

test/ibmq/websocket/test_websocket.py

+29-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929

3030
from .websocket_server import (
3131
TOKEN_JOB_COMPLETED, TOKEN_JOB_TRANSITION, TOKEN_WRONG_FORMAT,
32-
TOKEN_TIMEOUT, websocket_handler)
32+
TOKEN_TIMEOUT, TOKEN_WEBSOCKET_RETRY_SUCCESS,
33+
TOKEN_WEBSOCKET_RETRY_FAILURE, TOKEN_WEBSOCKET_JOB_NOT_FOUND,
34+
websocket_handler)
3335

3436
TEST_IP_ADDRESS = '127.0.0.1'
3537
INVALID_PORT = 9876
@@ -111,3 +113,29 @@ def test_invalid_response(self):
111113
with self.assertRaises(WebsocketIBMQProtocolError):
112114
_ = asyncio.get_event_loop().run_until_complete(
113115
client.get_job_status('job_id'))
116+
117+
def test_websocket_retry_success(self):
118+
"""Test retrieving a job status during a retry attempt."""
119+
client = WebsocketClient('ws://{}:{}'.format(
120+
TEST_IP_ADDRESS, VALID_PORT), TOKEN_WEBSOCKET_RETRY_SUCCESS)
121+
response = asyncio.get_event_loop().run_until_complete(
122+
client.get_job_status('job_id'))
123+
self.assertIsInstance(response, dict)
124+
self.assertIn('status', response)
125+
self.assertEqual(response['status'], 'COMPLETED')
126+
127+
def test_websocket_retry_failure(self):
128+
"""Test exceeding the retry limit for retrieving a job status."""
129+
client = WebsocketClient('ws://{}:{}'.format(
130+
TEST_IP_ADDRESS, VALID_PORT), TOKEN_WEBSOCKET_RETRY_FAILURE)
131+
with self.assertRaises(WebsocketError):
132+
_ = asyncio.get_event_loop().run_until_complete(
133+
client.get_job_status('job_id'))
134+
135+
def test_websocket_job_not_found(self):
136+
"""Test retrieving a job status for an non existent id."""
137+
client = WebsocketClient('ws://{}:{}'.format(
138+
TEST_IP_ADDRESS, VALID_PORT), TOKEN_WEBSOCKET_JOB_NOT_FOUND)
139+
with self.assertRaises(WebsocketError):
140+
_ = asyncio.get_event_loop().run_until_complete(
141+
client.get_job_status('job_id'))

test/ibmq/websocket/websocket_server.py

+35-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
TOKEN_JOB_TRANSITION = 'token_job_transition'
2525
TOKEN_TIMEOUT = 'token_timeout'
2626
TOKEN_WRONG_FORMAT = 'token_wrong_format'
27+
TOKEN_WEBSOCKET_RETRY_SUCCESS = 'token_websocket_retry_success'
28+
TOKEN_WEBSOCKET_RETRY_FAILURE = 'token_websocket_retry_failure'
29+
TOKEN_WEBSOCKET_JOB_NOT_FOUND = 'token_websocket_job_not_found'
2730

2831

2932
@asyncio.coroutine
@@ -39,7 +42,10 @@ def websocket_handler(websocket, path):
3942
if token in (TOKEN_JOB_COMPLETED,
4043
TOKEN_JOB_TRANSITION,
4144
TOKEN_TIMEOUT,
42-
TOKEN_WRONG_FORMAT):
45+
TOKEN_WRONG_FORMAT,
46+
TOKEN_WEBSOCKET_RETRY_SUCCESS,
47+
TOKEN_WEBSOCKET_RETRY_FAILURE,
48+
TOKEN_WEBSOCKET_JOB_NOT_FOUND):
4349
msg_out = json.dumps({'type': 'authenticated'})
4450
yield from websocket.send(msg_out.encode('utf8'))
4551
else:
@@ -55,6 +61,12 @@ def websocket_handler(websocket, path):
5561
yield from handle_token_timeout(websocket)
5662
elif token == TOKEN_WRONG_FORMAT:
5763
yield from handle_token_wrong_format(websocket)
64+
elif token == TOKEN_WEBSOCKET_RETRY_SUCCESS:
65+
yield from handle_token_retry_success(websocket)
66+
elif token == TOKEN_WEBSOCKET_RETRY_FAILURE:
67+
yield from handle_token_retry_failure(websocket)
68+
elif token == TOKEN_WEBSOCKET_JOB_NOT_FOUND:
69+
yield from handle_token_job_not_found(websocket)
5870

5971

6072
@asyncio.coroutine
@@ -94,3 +106,25 @@ def handle_token_wrong_format(websocket):
94106
"""Return a status in an invalid format."""
95107
yield from websocket.send('INVALID'.encode('utf8'))
96108
yield from websocket.close()
109+
110+
111+
@asyncio.coroutine
112+
def handle_token_retry_success(websocket):
113+
"""Close the socket once and force a retry."""
114+
if not hasattr(handle_token_retry_success, 'retry_attempt'):
115+
setattr(handle_token_retry_success, 'retry_attempt', True)
116+
yield from handle_token_retry_failure(websocket)
117+
else:
118+
yield from handle_token_job_completed(websocket)
119+
120+
121+
@asyncio.coroutine
122+
def handle_token_retry_failure(websocket):
123+
"""Continually close the socket, until both the first attempt and retry fail."""
124+
yield from websocket.close()
125+
126+
127+
@asyncio.coroutine
128+
def handle_token_job_not_found(websocket):
129+
"""Close the socket, specifying code for job not found."""
130+
yield from websocket.close(code=4003)

0 commit comments

Comments
 (0)