2
2
import makeStore from '@agoric/store' ;
3
3
import rawHarden from '@agoric/harden' ;
4
4
import { E as defaultE } from '@agoric/eventual-send' ;
5
+ import { producePromise } from '@agoric/produce-promise' ;
5
6
import { toBytes } from './bytes' ;
6
7
7
8
const harden = /** @type {<T>(x: T) => T } */ ( rawHarden ) ;
@@ -13,6 +14,11 @@ export const ENDPOINT_SEPARATOR = '/';
13
14
* @typedef {import('@agoric/store').Store<T,U> } Store
14
15
*/
15
16
17
+ /**
18
+ * @template T,U
19
+ * @typedef {import('@agoric/produce-promise').PromiseRecord<T, U> } PromiseRecord
20
+ */
21
+
16
22
/**
17
23
* @typedef {import('./bytes').Bytes } Bytes
18
24
* @typedef {import('./bytes').Data } Data
@@ -108,6 +114,10 @@ export const makeConnection = (
108
114
E = defaultE ,
109
115
) => {
110
116
let closed ;
117
+ /**
118
+ * @type {Set<PromiseRecord<Bytes,any>> }
119
+ */
120
+ const pendingAcks = new Set ( ) ;
111
121
/**
112
122
* @type {Connection }
113
123
*/
@@ -124,6 +134,10 @@ export const makeConnection = (
124
134
}
125
135
current . delete ( connection ) ;
126
136
closed = Error ( 'Connection closed' ) ;
137
+ for ( const ackDeferred of [ ...pendingAcks . values ( ) ] ) {
138
+ pendingAcks . delete ( ackDeferred ) ;
139
+ ackDeferred . reject ( closed ) ;
140
+ }
127
141
await E ( handler )
128
142
. onClose ( connection , undefined , handler )
129
143
. catch ( rethrowUnlessMissing ) ;
@@ -134,10 +148,22 @@ export const makeConnection = (
134
148
throw closed ;
135
149
}
136
150
const bytes = toBytes ( data ) ;
137
- const ack = await E ( handler )
151
+ const ackDeferred = producePromise ( ) ;
152
+ pendingAcks . add ( ackDeferred ) ;
153
+ E ( handler )
138
154
. onReceive ( connection , bytes , handler )
139
- . catch ( err => rethrowUnlessMissing ( err ) || '' ) ;
140
- return toBytes ( ack ) ;
155
+ . catch ( err => rethrowUnlessMissing ( err ) || '' )
156
+ . then (
157
+ ack => {
158
+ pendingAcks . delete ( ackDeferred ) ;
159
+ ackDeferred . resolve ( toBytes ( ack ) ) ;
160
+ } ,
161
+ err => {
162
+ pendingAcks . delete ( ackDeferred ) ;
163
+ ackDeferred . reject ( err ) ;
164
+ } ,
165
+ ) ;
166
+ return ackDeferred . promise ;
141
167
} ,
142
168
} ) ;
143
169
0 commit comments