Skip to content

Commit f8dc248

Browse files
authored
refactor!: rename RelayPool to RelayGroup and modify the API (#19)
1 parent 90c7e67 commit f8dc248

11 files changed

+180
-72
lines changed

core/nodes.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { NostrMessage } from "./protocol.d.ts";
22
import type { Logger } from "./types.ts";
3-
import { WebSocketLike, WebSocketReadyState } from "./websockets.ts";
3+
import { WebSocketLike } from "./websockets.ts";
44
import { NonExclusiveWritableStream } from "./streams.ts";
55

66
export interface NostrNodeConfig<
@@ -39,7 +39,7 @@ export class NostrNode<
3939
this.config.modules.forEach((m) => this.addModule(m));
4040
}
4141

42-
get status(): WebSocketReadyState {
42+
get status(): WebSocket["readyState"] {
4343
return this.ws.readyState;
4444
}
4545

core/relays.ts

+18-10
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@ export class ConnectionClosed extends Error {}
3737
// Interfaces
3838
// ----------------------
3939

40-
export interface RelayLike
41-
extends NonExclusiveWritableStream<ClientToRelayMessage> {
42-
subscribe: Relay["subscribe"];
43-
publish: Relay["publish"];
44-
}
45-
4640
export interface RelayConfig
4741
extends NostrNodeConfig<RelayFunctionParameterTypeRecord> {
4842
url: RelayUrl;
@@ -131,10 +125,6 @@ export class Relay extends NostrNode<
131125
);
132126
this.callFunction("startSubscription", { controller, ...context });
133127
},
134-
pull: async () => {
135-
await this.ws.ready();
136-
// TODO: backpressure
137-
},
138128
cancel: (reason) => {
139129
return this.callFunction("closeSubscription", { reason, ...context });
140130
},
@@ -170,6 +160,24 @@ export class Relay extends NostrNode<
170160
}
171161
}
172162

163+
// ----------------------
164+
// RelayLikes
165+
// ----------------------
166+
167+
export interface RelayLike
168+
extends NonExclusiveWritableStream<ClientToRelayMessage> {
169+
readonly config: RelayLikeConfig;
170+
subscribe: Relay["subscribe"];
171+
publish: Relay["publish"];
172+
}
173+
174+
export type RelayLikeConfig = Omit<
175+
RelayConfig,
176+
"url" | keyof NostrNodeConfig<RelayFunctionParameterTypeRecord>
177+
>;
178+
179+
export type RelayLikeOptions = Partial<RelayLikeConfig>;
180+
173181
// ----------------------
174182
// Events
175183
// ----------------------

core/websockets.ts

+15-30
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
import { Notify } from "./x/async.ts";
2-
31
export type WebSocketEventType = keyof WebSocketEventMap;
42

