2 * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; either version 2
7 * of the License, or (at your option) any later version.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
22 %module "Amanda::MainLoop"
23 %include "amglue/amglue.swg"
24 %include "exception.i"
26 %include "Amanda/MainLoop.pod"
41 my $have_sub_name = eval "use Sub::Name; 1";
42 if (!$have_sub_name) {
45 my ($name, $sub) = @_;
52 # glib's g_is_main_loop_running() seems inaccurate, so we just
53 # track that information locally..
54 my $mainloop_running = 0;
56 $mainloop_running = 1;
58 $mainloop_running = 0;
60 push @EXPORT_OK, "run";
63 return $mainloop_running;
65 push @EXPORT_OK, "is_running";
67 # quit is a direct call to C
68 push @EXPORT_OK, "quit";
72 my @waiting_to_call_later;
74 my ($sub, @args) = @_;
76 confess "undefined sub" unless ($sub);
78 # add the callback if nothing is waiting right now
79 if (!@waiting_to_call_later) {
80 timeout_source(0)->set_callback(sub {
84 while (@waiting_to_call_later) {
85 my ($sub, @args) = @{shift @waiting_to_call_later};
86 $sub->(@args) if $sub;
91 push @waiting_to_call_later, [ $sub, @args ];
93 push @EXPORT_OK, "call_later";
96 my ($name, $sub) = @_;
99 my ($pkg, $filename, $line) = caller;
100 my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
101 $sub = subname($newname => $sub);
103 $sub = $name; # no name => sub is actually in first parameter
107 Amanda::MainLoop::call_later($sub, @_);
110 push @EXPORT, 'make_cb';
113 my ($delay_ms, $sub, @args) = @_;
115 confess "undefined sub" unless ($sub);
117 my $src = timeout_source($delay_ms);
118 $src->set_callback(sub {
125 push @EXPORT_OK, "call_after";
127 sub call_on_child_termination {
128 my ($pid, $cb, @args) = @_;
130 confess "undefined sub" unless ($cb);
132 my $src = child_watch_source($pid);
133 $src->set_callback(sub {
134 my ($src, $pid, $exitstatus) = @_;
136 return $cb->($exitstatus);
139 push @EXPORT_OK, "call_on_child_termination";
143 my $fd = $params{'fd'};
144 my $size = $params{'size'} || 0;
145 my $cb = $params{'async_read_cb'};
147 @args = @{$params{'args'}} if exists $params{'args'};
154 my $res = POSIX::read($fd, $buf, $size || 32768);
156 return $cb->($!, undef, @args);
158 return $cb->(undef, $buf, @args);
161 my $src = fd_source($fd, $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
162 $src->set_callback($fd_cb);
165 push @EXPORT_OK, "async_read";
167 my %outstanding_writes;
170 my $fd = $params{'fd'};
171 my $data = $params{'data'};
172 my $cb = $params{'async_write_cb'};
174 @args = @{$params{'args'}} if exists $params{'args'};
176 # more often than not, writes will not block, so just try it.
177 if (!exists $outstanding_writes{$fd}) {
178 my $res = POSIX::write($fd, $data, length($data));
180 if ($! != POSIX::EAGAIN) {
181 return $cb->($!, 0, @args);
183 } elsif ($res eq length($data)) {
184 return $cb->(undef, $res, @args);
186 # chop off whatever data was written
187 $data = substr($data, $res);
191 if (!exists $outstanding_writes{$fd}) {
192 my $fd_writes = $outstanding_writes{$fd} = [];
193 my $src = fd_source($fd, $G_IO_OUT|$G_IO_HUP|$G_IO_ERR);
195 # (note that this does not coalesce consecutive outstanding writes
196 # into a single POSIX::write call)
198 my $ow = $fd_writes->[0];
199 my ($buf, $nwritten, $len, $cb, $args) = @$ow;
201 my $res = POSIX::write($fd, $buf, $len-$nwritten);
204 $cb->($!, $nwritten, @$args);
206 $ow->[1] = $nwritten = $nwritten + $res;
207 if ($nwritten == $len) {
209 $cb->(undef, $nwritten, @$args);
211 $ow->[0] = substr($buf, $res);
215 # (the following is *intentionally* done after calling $cb, allowing
216 # $cb to add a new message to $fd_writes if desired, and thus avoid
217 # removing and re-adding the source)
218 if (@$fd_writes == 0) {
220 delete $outstanding_writes{$fd};
224 $src->set_callback($fd_cb);
227 push @{$outstanding_writes{$fd}}, [ $data, 0, length($data), $cb, \@args ];
229 push @EXPORT_OK, "async_write";
232 my ($lock, $orig_cb, $sub) = @_;
235 $continuation_cb = sub {
238 # shift this invocation off the queue
239 my ($last_sub, $last_orig_cb) = @{ shift @$lock };
241 # start the next invocation, if the queue isn't empty
243 Amanda::MainLoop::call_later($lock->[0][0], $continuation_cb);
246 # call through to the original callback for the last invocation
247 return $last_orig_cb->(@args);
250 # push this sub onto the lock queue
251 if ((push @$lock, [ $sub, $orig_cb ]) == 1) {
252 # if this is the first addition to the queue, start it
253 $sub->($continuation_cb);
256 push @EXPORT_OK, "synchronized";
258 { # privat variables to track the "current" step definition
263 sub define_steps (@) {
265 my $cb_ref = $params{'cb_ref'};
266 my $finalize = $params{'finalize'};
269 croak "cb_ref is undefined" unless defined $cb_ref;
270 croak "cb_ref is not a reference" unless ref($cb_ref) eq 'REF';
271 croak "cb_ref is not a code double-reference" unless ref($$cb_ref) eq 'CODE';
273 # arrange to clear out $steps when $exit_cb is called; this eliminates
274 # reference loops (values in %steps are closures which point to %steps).
275 # This also clears $current_steps, which is likely holding a reference to
277 my $orig_cb = $$cb_ref;
280 $current_steps = undef;
281 $finalize->() if defined($finalize);
286 $current_steps = \%steps;
287 $immediate = $params{'immediate'};
290 return $current_steps;
292 push @EXPORT, "define_steps";
296 my $step_immediate = $immediate || $params{'immediate'};
297 delete $params{'immediate'} if $step_immediate;
299 my ($name) = keys %params;
300 my $cb = $params{$name};
302 croak "expected a sub at key $name" unless ref($cb) eq 'CODE';
304 # make the sub delayed
305 unless ($step_immediate) {
307 $cb = sub { Amanda::MainLoop::call_later($orig_cb, @_); }
310 # patch up the callback
311 my ($pkg, $filename, $line) = caller;
312 my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
313 $cb = subname($newname => $cb);
315 # store the step for later
316 $current_steps->{$name} = $cb;
318 # and invoke it, if it's the first step given
320 if ($step_immediate) {
328 push @EXPORT, "step";
334 g_main_loop_run(default_main_loop());
338 g_main_loop_quit(default_main_loop());
346 /* First we wrap the amglue_Source struct, defined in
347 * perl/amglue/mainloop.h, into a Perl object (named
348 * Amanda::MainLoop::Source). After that appear several
349 * constructors for various event sources.
352 %{ static void amglue_source_remove(amglue_Source *self); %}
354 %rename(Source) amglue_Source;
355 typedef struct amglue_Source {
357 /* Constructor: use one of the package-level functions, below */
359 die("Amanda::MainLoop::Source is an abstract base class");
362 /* Destructor: just unref the object */
364 amglue_source_unref(self);
367 /* a "cheater's typemap" to just pass the SV along */
368 %typemap(in) SV *callback_sub "$1 = $input;"
369 void set_callback(SV *callback_sub) {
370 /* Attach the source to the default mainloop context, so
371 * that it will start generating events. If it's already
372 * been destroyed, then bail with a fatal error.
374 if (self->state == AMGLUE_SOURCE_DESTROYED) {
375 die("This source has already been removed");
376 } else if (self->state == AMGLUE_SOURCE_NEW) {
377 self->state = AMGLUE_SOURCE_ATTACHED;
379 g_source_attach(self->src, NULL);
381 /* the link from the GSource to the amglue_Source is
382 * now in use, so we increment the amglue_Source's
384 amglue_source_ref(self);
387 /* whoever created this Source object conveniently left
388 * the proper C-side callback for us. This function has
389 * the appropriate calling signature for this GSource, and
390 * knows how to reflect that into Perl. It expects the SV to
391 * be provided as its 'data' argument. 'perlcall' suggests
392 * that we make a copy of this SV, in case the user later
394 if (self->callback_sv) {
395 SvREFCNT_dec(self->callback_sv);
396 self->callback_sv = NULL;
398 self->callback_sv = newSVsv(callback_sub);
399 SvREFCNT_inc(self->callback_sv);
400 g_source_set_callback(self->src, self->callback,
401 (gpointer)self, NULL);
403 /* delete the cheater's typemap */
407 amglue_source_remove(self);
413 /* Detach a source from the mainloop and remove it from play. This is broken
414 * out as a separate function because it's also used from some callbacks */
416 amglue_source_remove(
419 /* protect against self being freed out from under us */
420 amglue_source_ref(self);
422 if (self->state == AMGLUE_SOURCE_ATTACHED) {
423 /* unref any perl callback */
424 if (self->callback_sv) {
425 SvREFCNT_dec(self->callback_sv);
426 self->callback_sv = NULL;
429 /* undo the ref made in set_callback() */
430 amglue_source_unref(self);
432 g_source_destroy(self->src);
435 self->state = AMGLUE_SOURCE_DESTROYED;
437 /* reverse the "protection" increment used above */
438 amglue_source_unref(self);
442 /* "Generic" callback function for a GSource that actually uses the GSourceFunc
443 * prototype. The source of this function also serves as a prototype for other,
444 * more advanced callbacks. Due to perl's heavy use of precompiler macros, it's
445 * not possible to break this down any further. */
448 amglue_source_callback_simple(
452 amglue_Source *src = (amglue_Source *)data;
455 /* keep the source around long enough for the call to finish */
456 amglue_source_ref(src);
457 g_assert(src->callback_sv != NULL);
462 /* create a new SV pointing to 'src', and increase our refcount
463 * accordingly. The SV is mortal, so FREETMPS will decrease the
464 * refcount, unless the callee keeps a copy of it somewhere */
465 amglue_source_ref(src);
466 src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
467 SWIG_OWNER | SWIG_SHADOW);
473 call_sv(src->callback_sv, G_EVAL|G_DISCARD);
478 /* we no longer need the src */
479 amglue_source_unref(src);
482 /* this may have been freed, so don't use them after this point */
485 /* check for an uncaught 'die'. If we don't do this, then Perl will longjmp()
486 * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
489 /* We handle this just the way the default 'die' handler in Amanda::Debug
490 * does, but since Amanda's debug support may not yet be running, we back
491 * it up with an exit() */
492 g_critical("%s", SvPV_nolen(ERRSV));
500 /* Constructors for some general-purpose sources */
503 %newobject timeout_source;
509 return amglue_source_new(g_timeout_source_new(interval),
510 (GSourceFunc)amglue_source_callback_simple);
515 %newobject idle_source;
521 GSource *idle_source = g_idle_source_new();
522 g_source_set_priority(idle_source, priority);
523 return amglue_source_new(idle_source,
524 (GSourceFunc)amglue_source_callback_simple);
528 /* child watch source */
531 child_watch_source_callback(
537 amglue_Source *src = (amglue_Source *)data;
540 /* keep the source around long enough for the call to finish */
541 amglue_source_ref(src);
542 g_assert(src->callback_sv != NULL);
547 /* create a new SV pointing to 'src', and increase our refcount
548 * accordingly. The SV is mortal, so FREETMPS will decrease the
549 * refcount, unless the callee keeps a copy of it somewhere */
550 amglue_source_ref(src);
551 src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
552 SWIG_OWNER | SWIG_SHADOW);
556 XPUSHs(sv_2mortal(newSViv(pid)));
557 XPUSHs(sv_2mortal(newSViv(status)));
560 call_sv(src->callback_sv, G_EVAL|G_DISCARD);
562 /* child watch sources automatically destroy themselves after the
563 * child dies, so we mark the amglue_Source as destroyed, too. */
564 amglue_source_remove(src);
569 /* we no longer need the src */
570 amglue_source_unref(src);
573 /* this may have been freed, so don't use them after this point */
576 /* check for an uncaught 'die'. If we don't do this, then Perl will longjmp()
577 * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
580 /* We handle this just the way the default 'die' handler in Amanda::Debug
581 * does, but since Amanda's debug support may not yet be running, we back
582 * it up with an exit() */
583 g_critical("%s", SvPV_nolen(ERRSV));
590 %newobject child_watch_source;
596 GSource *child_watch_source = new_child_watch_source(pid);
597 return amglue_source_new(child_watch_source,
598 (GSourceFunc)child_watch_source_callback);
603 %apply gint { GIOCondition };
604 amglue_add_flag_tag_fns(GIOCondition);
605 amglue_add_constant(G_IO_IN, GIOCondition);
606 amglue_add_constant(G_IO_OUT, GIOCondition);
607 amglue_add_constant(G_IO_PRI, GIOCondition);
608 amglue_add_constant(G_IO_ERR, GIOCondition);
609 amglue_add_constant(G_IO_HUP, GIOCondition);
610 amglue_add_constant(G_IO_NVAL, GIOCondition);
611 amglue_copy_to_tag(GIOCondition, constants);
613 %newobject fd_source;
620 GSource *fdsource = new_fdsource(fd, events);
621 return amglue_source_new(fdsource,
622 (GSourceFunc)amglue_source_callback_simple);