Skip to content
This repository was archived by the owner on Oct 30, 2018. It is now read-only.

[WIP] workaround added for multiple restarting crashed proccesses crashing … #118

Merged
merged 3 commits into from
May 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions bin/storjshare-daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

const daemonize = require('daemon');
const dnode = require('dnode');
const config = require('../lib/config/daemon');
const RPC = require('../lib/api');
const utils = require('../lib/utils');
const api = new RPC({ logVerbosity: config.daemonLogVerbosity });
const {createWriteStream} = require('fs');
const config = require('../lib/config/daemon');
const { createWriteStream } = require('fs');
const logFile = createWriteStream(config.daemonLogFilePath, { flags: 'a' });
const storjshare_daemon = require('commander');

Expand All @@ -17,6 +16,10 @@ storjshare_daemon
.option('-F, --foreground', 'keeps the process in the foreground')
.parse(process.argv);

const api = new RPC({
logVerbosity: config.daemonLogVerbosity
});

function startDaemonRpcServer() {
dnode(api.methods)
.on('error', (err) => api.logger.warn(err.message))
Expand Down
5 changes: 2 additions & 3 deletions bin/storjshare-start.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ storjshare_start
.description('starts a new network share')
.option('-c, --config <path>', 'specify the configuration path')
.option('-d, --detached', 'run share without management from daemon')
.option('-u, --unsafe', 'ignore system resource guards')
.parse(process.argv);

if (!storjshare_start.config) {
Expand Down Expand Up @@ -42,7 +43,7 @@ function runManagedShare() {
}
console.info(`\n * starting share with config at ${configPath}`);
sock.end();
});
}, storjshare_start.unsafe);
});
}

Expand All @@ -51,5 +52,3 @@ if (storjshare_start.detached) {
} else {
runManagedShare();
}


4 changes: 2 additions & 2 deletions example/daemon.config.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
// Bind Dnode RPC server to this port
"daemonRpcPort": 45015,
// Interface to bind Dnode RPC server, if your host is public, be sure to
// Interface to bind Dnode RPC server, if your host is public, be sure to
// leave this as "127.0.0.1" to prevent others from controlling your nodes
// You can set this to a public address if you'd like to control your shares
// remotely, however you must secure access on your own - you have been
// remotely, however you must secure access on your own - you have been
// warned
"daemonRpcAddress": "127.0.0.1",
// Path to write daemon log file to disk, leave blank to default to:
Expand Down
28 changes: 18 additions & 10 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const JsonLogger = require('kad-logger-json');
const {fork} = require('child_process');
const utils = require('./utils');
const path = require('path');
const { cpus } = require('os');

/** Class representing a local RPC API's handlers */
class RPC {
Expand Down Expand Up @@ -75,19 +76,23 @@ class RPC {
/**
* Starts a share process with the given configuration
* @param {String} configPath
* @param {Boolean} unsafeFlag
* @param {RPC~startCallback}
* @see https://storj.github.io/core/FarmerInterface.html
*/
start(configPath, callback) {
start(configPath, callback, unsafeFlag=false) {
let config = null;

if (this.shares.size + 1 >= cpus().length && !unsafeFlag) {
return callback(new Error('insufficient system resources available'));
}

try {
config = this._readConfig(configPath);
} catch (err) {
return callback(err);
}

const self = this;
const nodeId = storj.KeyPair(config.networkPrivateKey).getNodeID();
const share = this.shares.get(nodeId) || {
config: config,
Expand All @@ -103,7 +108,7 @@ class RPC {

this._log(`attempting to start share with config at path ${configPath}`);

if (self.shares.has(nodeId) && self.shares.get(nodeId).readyState === 1) {
if (this.shares.has(nodeId) && this.shares.get(nodeId).readyState === 1) {
return callback(new Error(`share ${nodeId} is already running`));
}

Expand Down Expand Up @@ -137,7 +142,7 @@ class RPC {
// NB: Listen for state changes to update the share's record
share.process.on('error', (err) => {
share.readyState = RPC.SHARE_ERRORED;
self._log(err.message, 'error');
this._log(err.message, 'error');
clearInterval(uptimeCounter);
});

Expand All @@ -146,17 +151,20 @@ class RPC {
let maxRestartsReached = share.meta.numRestarts >= RPC.MAX_RESTARTS;
share.readyState = RPC.SHARE_STOPPED;

self._log(`share ${nodeId} exited with code ${code}`);
this._log(`share ${nodeId} exited with code ${code}`);
clearInterval(uptimeCounter);

if (signal !== 'SIGINT' && !maxRestartsReached) {
if (signal !== 'SIGINT' &&
!maxRestartsReached &&
share.meta.uptimeMs >= 5000
) {
share.meta.numRestarts++;
self.restart(nodeId, () => null);
this.restart(nodeId, () => null);
}
});

share.process.on('message', (msg) => self._processShareIpc(share, msg));
self.shares.set(nodeId, share);
share.process.on('message', (msg) => this._processShareIpc(share, msg));
this.shares.set(nodeId, share);
callback(null);
});
}
Expand Down Expand Up @@ -324,7 +332,7 @@ class RPC {
return callback(new Error('failed to parse snapshot'));
}

async.eachLimit(snapshot, 3, (share, next) => {
async.eachLimit(snapshot, 1, (share, next) => {
this.start(share.path, (err) => {
/* istanbul ignore if */
if (err) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "storjshare-daemon",
"version": "2.5.1",
"version": "2.5.2",
"description": "daemon + process manager for sharing space on the storj network",
"main": "index.js",
"bin": {
Expand Down Expand Up @@ -71,7 +71,7 @@
"pretty-ms": "^2.1.0",
"rc": "^1.1.6",
"readable-stream": "^2.2.2",
"storj-lib": "^6.3.2",
"storj-lib": "^6.4.2",
"storj-telemetry-reporter": "^5.0.0",
"strip-json-comments": "^2.0.1",
"tail": "^1.2.1"
Expand Down
32 changes: 32 additions & 0 deletions test/api.unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ describe('class:RPC', function() {

describe('#start', function() {

it('should callback error if too many shares', function(done) {
let _RPC = proxyquire('../lib/api', {
os: {
cpus: sinon.stub().returns([])
}
});
let rpc = new _RPC({ loggerVerbosity: 0 });
rpc.start('path/to/config', function(err) {
expect(err.message).to.equal(
'insufficient system resources available'
);
done();
});
});

it('should fall through is unsafe flag', function(done) {
let _RPC = proxyquire('../lib/api', {
os: {
cpus: sinon.stub().returns([])
}
});
let rpc = new _RPC({ loggerVerbosity: 0 });
rpc.start('path/to/config', function(err) {
expect(err.message).to.equal(
'failed to read config at path/to/config'
);
done();
}, true);
});

it('should callback error if no config given', function(done) {
let _RPC = proxyquire('../lib/api', {
fs: {
Expand Down Expand Up @@ -148,6 +178,8 @@ describe('class:RPC', function() {
let _ipc = sinon.stub(rpc, '_processShareIpc');
rpc.start('path/to/config', function() {
let id = rpc.shares.keys().next().value;
let share = rpc.shares.get(id);
share.meta.uptimeMs = 6000;
_proc.emit('message', {});
setImmediate(() => {
expect(_ipc.called).to.equal(true);
Expand Down