53
export interface WebSocketLike {
64
readonly url: string;
7-
readonly readyState: WebSocketReadyState;
5+
readonly readyState: WebSocket["readyState"];
86
send(
97
data: string | ArrayBufferLike | Blob | ArrayBufferView,
108
): void | Promise<void>;
@@ -25,14 +23,11 @@ type EventListenerOptionsMap = Map<
2523
*/
2624
export class LazyWebSocket implements WebSocketLike {
2725
#ws?: WebSocket;
28-
#createWebSocket: () => WebSocket;
29-
26+
readonly #createWebSocket: () => WebSocket;
3027
readonly #eventListenerMap = new Map<
3128
WebSocketEventType,
3229
EventListenerOptionsMap
3330
>();
34-
readonly #notifier = new Notify();
35-
3631
readonly url: string;
3732

3833
constructor(
@@ -41,11 +36,6 @@ export class LazyWebSocket implements WebSocketLike {
4136
) {
4237
this.#createWebSocket = () => {
4338
const ws = new WebSocket(url, protocols);
44-
for (const type of ["close", "open"] as const) {
45-
ws.addEventListener(type, () => {
46-
this.#notifier.notifyAll();
47-
});
48-
}
4939
this.#eventListenerMap.forEach((map, type) => {
5040
map.forEach((options, listener) => {
5141
ws.addEventListener(type, listener, options);
@@ -60,21 +50,26 @@ export class LazyWebSocket implements WebSocketLike {
6050
return this.#ws ??= this.#createWebSocket();
6151
}
6252

53+
#once(type: WebSocketEventType): Promise<void> {
54+
return new Promise<void>((resolve) => {
55+
this.#created().addEventListener(type, () => resolve(), { once: true });
56+
});
57+
}
58+
6359
async #ready(): Promise<WebSocket> {
6460
this.#ws = this.#created();
6561
switch (this.#ws.readyState) {
6662
case WebSocket.CONNECTING:
67-
await this.#notifier.notified();
68-
/* falls through */
69-
case WebSocket.OPEN:
7063
break;
64+
case WebSocket.OPEN:
65+
return this.#ws;
7166
case WebSocket.CLOSING:
72-
await this.#notifier.notified();
67+
await this.#once("close");
7368
/* falls through */
7469
case WebSocket.CLOSED:
7570
this.#ws = this.#createWebSocket();
76-
await this.#notifier.notified();
7771
}
72+
await this.#once("open");
7873
return this.#ws;
7974
}
8075

@@ -93,21 +88,21 @@ export class LazyWebSocket implements WebSocketLike {
9388
}
9489
switch (this.#ws.readyState) {
9590
case WebSocket.CONNECTING:
96-
await this.#notifier.notified();
91+
await this.#once("open");
9792
/* falls through */
9893
case WebSocket.OPEN:
9994
this.#ws.close(code, reason);
10095
/* falls through */
10196
case WebSocket.CLOSING:
102-
await this.#notifier.notified();
97+
await this.#once("close");
10398
/* falls through */
10499
case WebSocket.CLOSED:
105100
break;
106101
}
107102
this.#ws = undefined;
108103
}
109104

110-
get readyState(): WebSocketReadyState {
105+
get readyState(): WebSocket["readyState"] {
111106
return this.#ws ? this.#ws.readyState : WebSocket.CLOSED;
112107
}
113108

@@ -138,13 +133,3 @@ export class LazyWebSocket implements WebSocketLike {
138133
return this.#ws?.dispatchEvent(event) ?? false;
139134
};
140135
}
141-
142-
/**
143-
* The ready state of a WebSocket.
144-
*/
145-
export enum WebSocketReadyState {
146-
CONNECTING = 0,
147-
OPEN = 1,
148-
CLOSING = 2,
149-
CLOSED = 3,
150-
}

core/x/async.ts

-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
export { Notify } from "https://deno.land/x/async@v2.0.2/notify.ts";
21
export { Lock } from "https://deno.land/x/async@v2.0.2/lock.ts";

deno.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/pools.ts lib/relays.ts

+16-15
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ import type {
22
ClientToRelayMessage,
33
EventKind,
44
NostrEvent,
5-
RelayUrl,
65
SubscriptionFilter,
76
} from "../core/protocol.d.ts";
87
import {
9-
Relay,
10-
RelayInit,
118
RelayLike,
9+
RelayLikeConfig,
10+
RelayLikeOptions,
1211
SubscriptionOptions,
1312
} from "../core/relays.ts";
1413
import { NonExclusiveWritableStream } from "../core/streams.ts";
@@ -17,26 +16,28 @@ import { Distinctor, merge } from "../lib/streams.ts";
1716
/**
1817
* A pool of relays that can be used as a single relay.
1918
*/
20-
export class RelayPool extends NonExclusiveWritableStream<ClientToRelayMessage>
19+
export class RelayGroup extends NonExclusiveWritableStream<ClientToRelayMessage>
2120
implements RelayLike {
22-
readonly relays: Relay[];
23-
24-
#relays_read: Relay[];
25-
26-
constructor(...init: (RelayUrl | RelayInit)[]) {
27-
const relays = init.map((i) => new Relay(i));
21+
readonly config: Readonly<RelayLikeConfig>;
22+
#relays_read: RelayLike[];
23+
#relays_write: RelayLike[];
2824

25+
constructor(readonly relays: RelayLike[], options?: RelayLikeOptions) {
2926
const writers = relays.filter((r) => r.config.write)
3027
.map((r) => r.getWriter());
31-
3228
super({
3329
async write(msg) {
3430
await Promise.all(writers.map((r) => r.write(msg)));
3531
},
36-
}, { highWaterMark: Math.max(...relays.map((r) => r.config.nbuffer)) });
37-
38-
this.relays = relays;
32+
});
33+
this.config = {
34+
name: relays.map((r) => r.config.name).join(", "),
35+
read: true,
36+
write: true,
37+
...options,
38+
};
3939
this.#relays_read = this.relays.filter((r) => r.config.read);
40+
this.#relays_write = this.relays.filter((r) => r.config.write);
4041
}
4142

4243
// ----------------------
@@ -54,7 +55,7 @@ export class RelayPool extends NonExclusiveWritableStream<ClientToRelayMessage>
5455
async publish<K extends EventKind>(
5556
msg: NostrEvent<K>,
5657
) {
57-
await Promise.all(this.relays.map((r) => r.publish(msg)));
58+
await Promise.all(this.#relays_write.map((r) => r.publish(msg)));
5859
}
5960

6061
// ----------------------

lib/relays_test.ts

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { afterAll, beforeAll, describe, it } from "../lib/std/testing.ts";
2+
import { assertEquals, assertInstanceOf } from "../lib/std/assert.ts";
3+
import { NostrEvent } from "../core/protocol.d.ts";
4+
import { Relay } from "../core/relays.ts?nips=1";
5+
import { RelayGroup } from "../lib/relays.ts";
6+
import { MockWebSocket } from "../lib/testing.ts";
7+
8+
describe("RelayGroup", () => {
9+
let relays: Relay[];
10+
let group: RelayGroup;
11+
let sub: ReadableStream<NostrEvent>;
12+
13+
// ----------------------
14+
// Setup
15+
// ----------------------
16+
beforeAll(() => {
17+
globalThis.WebSocket = MockWebSocket;
18+
relays = [
19+
new Relay("ws://localhost:80", {
20+
name: "relay-1",
21+
read: true,
22+
write: true,
23+
}),
24+
new Relay("ws://localhost:81", {
25+
name: "relay-2",
26+
read: true,
27+
write: false,
28+
}),
29+
new Relay("ws://localhost:82", {
30+
name: "relay-3",
31+
read: false,
32+
write: true,
33+
}),
34+
];
35+
});
36+
afterAll(() => {
37+
group.close();
38+
});
39+
40+
// ----------------------
41+
// Constructor
42+
// ----------------------
43+
it("should create a group of relays", () => {
44+
group = new RelayGroup(relays);
45+
assertInstanceOf(group, RelayGroup);
46+
});
47+
it("should not have a url", () => {
48+
// @ts-expect-error RelayGroup does not have a url
49+
assertEquals(group.url, undefined);
50+
});
51+
it("should have a default name", () => {
52+
assertEquals(group.config.name, "relay-1, relay-2, relay-3");
53+
});
54+
it("should have a custom name if provided", () => {
55+
const group = new RelayGroup(relays, { name: "custom" });
56+
assertEquals(group.config.name, "custom");
57+
});
58+
it("should have default read and write config", () => {
59+
assertEquals(group.config.read, true);
60+
assertEquals(group.config.write, true);
61+
});
62+
it("should have custom read and write config if provided", () => {
63+
const group = new RelayGroup(relays, { read: false, write: false });
64+
assertEquals(group.config.read, false);
65+
assertEquals(group.config.write, false);
66+
});
67+
68+
// ----------------------
69+
// Subscription
70+
// ----------------------
71+
it("should create a subscription", () => {
72+
sub = group.subscribe({ kinds: [1] }, { id: "test-group" });
73+
assertInstanceOf(sub, ReadableStream);
74+
});
75+
});

lib/streams.ts

+26-3
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
export { mergeReadableStreams as merge } from "./std/streams.ts";
2-
31
/**
42
* TransformStream which filters out duplicate values from a stream.
53
*/
64
export class Distinctor<R = unknown, T = unknown>
75
extends TransformStream<R, R> {
86
#seen: Set<T>;
9-
107
constructor(protected readonly fn: (value: R) => T) {
118
super({
129
transform: (value, controller) => {
@@ -35,3 +32,29 @@ export class Transformer<R = unknown, W = unknown>
3532
});
3633
}
3734
}
35+
36+
export function merge<T>(
37+
...streams: ReadableStream<T>[]
38+
) {
39+
const readers = streams.map((r) => r.getReader());
40+
return new ReadableStream<T>({
41+
async pull(controller) {
42+
await Promise.any(readers.map(async (r) => {
43+
const { value, done } = await r.read();
44+
if (done) {
45+
readers.splice(readers.indexOf(r), 1);
46+
r.releaseLock();
47+
}
48+
if (value) {
49+
controller.enqueue(value);
50+
}
51+
})).catch((e) => {
52+
if (e instanceof AggregateError) {
53+
controller.close();
54+
} else {
55+
throw e;
56+
}
57+
});
58+
},
59+
});
60+
}

0 commit comments

Comments
 (0)