Skip to content

Commit b02b0c8

Browse files
committed
feat(telemetry): add @agoric/telemetry/src/flight-recorder.js
1 parent 2892da9 commit b02b0c8

File tree

3 files changed

+239
-3
lines changed

3 files changed

+239
-3
lines changed
+205
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// @ts-check
2+
/* global BigUint64Array */
3+
/// <reference types="ses" />
4+
5+
// https://github.com/Agoric/agoric-sdk/issues/3742#issuecomment-1028451575
6+
// I'd mmap() a 100MB file, reserve a few bytes for offsets, then use the rest
7+
// as a circular buffer to hold length-prefixed records. The agd process would
8+
// keep writing new events into the RAM window and updating the start/end
9+
// pointers, with some sequencing to make sure the record gets written before
10+
// the pointer is updated. Then, no mattter how abruptly the process is
11+
// terminated, as long as the host computer itself is still running, the on-disk
12+
// file would contain the most recent state, and anybody who reads the file will
13+
// get the most recent state. The host kernel (linux) is under no obligation to
14+
// flush it to disk any particular time, but knows when reads happen, so there's
15+
// no coherency problem, and the speed is unaffected by disk write speeds.
16+
17+
import BufferFromFile from 'bufferfromfile';
18+
import fs from 'fs';
19+
import path from 'path';
20+
21+
const { details: X } = assert;
22+
23+
export const DEFAULT_CIRCULAR_BUFFER_SIZE = 100 * 1024 * 1024;
24+
export const DEFAULT_CIRCULAR_BUFFER_FILE = 'flight-recorder.bin';
25+
export const SLOG_MAGIC = 0x21474f4c532d4741n; // 'AG-SLOG!'
26+
27+
const I_MAGIC = 0;
28+
const I_ARENA_SIZE = 1;
29+
const I_CIRC_START = 2;
30+
const I_CIRC_END = 3;
31+
const HEADER_LENGTH = 4;
32+
33+
export const makeMemoryMappedCircularBuffer = ({
34+
circularBufferSize = DEFAULT_CIRCULAR_BUFFER_SIZE,
35+
stateDir = '/tmp',
36+
circularBufferFile,
37+
}) => {
38+
const bufferFile =
39+
circularBufferFile || `${stateDir}/${DEFAULT_CIRCULAR_BUFFER_FILE}`;
40+
// console.log({ circularBufferFile, bufferFile });
41+
42+
// If the file doesn't exist, or is not large enough, create it.
43+
let stbuf;
44+
try {
45+
stbuf = fs.statSync(bufferFile);
46+
} catch (e) {
47+
if (e.code !== 'ENOENT') {
48+
throw e;
49+
}
50+
}
51+
const arenaSize = BigInt(
52+
circularBufferSize - HEADER_LENGTH * BigUint64Array.BYTES_PER_ELEMENT,
53+
);
54+
if (!stbuf || stbuf.size < BigUint64Array.BYTES_PER_ELEMENT * 3) {
55+
// Write the header.
56+
const header = new Array(HEADER_LENGTH).fill(0n);
57+
header[I_MAGIC] = SLOG_MAGIC;
58+
header[I_ARENA_SIZE] = arenaSize;
59+
fs.mkdirSync(path.dirname(bufferFile), { recursive: true });
60+
fs.writeFileSync(bufferFile, BigUint64Array.from(header));
61+
}
62+
if (!stbuf || stbuf.size < circularBufferSize) {
63+
fs.truncateSync(bufferFile, circularBufferSize);
64+
}
65+
66+
/** @type {Uint8Array} */
67+
const fileBuf = BufferFromFile(bufferFile).Uint8Array();
68+
const header = new BigUint64Array(fileBuf.buffer, 0, HEADER_LENGTH);
69+
70+
assert.equal(
71+
SLOG_MAGIC,
72+
header[I_MAGIC],
73+
X`${bufferFile} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${header[I_MAGIC]}`,
74+
);
75+
assert.equal(
76+
arenaSize,
77+
header[I_ARENA_SIZE],
78+
X`${bufferFile} arena size mismatch; wanted ${arenaSize}, got ${header[I_ARENA_SIZE]}`,
79+
);
80+
const arena = new Uint8Array(
81+
fileBuf.buffer,
82+
header.byteLength,
83+
Number(arenaSize),
84+
);
85+
86+
/**
87+
* @param {Uint8Array} data
88+
* @param {number} [offset]
89+
*/
90+
const readCircBuf = (data, offset = 0) => {
91+
assert(
92+
offset + data.byteLength <= arenaSize,
93+
X`Reading past end of circular buffer`,
94+
);
95+
96+
// Read the data to the end of the arena.
97+
let firstReadLength = data.byteLength;
98+
const circStart = Number(header[I_CIRC_START]);
99+
const readStart = (circStart + offset) % Number(arenaSize);
100+
if (readStart > header[I_CIRC_END]) {
101+
// The data is wrapped around the end of the arena, like BBB---AAA
102+
firstReadLength = Math.min(
103+
firstReadLength,
104+
Number(arenaSize) - readStart,
105+
);
106+
}
107+
data.set(arena.subarray(readStart, readStart + firstReadLength));
108+
if (firstReadLength < data.byteLength) {
109+
data.set(
110+
arena.subarray(0, data.byteLength - firstReadLength),
111+
firstReadLength,
112+
);
113+
}
114+
return data;
115+
};
116+
117+
/** @param {Uint8Array} data */
118+
const writeCircBuf = data => {
119+
if (BigUint64Array.BYTES_PER_ELEMENT + data.byteLength > arena.byteLength) {
120+
// The data is too big to fit in the arena, so skip it.
121+
const tooBigRecord = JSON.stringify({
122+
type: 'slog-record-too-big',
123+
size: data.byteLength,
124+
});
125+
data = new TextEncoder().encode(tooBigRecord);
126+
}
127+
128+
const record = new Uint8Array(
129+
BigUint64Array.BYTES_PER_ELEMENT + data.byteLength,
130+
);
131+
const lengthPrefix = new BigUint64Array(record.buffer, 0, 1);
132+
lengthPrefix[0] = BigInt(data.byteLength);
133+
record.set(data, BigUint64Array.BYTES_PER_ELEMENT);
134+
135+
// Check if we need to wrap around.
136+
/** @type {bigint} */
137+
let capacity;
138+
if (header[I_CIRC_START] <= header[I_CIRC_END]) {
139+
// ---AAAABBBB----
140+
capacity =
141+
header[I_ARENA_SIZE] - header[I_CIRC_END] + header[I_CIRC_START];
142+
} else {
143+
// BBB---AAAA
144+
capacity = header[I_CIRC_START] - header[I_CIRC_END];
145+
}
146+
147+
let overlap = BigInt(record.byteLength) - capacity;
148+
while (overlap > 0n) {
149+
// Advance the start pointer.
150+
const startRecordLength = new BigUint64Array(1);
151+
readCircBuf(new Uint8Array(startRecordLength.buffer));
152+
153+
const totalRecordLength =
154+
BigInt(startRecordLength.byteLength) + // size of the length field
155+
startRecordLength[0]; // size of the record
156+
157+
header[I_CIRC_START] =
158+
(header[I_CIRC_START] + totalRecordLength) % header[I_ARENA_SIZE];
159+
overlap -= totalRecordLength;
160+
}
161+
162+
// Append the record.
163+
let firstWriteLength = record.byteLength;
164+
if (header[I_CIRC_START] < header[I_CIRC_END]) {
165+
// May need to wrap, it's ---AAAABBBB---
166+
firstWriteLength = Math.min(
167+
firstWriteLength,
168+
Number(header[I_ARENA_SIZE] - header[I_CIRC_END]),
169+
);
170+
}
171+
172+
const circEnd = Number(header[I_CIRC_END]);
173+
arena.set(record.subarray(0, firstWriteLength), circEnd);
174+
if (firstWriteLength < record.byteLength) {
175+
// Write to the beginning of the arena.
176+
arena.set(record.subarray(firstWriteLength, record.byteLength), 0);
177+
}
178+
header[I_CIRC_END] =
179+
(header[I_CIRC_END] + BigInt(record.byteLength)) % header[I_ARENA_SIZE];
180+
};
181+
182+
const writeJSON = obj => {
183+
const text = JSON.stringify(obj, (key, value) => {
184+
if (typeof value === 'bigint') {
185+
return Number(value);
186+
}
187+
if (key === 'endoZipBase64') {
188+
// Abridge the source bundle, since it's pretty huge.
189+
return `[${value.length} characters...]`;
190+
}
191+
return value;
192+
});
193+
// Prepend a newline so that the file can be more easily manipulated.
194+
const data = new TextEncoder().encode(`\n${text}`);
195+
// console.log('have obj', obj);
196+
writeCircBuf(data);
197+
};
198+
199+
return { readCircBuf, writeCircBuf, writeJSON };
200+
};
201+
202+
export const makeSlogSender = opts => {
203+
const { writeJSON } = makeMemoryMappedCircularBuffer(opts);
204+
return writeJSON;
205+
};

