Skip to content

Commit fd469d1

Browse files
committed
feat(client): add method
1 parent 48319ee commit fd469d1

File tree

6 files changed

+127
-59
lines changed

6 files changed

+127
-59
lines changed

client.ts

+44-35
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { PromiseCallbacks } from "./core/types.ts";
12
import type {
23
ClientToRelayMessage,
34
EoseMessage,
@@ -16,6 +17,8 @@ import { NonExclusiveWritableStream } from "./core/streams.ts";
1617
import { LazyWebSocket } from "./core/websockets.ts";
1718
import { Lock } from "./core/x/async.ts";
1819

20+
export class EventRejectionError extends Error {}
21+
1922
/**
2023
* A class that represents a remote Nostr Relay.
2124
*/
@@ -28,7 +31,8 @@ export class Relay extends NostrNode<ClientToRelayMessage> {
2831
Lock<WritableStreamDefaultWriter<EventMessage | EoseMessage>>
2932
>();
3033

31-
readonly #published = new Map<EventId, PromiseResolveCallback<OkMessage>>();
34+
readonly #publisher = this.getWriter();
35+
readonly #published = new Map<EventId, PromiseCallbacks<OkMessage>>();
3236

3337
constructor(
3438
init: RelayUrl | RelayInit,
@@ -58,21 +62,40 @@ export class Relay extends NostrNode<ClientToRelayMessage> {
5862
}
5963
if (type === "OK") {
6064
const [, eid] = msg;
61-
const resolve = this.#published.get(eid);
62-
if (!resolve) {
63-
opts?.logger?.warn?.(type, this.config.name, "Unknown event id");
65+
const callbacks = this.#published.get(eid);
66+
if (!callbacks) {
67+
opts?.logger?.warn?.(type, this.config.name, "Unknown event id", eid);
6468
return;
6569
}
66-
return resolve(msg);
70+
return callbacks.resolve(msg);
6771
}
6872
if (type === "EVENT" || type === "EOSE") {
6973
const [, sid] = msg;
70-
return this._notify(sid, msg);
74+
return this.#notify(sid, msg);
7175
}
7276
opts?.logger?.warn?.(type, this.config.name, "Unknown message type");
7377
});
7478
}
7579

80+
async #notify(
81+
sid: SubscriptionId,
82+
msg: EventMessage | EoseMessage,
83+
) {
84+
const messenger = new Lock(this.getWriter());
85+
const sub = this.#subs.get(sid);
86+
const promise = sub
87+
? sub.lock((writer) => writer.write(msg))
88+
// Subscription is already closed. TODO: should we throw an error?
89+
: messenger.lock((writer) => writer.write(["CLOSE", sid]));
90+
await promise.catch((err) => {
91+
if (err instanceof TypeError) {
92+
// Stream is already closing or closed.
93+
return;
94+
}
95+
throw err;
96+
});
97+
}
98+
7699
subscribe<K extends EventKind>(
77100
filter: SubscriptionFilter<K> | SubscriptionFilter<K>[],
78101
opts: Partial<SubscriptionOptions> = {},
@@ -140,15 +163,17 @@ export class Relay extends NostrNode<ClientToRelayMessage> {
140163
}
141164

142165
async publish(event: NostrEvent): Promise<void> {
143-
const writer = this.getWriter();
144-
await writer.ready;
145-
await writer.write(["EVENT", event]);
146-
writer.releaseLock();
147-
const [, , accepted, body] = await new Promise<OkMessage>((resolve) => {
148-
this.#published.set(event.id, resolve);
149-
});
166+
await this.#publisher.ready;
167+
// We found this blocks for a long time, so we don't await it.
168+
this.#publisher.write(["EVENT", event]);
169+
const [, , accepted, body] = await new Promise<OkMessage>(
170+
(resolve, reject) => {
171+
this.#published.set(event.id, { resolve, reject });
172+
},
173+
);
174+
await this.#publisher.ready;
150175
if (!accepted) {
151-
throw new Error(`Event rejected: ${body}`, { cause: event });
176+
throw new EventRejectionError(body, { cause: event });
152177
}
153178
this.config.logger?.debug?.("OK", this.config.name, body);
154179
}
@@ -159,27 +184,13 @@ export class Relay extends NostrNode<ClientToRelayMessage> {
159184
sub.lock((writer) => writer.close())
160185
),
161186
);
187+
await Promise.all(
188+
Array.from(this.#published.values()).map(({ reject }) =>
189+
reject("Relay closed")
190+
),
191+
);
162192
await super.close();
163193
}
164-
165-
private async _notify(
166-
sid: SubscriptionId,
167-
msg: EventMessage | EoseMessage,
168-
) {
169-
const messenger = new Lock(this.getWriter());
170-
const sub = this.#subs.get(sid);
171-
const promise = sub
172-
? sub.lock((writer) => writer.write(msg))
173-
// Subscription is already closed. TODO: should we throw an error?
174-
: messenger.lock((writer) => writer.write(["CLOSE", sid]));
175-
await promise.catch((err) => {
176-
if (err instanceof TypeError) {
177-
// Stream is already closing or closed.
178-
return;
179-
}
180-
throw err;
181-
});
182-
}
183194
}
184195

