Skip to content

Commit 06c6e1a

Browse files
author
Mike Raynham
committed
Add retry code to client and worker
If a Gearman server dies and another server is available, Gearman clients and workers will automatically try connecting to the next server in the list. But a dying server can still cause the next request from a client or worker to fail. To help prevent work from being lost, this revision adds code that will retry connections up to 10 times, using an exponential back-off algorithm to determine delays between retries.
1 parent d7dad06 commit 06c6e1a

File tree

4 files changed

+66
-28
lines changed

4 files changed

+66
-28
lines changed

Changes

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ Revision history for Gearman-Mesh
22

33
0.01 Date/time
44
First version, released on an unsuspecting world.
5-
5+
0.02 2015-09-21
6+
Added automatic retries to worker and client.

lib/Gearman/Mesh.pm

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ Gearman::Mesh - thin wrappers around Gearman::XS modules
1414
1515
=head1 VERSION
1616
17-
Version 0.01
17+
Version 0.02
1818
1919
=cut
2020

21-
our $VERSION = '0.01';
21+
our $VERSION = '0.02';
2222

2323
=head1 DESCRIPTION
2424

lib/Gearman/Mesh/Client.pm

+32-19
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use Gearman::Mesh qw(
3232

3333
use Exporter 5.57 qw(import);
3434
use Gearman::XS 0.16 qw(:constants);
35+
use Time::HiRes ();
3536

3637
our @EXPORT_OK = @Gearman::XS::EXPORT_OK;
3738
our %EXPORT_TAGS = %Gearman::XS::EXPORT_TAGS;
@@ -59,28 +60,44 @@ BEGIN {
5960
eval join("\n",
6061
qq`package $pkg;`,
6162
qq`sub $job {`,
62-
q`my $self = shift;`,
63-
q`my $task = shift;`,
64-
q`my $args = $self->serialize([@_]);`,
65-
qq`my \@ret = \$self->{_delegate}->$job(\$task, \$args);`,
66-
q`return if $ret[0] ne GEARMAN_SUCCESS;`,
67-
q`return $ret[1];`,
63+
q` my $self = shift;`,
64+
q` my $task = shift;`,
65+
q` my $args = $self->serialize([@_]);`,
66+
q` my $backoff = _backoff(10, 1000);`,
67+
q` my @ret;`,
68+
q` while (1) {`,
69+
qq` \@ret = \$self->{_delegate}->$job(\$task, \$args);`,
70+
q` last if defined $ret[0] && $ret[0] eq GEARMAN_SUCCESS;`,
71+
q` last unless $backoff->();`,
72+
q` }`,
73+
q` return $ret[1];`,
6874
q`}`,
6975
);
7076
}
7177
}
7278

79+
sub _backoff {
80+
my $tries = shift;
81+
my $scale = shift;
82+
my $try = 0;
83+
84+
sub {
85+
return if ++$try > $tries;
86+
Time::HiRes::usleep( $scale << int rand($try) );
87+
}
88+
}
89+
7390
=head1 NAME
7491
7592
Gearman::Mesh::Client - A wrapper around Gearman::XS::Client
7693
7794
=head1 VERSION
7895
79-
Version 0.01
96+
Version 0.02
8097
8198
=cut
8299
83-
our $VERSION = '0.01';
100+
our $VERSION = '0.02';
84101
85102
86103
=head1 SYNOPSIS
@@ -112,7 +129,9 @@ This module optionally re-exports the constants from L<Gearman::XS>:
112129
=cut
113130
114131
sub new {
115-
shift->SUPER::new('Gearman::XS::Client', @_);
132+
my $self = shift->SUPER::new('Gearman::XS::Client', @_);
133+
$self->set_timeout(1_000);
134+
$self;
116135
}
117136
118137
=head1 METHODS
@@ -126,16 +145,10 @@ sub new {
126145
my $task = $client->add_task($function_name => $workload);
127146
128147
Each of these methods are wrapped in a method that serializes C<$workload>
129-
before calling the equivalent L<Gearman::XS::Client> method:
130-
131-
sub do {
132-
my $self = shift;
133-
my $task = shift;
134-
my $args = $self->serialize([@_]);
135-
my @ret = $self->{gearman_client}->do($task, $args);
136-
return if $ret[0] ne GEARMAN_SUCCESS;
137-
return $ret[1];
138-
}
148+
before calling the equivalent L<Gearman::XS::Client> method.
149+
150+
Each method will retry up to ten times, using an exponential backoff algorithm
151+
to determine the delay between retries.
139152
140153
Job methods (those whose names begin with C<do>) will return the job handle on
141154
success. Task methods (those whose name begin with C<add_task>) will return a

lib/Gearman/Mesh/Worker.pm

+30-6
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ Gearman::Mesh::Worker - The great new Gearman::Mesh::Worker!
4141
4242
=head1 VERSION
4343
44-
Version 0.01
44+
Version 0.02
4545
4646
=cut
4747

48-
our $VERSION = '0.01';
48+
our $VERSION = '0.02';
4949

5050
=head1 SYNOPSIS
5151
@@ -172,19 +172,43 @@ sub work_loop {
172172
my $options = $worker->options;
173173
my $timeout = $worker->timeout;
174174

175-
$worker->add_options(GEARMAN_WORKER_NON_BLOCKING);
176175
$worker->set_timeout(1_000);
177176

177+
my $work_ok = sub {
178+
my $code = shift;
179+
return 1 if $code eq GEARMAN_IO_WAIT
180+
|| $code eq GEARMAN_NO_JOBS
181+
|| $code eq GEARMAN_TIMEOUT;
182+
};
183+
184+
my $wait_ok = sub {
185+
my $code = shift;
186+
return 1 if $code eq GEARMAN_SUCCESS
187+
|| $code eq GEARMAN_NO_ACTIVE_FDS
188+
|| $code eq GEARMAN_TIMEOUT;
189+
};
190+
178191
{
179192
my $continue = 1;
180193
my $handler = sub {$continue = 0};
181194

182195
local $SIG{TERM} = $handler;
183196
local $SIG{INT} = $handler;
184197

185-
while ($continue) {
186-
$worker->work;
187-
$worker->wait;
198+
TRY: while ($continue) {
199+
200+
my $work = $worker->work;
201+
next TRY if $work == GEARMAN_SUCCESS;
202+
203+
sleep 1;
204+
205+
if ($work_ok->($work)) {
206+
my $wait = $worker->wait;
207+
next TRY if $wait_ok->($wait);
208+
}
209+
210+
warn $worker->error;
211+
sleep 5;
188212
}
189213
}
190214

0 commit comments

Comments
 (0)