2 * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21 %module "Amanda::MainLoop"
22 %include "amglue/amglue.swg"
23 %include "exception.i"
25 %include "Amanda/MainLoop.pod"
40 my $have_sub_name = eval "use Sub::Name; 1";
41 if (!$have_sub_name) {
44 my ($name, $sub) = @_;
51 # glib's g_is_main_loop_running() seems inaccurate, so we just
52 # track that information locally..
53 my $mainloop_running = 0;
55 $mainloop_running = 1;
57 $mainloop_running = 0;
59 push @EXPORT_OK, "run";
62 return $mainloop_running;
64 push @EXPORT_OK, "is_running";
66 # quit is a direct call to C
67 push @EXPORT_OK, "quit";
71 my @waiting_to_call_later;
73 my ($sub, @args) = @_;
75 confess "undefined sub" unless ($sub);
77 # add the callback if nothing is waiting right now
78 if (!@waiting_to_call_later) {
79 timeout_source(0)->set_callback(sub {
83 while (@waiting_to_call_later) {
84 my ($sub, @args) = @{shift @waiting_to_call_later};
85 $sub->(@args) if $sub;
90 push @waiting_to_call_later, [ $sub, @args ];
92 push @EXPORT_OK, "call_later";
95 my ($name, $sub) = @_;
98 my ($pkg, $filename, $line) = caller;
99 my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
100 $sub = subname($newname => $sub);
102 $sub = $name; # no name => sub is actually in first parameter
106 Amanda::MainLoop::call_later($sub, @_);
109 push @EXPORT, 'make_cb';
112 my ($delay_ms, $sub, @args) = @_;
114 confess "undefined sub" unless ($sub);
116 my $src = timeout_source($delay_ms);
117 $src->set_callback(sub {
124 push @EXPORT_OK, "call_after";
126 sub call_on_child_termination {
127 my ($pid, $cb, @args) = @_;
129 confess "undefined sub" unless ($cb);
131 my $src = child_watch_source($pid);
132 $src->set_callback(sub {
133 my ($src, $pid, $exitstatus) = @_;
135 return $cb->($exitstatus);
138 push @EXPORT_OK, "call_on_child_termination";
142 my $fd = $params{'fd'};
143 my $size = $params{'size'} || 0;
144 my $cb = $params{'async_read_cb'};
146 @args = @{$params{'args'}} if exists $params{'args'};
153 my $res = POSIX::read($fd, $buf, $size || 32768);
155 return $cb->($!, undef, @args);
157 return $cb->(undef, $buf, @args);
160 my $src = fd_source($fd, $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
161 $src->set_callback($fd_cb);
164 push @EXPORT_OK, "async_read";
166 my %outstanding_writes;
169 my $fd = $params{'fd'};
170 my $data = $params{'data'};
171 my $cb = $params{'async_write_cb'};
173 @args = @{$params{'args'}} if exists $params{'args'};
175 # more often than not, writes will not block, so just try it.
176 if (!exists $outstanding_writes{$fd}) {
177 my $res = POSIX::write($fd, $data, length($data));
179 if ($! != POSIX::EAGAIN) {
180 return $cb->($!, 0, @args);
182 } elsif ($res eq length($data)) {
183 return $cb->(undef, $res, @args);
185 # chop off whatever data was written
186 $data = substr($data, $res);
190 if (!exists $outstanding_writes{$fd}) {
191 my $fd_writes = $outstanding_writes{$fd} = [];
192 my $src = fd_source($fd, $G_IO_OUT|$G_IO_HUP|$G_IO_ERR);
194 # (note that this does not coalesce consecutive outstanding writes
195 # into a single POSIX::write call)
197 my $ow = $fd_writes->[0];
198 my ($buf, $nwritten, $len, $cb, $args) = @$ow;
200 my $res = POSIX::write($fd, $buf, $len-$nwritten);
203 $cb->($!, $nwritten, @$args);
205 $ow->[1] = $nwritten = $nwritten + $res;
206 if ($nwritten == $len) {
208 $cb->(undef, $nwritten, @$args);
210 $ow->[0] = substr($buf, $res);
214 # (the following is *intentionally* done after calling $cb, allowing
215 # $cb to add a new message to $fd_writes if desired, and thus avoid
216 # removing and re-adding the source)
217 if (@$fd_writes == 0) {
219 delete $outstanding_writes{$fd};
223 $src->set_callback($fd_cb);
226 push @{$outstanding_writes{$fd}}, [ $data, 0, length($data), $cb, \@args ];
228 push @EXPORT_OK, "async_write";
231 my ($lock, $orig_cb, $sub) = @_;
234 $continuation_cb = sub {
237 # shift this invocation off the queue
238 my ($last_sub, $last_orig_cb) = @{ shift @$lock };
240 # start the next invocation, if the queue isn't empty
242 Amanda::MainLoop::call_later($lock->[0][0], $continuation_cb);
245 # call through to the original callback for the last invocation
246 return $last_orig_cb->(@args);
249 # push this sub onto the lock queue
250 if ((push @$lock, [ $sub, $orig_cb ]) == 1) {
251 # if this is the first addition to the queue, start it
252 $sub->($continuation_cb);
255 push @EXPORT_OK, "synchronized";
257 { # privat variables to track the "current" step definition
262 sub define_steps (@) {
264 my $cb_ref = $params{'cb_ref'};
265 my $finalize = $params{'finalize'};
268 croak "cb_ref is undefined" unless defined $cb_ref;
269 croak "cb_ref is not a reference" unless ref($cb_ref) eq 'REF';
270 croak "cb_ref is not a code double-reference" unless ref($$cb_ref) eq 'CODE';
272 # arrange to clear out $steps when $exit_cb is called; this eliminates
273 # reference loops (values in %steps are closures which point to %steps).
274 # This also clears $current_steps, which is likely holding a reference to
276 my $orig_cb = $$cb_ref;
279 $current_steps = undef;
280 $finalize->() if defined($finalize);
285 $current_steps = \%steps;
286 $immediate = $params{'immediate'};
289 return $current_steps;
291 push @EXPORT, "define_steps";
295 my $step_immediate = $immediate || $params{'immediate'};
296 delete $params{'immediate'} if $step_immediate;
298 my ($name) = keys %params;
299 my $cb = $params{$name};
301 croak "expected a sub at key $name" unless ref($cb) eq 'CODE';
303 # make the sub delayed
304 unless ($step_immediate) {
306 $cb = sub { Amanda::MainLoop::call_later($orig_cb, @_); }
309 # patch up the callback
310 my ($pkg, $filename, $line) = caller;
311 my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
312 $cb = subname($newname => $cb);
314 # store the step for later
315 $current_steps->{$name} = $cb;
317 # and invoke it, if it's the first step given
319 if ($step_immediate) {
327 push @EXPORT, "step";
333 g_main_loop_run(default_main_loop());
337 g_main_loop_quit(default_main_loop());
345 /* First we wrap the amglue_Source struct, defined in
346 * perl/amglue/mainloop.h, into a Perl object (named
347 * Amanda::MainLoop::Source). After that appear several
348 * constructors for various event sources.
351 %{ static void amglue_source_remove(amglue_Source *self); %}
353 %rename(Source) amglue_Source;
354 typedef struct amglue_Source {
356 /* Constructor: use one of the package-level functions, below */
358 die("Amanda::MainLoop::Source is an abstract base class");
361 /* Destructor: just unref the object */
363 amglue_source_unref(self);
366 /* a "cheater's typemap" to just pass the SV along */
367 %typemap(in) SV *callback_sub "$1 = $input;"
368 void set_callback(SV *callback_sub) {
369 /* Attach the source to the default mainloop context, so
370 * that it will start generating events. If it's already
371 * been destroyed, then bail with a fatal error.
373 if (self->state == AMGLUE_SOURCE_DESTROYED) {
374 die("This source has already been removed");
375 } else if (self->state == AMGLUE_SOURCE_NEW) {
376 self->state = AMGLUE_SOURCE_ATTACHED;
378 g_source_attach(self->src, NULL);
380 /* the link from the GSource to the amglue_Source is
381 * now in use, so we increment the amglue_Source's
383 amglue_source_ref(self);
386 /* whoever created this Source object conveniently left
387 * the proper C-side callback for us. This function has
388 * the appropriate calling signature for this GSource, and
389 * knows how to reflect that into Perl. It expects the SV to
390 * be provided as its 'data' argument. 'perlcall' suggests
391 * that we make a copy of this SV, in case the user later
393 if (self->callback_sv) {
394 SvREFCNT_dec(self->callback_sv);
395 self->callback_sv = NULL;
397 self->callback_sv = newSVsv(callback_sub);
398 SvREFCNT_inc(self->callback_sv);
399 g_source_set_callback(self->src, self->callback,
400 (gpointer)self, NULL);
402 /* delete the cheater's typemap */
406 amglue_source_remove(self);
412 /* Detach a source from the mainloop and remove it from play. This is broken
413 * out as a separate function because it's also used from some callbacks */
415 amglue_source_remove(
418 /* protect against self being freed out from under us */
419 amglue_source_ref(self);
421 if (self->state == AMGLUE_SOURCE_ATTACHED) {
422 /* unref any perl callback */
423 if (self->callback_sv) {
424 SvREFCNT_dec(self->callback_sv);
425 self->callback_sv = NULL;
428 /* undo the ref made in set_callback() */
429 amglue_source_unref(self);
431 g_source_destroy(self->src);
434 self->state = AMGLUE_SOURCE_DESTROYED;
436 /* reverse the "protection" increment used above */
437 amglue_source_unref(self);
441 /* "Generic" callback function for a GSource that actually uses the GSourceFunc
442 * prototype. The source of this function also serves as a prototype for other,
443 * more advanced callbacks. Due to perl's heavy use of precompiler macros, it's
444 * not possible to break this down any further. */
447 amglue_source_callback_simple(
451 amglue_Source *src = (amglue_Source *)data;
454 /* keep the source around long enough for the call to finish */
455 amglue_source_ref(src);
456 g_assert(src->callback_sv != NULL);
461 /* create a new SV pointing to 'src', and increase our refcount
462 * accordingly. The SV is mortal, so FREETMPS will decrease the
463 * refcount, unless the callee keeps a copy of it somewhere */
464 amglue_source_ref(src);
465 src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
466 SWIG_OWNER | SWIG_SHADOW);
472 call_sv(src->callback_sv, G_EVAL|G_DISCARD);
477 /* we no longer need the src */
478 amglue_source_unref(src);
481 /* this may have been freed, so don't use them after this point */
484 /* check for an uncaught 'die'. If we don't do this, then Perl will longjmp()
485 * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
488 /* We handle this just the way the default 'die' handler in Amanda::Debug
489 * does, but since Amanda's debug support may not yet be running, we back
490 * it up with an exit() */
491 g_critical("%s", SvPV_nolen(ERRSV));
499 /* Constructors for some general-purpose sources */
502 %newobject timeout_source;
508 return amglue_source_new(g_timeout_source_new(interval),
509 (GSourceFunc)amglue_source_callback_simple);
514 %newobject idle_source;
520 GSource *idle_source = g_idle_source_new();
521 g_source_set_priority(idle_source, priority);
522 return amglue_source_new(idle_source,
523 (GSourceFunc)amglue_source_callback_simple);
527 /* child watch source */
530 child_watch_source_callback(
536 amglue_Source *src = (amglue_Source *)data;
539 /* keep the source around long enough for the call to finish */
540 amglue_source_ref(src);
541 g_assert(src->callback_sv != NULL);
546 /* create a new SV pointing to 'src', and increase our refcount
547 * accordingly. The SV is mortal, so FREETMPS will decrease the
548 * refcount, unless the callee keeps a copy of it somewhere */
549 amglue_source_ref(src);
550 src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
551 SWIG_OWNER | SWIG_SHADOW);
555 XPUSHs(sv_2mortal(newSViv(pid)));
556 XPUSHs(sv_2mortal(newSViv(status)));
559 call_sv(src->callback_sv, G_EVAL|G_DISCARD);
561 /* child watch sources automatically destroy themselves after the
562 * child dies, so we mark the amglue_Source as destroyed, too. */
563 amglue_source_remove(src);
568 /* we no longer need the src */
569 amglue_source_unref(src);
572 /* this may have been freed, so don't use them after this point */
575 /* check for an uncaught 'die'. If we don't do this, then Perl will longjmp()
576 * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
579 /* We handle this just the way the default 'die' handler in Amanda::Debug
580 * does, but since Amanda's debug support may not yet be running, we back
581 * it up with an exit() */
582 g_critical("%s", SvPV_nolen(ERRSV));
589 %newobject child_watch_source;
595 GSource *child_watch_source = new_child_watch_source(pid);
596 return amglue_source_new(child_watch_source,
597 (GSourceFunc)child_watch_source_callback);
602 %apply gint { GIOCondition };
603 amglue_add_flag_tag_fns(GIOCondition);
604 amglue_add_constant(G_IO_IN, GIOCondition);
605 amglue_add_constant(G_IO_OUT, GIOCondition);
606 amglue_add_constant(G_IO_PRI, GIOCondition);
607 amglue_add_constant(G_IO_ERR, GIOCondition);
608 amglue_add_constant(G_IO_HUP, GIOCondition);
609 amglue_add_constant(G_IO_NVAL, GIOCondition);
610 amglue_copy_to_tag(GIOCondition, constants);
612 %newobject fd_source;
619 GSource *fdsource = new_fdsource(fd, events);
620 return amglue_source_new(fdsource,
621 (GSourceFunc)amglue_source_callback_simple);