]>
jfr.im git - irc/SurrealServices/srsv.git/blob - branches/0.4.3/SrSv/Process/Worker.pm
1 # This file is part of SurrealServices.
3 # SurrealServices is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # SurrealServices is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with SurrealServices; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 package SrSv
::Process
::Worker
;
23 use Exporter
'import';
25 our @EXPORT_OK = qw(spawn write_pidfiles
26 ima_worker $ima_worker
28 call_in_parent call_all_child do_callback_in_child
29 shutdown_worker shutdown_all_workers kill_all_workers)
33 use English
qw( -no_match_vars );
36 use Storable
qw(fd_retrieve store_fd);
40 sub PREFIX
() { return main
::PREFIX
}
44 require Data
::Dumper
; import Data
::Dumper
();
48 use SrSv
::Message
qw(message call_callback unit_finished);
49 use SrSv
::Process
::Call
qw(safe_call);
50 use SrSv
::RunLevel
qw(:levels $runlevel);
52 use SrSv
::Process
::InParent
qw(shutdown_worker shutdown_all_workers kill_all_workers);
54 use SrSv
::Process
::Init
();
64 ### Public interface ###
69 my ($parent, $child) = IO
::Socket-
>socketpair(AF_UNIX
, SOCK_STREAM
, PF_UNSPEC
);
71 if(my $pid = fork()) {
74 NUMBER
=> scalar(@workers),
79 push @workers, $worker;
80 $worker->{WATCHER
} = Event-
>io (
81 cb
=> \
&SrSv
::Process
::Worker
::req_from_child
,
91 sub write_pidfiles
() {
92 my $fh = IO
::File-
>new("@{[PREFIX]}/data/worker.pids", 'w', '0600');
93 for(my $i = scalar(@workers); $i; $i--) {
94 my $pid = $workers[$i-1]->{PID
};
114 sub call_in_parent
(@) {
127 store_fd
(\
%call, $parent_sock);
130 return @{ fd_retrieve
($parent_sock) };
132 return @{ fd_retrieve
($parent_sock) }[-1];
136 sub call_all_child
(@) {
137 croak
"call_all_child is not functional.\n";
142 foreach my $worker (@workers) {
143 store_fd(\@args, $worker->{SOCKET});
151 sub shutdown_worker
($) {
154 print "Shutting down worker $worker->{NUMBER}\n" if DEBUG
;
155 store_fd
({ _SHUTDOWN
=> 1 }, $worker->{SOCKET
});
156 $worker->{WATCHER
}->cancel; undef $worker->{WATCHER
};
157 $worker->{SOCKET
}->close; undef $worker->{SOCKET
};
158 undef($workers[$worker->{NUMBER
}]);
160 unless(grep defined($_), @workers) {
161 print "All workers shut down.\n" if DEBUG
;
162 $callback->() if $callback;
166 sub shutdown_all_workers
($) {
169 while(my $worker = pop @free_workers) {
170 shutdown_worker
($worker);
175 sub kill_all_workers
() {
176 kill 9, map($_->{PID
}, @workers);
179 ### Semi-private Functions ###
181 sub do_callback_in_child
{
182 my ($callback, $message) = @_;
184 # this whole thing is a workaround for perl 5.12's Storable.
185 # Can't pass a regexp through Storable.
186 if(ref($callback->{TRIGGER_COND
}->{DST
}) || ref($callback->{TRIGGER_COND
}->{SRC
})) {
187 foreach my $k (qw(DST SRC)) {
188 next unless defined $callback->{TRIGGER_COND
}->{$k};
189 my $v = $callback->{TRIGGER_COND
}->{$k};
190 $v = "$v"; # convert regexp to string
191 $callback->{TRIGGER_COND
}->{$k} = $v;
194 #ircd::debug( split($/, Data::Dumper::Dumper($worker->{UNIT})) );
196 if(my $worker = pop @free_workers) {
197 print "Asking worker ".$worker->{NUMBER
}." to call ".$callback->{CALL
}."\n" if DEBUG
;
198 #store_fd([$unit], $worker->{SOCKET});
199 $worker->{UNIT
} = [$callback, $message];
201 store_fd
($worker->{UNIT
}, $worker->{SOCKET
});
203 push @queue, [$callback, $message];
204 print "Added to queue, length is now" . @queue if DEBUG
;
208 ### Internal Functions ###
210 sub req_from_child
($) {
212 my $nr = $event->w->data;
213 my $worker = $workers[$nr];
214 my $fd = $worker->{SOCKET
};
216 my $req = eval { fd_retrieve
($fd) };
217 die "Couldn't read the request: $@" if $@;
219 print "Got a ".$req->{CLASS
}." message from worker ".$worker->{NUMBER
}."\n" if DEBUG
;
221 if($req->{CLASS
} eq 'CALL') {
222 my @reply = safe_call
($req->{FUNCTION
}, $req->{ARGS
});
223 store_fd
(\
@reply, $fd);
225 elsif($req->{CLASS
} eq 'FINISHED') {
226 my $unit = $worker->{UNIT
};
227 $worker->{UNIT
} = undef;
229 print "Worker ".$worker->{NUMBER
}." is now finished.\n" if DEBUG
;
231 if($runlevel == ST_SHUTDOWN
) {
232 shutdown_worker
($worker);
236 push @free_workers, $worker;
239 print "About to dequeue, length is now " . @queue if DEBUG
;
240 do_callback_in_child
(@{ shift @queue });
243 unit_finished
($unit->[0], $unit->[1]);
245 elsif($runlevel != ST_SHUTDOWN
) {
246 store_fd
({ACK
=> 1}, $fd);
252 print "Worker ".@workers." shutting down.\n" if DEBUG
;
261 $parent_sock = $parent;
263 SrSv
::Process
::Init
::do_init
();
266 store_fd
({ CLASS
=> 'FINISHED' }, $parent);
268 while(my $unit = fd_retrieve
($parent)) {
269 if(ref $unit eq 'HASH' and $unit->{_SHUTDOWN
}) {
272 print "Worker ".@workers." is now busy.\n" if DEBUG
;
273 call_callback
(@$unit);
275 print "Worker ".@workers." is now free.\n" if DEBUG
;
276 store_fd
({ CLASS
=> 'FINISHED' }, $parent);
279 die "Lost contact with the mothership";