4
4
rethrowUnlessMissing ,
5
5
dataToBase64 ,
6
6
base64ToBytes ,
7
- toBytes ,
8
7
getPrefixes ,
9
8
} from '@agoric/swingset-vat/src/vats/network' ;
10
9
import makeStore from '@agoric/store' ;
@@ -143,6 +142,31 @@ export function makeIBCProtocolHandler(
143
142
*/
144
143
const channelKeyToSeqAck = makeStore ( 'CHANNEL:PORT' ) ;
145
144
145
+ /**
146
+ * Send a packet out via the IBC device.
147
+ * @param {IBCPacket } packet
148
+ * @param {Store<number, PromiseRecord<Bytes, any>> } seqToAck
149
+ */
150
+ async function ibcSendPacket ( packet , seqToAck ) {
151
+ // Make a kernel call to do the send.
152
+ const fullPacket = await callIBCDevice ( 'sendPacket' , {
153
+ packet,
154
+ relativeTimeout : DEFAULT_PACKET_TIMEOUT ,
155
+ } ) ;
156
+
157
+ // Extract the actual sequence number from the return.
158
+ const { sequence } = fullPacket ;
159
+
160
+ /**
161
+ * @type {PromiseRecord<Bytes, any> }
162
+ */
163
+ const ackDeferred = producePromise ( ) ;
164
+
165
+ // Register the ack resolver/rejector with this sequence number.
166
+ seqToAck . init ( sequence , ackDeferred ) ;
167
+ return ackDeferred . promise ;
168
+ }
169
+
146
170
/**
147
171
* @param {string } channelID
148
172
* @param {string } portID
@@ -161,60 +185,39 @@ export function makeIBCProtocolHandler(
161
185
const channelKey = `${ channelID } :${ portID } ` ;
162
186
const seqToAck = makeStore ( 'SEQUENCE' ) ;
163
187
channelKeyToSeqAck . init ( channelKey , seqToAck ) ;
188
+
164
189
/**
165
190
* @param {Connection } _conn
166
191
* @param {Bytes } packetBytes
167
192
* @param {ConnectionHandler } _handler
168
193
* @returns {Promise<Bytes> } Acknowledgement data
169
194
*/
170
195
let onReceive = async ( _conn , packetBytes , _handler ) => {
196
+ // console.error(`Remote IBC Handler ${portID} ${channelID}`);
171
197
const packet = {
172
198
source_port : portID ,
173
199
source_channel : channelID ,
174
200
destination_port : rPortID ,
175
201
destination_channel : rChannelID ,
176
202
data : dataToBase64 ( packetBytes ) ,
177
203
} ;
178
- const fullPacket = await callIBCDevice ( 'sendPacket' , {
179
- packet,
180
- relativeTimeout : DEFAULT_PACKET_TIMEOUT ,
181
- } ) ;
182
- const { sequence } = fullPacket ;
183
- /**
184
- * @type {PromiseRecord<Bytes, any> }
185
- */
186
- const ackDeferred = producePromise ( ) ;
187
- seqToAck . init ( sequence , ackDeferred ) ;
188
- return ackDeferred . promise ;
204
+ return ibcSendPacket ( packet , seqToAck ) ;
189
205
} ;
190
206
191
- if ( ordered ) {
207
+ // FIXME: We may want a queue sometime to sequence
208
+ // our packets, but it doesn't currently work (received
209
+ // packets don't arrive).
210
+ if ( false && ordered ) {
192
211
// Set up a queue on the receiver.
193
212
const withChannelReceiveQueue = makeWithQueue ( ) ;
194
213
onReceive = withChannelReceiveQueue ( onReceive ) ;
195
214
}
196
215
197
216
return harden ( {
198
- async onOpen ( conn , handler ) {
217
+ async onOpen ( conn , _handler ) {
199
218
console . info ( 'onOpen Remote IBC Connection' , channelID , portID ) ;
200
- /**
201
- * @param {Data } data
202
- * @returns {Promise<Bytes> } acknowledgement
203
- */
204
- let sender = data =>
205
- /** @type {Promise<Bytes> } */
206
- ( E ( handler )
207
- . onReceive ( conn , toBytes ( data ) , handler )
208
- . catch ( rethrowUnlessMissing ) ) ;
209
- if ( ordered ) {
210
- // Set up a queue on the sender.
211
- const withChannelSendQueue = makeWithQueue ( ) ;
212
- sender = withChannelSendQueue ( sender ) ;
213
- }
214
- const boundSender = sender ;
215
- sender = data => {
216
- return boundSender ( data ) ;
217
- } ;
219
+ const connP = /** @type {Promise<Connection, any> } */ ( E . when ( conn ) ) ;
220
+ channelKeyToConnP . init ( channelKey , connP ) ;
218
221
} ,
219
222
onReceive,
220
223
async onClose ( _conn , _reason , _handler ) {
@@ -623,11 +626,7 @@ paths:
623
626
624
627
// Actually connect.
625
628
// eslint-disable-next-line prettier/prettier
626
- const pr = E ( protocolImpl ) . inbound ( listenSearch , localAddr , remoteAddr , rchandler ) ;
627
- const connP = /** @type {Promise<Connection> } */ ( pr ) ;
628
-
629
- /* Stash it for later use. */
630
- channelKeyToConnP . init ( channelKey , connP ) ;
629
+ E ( protocolImpl ) . inbound ( listenSearch , localAddr , remoteAddr , rchandler ) ;
631
630
break ;
632
631
}
633
632
@@ -640,6 +639,7 @@ paths:
640
639
} = packet ;
641
640
const channelKey = `${ channelID } :${ portID } ` ;
642
641
642
+ console . log ( `Received with:` , channelKey , channelKeyToConnP . keys ( ) ) ;
643
643
const connP = channelKeyToConnP . get ( channelKey ) ;
644
644
const data = base64ToBytes ( data64 ) ;
645
645
@@ -709,20 +709,11 @@ paths:
709
709
}
710
710
711
711
const { source_port : portID , source_channel : channelID } = packet ;
712
-
713
- const fullPacket = await callIBCDevice ( 'sendPacket' , { packet } ) ;
714
-
715
- const { sequence } = fullPacket ;
716
- /**
717
- * @type {PromiseRecord<Bytes, any> }
718
- */
719
- const ackDeferred = producePromise ( ) ;
720
712
const channelKey = `${ channelID } :${ portID } ` ;
721
713
const seqToAck = channelKeyToSeqAck . get ( channelKey ) ;
722
- seqToAck . init ( sequence , ackDeferred ) ;
723
- ackDeferred . promise . then (
724
- ack => console . info ( 'Manual packet' , fullPacket , 'acked:' , ack ) ,
725
- e => console . warn ( 'Manual packet' , fullPacket , 'timed out:' , e ) ,
714
+ ibcSendPacket ( packet , seqToAck ) . then (
715
+ ack => console . info ( 'Manual packet' , packet , 'acked:' , ack ) ,
716
+ e => console . warn ( 'Manual packet' , packet , 'failed:' , e ) ,
726
717
) ;
727
718
break ;
728
719
}
0 commit comments