diff --git a/core/relays.ts b/core/relays.ts index dbd485f..a8daa1c 100644 --- a/core/relays.ts +++ b/core/relays.ts @@ -1,9 +1,11 @@ import type { Stringified } from "./types.ts"; import type { ClientToRelayMessage, + EventId, EventKind, NostrEvent, RelayToClientMessage, + RelayToClientMessageType, RelayUrl, SubscriptionFilter, SubscriptionId, @@ -97,6 +99,13 @@ export class Relay extends NostrNode< ); } + dispatch( + type: T, + data: RelayEventTypeRecord[T], + ): void { + this.dispatchEvent(new RelayEvent(type, data)); + } + subscribe( filter: SubscriptionFilter | SubscriptionFilter[], options: Partial = {}, @@ -108,19 +117,21 @@ export class Relay extends NostrNode< filters: [filter].flat(), options, }; + const resubscribe = () => this.dispatch("resubscribe", { ...context }); return new ReadableStream>({ start: (controller) => { - this.dispatchEvent( - new RelayEvent("subscribe", { ...context, controller }), + this.addEventListener( + context.id, + () => this.ws.removeEventListener("close", resubscribe), ); + this.dispatch("subscribe", { ...context, controller }); }, - pull: (controller) => { - this.dispatchEvent(new RelayEvent("pull", { ...context, controller })); + pull: () => { + this.ws.addEventListener("close", resubscribe, { once: true }); + this.ws.ready(); }, cancel: (reason) => { - this.dispatchEvent( - new RelayEvent("unsubscribe", { ...context, reason }), - ); + this.dispatch("unsubscribe", { ...context, reason }); }, }, new CountQueuingStrategy({ highWaterMark: options.nbuffer })); } @@ -133,9 +144,7 @@ export class Relay extends NostrNode< */ publish(event: NostrEvent): Promise { return new Promise((resolve, reject) => { - this.dispatchEvent( - new RelayEvent("publish", { event, resolve, reject }), - ); + this.dispatch("publish", { event, resolve, reject }); this.ws.addEventListener( "close", () => reject(new ConnectionClosed()), @@ -183,12 +192,25 @@ export interface PublicationContext { reject: (reason: unknown) => void; } +type SubscriptionMessage = { + [T in RelayToClientMessageType]: RelayToClientMessage[1] extends + SubscriptionId ? RelayToClientMessage : never; +}[RelayToClientMessageType]; + +type PublicationMessage = { + [T in RelayToClientMessageType]: RelayToClientMessage[1] extends EventId + ? RelayToClientMessage + : never; +}[RelayToClientMessageType]; + export interface RelayEventTypeRecord { message: RelayToClientMessage; subscribe: SubscriptionContextWithController; - pull: SubscriptionContextWithController; + resubscribe: SubscriptionContext; unsubscribe: SubscriptionContextWithReason; publish: PublicationContext; + [id: SubscriptionId]: SubscriptionMessage; + [id: EventId]: PublicationMessage; } export type RelayEventType = keyof RelayEventTypeRecord; diff --git a/core/websockets.ts b/core/websockets.ts index b2bf283..eeaa015 100644 --- a/core/websockets.ts +++ b/core/websockets.ts @@ -23,6 +23,7 @@ type EventListenerOptionsMap = Map< */ export class LazyWebSocket implements WebSocketLike { #ws?: WebSocket; + readonly #ac = new AbortController(); readonly #createWebSocket: () => WebSocket; readonly #eventListenerMap = new Map< WebSocketEventType, @@ -86,6 +87,7 @@ export class LazyWebSocket implements WebSocketLike { if (!this.#ws) { return; } + this.#ac.abort(); switch (this.#ws.readyState) { case WebSocket.CONNECTING: await this.#once("open"); @@ -111,6 +113,8 @@ export class LazyWebSocket implements WebSocketLike { listener: EventListenerOrEventListenerObject, options: boolean | AddEventListenerOptions = {}, ) => { + options = typeof options === "boolean" ? { capture: options } : options; + options = { signal: this.#ac.signal, ...options }; this.#ws?.addEventListener(type, listener, options); const map = this.#eventListenerMap.get(type); if (map) { diff --git a/lib/testing.ts b/lib/testing.ts index c51c760..1eb0437 100644 --- a/lib/testing.ts +++ b/lib/testing.ts @@ -1,16 +1,24 @@ type MessageEventData = string | ArrayBufferLike | Blob | ArrayBufferView; export class MockWebSocket extends EventTarget implements WebSocket { + /** + * A list of all instances of MockWebSocket. + * An instance is removed from this list when it is closed. + */ static get instances(): MockWebSocket[] { - return this.#instances; + return Array.from(this.#instances); + } + static #instances = new Set(); + + static get first(): MockWebSocket | undefined { + return this.instances[0]; } - static #instances: MockWebSocket[] = []; constructor(url?: string | URL, protocols?: string | string[]) { super(); this.url = url?.toString() ?? ""; this.protocol = protocols ? [...protocols].flat()[0] : ""; - MockWebSocket.#instances.push(this); + MockWebSocket.#instances.add(this); // Simulate async behavior of WebSocket as much as possible. queueMicrotask(() => { this.#readyState = 1; @@ -50,9 +58,11 @@ export class MockWebSocket extends EventTarget implements WebSocket { if (this.#remote) { this.#remote.#readyState = 3; this.#remote.dispatchEvent(new CloseEvent("close", { code, reason })); + MockWebSocket.#instances.delete(this.#remote); } this.#readyState = 3; this.dispatchEvent(new CloseEvent("close", { code, reason })); + MockWebSocket.#instances.delete(this); }); } diff --git a/nips/01/relays.ts b/nips/01/relays.ts index 300c37d..86d1203 100644 --- a/nips/01/relays.ts +++ b/nips/01/relays.ts @@ -1,35 +1,5 @@ -import type { - EventId, - RelayToClientMessage, - RelayToClientMessageType, - SubscriptionId, -} from "../../core/protocol.d.ts"; import { EventRejected, RelayEvent, RelayModule } from "../../core/relays.ts"; -type SubscriptionMessage = { - [T in RelayToClientMessageType]: RelayToClientMessage[1] extends - SubscriptionId ? RelayToClientMessage : never; -}[RelayToClientMessageType]; - -type PublicationMessage = { - [T in RelayToClientMessageType]: RelayToClientMessage[1] extends EventId - ? RelayToClientMessage - : never; -}[RelayToClientMessageType]; - -type ExtentionalEventTypeRecord = - & { - [id in SubscriptionId]: SubscriptionMessage; - } - & { - [id in EventId]: PublicationMessage; - }; - -declare module "../../core/relays.ts" { - // deno-lint-ignore no-empty-interface - interface RelayEventTypeRecord extends ExtentionalEventTypeRecord {} -} - export class SubscriptionClosed extends Error {} const install: RelayModule["default"] = (relay) => { @@ -65,11 +35,14 @@ const install: RelayModule["default"] = (relay) => { } } }); - return relay.send(["REQ", id, ...filters]); + relay.send(["REQ", id, ...filters]); + }); + relay.addEventListener("resubscribe", ({ data: { id, filters } }) => { + relay.send(["REQ", id, ...filters]); }); relay.addEventListener("unsubscribe", ({ data: { id } }) => { if (relay.status === WebSocket.OPEN) { - return relay.send(["CLOSE", id]); + relay.send(["CLOSE", id]); } }); relay.addEventListener("publish", (ev) => { diff --git a/nips/01/relays_test.ts b/nips/01/relays_test.ts index fe22960..57d0c0a 100644 --- a/nips/01/relays_test.ts +++ b/nips/01/relays_test.ts @@ -20,7 +20,11 @@ import { } from "../../core/relays.ts?nips=1"; import { SubscriptionClosed } from "../../nips/01/relays.ts"; -describe("NIP-01/Relay", () => { +function getRemoteSocket() { + return MockWebSocket.instances[0].remote; +} + +describe("Relay (NIP-01)", () => { const url = "wss://localhost:8080"; let relay: Relay; let sub_0: ReadableStream>; @@ -30,6 +34,7 @@ describe("NIP-01/Relay", () => { globalThis.WebSocket = MockWebSocket; relay = new Relay(url); }); + afterAll(() => { if (relay.status === WebSocket.OPEN) { return relay.close(); @@ -39,33 +44,33 @@ describe("NIP-01/Relay", () => { it("should have loaded NIP-01 module", () => { assertEquals(relay.config.modules.length, 1); }); + it("should create a subscription", () => { sub_1 = relay.subscribe({ kinds: [1] }, { id: "test-1" }); assertInstanceOf(sub_1, ReadableStream); }); + it("should receive text notes", async () => { const reader = sub_1.getReader(); const read = reader.read(); - MockWebSocket.instances[0].remote.send( - JSON.stringify(["EVENT", "test-1", { kind: 1 }]), - ); + getRemoteSocket().send(JSON.stringify(["EVENT", "test-1", { kind: 1 }])); const { value, done } = await read; assert(!done); assertEquals(value.kind, 1); reader.releaseLock(); }); + it("should be able to open multiple subscriptions", () => { - sub_0 = relay.subscribe({ kinds: [0], limit: 1 }, { - id: "test-0", - }); + sub_0 = relay.subscribe({ kinds: [0] }, { id: "test-0" }); assert(sub_0 instanceof ReadableStream); }); + it("should recieve metas and notes simultaneously", async () => { const reader_0 = sub_0.getReader(); const reader_1 = sub_1.getReader(); - const ws = MockWebSocket.instances[0]; - ws.remote.send(JSON.stringify(["EVENT", "test-0", { kind: 0 }])); - ws.remote.send(JSON.stringify(["EVENT", "test-1", { kind: 1 }])); + const remote = getRemoteSocket(); + remote.send(JSON.stringify(["EVENT", "test-0", { kind: 0 }])); + remote.send(JSON.stringify(["EVENT", "test-1", { kind: 1 }])); const [{ value: value_0 }, { value: value_1 }] = await Promise.all([ reader_0.read(), reader_1.read(), @@ -77,18 +82,54 @@ describe("NIP-01/Relay", () => { reader_0.releaseLock(); reader_1.releaseLock(); }); + + it("should close a subscription with an error when receiving a CLOSED message", async () => { + getRemoteSocket().send(JSON.stringify( + [ + "CLOSED", + "test-1" as SubscriptionId, + "error: test", + ] satisfies RelayToClientMessage<"CLOSED">, + )); + const reader = sub_1.getReader(); + try { + await reader.read(); + } catch (e) { + assertInstanceOf(e, SubscriptionClosed); + assertEquals(e.message, "error: test"); + } finally { + reader.releaseLock(); + } + }); + + it("should reconnect if connection is closed while waiting for an event", async () => { + const reader = sub_0.getReader(); + const read = reader.read(); + getRemoteSocket().close(); + const reconnected = new Promise((resolve) => { + relay.ws.addEventListener("open", () => resolve(true)); + }); + assert(await reconnected); + // We must use a new instance of MockWebSocket. + getRemoteSocket().send(JSON.stringify(["EVENT", "test-0", { kind: 0 }])); + const { value, done } = await read; + assert(!done); + assertEquals(value.kind, 0); + reader.releaseLock(); + }); + it("should publish an event and recieve an accepting OK message", async () => { const eid = "test-true" as EventId; - const ws = MockWebSocket.instances[0]; + const remote = getRemoteSocket(); const arrived = new Promise((resolve) => { - ws.remote.addEventListener( + remote.addEventListener( "message", (ev: MessageEvent) => { // deno-fmt-ignore const [, event] = JSON.parse(ev.data) as ClientToRelayMessage<"EVENT">; if (event.id === eid) { assertEquals(event.kind, 1); - ws.remote.send( + remote.send( JSON.stringify( ["OK", eid, true, ""] satisfies RelayToClientMessage<"OK">, ), @@ -102,13 +143,14 @@ describe("NIP-01/Relay", () => { await relay.publish({ id: eid, kind: 1 } as any); assert(await arrived); }); + it("should receieve a rejecting OK message and throw EventRejected", async () => { const eid = "test-false" as EventId; // deno-fmt-ignore const msg = ["OK", eid, false, "error: test"] satisfies RelayToClientMessage<"OK"> - const ws = MockWebSocket.instances[0]; + const remote = getRemoteSocket(); const arrived = new Promise((resolve) => { - ws.remote.addEventListener( + remote.addEventListener( "message", (ev: MessageEvent) => { // deno-fmt-ignore @@ -116,7 +158,7 @@ describe("NIP-01/Relay", () => { if (event.id === eid) { assertEquals(event.kind, 1); resolve(true); - ws.remote.send(JSON.stringify(msg)); + remote.send(JSON.stringify(msg)); } }, ); @@ -132,26 +174,16 @@ describe("NIP-01/Relay", () => { } await arrived; }); + it("should throw ConnectionClosed when connection is closed before recieving an OK message", async () => { const event = { id: "test-close" as EventId, kind: 1 }; // deno-lint-ignore no-explicit-any const published = relay.publish(event as any).catch((e) => e); - MockWebSocket.instances[0].remote.close(); - assertInstanceOf(await published, ConnectionClosed); - }); - it("should close a subscription with an error when receiving a CLOSED message", async () => { - MockWebSocket.instances[0].remote.send(JSON.stringify( - [ - "CLOSED", - "test-1" as SubscriptionId, - "error: test", - ] satisfies RelayToClientMessage<"CLOSED">, - )); + getRemoteSocket().close(); try { - await sub_1.getReader().read(); + await published; } catch (e) { - assertInstanceOf(e, SubscriptionClosed); - assertEquals(e.message, "error: test"); + assertInstanceOf(e, ConnectionClosed); } }); });