Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core/relays): implement reconnection logic #29

Merged
merged 2 commits into from
Feb 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions core/relays.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { Stringified } from "./types.ts";
import type {
ClientToRelayMessage,
EventId,
EventKind,
NostrEvent,
RelayToClientMessage,
RelayToClientMessageType,
RelayUrl,
SubscriptionFilter,
SubscriptionId,
@@ -97,6 +99,13 @@
);
}

dispatch<T extends RelayEventType>(
type: T,
data: RelayEventTypeRecord[T],
): void {
this.dispatchEvent(new RelayEvent(type, data));
}

subscribe<K extends EventKind>(
filter: SubscriptionFilter<K> | SubscriptionFilter<K>[],
options: Partial<SubscriptionOptions> = {},
@@ -108,19 +117,21 @@
filters: [filter].flat(),
options,
};
const resubscribe = () => this.dispatch("resubscribe", { ...context });
return new ReadableStream<NostrEvent<K>>({
start: (controller) => {
this.dispatchEvent(
new RelayEvent("subscribe", { ...context, controller }),
this.addEventListener(
context.id,
() => this.ws.removeEventListener("close", resubscribe),
);
this.dispatch("subscribe", { ...context, controller });
},
pull: (controller) => {
this.dispatchEvent(new RelayEvent("pull", { ...context, controller }));
pull: () => {
this.ws.addEventListener("close", resubscribe, { once: true });
this.ws.ready();
},
cancel: (reason) => {
this.dispatchEvent(
new RelayEvent("unsubscribe", { ...context, reason }),
);
this.dispatch("unsubscribe", { ...context, reason });

Check warning on line 134 in core/relays.ts

Codecov / codecov/patch

core/relays.ts#L134

Added line #L134 was not covered by tests
},
}, new CountQueuingStrategy({ highWaterMark: options.nbuffer }));
}
@@ -133,9 +144,7 @@
*/
publish<K extends EventKind>(event: NostrEvent<K>): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.dispatchEvent(
new RelayEvent("publish", { event, resolve, reject }),
);
this.dispatch("publish", { event, resolve, reject });
this.ws.addEventListener(
"close",
() => reject(new ConnectionClosed()),
@@ -183,12 +192,25 @@
reject: (reason: unknown) => void;
}

type SubscriptionMessage = {
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends
SubscriptionId ? RelayToClientMessage<T> : never;
}[RelayToClientMessageType];

type PublicationMessage = {
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends EventId
? RelayToClientMessage<T>
: never;
}[RelayToClientMessageType];

export interface RelayEventTypeRecord {
message: RelayToClientMessage;
subscribe: SubscriptionContextWithController;
pull: SubscriptionContextWithController;
resubscribe: SubscriptionContext;
unsubscribe: SubscriptionContextWithReason;
publish: PublicationContext;
[id: SubscriptionId]: SubscriptionMessage;
[id: EventId]: PublicationMessage;
}

export type RelayEventType = keyof RelayEventTypeRecord;
4 changes: 4 additions & 0 deletions core/websockets.ts
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ type EventListenerOptionsMap = Map<
*/
export class LazyWebSocket implements WebSocketLike {
#ws?: WebSocket;
readonly #ac = new AbortController();
readonly #createWebSocket: () => WebSocket;
readonly #eventListenerMap = new Map<
WebSocketEventType,
@@ -86,6 +87,7 @@ export class LazyWebSocket implements WebSocketLike {
if (!this.#ws) {
return;
}
this.#ac.abort();
switch (this.#ws.readyState) {
case WebSocket.CONNECTING:
await this.#once("open");
@@ -111,6 +113,8 @@ export class LazyWebSocket implements WebSocketLike {
listener: EventListenerOrEventListenerObject,
options: boolean | AddEventListenerOptions = {},
) => {
options = typeof options === "boolean" ? { capture: options } : options;
options = { signal: this.#ac.signal, ...options };
this.#ws?.addEventListener(type, listener, options);
const map = this.#eventListenerMap.get(type);
if (map) {
16 changes: 13 additions & 3 deletions lib/testing.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
type MessageEventData = string | ArrayBufferLike | Blob | ArrayBufferView;

export class MockWebSocket extends EventTarget implements WebSocket {
/**
* A list of all instances of MockWebSocket.
* An instance is removed from this list when it is closed.
*/
static get instances(): MockWebSocket[] {
return this.#instances;
return Array.from(this.#instances);
}
static #instances = new Set<MockWebSocket>();

static get first(): MockWebSocket | undefined {
return this.instances[0];

Check warning on line 14 in lib/testing.ts

Codecov / codecov/patch

lib/testing.ts#L13-L14

Added lines #L13 - L14 were not covered by tests
}
static #instances: MockWebSocket[] = [];

constructor(url?: string | URL, protocols?: string | string[]) {
super();
this.url = url?.toString() ?? "";
this.protocol = protocols ? [...protocols].flat()[0] : "";
MockWebSocket.#instances.push(this);
MockWebSocket.#instances.add(this);
// Simulate async behavior of WebSocket as much as possible.
queueMicrotask(() => {
this.#readyState = 1;
@@ -50,9 +58,11 @@
if (this.#remote) {
this.#remote.#readyState = 3;
this.#remote.dispatchEvent(new CloseEvent("close", { code, reason }));
MockWebSocket.#instances.delete(this.#remote);
}
this.#readyState = 3;
this.dispatchEvent(new CloseEvent("close", { code, reason }));
MockWebSocket.#instances.delete(this);
});
}

37 changes: 5 additions & 32 deletions nips/01/relays.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,5 @@
import type {
EventId,
RelayToClientMessage,
RelayToClientMessageType,
SubscriptionId,
} from "../../core/protocol.d.ts";
import { EventRejected, RelayEvent, RelayModule } from "../../core/relays.ts";

type SubscriptionMessage = {
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends
SubscriptionId ? RelayToClientMessage<T> : never;
}[RelayToClientMessageType];

type PublicationMessage = {
[T in RelayToClientMessageType]: RelayToClientMessage<T>[1] extends EventId
? RelayToClientMessage<T>
: never;
}[RelayToClientMessageType];

type ExtentionalEventTypeRecord =
& {
[id in SubscriptionId]: SubscriptionMessage;
}
& {
[id in EventId]: PublicationMessage;
};

declare module "../../core/relays.ts" {
// deno-lint-ignore no-empty-interface
interface RelayEventTypeRecord extends ExtentionalEventTypeRecord {}
}

export class SubscriptionClosed extends Error {}

const install: RelayModule["default"] = (relay) => {
@@ -65,11 +35,14 @@
}
}
});
return relay.send(["REQ", id, ...filters]);
relay.send(["REQ", id, ...filters]);
});
relay.addEventListener("resubscribe", ({ data: { id, filters } }) => {
relay.send(["REQ", id, ...filters]);
});
relay.addEventListener("unsubscribe", ({ data: { id } }) => {
if (relay.status === WebSocket.OPEN) {
return relay.send(["CLOSE", id]);
relay.send(["CLOSE", id]);

Check warning on line 45 in nips/01/relays.ts

Codecov / codecov/patch

nips/01/relays.ts#L45

Added line #L45 was not covered by tests
}
});
relay.addEventListener("publish", (ev) => {
92 changes: 62 additions & 30 deletions nips/01/relays_test.ts
Original file line number Diff line number Diff line change
@@ -20,7 +20,11 @@ import {
} from "../../core/relays.ts?nips=1";
import { SubscriptionClosed } from "../../nips/01/relays.ts";

describe("NIP-01/Relay", () => {
function getRemoteSocket() {
return MockWebSocket.instances[0].remote;
}

describe("Relay (NIP-01)", () => {
const url = "wss://localhost:8080";
let relay: Relay;
let sub_0: ReadableStream<NostrEvent<0>>;
@@ -30,6 +34,7 @@ describe("NIP-01/Relay", () => {
globalThis.WebSocket = MockWebSocket;
relay = new Relay(url);
});

afterAll(() => {
if (relay.status === WebSocket.OPEN) {
return relay.close();
@@ -39,33 +44,33 @@ describe("NIP-01/Relay", () => {
it("should have loaded NIP-01 module", () => {
assertEquals(relay.config.modules.length, 1);
});

it("should create a subscription", () => {
sub_1 = relay.subscribe({ kinds: [1] }, { id: "test-1" });
assertInstanceOf(sub_1, ReadableStream);
});

it("should receive text notes", async () => {
const reader = sub_1.getReader();
const read = reader.read();
MockWebSocket.instances[0].remote.send(
JSON.stringify(["EVENT", "test-1", { kind: 1 }]),
);
getRemoteSocket().send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
const { value, done } = await read;
assert(!done);
assertEquals(value.kind, 1);
reader.releaseLock();
});

it("should be able to open multiple subscriptions", () => {
sub_0 = relay.subscribe({ kinds: [0], limit: 1 }, {
id: "test-0",
});
sub_0 = relay.subscribe({ kinds: [0] }, { id: "test-0" });
assert(sub_0 instanceof ReadableStream);
});

it("should recieve metas and notes simultaneously", async () => {
const reader_0 = sub_0.getReader();
const reader_1 = sub_1.getReader();
const ws = MockWebSocket.instances[0];
ws.remote.send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
ws.remote.send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
const remote = getRemoteSocket();
remote.send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
remote.send(JSON.stringify(["EVENT", "test-1", { kind: 1 }]));
const [{ value: value_0 }, { value: value_1 }] = await Promise.all([
reader_0.read(),
reader_1.read(),
@@ -77,18 +82,54 @@ describe("NIP-01/Relay", () => {
reader_0.releaseLock();
reader_1.releaseLock();
});

it("should close a subscription with an error when receiving a CLOSED message", async () => {
getRemoteSocket().send(JSON.stringify(
[
"CLOSED",
"test-1" as SubscriptionId,
"error: test",
] satisfies RelayToClientMessage<"CLOSED">,
));
const reader = sub_1.getReader();
try {
await reader.read();
} catch (e) {
assertInstanceOf(e, SubscriptionClosed);
assertEquals(e.message, "error: test");
} finally {
reader.releaseLock();
}
});

it("should reconnect if connection is closed while waiting for an event", async () => {
const reader = sub_0.getReader();
const read = reader.read();
getRemoteSocket().close();
const reconnected = new Promise<true>((resolve) => {
relay.ws.addEventListener("open", () => resolve(true));
});
assert(await reconnected);
// We must use a new instance of MockWebSocket.
getRemoteSocket().send(JSON.stringify(["EVENT", "test-0", { kind: 0 }]));
const { value, done } = await read;
assert(!done);
assertEquals(value.kind, 0);
reader.releaseLock();
});

it("should publish an event and recieve an accepting OK message", async () => {
const eid = "test-true" as EventId;
const ws = MockWebSocket.instances[0];
const remote = getRemoteSocket();
const arrived = new Promise<true>((resolve) => {
ws.remote.addEventListener(
remote.addEventListener(
"message",
(ev: MessageEvent<string>) => {
// deno-fmt-ignore
const [, event] = JSON.parse(ev.data) as ClientToRelayMessage<"EVENT">;
if (event.id === eid) {
assertEquals(event.kind, 1);
ws.remote.send(
remote.send(
JSON.stringify(
["OK", eid, true, ""] satisfies RelayToClientMessage<"OK">,
),
@@ -102,21 +143,22 @@ describe("NIP-01/Relay", () => {
await relay.publish({ id: eid, kind: 1 } as any);
assert(await arrived);
});

it("should receieve a rejecting OK message and throw EventRejected", async () => {
const eid = "test-false" as EventId;
// deno-fmt-ignore
const msg = ["OK", eid, false, "error: test"] satisfies RelayToClientMessage<"OK">
const ws = MockWebSocket.instances[0];
const remote = getRemoteSocket();
const arrived = new Promise<true>((resolve) => {
ws.remote.addEventListener(
remote.addEventListener(
"message",
(ev: MessageEvent<string>) => {
// deno-fmt-ignore
const [, event] = JSON.parse(ev.data) as ClientToRelayMessage<"EVENT">;
if (event.id === eid) {
assertEquals(event.kind, 1);
resolve(true);
ws.remote.send(JSON.stringify(msg));
remote.send(JSON.stringify(msg));
}
},
);
@@ -132,26 +174,16 @@ describe("NIP-01/Relay", () => {
}
await arrived;
});

it("should throw ConnectionClosed when connection is closed before recieving an OK message", async () => {
const event = { id: "test-close" as EventId, kind: 1 };
// deno-lint-ignore no-explicit-any
const published = relay.publish(event as any).catch((e) => e);
MockWebSocket.instances[0].remote.close();
assertInstanceOf(await published, ConnectionClosed);
});
it("should close a subscription with an error when receiving a CLOSED message", async () => {
MockWebSocket.instances[0].remote.send(JSON.stringify(
[
"CLOSED",
"test-1" as SubscriptionId,
"error: test",
] satisfies RelayToClientMessage<"CLOSED">,
));
getRemoteSocket().close();
try {
await sub_1.getReader().read();
await published;
} catch (e) {
assertInstanceOf(e, SubscriptionClosed);
assertEquals(e.message, "error: test");
assertInstanceOf(e, ConnectionClosed);
}
});
});