Skip to content

Commit

Permalink
feat: add forceSyncHistory options (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
killagu authored Jul 25, 2022
1 parent 4303c8a commit af6a75a
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 8 deletions.
3 changes: 3 additions & 0 deletions app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export type SyncPackageTaskOptions = {
tips?: string;
skipDependencies?: boolean;
syncDownloadData?: boolean;
// force sync history version
forceSyncHistory?: boolean;
};

export class Task extends Entity {
Expand Down Expand Up @@ -82,6 +84,7 @@ export class Task extends Entity {
tips: options?.tips,
skipDependencies: options?.skipDependencies,
syncDownloadData: options?.syncDownloadData,
forceSyncHistory: options?.forceSyncHistory,
},
};
const task = this.create(data);
Expand Down
18 changes: 15 additions & 3 deletions app/core/service/PackageSyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ export class PackageSyncerService extends AbstractService {

public async executeTask(task: Task) {
const fullname = task.targetName;
const { tips, skipDependencies: originSkipDependencies, syncDownloadData } = task.data as SyncPackageTaskOptions;
const { tips, skipDependencies: originSkipDependencies, syncDownloadData, forceSyncHistory } = task.data as SyncPackageTaskOptions;
const registry = this.npmRegistry.registry;
let logs: string[] = [];
if (tips) {
Expand All @@ -206,7 +206,7 @@ export class PackageSyncerService extends AbstractService {
const logUrl = `${this.config.cnpmcore.registry}/-/package/${fullname}/syncs/${task.taskId}/log`;
this.logger.info('[PackageSyncerService.executeTask:start] taskId: %s, targetName: %s, attempts: %s, taskQueue: %s/%s, syncUpstream: %s, log: %s',
task.taskId, task.targetName, task.attempts, taskQueueLength, taskQueueHighWaterSize, syncUpstream, logUrl);
logs.push(`[${isoNow()}] 🚧🚧🚧🚧🚧 Syncing from ${registry}/${fullname}, skipDependencies: ${skipDependencies}, syncUpstream: ${syncUpstream}, syncDownloadData: ${!!syncDownloadData}, attempts: ${task.attempts}, worker: "${os.hostname()}/${process.pid}", taskQueue: ${taskQueueLength}/${taskQueueHighWaterSize} 🚧🚧🚧🚧🚧`);
logs.push(`[${isoNow()}] 🚧🚧🚧🚧🚧 Syncing from ${registry}/${fullname}, skipDependencies: ${skipDependencies}, syncUpstream: ${syncUpstream}, syncDownloadData: ${!!syncDownloadData}, forceSyncHistory: ${!!forceSyncHistory} attempts: ${task.attempts}, worker: "${os.hostname()}/${process.pid}", taskQueue: ${taskQueueLength}/${taskQueueHighWaterSize} 🚧🚧🚧🚧🚧`);
logs.push(`[${isoNow()}] 🚧 log: ${logUrl}`);

const [ scope, name ] = getScopeAndName(fullname);
Expand Down Expand Up @@ -391,7 +391,7 @@ export class PackageSyncerService extends AbstractService {
const version: string = item.version;
if (!version) continue;
let existsItem = existsVersionMap[version];
const existsAbbreviatedItem = abbreviatedVersionMap[version];
let existsAbbreviatedItem = abbreviatedVersionMap[version];
const shouldDeleteReadme = !!(existsItem && 'readme' in existsItem);
if (pkg) {
if (existsItem) {
Expand All @@ -412,6 +412,18 @@ export class PackageSyncerService extends AbstractService {
logs.push(`[${isoNow()}] 🐛 Remote version ${version} not exists on local manifests, need to refresh`);
}
}

if (existsItem && forceSyncHistory === true) {
const pkgVer = await this.packageRepository.findPackageVersion(pkg.packageId, version);
if (pkgVer) {
logs.push(`[${isoNow()}] 🚧 [${syncIndex}] Remove version ${version} for force sync history`);
await this.packageManagerService.removePackageVersion(pkg, pkgVer, true);
existsItem = undefined;
existsAbbreviatedItem = undefined;
existsVersionMap[version] = undefined;
abbreviatedVersionMap[version] = undefined;
}
}
}

if (existsItem) {
Expand Down
22 changes: 17 additions & 5 deletions app/port/controller/PackageSyncController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
EggContext,
Inject,
HTTPQuery,
BackgroundTaskHelper,
} from '@eggjs/tegg';
import { ForbiddenError, NotFoundError } from 'egg-errors';
import { AbstractController } from './AbstractController';
Expand All @@ -22,6 +23,9 @@ export class PackageSyncController extends AbstractController {
@Inject()
private packageSyncerService: PackageSyncerService;

@Inject()
private backgroundTaskHelper: BackgroundTaskHelper;

private async _executeTaskAsync(task: Task) {
const startTime = Date.now();
this.logger.info('[PackageSyncController:executeTask:start] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms',
Expand Down Expand Up @@ -50,12 +54,15 @@ export class PackageSyncController extends AbstractController {
throw new ForbiddenError('Not allow to sync package');
}
const tips = data.tips || `Sync cause by "${ctx.href}", parent traceId: ${ctx.tracer.traceId}`;
const isAdmin = await this.userRoleManager.isAdmin(ctx);
const params = {
fullname,
tips,
skipDependencies: !!data.skipDependencies,
syncDownloadData: !!data.syncDownloadData,
force: !!data.force,
// only admin allow to sync history version
forceSyncHistory: !!data.forceSyncHistory && isAdmin,
};
ctx.tValidate(SyncPackageTaskRule, params);
const [ scope, name ] = getScopeAndName(params.fullname);
Expand All @@ -73,16 +80,20 @@ export class PackageSyncController extends AbstractController {
tips: params.tips,
skipDependencies: params.skipDependencies,
syncDownloadData: params.syncDownloadData,
forceSyncHistory: params.forceSyncHistory,
});
ctx.logger.info('[PackageSyncController.createSyncTask:success] taskId: %s, fullname: %s',
task.taskId, fullname);
if (data.force) {
const isAdmin = await this.userRoleManager.isAdmin(ctx);
if (isAdmin) {
// execute task in background
this._executeTaskAsync(task);
ctx.logger.info('[PackageSyncController.createSyncTask:execute-immediately] taskId: %s',
task.taskId);
// set background task timeout to 5min
this.backgroundTaskHelper.timeout = 1000 * 60 * 5;
this.backgroundTaskHelper.run(async () => {
ctx.logger.info('[PackageSyncController.createSyncTask:execute-immediately] taskId: %s',
task.taskId);
// execute task in background
await this._executeTaskAsync(task);
});
}
}
ctx.status = 201;
Expand Down Expand Up @@ -153,6 +164,7 @@ export class PackageSyncController extends AbstractController {
skipDependencies: nodeps === 'true',
syncDownloadData: false,
force: false,
forceSyncHistory: false,
};
const task = await this.createSyncTask(ctx, fullname, options);
return {
Expand Down
2 changes: 2 additions & 0 deletions app/port/typebox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ export const SyncPackageTaskRule = Type.Object({
syncDownloadData: Type.Boolean(),
// force sync immediately, only allow by admin
force: Type.Boolean(),
// sync history version
forceSyncHistory: Type.Boolean(),
});
export type SyncPackageTaskType = Static<typeof SyncPackageTaskRule>;

Expand Down
18 changes: 18 additions & 0 deletions test/core/service/PackageSyncerService/executeTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {
assert.equal(abbreviatedManifests.data.name, manifests.data.name);
});

it('should resync history version if forceSyncHistory is true', async () => {
await packageSyncerService.createTask('foo', { skipDependencies: true });
let task = await packageSyncerService.findExecuteTask();
assert(task);
await packageSyncerService.executeTask(task);

await packageSyncerService.createTask('foo', { forceSyncHistory: true, skipDependencies: true });
task = await packageSyncerService.findExecuteTask();
assert(task);
await packageSyncerService.executeTask(task);
const stream2 = await packageSyncerService.findTaskLog(task);
assert(stream2);
const log2 = await TestUtil.readStreamToLog(stream2);
// console.log(log2);
assert(/Remove version 1\.0\.0 for force sync history/.test(log2));
assert(/Syncing version 1\.0\.0/.test(log2));
});

it('should not sync dependencies where task queue length too high', async () => {
mock(app.config.cnpmcore, 'taskQueueHighWaterSize', 2);
await packageSyncerService.createTask('foo', { skipDependencies: false });
Expand Down

0 comments on commit af6a75a

Please sign in to comment.