]> jfr.im git - irc/SurrealServices/srsv.git/blob - branches/0.4.3/SrSv/Process/Worker.pm
6869fc05560aaaf9d459ac9497f1ac20b5ab7114
[irc/SurrealServices/srsv.git] / branches / 0.4.3 / SrSv / Process / Worker.pm
1 # This file is part of SurrealServices.
2 #
3 # SurrealServices is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7 #
8 # SurrealServices is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with SurrealServices; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16
17 package SrSv::Process::Worker;
18
19 use strict;
20
21 use Carp 'croak';
22
23 use Exporter 'import';
24 BEGIN {
25 our @EXPORT_OK = qw(spawn write_pidfiles
26 ima_worker $ima_worker
27 multi get_socket
28 call_in_parent call_all_child do_callback_in_child
29 shutdown_worker shutdown_all_workers kill_all_workers)
30 }
31
32 use Event;
33 use English qw( -no_match_vars );
34 use IO::Socket;
35 use IO::File;
36 use Storable qw(fd_retrieve store_fd);
37
38 use SrSv::Debug;
39
40 sub PREFIX() { return main::PREFIX }
41
42 BEGIN {
43 if(DEBUG) {
44 require Data::Dumper; import Data::Dumper ();
45 }
46 }
47
48 use SrSv::Message qw(message call_callback unit_finished);
49 use SrSv::Process::Call qw(safe_call);
50 use SrSv::RunLevel qw(:levels $runlevel);
51
52 use SrSv::Process::InParent qw(shutdown_worker shutdown_all_workers kill_all_workers);
53
54 use SrSv::Process::Init ();
55
56 our $parent_sock;
57 our $multi = 0;
58 our @workers;
59 our @free_workers;
60 our @queue;
61
62 our $ima_worker = 0;
63
64 ### Public interface ###
65
66 sub spawn() {
67 $multi = 1;
68
69 my ($parent, $child) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
70
71 if(my $pid = fork()) {
72 my $worker = {
73 SOCKET => $child,
74 NUMBER => scalar(@workers),
75 PID => $pid,
76 };
77
78 my $nr = @workers;
79 push @workers, $worker;
80 $worker->{WATCHER} = Event->io (
81 cb => \&SrSv::Process::Worker::req_from_child,
82 fd => $child,
83 data => $nr,
84 );
85 } else {
86 loop($parent);
87 exit;
88 }
89 }
90
91 sub write_pidfiles() {
92 my $fh = IO::File->new("@{[PREFIX]}/data/worker.pids", 'w', '0600');
93 for(my $i = scalar(@workers); $i; $i--) {
94 my $pid = $workers[$i-1]->{PID};
95 print $fh $pid,"\n";
96 }
97 print $fh $PID,"\n";
98 }
99
100 sub ima_worker {
101 return $ima_worker;
102 }
103
104 sub multi {
105 return $multi;
106 }
107
108 sub get_socket {
109 if(ima_worker) {
110 return $parent_sock;
111 }
112 }
113
114 sub call_in_parent(@) {
115 my ($f, @args) = @_;
116 if(!ima_worker) {
117 no strict 'refs';
118 return &$f(@args);
119 }
120
121 my %call = (
122 CLASS => 'CALL',
123 FUNCTION => $f,
124 ARGS => \@args
125 );
126
127 store_fd(\%call, $parent_sock);
128
129 if(wantarray) {
130 return @{ fd_retrieve($parent_sock) };
131 } else {
132 return @{ fd_retrieve($parent_sock) }[-1];
133 }
134 }
135
136 sub call_all_child(@) {
137 croak "call_all_child is not functional.\n";
138
139 =for comment
140 my (@args) = @_;
141
142 foreach my $worker (@workers) {
143 store_fd(\@args, $worker->{SOCKET});
144 }
145 =cut
146 }
147
148 {
149 my $callback;
150
151 sub shutdown_worker($) {
152 my $worker = shift;
153
154 print "Shutting down worker $worker->{NUMBER}\n" if DEBUG;
155 store_fd({ _SHUTDOWN => 1 }, $worker->{SOCKET});
156 $worker->{WATCHER}->cancel; undef $worker->{WATCHER};
157 $worker->{SOCKET}->close; undef $worker->{SOCKET};
158 undef($workers[$worker->{NUMBER}]);
159
160 unless(grep defined($_), @workers) {
161 print "All workers shut down.\n" if DEBUG;
162 $callback->() if $callback;
163 }
164 }
165
166 sub shutdown_all_workers($) {
167 $callback = shift;
168
169 while(my $worker = pop @free_workers) {
170 shutdown_worker($worker);
171 }
172 }
173 }
174
175 sub kill_all_workers() {
176 kill 9, map($_->{PID}, @workers);
177 }
178
179 ### Semi-private Functions ###
180
181 sub do_callback_in_child {
182 my ($callback, $message) = @_;
183
184 if(my $worker = pop @free_workers) {
185 print "Asking worker ".$worker->{NUMBER}." to call ".$callback->{CALL}."\n" if DEBUG;
186 #store_fd([$unit], $worker->{SOCKET});
187 $worker->{UNIT} = [$callback, $message];
188
189 store_fd($worker->{UNIT}, $worker->{SOCKET});
190 } else {
191 push @queue, [$callback, $message];
192 print "Added to queue, length is now" . @queue if DEBUG;
193 }
194 }
195
196 ### Internal Functions ###
197
198 sub req_from_child($) {
199 my $event = shift;
200 my $nr = $event->w->data;
201 my $worker = $workers[$nr];
202 my $fd = $worker->{SOCKET};
203
204 my $req = eval { fd_retrieve($fd) };
205 die "Couldn't read the request: $@" if $@;
206
207 print "Got a ".$req->{CLASS}." message from worker ".$worker->{NUMBER}."\n" if DEBUG;
208
209 if($req->{CLASS} eq 'CALL') {
210 my @reply = safe_call($req->{FUNCTION}, $req->{ARGS});
211 store_fd(\@reply, $fd);
212 }
213 elsif($req->{CLASS} eq 'FINISHED') {
214 my $unit = $worker->{UNIT};
215 $worker->{UNIT} = undef;
216
217 print "Worker ".$worker->{NUMBER}." is now finished.\n" if DEBUG;
218
219 if($runlevel == ST_SHUTDOWN) {
220 shutdown_worker($worker);
221 return;
222 }
223
224 push @free_workers, $worker;
225
226 if(@queue) {
227 print "About to dequeue, length is now " . @queue if DEBUG;
228 do_callback_in_child(@{ shift @queue });
229 }
230
231 unit_finished($unit->[0], $unit->[1]);
232 }
233 elsif($runlevel != ST_SHUTDOWN) {
234 store_fd({ACK => 1}, $fd);
235 message($req);
236 }
237 }
238
239 sub do_exit() {
240 print "Worker ".@workers." shutting down.\n" if DEBUG;
241 $parent_sock->close;
242 exit;
243 }
244
245 sub loop($) {
246 my ($parent) = @_;
247
248 $ima_worker = 1;
249 $parent_sock = $parent;
250
251 SrSv::Process::Init::do_init();
252 module::begin();
253
254 store_fd({ CLASS => 'FINISHED' }, $parent);
255
256 while(my $unit = fd_retrieve($parent)) {
257 if(ref $unit eq 'HASH' and $unit->{_SHUTDOWN}) {
258 do_exit;
259 }
260 print "Worker ".@workers." is now busy.\n" if DEBUG;
261 call_callback(@$unit);
262
263 print "Worker ".@workers." is now free.\n" if DEBUG;
264 store_fd({ CLASS => 'FINISHED' }, $parent);
265 }
266
267 die "Lost contact with the mothership";
268 }
269
270 1;