Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a single SharedArrayBuffer #154

Merged
merged 5 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,18 @@ export interface GlobalShim {

### Options

1. `bufferSize` same as env `SYNCKIT_BUFFER_SIZE`
2. `timeout` same as env `SYNCKIT_TIMEOUT`
3. `execArgv` same as env `SYNCKIT_EXEC_ARGV`
4. `tsRunner` same as env `SYNCKIT_TS_RUNNER`
5. `transferList`: Please refer Node.js [`worker_threads`](https://nodejs.org/api/worker_threads.html#:~:text=Default%3A%20true.-,transferList,-%3CObject%5B%5D%3E%20If) documentation
6. `globalShims`: Similar like env `SYNCKIT_GLOBAL_SHIMS` but much more flexible which can be a `GlobalShim` `Array`, see `GlobalShim`'s [definition](#types) for more details
1. `timeout` same as env `SYNCKIT_TIMEOUT`
2. `execArgv` same as env `SYNCKIT_EXEC_ARGV`
3. `tsRunner` same as env `SYNCKIT_TS_RUNNER`
4. `transferList`: Please refer Node.js [`worker_threads`](https://nodejs.org/api/worker_threads.html#:~:text=Default%3A%20true.-,transferList,-%3CObject%5B%5D%3E%20If) documentation
5. `globalShims`: Similar like env `SYNCKIT_GLOBAL_SHIMS` but much more flexible which can be a `GlobalShim` `Array`, see `GlobalShim`'s [definition](#types) for more details

### Envs

1. `SYNCKIT_BUFFER_SIZE`: `bufferSize` to create `SharedArrayBuffer` for `worker_threads` (default as `1024`)
2. `SYNCKIT_TIMEOUT`: `timeout` for performing the async job (no default)
3. `SYNCKIT_EXEC_ARGV`: List of node CLI options passed to the worker, split with comma `,`. (default as `[]`), see also [`node` docs](https://nodejs.org/api/worker_threads.html)
4. `SYNCKIT_TS_RUNNER`: Which TypeScript runner to be used, it could be very useful for development, could be `'ts-node' | 'esbuild-register' | 'esbuild-runner' | 'swc' | 'tsx'`, `'ts-node'` is used by default, make sure you have installed them already
5. `SYNCKIT_GLOBAL_SHIMS`: Whether to enable the default `DEFAULT_GLOBAL_SHIMS_PRESET` as `globalShims`
1. `SYNCKIT_TIMEOUT`: `timeout` for performing the async job (no default)
2. `SYNCKIT_EXEC_ARGV`: List of node CLI options passed to the worker, split with comma `,`. (default as `[]`), see also [`node` docs](https://nodejs.org/api/worker_threads.html)
3. `SYNCKIT_TS_RUNNER`: Which TypeScript runner to be used, it could be very useful for development, could be `'ts-node' | 'esbuild-register' | 'esbuild-runner' | 'swc' | 'tsx'`, `'ts-node'` is used by default, make sure you have installed them already
4. `SYNCKIT_GLOBAL_SHIMS`: Whether to enable the default `DEFAULT_GLOBAL_SHIMS_PRESET` as `globalShims`

### TypeScript

Expand Down
57 changes: 22 additions & 35 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import type {
WorkerToMainMessage,
} from './types.js'

const INT32_BYTES = 4

export * from './types.js'

export const TsRunner = {
Expand All @@ -44,22 +46,15 @@ export const TsRunner = {
export type TsRunner = ValueOf<typeof TsRunner>

const {
SYNCKIT_BUFFER_SIZE,
SYNCKIT_TIMEOUT,
SYNCKIT_EXEC_ARGV,
SYNCKIT_TS_RUNNER,
SYNCKIT_GLOBAL_SHIMS,
NODE_OPTIONS,
} = process.env

export const DEFAULT_BUFFER_SIZE = SYNCKIT_BUFFER_SIZE
? +SYNCKIT_BUFFER_SIZE
: undefined

export const DEFAULT_TIMEOUT = SYNCKIT_TIMEOUT ? +SYNCKIT_TIMEOUT : undefined

export const DEFAULT_WORKER_BUFFER_SIZE = DEFAULT_BUFFER_SIZE || 1024

/* istanbul ignore next */
export const DEFAULT_EXEC_ARGV = SYNCKIT_EXEC_ARGV?.split(',') || []

Expand All @@ -86,7 +81,6 @@ export const MTS_SUPPORTED_NODE_VERSION = 16
const syncFnCache = new Map<string, AnyFn>()

export interface SynckitOptions {
bufferSize?: number
timeout?: number
execArgv?: string[]
tsRunner?: TsRunner
Expand All @@ -109,40 +103,30 @@ export function extractProperties<T>(object?: T) {
}
}

export function createSyncFn<T extends AnyAsyncFn>(
workerPath: string,
bufferSize?: number,
timeout?: number,
): Syncify<T>
export function createSyncFn<T extends AnyAsyncFn>(
workerPath: string,
options?: SynckitOptions,
): Syncify<T>
export function createSyncFn<R, T extends AnyAsyncFn<R>>(
export function createSyncFn<T extends AnyAsyncFn<R>, R = unknown>(
workerPath: string,
bufferSizeOrOptions?: SynckitOptions | number,
timeout?: number,
) {
timeoutOrOptions?: SynckitOptions | number,
): Syncify<T> {
if (!path.isAbsolute(workerPath)) {
throw new Error('`workerPath` must be absolute')
}

const cachedSyncFn = syncFnCache.get(workerPath)

if (cachedSyncFn) {
return cachedSyncFn
return cachedSyncFn as Syncify<T>
}

const syncFn = startWorkerThread<R, T>(
workerPath,
/* istanbul ignore next */ typeof bufferSizeOrOptions === 'number'
? { bufferSize: bufferSizeOrOptions, timeout }
: bufferSizeOrOptions,
/* istanbul ignore next */ typeof timeoutOrOptions === 'number'
? { timeout: timeoutOrOptions }
: timeoutOrOptions,
)

syncFnCache.set(workerPath, syncFn)

return syncFn
return syncFn as Syncify<T>
}

const cjsRequire =
Expand Down Expand Up @@ -421,7 +405,6 @@ export const generateGlobals = (
function startWorkerThread<R, T extends AnyAsyncFn<R>>(
workerPath: string,
{
bufferSize = DEFAULT_WORKER_BUFFER_SIZE,
timeout = DEFAULT_TIMEOUT,
execArgv = DEFAULT_EXEC_ARGV,
tsRunner = DEFAULT_TS_RUNNER,
Expand Down Expand Up @@ -480,6 +463,11 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
: []
).filter(({ moduleName }) => isPkgAvailable(moduleName))

// We store a single Byte in the SharedArrayBuffer
// for the notification, we can used a fixed size
const sharedBuffer = new SharedArrayBuffer(INT32_BYTES)
const sharedBufferView = new Int32Array(sharedBuffer, 0, 1)

const useGlobals = finalGlobalShims.length > 0

const useEval = isTs ? !tsUseEsm : !jsUseEsm && useGlobals
Expand All @@ -501,7 +489,7 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
: workerPathUrl,
{
eval: useEval,
workerData: { workerPort },
workerData: { sharedBuffer, workerPort },
transferList: [workerPort, ...transferList],
execArgv: finalExecArgv,
},
Expand All @@ -512,13 +500,12 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
const syncFn = (...args: Parameters<T>): R => {
const id = nextID++

const sharedBuffer = new SharedArrayBuffer(bufferSize)
const sharedBufferView = new Int32Array(sharedBuffer)

const msg: MainToWorkerMessage<Parameters<T>> = { sharedBuffer, id, args }
const msg: MainToWorkerMessage<Parameters<T>> = { id, args }
worker.postMessage(msg)

const status = Atomics.wait(sharedBufferView, 0, 0, timeout)
// Reset SharedArrayBuffer for next call
Atomics.store(sharedBufferView, 0, 0)

/* istanbul ignore if */
if (!['ok', 'not-equal'].includes(status)) {
Expand Down Expand Up @@ -560,14 +547,14 @@ export function runAsWorker<
return
}

const { workerPort } = workerData as WorkerData
const { workerPort, sharedBuffer } = workerData as WorkerData
const sharedBufferView = new Int32Array(sharedBuffer, 0, 1)

parentPort!.on(
'message',
({ sharedBuffer, id, args }: MainToWorkerMessage<Parameters<T>>) => {
({ id, args }: MainToWorkerMessage<Parameters<T>>) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
;(async () => {
const sharedBufferView = new Int32Array(sharedBuffer)
let msg: WorkerToMainMessage<R>
try {
msg = { id, result: await fn(...args) }
Expand Down
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ export type PromiseType<T extends AnyPromise> = T extends Promise<infer R>
export type ValueOf<T> = T[keyof T]

export interface MainToWorkerMessage<T extends unknown[]> {
sharedBuffer: SharedArrayBuffer
id: number
args: T
}

export interface WorkerData {
sharedBuffer: SharedArrayBuffer
workerPort: MessagePort
}

Expand Down
2 changes: 0 additions & 2 deletions test/fn.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ const { SYNCKIT_TIMEOUT } = process.env
beforeEach(() => {
jest.resetModules()

delete process.env.SYNCKIT_BUFFER_SIZE
delete process.env.SYNCKIT_GLOBAL_SHIMS

if (SYNCKIT_TIMEOUT) {
Expand Down Expand Up @@ -95,7 +94,6 @@ test('createSyncFn', () => {
})

test('timeout', async () => {
process.env.SYNCKIT_BUFFER_SIZE = '0'
process.env.SYNCKIT_TIMEOUT = '1'

const { createSyncFn } = await import('synckit')
Expand Down
2 changes: 1 addition & 1 deletion test/ts-runner.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ test('unknown ts runner', async () => {
const { createSyncFn } = await import('synckit')

expect(() =>
// @ts-expect-error
createSyncFn<AsyncWorkerFn>(path.resolve(_dirname, 'worker.js'), {
// @ts-expect-error
tsRunner: 'unknown',
}),
).toThrowErrorMatchingInlineSnapshot(`"Unknown ts runner: unknown"`)
Expand Down