packages/telemetry/test/prepare-test-env-ava.js

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
/* eslint-disable import/no-extraneous-dependencies */
2-
import '@endo/init/pre-bundle-source.js';
3-
import '@endo/lockdown/commit-debug.js';
1+
// @ts-check
2+
import '@endo/init';
43

54
import { wrapTest } from '@endo/ses-ava';
65
import rawTest from 'ava';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// @ts-check
2+
/* global BigUint64Array */
3+
import tmp from 'tmp';
4+
import { test } from './prepare-test-env-ava.js';
5+
6+
import { makeMemoryMappedCircularBuffer } from '../src/flight-recorder.js';
7+
8+
test('flight-recorder sanity', t => {
9+
const { name: tmpFile } = tmp.fileSync();
10+
console.log(tmpFile);
11+
const { writeJSON: slogSender, readCircBuf } = makeMemoryMappedCircularBuffer(
12+
{
13+
circularBufferSize: 512,
14+
circularBufferFile: tmpFile,
15+
},
16+
);
17+
slogSender({ type: 'start' });
18+
19+
const len0 = new BigUint64Array(readCircBuf(new Uint8Array(8)).buffer);
20+
const buf0 = readCircBuf(new Uint8Array(Number(len0[0])), 8);
21+
const buf0Str = new TextDecoder().decode(buf0);
22+
t.is(buf0Str, `\n{"type":"start"}`);
23+
24+
for (let i = 0; i < 500; i += 1) {
25+
slogSender({ type: 'iteration', iteration: i });
26+
}
27+
28+
const len1 = new BigUint64Array(readCircBuf(new Uint8Array(8)).buffer);
29+
const buf1 = readCircBuf(new Uint8Array(Number(len1[0])), 8);
30+
const buf1Str = new TextDecoder().decode(buf1);
31+
t.is(buf1Str, `\n{"type":"iteration","iteration":490}`);
32+
});

0 commit comments

Comments
 (0)