Skip to content

Commit 947df5e

Browse files
authored
feat(core/relays): implement reconnection logic (#29)
1 parent 5b27059 commit 947df5e

File tree

5 files changed

+117
-76
lines changed

5 files changed

+117
-76
lines changed

core/relays.ts

+33-11
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import type { Stringified } from "./types.ts";
22
import type {
33
ClientToRelayMessage,
4+
EventId,
45
EventKind,
56
NostrEvent,
67
RelayToClientMessage,
8+
RelayToClientMessageType,
79
RelayUrl,
810
SubscriptionFilter,
911
SubscriptionId,
@@ -97,6 +99,13 @@ export class Relay extends NostrNode<
9799
);
98100
}
99101

102+
dispatch<T extends RelayEventType>(
103+
type: T,
104+
data: RelayEventTypeRecord[T],
105+
): void {
106+
this.dispatchEvent(new RelayEvent(type, data));
107+
}
108+
100109
subscribe<K extends EventKind>(
101110
filter: SubscriptionFilter<K> | SubscriptionFilter<K>[],
102111
options: Partial<SubscriptionOptions> = {},
@@ -108,19 +117,21 @@ export class Relay extends NostrNode<
108117
filters: [filter].flat(),
109118
options,
110119
};
120+
const resubscribe = () => this.dispatch("resubscribe", { ...context });
111121
return new ReadableStream<NostrEvent<K>>({
112122
start: (controller) => {
113-
this.dispatchEvent(
114-
new RelayEvent("subscribe", { ...context, controller }),
123+
this.addEventListener(
124+
context.id,
125+
() => this.ws.removeEventListener("close", resubscribe),
115126
);
127+
this.dispatch("subscribe", { ...context, controller });
116128
},
117-
pull: (controller) => {
118-
this.dispatchEvent(new RelayEvent("pull", { ...context, controller }));
129+
pull: () => {
130+
this.ws.addEventListener("close", resubscribe, { once: true });
131+
this.ws.ready();
119132
},
120133
cancel: (reason) => {
121-
this.dispatchEvent(
122-
new RelayEvent("unsubscribe", { ...context, reason }),
123-
);
134+
this.dispatch("unsubscribe", { ...context, reason });
124135
},
125136
}, new CountQueuingStrategy({ highWaterMark: options.nbuffer }));
126137
}
@@ -133,9 +144,7 @@ export class Relay extends NostrNode<
133144
*/
134145
publish<K extends EventKind>(event: NostrEvent<K>): Promise<void> {
135146
return new Promise<void>((resolve, reject) => {
136-
this.dispatchEvent(
137-
new RelayEvent("publish", { event, resolve, reject }),
138-
);
147+
this.dispatch("publish", { event, resolve, reject });
139148
this.ws.addEventListener(
140149
"close",
141150
() => reject(new ConnectionClosed()),
@@ -183,12 +192,25 @@ export interface PublicationContext {
183192
reject: (reason: unknown) => void;
184193
}
185194

195+
type SubscriptionMessage = {
196+
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends
197+
SubscriptionId ? RelayToClientMessage<T> : never;
198+
}[RelayToClientMessageType];
199+
200+
type PublicationMessage = {
201+
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends EventId
202+
? RelayToClientMessage<T>
203+
: never;
204+
}[RelayToClientMessageType];
205+
186206
export interface RelayEventTypeRecord {
187207
message: RelayToClientMessage;
188208
subscribe: SubscriptionContextWithController;
189-
pull: SubscriptionContextWithController;
209+
resubscribe: SubscriptionContext;
190210
unsubscribe: SubscriptionContextWithReason;
191211
publish: PublicationContext;
212+
[id: SubscriptionId]: SubscriptionMessage;
213+
[id: EventId]: PublicationMessage;
192214
}
193215

194216
export type RelayEventType = keyof RelayEventTypeRecord;

core/websockets.ts

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type EventListenerOptionsMap = Map<
2323
*/
2424
export class LazyWebSocket implements WebSocketLike {
2525
#ws?: WebSocket;
26+
readonly #ac = new AbortController();
2627
readonly #createWebSocket: () => WebSocket;
2728
readonly #eventListenerMap = new Map<
2829
WebSocketEventType,
@@ -86,6 +87,7 @@ export class LazyWebSocket implements WebSocketLike {
8687
if (!this.#ws) {
8788
return;
8889
}
90+
this.#ac.abort();
8991
switch (this.#ws.readyState) {
9092
case WebSocket.CONNECTING:
9193
await this.#once("open");
@@ -111,6 +113,8 @@ export class LazyWebSocket implements WebSocketLike {
111113
listener: EventListenerOrEventListenerObject,
112114
options: boolean | AddEventListenerOptions = {},
113115
) => {
116+
options = typeof options === "boolean" ? { capture: options } : options;
117+
options = { signal: this.#ac.signal, ...options };
114118
this.#ws?.addEventListener(type, listener, options);
115119
const map = this.#eventListenerMap.get(type);
116120
if (map) {

lib/testing.ts

+13-3
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
type MessageEventData = string | ArrayBufferLike | Blob | ArrayBufferView;
22

33
export class MockWebSocket extends EventTarget implements WebSocket {
4+
/**
5+
* A list of all instances of MockWebSocket.
6+
* An instance is removed from this list when it is closed.
7+
*/
48
static get instances(): MockWebSocket[] {
5-
return this.#instances;
9+
return Array.from(this.#instances);
10+
}
11+
static #instances = new Set<MockWebSocket>();
12+
13+
static get first(): MockWebSocket | undefined {
14+
return this.instances[0];
615
}
7-
static #instances: MockWebSocket[] = [];
816

917
constructor(url?: string | URL, protocols?: string | string[]) {
1018
super();
1119
this.url = url?.toString() ?? "";
1220
this.protocol = protocols ? [...protocols].flat()[0] : "";
13-
MockWebSocket.#instances.push(this);
21+
MockWebSocket.#instances.add(this);
1422
// Simulate async behavior of WebSocket as much as possible.
1523
queueMicrotask(() => {
1624
this.#readyState = 1;
@@ -50,9 +58,11 @@ export class MockWebSocket extends EventTarget implements WebSocket {
5058
if (this.#remote) {
5159
this.#remote.#readyState = 3;
5260
this.#remote.dispatchEvent(new CloseEvent("close", { code, reason }));
61+
MockWebSocket.#instances.delete(this.#remote);
5362
}
5463
this.#readyState = 3;
5564
this.dispatchEvent(new CloseEvent("close", { code, reason }));
65+
MockWebSocket.#instances.delete(this);
5666
});
5767
}
5868

nips/01/relays.ts

+5-32
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,5 @@
1-
import type {
2-
EventId,
3-
RelayToClientMessage,
4-
RelayToClientMessageType,
5-
SubscriptionId,
6-
} from "../../core/protocol.d.ts";
71
import { EventRejected, RelayEvent, RelayModule } from "../../core/relays.ts";
82

9-
type SubscriptionMessage = {
10-
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends
11-
SubscriptionId ? RelayToClientMessage<T> : never;
12-
}[RelayToClientMessageType];
13-
14-
type PublicationMessage = {
15-
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends EventId
16-
? RelayToClientMessage<T>
17-
: never;
18-
}[RelayToClientMessageType];
19-
20-
type ExtentionalEventTypeRecord =
21-
& {
22-
[id in SubscriptionId]: SubscriptionMessage;
23-
}
24-
& {
25-
[id in EventId]: PublicationMessage;
26-
};
27-
28-
declare module "../../core/relays.ts" {
29-
// deno-lint-ignore no-empty-interface
30-
interface RelayEventTypeRecord extends ExtentionalEventTypeRecord {}
31-
}
32-
333
export class SubscriptionClosed extends Error {}
344

355
const install: RelayModule["default"] = (relay) => {
@@ -65,11 +35,14 @@ const install: RelayModule["default"] = (relay) => {
6535
}
6636
}
6737
});
68-
return relay.send(["REQ", id, ...filters]);
38+
relay.send(["REQ", id, ...filters]);
39+
});
40+
relay.addEventListener("resubscribe", ({ data: { id, filters } }) => {
41+
relay.send(["REQ", id, ...filters]);
6942
});
7043
relay.addEventListener("unsubscribe", ({ data: { id } }) => {
7144
if (relay.status === WebSocket.OPEN) {
72-
return relay.send(["CLOSE", id]);
45+
relay.send(["CLOSE", id]);
7346
}
7447
});
7548
relay.addEventListener("publish", (ev) => {

nips/01/relays_test.ts

+62-30
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ import {
2020
} from "../../core/relays.ts?nips=1";
2121
import { SubscriptionClosed } from "../../nips/01/relays.ts";
2222

23-
describe("NIP-01/Relay", () => {
23+
function getRemoteSocket() {
24+
return MockWebSocket.instances[0].remote;
25+
}
26+
27+
describe("Relay (NIP-01)", () => {
2428
const url = "wss://localhost:8080";
2529
let relay: Relay;
2630
let sub_0: ReadableStream<NostrEvent<0>>;
@@ -30,6 +34,7 @@ describe("NIP-01/Relay", () => {
3034
globalThis.WebSocket = MockWebSocket;
3135
relay = new Relay(url);
3236
});
37+
3338
afterAll(() => {
3439
if (relay.status === WebSocket.OPEN) {
3540
return relay.close();
@@ -39,33 +44,33 @@ describe("NIP-01/Relay", () => {
3944
it("should have loaded NIP-01 module", () => {
4045
assertEquals(relay.config.modules.length, 1);
4146
});
47+
4248
it("should create a subscription", () => {
4349
sub_1 = relay.subscribe({ kinds: [1] }, { id: "test-1" });
4450
assertInstanceOf(sub_1, ReadableStream);
4551
});
52+
4653
it("should receive text notes", async () => {
4754
const reader = sub_1.getReader();
4855
const read = reader.read();
49-
MockWebSocket.instances[0].remote.send(
50-
JSON.stringify(["EVENT", "test-1", { kind: 1 }]),
51-
);
56+
getRemoteSocket().send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
5257
const { value, done } = await read;
5358
assert(!done);
5459
assertEquals(value.kind, 1);
5560
reader.releaseLock();
5661
});
62+
5763
it("should be able to open multiple subscriptions", () => {
58-
sub_0 = relay.subscribe({ kinds: [0], limit: 1 }, {
59-
id: "test-0",
60-
});
64+
sub_0 = relay.subscribe({ kinds: [0] }, { id: "test-0" });
6165
assert(sub_0 instanceof ReadableStream);
6266
});
67+
6368
it("should recieve metas and notes simultaneously", async () => {
6469
const reader_0 = sub_0.getReader();
6570
const reader_1 = sub_1.getReader();
66-
const ws = MockWebSocket.instances[0];
67-
ws.remote.send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
68-
ws.remote.send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
71+
const remote = getRemoteSocket();
72+
remote.send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
73+
remote.send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
6974
const [{ value: value_0 }, { value: value_1 }] = await Promise.all([
7075
reader_0.read(),
7176
reader_1.read(),
@@ -77,18 +82,54 @@ describe("NIP-01/Relay", () => {
7782
reader_0.releaseLock();
7883
reader_1.releaseLock();
7984
});
85+
86+
it("should close a subscription with an error when receiving a CLOSED message", async () => {
87+
getRemoteSocket().send(JSON.stringify(
88+
[
89+
"CLOSED",
90+
"test-1" as SubscriptionId,
91+
"error: test",
92+
] satisfies RelayToClientMessage<"CLOSED">,
93+
));
94+
const reader = sub_1.getReader();
95+
try {
96+
await reader.read();
97+
} catch (e) {
98+
assertInstanceOf(e, SubscriptionClosed);
99+
assertEquals(e.message, "error: test");
100+
} finally {
101+
reader.releaseLock();
102+
}
103+
});
104+
105+
it("should reconnect if connection is closed while waiting for an event", async () => {
106+
const reader = sub_0.getReader();
107+
const read = reader.read();
108+
getRemoteSocket().close();
109+
const reconnected = new Promise<true>((resolve) => {
110+
relay.ws.addEventListener("open", () => resolve(true));
111+
});
112+
assert(await reconnected);
113+
// We must use a new instance of MockWebSocket.
114+
getRemoteSocket().send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
115+
const { value, done } = await read;
116+
assert(!done);
117+
assertEquals(value.kind, 0);
118+
reader.releaseLock();
119+
});
120+
80121
it("should publish an event and recieve an accepting OK message", async () => {
81122
const eid = "test-true" as EventId;
82-
const ws = MockWebSocket.instances[0];
123+
const remote = getRemoteSocket();
83124
const arrived = new Promise<true>((resolve) => {
84-
ws.remote.addEventListener(
125+
remote.addEventListener(
85126
"message",
86127
(ev: MessageEvent<string>) => {
87128
// deno-fmt-ignore
88129
const [, event] = JSON.parse(ev.data) as ClientToRelayMessage<"EVENT">;
89130
if (event.id === eid) {
90131
assertEquals(event.kind, 1);
91-
ws.remote.send(
132+
remote.send(
92133
JSON.stringify(
93134
["OK", eid, true, ""] satisfies RelayToClientMessage<"OK">,
94135
),
@@ -102,21 +143,22 @@ describe("NIP-01/Relay", () => {
102143
await relay.publish({ id: eid, kind: 1 } as any);
103144
assert(await arrived);
104145
});
146+
105147
it("should receieve a rejecting OK message and throw EventRejected", async () => {
106148
const eid = "test-false" as EventId;
107149
// deno-fmt-ignore
108150
const msg = ["OK", eid, false, "error: test"] satisfies RelayToClientMessage<"OK">
109-
const ws = MockWebSocket.instances[0];
151+
const remote = getRemoteSocket();
110152
const arrived = new Promise<true>((resolve) => {
111-
ws.remote.addEventListener(
153+
remote.addEventListener(
112154
"message",
113155
(ev: MessageEvent<string>) => {
114156
// deno-fmt-ignore
115157
const [, event] = JSON.parse(ev.data) as ClientToRelayMessage<"EVENT">;
116158
if (event.id === eid) {
117159
assertEquals(event.kind, 1);
118160
resolve(true);
119-
ws.remote.send(JSON.stringify(msg));
161+
remote.send(JSON.stringify(msg));
120162
}
121163
},
122164
);
@@ -132,26 +174,16 @@ describe("NIP-01/Relay", () => {
132174
}
133175
await arrived;
134176
});
177+
135178
it("should throw ConnectionClosed when connection is closed before recieving an OK message", async () => {
136179
const event = { id: "test-close" as EventId, kind: 1 };
137180
// deno-lint-ignore no-explicit-any
138181
const published = relay.publish(event as any).catch((e) => e);
139-
MockWebSocket.instances[0].remote.close();
140-
assertInstanceOf(await published, ConnectionClosed);
141-
});
142-
it("should close a subscription with an error when receiving a CLOSED message", async () => {
143-
MockWebSocket.instances[0].remote.send(JSON.stringify(
144-
[
145-
"CLOSED",
146-
"test-1" as SubscriptionId,
147-
"error: test",
148-
] satisfies RelayToClientMessage<"CLOSED">,
149-
));
182+
getRemoteSocket().close();
150183
try {
151-
await sub_1.getReader().read();
184+
await published;
152185
} catch (e) {
153-
assertInstanceOf(e, SubscriptionClosed);
154-
assertEquals(e.message, "error: test");
186+
assertInstanceOf(e, ConnectionClosed);
155187
}
156188
});
157189
});

0 commit comments

Comments
 (0)