]> jfr.im git - irc/SurrealServices/srsv.git/blob - branches/0.4.3/SrSv/Process/Worker.pm
better longterm fix for perl/Storable vs regexes
[irc/SurrealServices/srsv.git] / branches / 0.4.3 / SrSv / Process / Worker.pm
1 # This file is part of SurrealServices.
2 #
3 # SurrealServices is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7 #
8 # SurrealServices is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with SurrealServices; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16
17 package SrSv::Process::Worker;
18
19 use strict;
20
21 use Carp 'croak';
22
23 use Exporter 'import';
24 BEGIN {
25 our @EXPORT_OK = qw(spawn write_pidfiles
26 ima_worker $ima_worker
27 multi get_socket
28 call_in_parent call_all_child do_callback_in_child
29 shutdown_worker shutdown_all_workers kill_all_workers)
30 }
31
32 use Event;
33 use English qw( -no_match_vars );
34 use IO::Socket;
35 use IO::File;
36 use Storable qw(fd_retrieve store_fd);
37
38 use SrSv::Debug;
39
40 sub PREFIX() { return main::PREFIX }
41
42 BEGIN {
43 if(DEBUG) {
44 require Data::Dumper; import Data::Dumper ();
45 }
46 }
47
48 use SrSv::Message qw(message call_callback unit_finished);
49 use SrSv::Process::Call qw(safe_call);
50 use SrSv::RunLevel qw(:levels $runlevel);
51
52 use SrSv::Process::InParent qw(shutdown_worker shutdown_all_workers kill_all_workers);
53
54 use SrSv::Process::Init ();
55
56 our $parent_sock;
57 our $multi = 0;
58 our @workers;
59 our @free_workers;
60 our @queue;
61
62 our $ima_worker = 0;
63
64 ### Public interface ###
65
66 sub spawn() {
67 $multi = 1;
68
69 my ($parent, $child) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
70
71 if(my $pid = fork()) {
72 my $worker = {
73 SOCKET => $child,
74 NUMBER => scalar(@workers),
75 PID => $pid,
76 };
77
78 my $nr = @workers;
79 push @workers, $worker;
80 $worker->{WATCHER} = Event->io (
81 cb => \&SrSv::Process::Worker::req_from_child,
82 fd => $child,
83 data => $nr,
84 );
85 } else {
86 loop($parent);
87 exit;
88 }
89 }
90
91 sub write_pidfiles() {
92 my $fh = IO::File->new("@{[PREFIX]}/data/worker.pids", 'w', '0600');
93 for(my $i = scalar(@workers); $i; $i--) {
94 my $pid = $workers[$i-1]->{PID};
95 print $fh $pid,"\n";
96 }
97 print $fh $PID,"\n";
98 }
99
100 sub ima_worker {
101 return $ima_worker;
102 }
103
104 sub multi {
105 return $multi;
106 }
107
108 sub get_socket {
109 if(ima_worker) {
110 return $parent_sock;
111 }
112 }
113
114 sub call_in_parent(@) {
115 my ($f, @args) = @_;
116 if(!ima_worker) {
117 no strict 'refs';
118 return &$f(@args);
119 }
120
121 my %call = (
122 CLASS => 'CALL',
123 FUNCTION => $f,
124 ARGS => \@args
125 );
126
127 store_fd(\%call, $parent_sock);
128
129 if(wantarray) {
130 return @{ fd_retrieve($parent_sock) };
131 } else {
132 return @{ fd_retrieve($parent_sock) }[-1];
133 }
134 }
135
136 sub call_all_child(@) {
137 croak "call_all_child is not functional.\n";
138
139 =for comment
140 my (@args) = @_;
141
142 foreach my $worker (@workers) {
143 store_fd(\@args, $worker->{SOCKET});
144 }
145 =cut
146 }
147
148 {
149 my $callback;
150
151 sub shutdown_worker($) {
152 my $worker = shift;
153
154 print "Shutting down worker $worker->{NUMBER}\n" if DEBUG;
155 store_fd({ _SHUTDOWN => 1 }, $worker->{SOCKET});
156 $worker->{WATCHER}->cancel; undef $worker->{WATCHER};
157 $worker->{SOCKET}->close; undef $worker->{SOCKET};
158 undef($workers[$worker->{NUMBER}]);
159
160 unless(grep defined($_), @workers) {
161 print "All workers shut down.\n" if DEBUG;
162 $callback->() if $callback;
163 }
164 }
165
166 sub shutdown_all_workers($) {
167 $callback = shift;
168
169 while(my $worker = pop @free_workers) {
170 shutdown_worker($worker);
171 }
172 }
173 }
174
175 sub kill_all_workers() {
176 kill 9, map($_->{PID}, @workers);
177 }
178
179 ### Semi-private Functions ###
180
181 sub do_callback_in_child {
182 my ($callback, $message) = @_;
183
184 # this whole thing is a workaround for perl 5.12's Storable.
185 # Can't pass a regexp through Storable.
186 if(ref($callback->{TRIGGER_COND}->{DST}) || ref($callback->{TRIGGER_COND}->{SRC})) {
187 foreach my $k (qw(DST SRC)) {
188 next unless defined $callback->{TRIGGER_COND}->{$k};
189 my $v = $callback->{TRIGGER_COND}->{$k};
190 $v = "$v"; # convert regexp to string
191 $callback->{TRIGGER_COND}->{$k} = $v;
192 }
193 #use Data::Dumper;
194 #ircd::debug( split($/, Data::Dumper::Dumper($worker->{UNIT})) );
195 }
196 if(my $worker = pop @free_workers) {
197 print "Asking worker ".$worker->{NUMBER}." to call ".$callback->{CALL}."\n" if DEBUG;
198 #store_fd([$unit], $worker->{SOCKET});
199 $worker->{UNIT} = [$callback, $message];
200
201 store_fd($worker->{UNIT}, $worker->{SOCKET});
202 } else {
203 push @queue, [$callback, $message];
204 print "Added to queue, length is now" . @queue if DEBUG;
205 }
206 }
207
208 ### Internal Functions ###
209
210 sub req_from_child($) {
211 my $event = shift;
212 my $nr = $event->w->data;
213 my $worker = $workers[$nr];
214 my $fd = $worker->{SOCKET};
215
216 my $req = eval { fd_retrieve($fd) };
217 die "Couldn't read the request: $@" if $@;
218
219 print "Got a ".$req->{CLASS}." message from worker ".$worker->{NUMBER}."\n" if DEBUG;
220
221 if($req->{CLASS} eq 'CALL') {
222 my @reply = safe_call($req->{FUNCTION}, $req->{ARGS});
223 store_fd(\@reply, $fd);
224 }
225 elsif($req->{CLASS} eq 'FINISHED') {
226 my $unit = $worker->{UNIT};
227 $worker->{UNIT} = undef;
228
229 print "Worker ".$worker->{NUMBER}." is now finished.\n" if DEBUG;
230
231 if($runlevel == ST_SHUTDOWN) {
232 shutdown_worker($worker);
233 return;
234 }
235
236 push @free_workers, $worker;
237
238 if(@queue) {
239 print "About to dequeue, length is now " . @queue if DEBUG;
240 do_callback_in_child(@{ shift @queue });
241 }
242
243 unit_finished($unit->[0], $unit->[1]);
244 }
245 elsif($runlevel != ST_SHUTDOWN) {
246 store_fd({ACK => 1}, $fd);
247 message($req);
248 }
249 }
250
251 sub do_exit() {
252 print "Worker ".@workers." shutting down.\n" if DEBUG;
253 $parent_sock->close;
254 exit;
255 }
256
257 sub loop($) {
258 my ($parent) = @_;
259
260 $ima_worker = 1;
261 $parent_sock = $parent;
262
263 SrSv::Process::Init::do_init();
264 module::begin();
265
266 store_fd({ CLASS => 'FINISHED' }, $parent);
267
268 while(my $unit = fd_retrieve($parent)) {
269 if(ref $unit eq 'HASH' and $unit->{_SHUTDOWN}) {
270 do_exit;
271 }
272 print "Worker ".@workers." is now busy.\n" if DEBUG;
273 call_callback(@$unit);
274
275 print "Worker ".@workers." is now free.\n" if DEBUG;
276 store_fd({ CLASS => 'FINISHED' }, $parent);
277 }
278
279 die "Lost contact with the mothership";
280 }
281
282 1;