185196
export type RelayConfig = NostrNodeConfig & {
@@ -205,5 +216,3 @@ export interface RelayLike
205216
extends NonExclusiveWritableStream<ClientToRelayMessage> {
206217
subscribe: Relay["subscribe"];
207218
}
208-
209-
type PromiseResolveCallback<T> = (value: T | PromiseLike<T>) => void;

core/types.ts

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ export type Optional<T, K extends keyof T> = Expand<
1818
& Partial<Pick<T, K>>
1919
>;
2020

21+
//
22+
// Promises
23+
//
24+
export type PromiseCallbacks<T> = {
25+
resolve: (value: T | PromiseLike<T>) => void;
26+
reject: (reason?: unknown) => void;
27+
};
28+
2129
// Logger
2230
export interface Logger {
2331
debug?: (...args: unknown[]) => void;

deno.lock

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/std/assert.ts

+2
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ export { assertArrayIncludes } from "https://deno.land/std@0.203.0/assert/assert
33
export { assertEquals } from "https://deno.land/std@0.203.0/assert/assert_equals.ts";
44
export { assertFalse } from "https://deno.land/std@0.203.0/assert/assert_false.ts";
55
export { assertObjectMatch } from "https://deno.land/std@0.203.0/assert/assert_object_match.ts";
6+
export { assertThrows } from "https://deno.land/std@0.203.0/assert/assert_throws.ts";
7+
export { assertRejects } from "https://deno.land/std@0.203.0/assert/assert_rejects.ts";

lib/testing.ts

+6-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export class MockWebSocket extends EventTarget implements WebSocket {
1111
this.url = url?.toString() ?? "";
1212
this.protocol = protocols ? [...protocols].flat()[0] : "";
1313
MockWebSocket.#instances.push(this);
14-
// Simulate a slow opening of a WebSocket as much as possible.
14+
// Simulate async behavior of WebSocket as much as possible.
1515
queueMicrotask(() => {
1616
this.#readyState = 1;
1717
this.dispatchEvent(new Event("open"));
@@ -45,7 +45,7 @@ export class MockWebSocket extends EventTarget implements WebSocket {
4545

4646
close(code?: number, reason?: string): void {
4747
this.#readyState = 2;
48-
// Simulate a slow closing of a WebSocket as much as possible.
48+
// Simulate async behavior of WebSocket as much as possible.
4949
queueMicrotask(() => {
5050
if (this.#remote) {
5151
this.#remote.#readyState = 3;
@@ -57,7 +57,10 @@ export class MockWebSocket extends EventTarget implements WebSocket {
5757
}
5858

5959
send(data: MessageEventData): void {
60-
this.#remote?.dispatchEvent(new MessageEvent("message", { data }));
60+
// Simulate async behavior of WebSocket as much as possible.
61+
queueMicrotask(() =>
62+
this.#remote?.dispatchEvent(new MessageEvent("message", { data }))
63+
);
6164
}
6265

6366
onclose = null;

tests/client_test.ts

+64-21
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1-
import { NostrEvent } from "../core/nips/01.ts";
1+
import {
2+
EventId,
3+
NostrEvent,
4+
OkMessage,
5+
PublishMessage,
6+
} from "../core/nips/01.ts";
27
import { Relay } from "../client.ts";
38
import { afterAll, beforeAll, describe, it } from "../lib/std/testing.ts";
4-
import { assert, assertEquals, assertObjectMatch } from "../lib/std/assert.ts";
9+
import {
10+
assert,
11+
assertEquals,
12+
assertObjectMatch,
13+
assertRejects,
14+
} from "../lib/std/assert.ts";
515
import { MockWebSocket } from "../lib/testing.ts";
616

717
const url = "wss://localhost:8080";
@@ -13,19 +23,15 @@ describe("Relay constructor", () => {
1323
beforeAll(() => {
1424
relay = new Relay(url);
1525
});
16-
1726
it("should be constructable", () => {
1827
assert(relay instanceof Relay);
1928
});
20-
2129
it("should have a url", () => {
2230
assertEquals(relay.config.url, url);
2331
});
24-
2532
it("should have a name", () => {
2633
assertEquals(relay.config.name, "localhost:8080");
2734
});
28-
2935
it("should have default options", () => {
3036
assertObjectMatch(relay.config, {
3137
nbuffer: 10,
@@ -37,7 +43,6 @@ describe("Relay constructor", () => {
3743

3844
describe("called with url and options", () => {
3945
const logger = { info: () => {} };
40-
4146
beforeAll(() => {
4247
relay = new Relay(url, {
4348
name: "test",
@@ -47,15 +52,12 @@ describe("Relay constructor", () => {
4752
logger,
4853
});
4954
});
50-
51-
afterAll(async () => {
52-
await relay.close();
55+
afterAll(() => {
56+
relay.close();
5357
});
54-
5558
it("should be constructable", () => {
5659
assert(relay instanceof Relay);
5760
});
58-
5961
it("should have the given options", () => {
6062
assertObjectMatch(relay.config, {
6163
name: "test",
@@ -79,21 +81,18 @@ describe("Relay", () => {
7981
globalThis.WebSocket = MockWebSocket;
8082
relay = new Relay(url);
8183
});
82-
8384
afterAll(() => {
8485
relay.close();
8586
});
8687

8788
it("should not be connected initially", () => {
8889
assertEquals(relay.status, WebSocket.CLOSED);
8990
});
90-
9191
it("should not connect when a subscription is created", () => {
9292
sub_1 = relay.subscribe({ kinds: [1] }, { id: "test-1" });
9393
assert(sub_1 instanceof ReadableStream);
9494
assertEquals(relay.status, WebSocket.CLOSED);
9595
});
96-
9796
it("should receive text notes", async () => {
9897
const reader = sub_1.getReader();
9998
const read = reader.read();
@@ -108,15 +107,13 @@ describe("Relay", () => {
108107
assertEquals(value.kind, 1);
109108
reader.releaseLock();
110109
});
111-
112110
it("should be able to open multiple subscriptions", () => {
113111
sub_0 = relay.subscribe({ kinds: [0], limit: 1 }, { id: "test-0" });
114112
assert(sub_0 instanceof ReadableStream);
115113
});
116-
117114
it("should recieve metas and notes simultaneously", async () => {
118-
const read_0 = sub_0.getReader().read();
119-
const read_1 = sub_1.getReader().read();
115+
const reader_0 = sub_0.getReader();
116+
const reader_1 = sub_1.getReader();
120117
const ws = MockWebSocket.instances[0];
121118
ws.dispatchEvent(
122119
new MessageEvent("message", {
@@ -129,12 +126,58 @@ describe("Relay", () => {
129126
}),
130127
);
131128
const [{ value: value_0 }, { value: value_1 }] = await Promise.all([
132-
read_0,
133-
read_1,
129+
reader_0.read(),
130+
reader_1.read(),
134131
]);
135132
assert(value_0);
136133
assertEquals(value_0.kind, 0);
137134
assert(value_1);
138135
assertEquals(value_1.kind, 1);
136+
reader_0.releaseLock();
137+
reader_1.releaseLock();
138+
});
139+
it("should publish an event and recieve an accepting OK message", async () => {
140+
const eid = "test-true" as EventId;
141+
const ok = ["OK", eid, true, ""] satisfies OkMessage<true>;
142+
const ws = MockWebSocket.instances[0];
143+
const arrived = new Promise<true>((resolve) => {
144+
ws.remote.addEventListener(
145+
"message",
146+
(ev: MessageEvent<string>) => {
147+
const [, event] = JSON.parse(ev.data) as PublishMessage<1>;
148+
if (event.id === eid) {
149+
assertObjectMatch(event, { kind: 1 });
150+
resolve(true);
151+
ws.remote.send(JSON.stringify(ok));
152+
}
153+
},
154+
);
155+
});
156+
const event = { id: eid, kind: 1 };
157+
// deno-lint-ignore no-explicit-any
158+
await relay.publish(event as any);
159+
assert(await arrived);
160+
});
161+
it("should receieve a rejecting OK message", async () => {
162+
const eid = "test-false" as EventId;
163+
const ok = ["OK", eid, false, "error: test"] satisfies OkMessage<false>;
164+
const ws = MockWebSocket.instances[0];
165+
const arrived = new Promise<true>((resolve) => {
166+
ws.remote.addEventListener(
167+
"message",
168+
(ev: MessageEvent<string>) => {
169+
const [, event] = JSON.parse(ev.data) as PublishMessage<1>;
170+
if (event.id === eid) {
171+
assertEquals(event.kind, 1);
172+
resolve(true);
173+
ws.remote.send(JSON.stringify(ok));
174+
}
175+
},
176+
);
177+
});
178+
const event = { id: eid, kind: 1 };
179+
// deno-lint-ignore no-explicit-any
180+
assertRejects(() => relay.publish(event as any));
181+
await arrived;
139182
});
140183
});

0 commit comments

Comments
 (0)