Imported Upstream version 3.1.0
[debian/amanda] / perl / Amanda / MainLoop.swg
1 /*
2  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 %module "Amanda::MainLoop"
22 %include "amglue/amglue.swg"
23 %include "exception.i"
24
25 %include "Amanda/MainLoop.pod"
26
27 %{
28 #include "amanda.h"
29 #include "event.h"
30 %}
31
32 %perlcode %{
33
34 use POSIX;
35 use Carp;
36
37 ## basic functions
38
39 BEGIN {
40     my $have_sub_name = eval "use Sub::Name; 1";
41     if (!$have_sub_name) {
42         eval <<'EOF'
43             sub subname {
44                 my ($name, $sub) = @_;
45                 $sub;
46             }
47 EOF
48     }
49 }
50
51 # glib's g_is_main_loop_running() seems inaccurate, so we just
52 # track that information locally..
53 my $mainloop_running = 0;
54 sub run {
55     $mainloop_running = 1;
56     run_c();
57     $mainloop_running = 0;
58 }
59 push @EXPORT_OK, "run";
60
61 sub is_running {
62     return $mainloop_running;
63 }
64 push @EXPORT_OK, "is_running";
65
66 # quit is a direct call to C
67 push @EXPORT_OK, "quit";
68
69 ## utility functions
70
71 my @waiting_to_call_later;
72 sub call_later {
73     my ($sub, @args) = @_;
74
75     confess "undefined sub" unless ($sub);
76
77     # add the callback if nothing is waiting right now
78     if (!@waiting_to_call_later) {
79         timeout_source(0)->set_callback(sub {
80             my ($src) = @_;
81             $src->remove();
82
83             while (@waiting_to_call_later) {
84                 my ($sub, @args) = @{shift @waiting_to_call_later};
85                 $sub->(@args) if $sub;
86             }
87         });
88     }
89
90     push @waiting_to_call_later, [ $sub, @args ];
91 }
92 push @EXPORT_OK, "call_later";
93
94 sub make_cb {
95     my ($name, $sub) = @_;
96
97     if ($sub) {
98         my ($pkg, $filename, $line) = caller;
99         my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
100         $sub = subname($newname => $sub);
101     } else {
102         $sub = $name; # no name => sub is actually in first parameter
103     }
104
105     sub {
106         Amanda::MainLoop::call_later($sub, @_);
107     };
108 }
109 push @EXPORT, 'make_cb';
110
111 sub call_after {
112     my ($delay_ms, $sub, @args) = @_;
113
114     confess "undefined sub" unless ($sub);
115
116     my $src = timeout_source($delay_ms);
117     $src->set_callback(sub {
118         $src->remove();
119         $sub->(@args);
120     });
121
122     return $src;
123 }
124 push @EXPORT_OK, "call_after";
125
126 sub call_on_child_termination {
127     my ($pid, $cb, @args) = @_;
128
129     confess "undefined sub" unless ($cb);
130
131     my $src = child_watch_source($pid);
132     $src->set_callback(sub {
133         my ($src, $pid, $exitstatus) = @_;
134         $src->remove();
135         return $cb->($exitstatus);
136     });
137 }
138 push @EXPORT_OK, "call_on_child_termination";
139
140 sub async_read {
141     my %params = @_;
142     my $fd = $params{'fd'};
143     my $size = $params{'size'} || 0;
144     my $cb = $params{'async_read_cb'};
145     my @args;
146     @args = @{$params{'args'}} if exists $params{'args'};
147
148     my $fd_cb = sub {
149         my ($src) = @_;
150         $src->remove();
151
152         my $buf;
153         my $res = POSIX::read($fd, $buf, $size || 32768);
154         if (!defined $res) {
155             return $cb->($!, undef, @args);
156         } else {
157             return $cb->(undef, $buf, @args);
158         }
159     };
160     my $src = fd_source($fd, $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
161     $src->set_callback($fd_cb);
162     return $src;
163 }
164 push @EXPORT_OK, "async_read";
165
166 my %outstanding_writes;
167 sub async_write {
168     my %params = @_;
169     my $fd = $params{'fd'};
170     my $data = $params{'data'};
171     my $cb = $params{'async_write_cb'};
172     my @args;
173     @args = @{$params{'args'}} if exists $params{'args'};
174
175     # more often than not, writes will not block, so just try it.
176     if (!exists $outstanding_writes{$fd}) {
177         my $res = POSIX::write($fd, $data, length($data));
178         if (!defined $res) {
179             if ($! != POSIX::EAGAIN) {
180                 return $cb->($!, 0, @args);
181             }
182         } elsif ($res eq length($data)) {
183             return $cb->(undef, $res, @args);
184         } else {
185             # chop off whatever data was written
186             $data = substr($data, $res);
187         }
188     }
189
190     if (!exists $outstanding_writes{$fd}) {
191         my $fd_writes = $outstanding_writes{$fd} = [];
192         my $src = fd_source($fd, $G_IO_OUT|$G_IO_HUP|$G_IO_ERR);
193
194         # (note that this does not coalesce consecutive outstanding writes
195         # into a single POSIX::write call)
196         my $fd_cb = sub {
197             my $ow = $fd_writes->[0];
198             my ($buf, $nwritten, $len, $cb, $args) = @$ow;
199
200             my $res = POSIX::write($fd, $buf, $len-$nwritten);
201             if (!defined $res) {
202                 shift @$fd_writes;
203                 $cb->($!, $nwritten, @$args);
204             } else {
205                 $ow->[1] = $nwritten = $nwritten + $res;
206                 if ($nwritten == $len) {
207                     shift @$fd_writes;
208                     $cb->(undef, $nwritten, @$args);
209                 } else {
210                     $ow->[0] = substr($buf, $res);
211                 }
212             }
213
214             # (the following is *intentionally* done after calling $cb, allowing
215             # $cb to add a new message to $fd_writes if desired, and thus avoid
216             # removing and re-adding the source)
217             if (@$fd_writes == 0) {
218                 $src->remove();
219                 delete $outstanding_writes{$fd};
220             }
221         };
222
223         $src->set_callback($fd_cb);
224     }
225     
226     push @{$outstanding_writes{$fd}}, [ $data, 0, length($data), $cb, \@args ];
227 }
228 push @EXPORT_OK, "async_write";
229
230 sub synchronized {
231     my ($lock, $orig_cb, $sub) = @_;
232     my $continuation_cb;
233
234     $continuation_cb = sub {
235         my @args = @_;
236
237         # shift this invocation off the queue
238         my ($last_sub, $last_orig_cb) = @{ shift @$lock };
239
240         # start the next invocation, if the queue isn't empty
241         if (@$lock) {
242             Amanda::MainLoop::call_later($lock->[0][0], $continuation_cb);
243         }
244
245         # call through to the original callback for the last invocation
246         return $last_orig_cb->(@args);
247     };
248
249     # push this sub onto the lock queue
250     if ((push @$lock, [ $sub, $orig_cb ]) == 1) {
251         # if this is the first addition to the queue, start it
252         $sub->($continuation_cb);
253     }
254 }
255 push @EXPORT_OK, "synchronized";
256
257 {   # privat variables to track the "current" step definition
258     my $current_steps;
259     my $immediate;
260     my $first_step;
261
262     sub define_steps (@) {
263         my (%params) = @_;
264         my $cb_ref = $params{'cb_ref'};
265         my %steps;
266
267         croak "cb_ref is undefined" unless defined $cb_ref;
268         croak "cb_ref is not a reference" unless ref($cb_ref) eq 'REF';
269         croak "cb_ref is not a code double-reference" unless ref($$cb_ref) eq 'CODE';
270
271         # arrange to clear out $steps when $exit_cb is called; this eliminates
272         # reference loops (values in %steps are closures which point to %steps).
273         # This also clears $current_steps, which is likely holding a reference to
274         # the steps hash.
275         my $orig_cb = $$cb_ref;
276         $$cb_ref = sub {
277             %steps = ();
278             $current_steps = undef;
279             goto $orig_cb;
280         };
281
282         # set up state
283         $current_steps = \%steps;
284         $immediate = $params{'immediate'};
285         $first_step = 1;
286
287         return $current_steps;
288     }
289     push @EXPORT, "define_steps";
290
291     sub step (@) {
292         my (%params) = @_;
293         my $step_immediate = $immediate || $params{'immediate'};
294         delete $params{'immediate'} if $step_immediate;
295
296         my ($name) = keys %params;
297         my $cb = $params{$name};
298
299         croak "expected a sub at key $name" unless ref($cb) eq 'CODE';
300
301         # make the sub delayed
302         unless ($step_immediate) {
303             my $orig_cb = $cb;
304             $cb = sub { Amanda::MainLoop::call_later($orig_cb, @_); }
305         }
306
307         # patch up the callback
308         my ($pkg, $filename, $line) = caller;
309         my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
310         $cb = subname($newname => $cb);
311
312         # store the step for later
313         $current_steps->{$name} = $cb;
314
315         # and invoke it, if it's the first step given
316         if ($first_step) {
317             if ($step_immediate) {
318                 call_later($cb);
319             } else {
320                 $cb->();
321             }
322         }
323         $first_step = 0;
324     }
325     push @EXPORT, "step";
326 }
327 %}
328
329 %inline %{
330 void run_c(void) {
331     g_main_loop_run(default_main_loop());
332 }
333
334 void quit(void) {
335     g_main_loop_quit(default_main_loop());
336 }
337 %}
338
339 /*
340  * Event Sources
341  */
342
343 /* First we wrap the amglue_Source struct, defined in
344  * perl/amglue/mainloop.h, into a Perl object (named
345  * Amanda::MainLoop::Source).  After that appear several 
346  * constructors for various event sources.
347  */
348
349 %{ static void amglue_source_remove(amglue_Source *self); %}
350
351 %rename(Source) amglue_Source;
352 typedef struct amglue_Source {
353     %extend {
354         /* Constructor: use one of the package-level functions, below */
355         amglue_Source() {
356             die("Amanda::MainLoop::Source is an abstract base class");
357         }
358
359         /* Destructor: just unref the object */
360         ~amglue_Source() {
361             amglue_source_unref(self);
362         }
363
364         /* a "cheater's typemap" to just pass the SV along */
365         %typemap(in) SV *callback_sub "$1 = $input;"
366         void set_callback(SV *callback_sub) {
367             /* Attach the source to the default mainloop context, so
368              * that it will start generating events.  If it's already
369              * been destroyed, then bail with a fatal error.
370              */
371             if (self->state == AMGLUE_SOURCE_DESTROYED) {
372                 die("This source has already been removed");
373             } else if (self->state == AMGLUE_SOURCE_NEW) {
374                 self->state = AMGLUE_SOURCE_ATTACHED;
375
376                 g_source_attach(self->src, NULL);
377
378                 /* the link from the GSource to the amglue_Source is
379                  * now in use, so we increment the amglue_Source's 
380                  * refcount. */
381                 amglue_source_ref(self);
382             }
383
384             /* whoever created this Source object conveniently left
385              * the proper C-side callback for us.  This function has
386              * the appropriate calling signature for this GSource, and
387              * knows how to reflect that into Perl.  It expects the SV to
388              * be provided as its 'data' argument.  'perlcall' suggests
389              * that we make a copy of this SV, in case the user later
390              * modifies it. */
391             if (self->callback_sv) {
392                 SvREFCNT_dec(self->callback_sv);
393                 self->callback_sv = NULL;
394             }
395             self->callback_sv = newSVsv(callback_sub);
396             SvREFCNT_inc(self->callback_sv);
397             g_source_set_callback(self->src, self->callback,
398                 (gpointer)self, NULL);
399         }
400         /* delete the cheater's typemap */
401         %typemap(in) SV *sv;
402
403         void remove(void) {
404             amglue_source_remove(self);
405         }
406     }
407 } amglue_Source;
408
409 %{
410 /* Detach a source from the mainloop and remove it from play.  This is broken
411  * out as a separate function because it's also used from some callbacks */
412 static void
413 amglue_source_remove(
414     amglue_Source *self)
415 {
416     /* protect against self being freed out from under us */
417     amglue_source_ref(self);
418
419     if (self->state == AMGLUE_SOURCE_ATTACHED) {
420         /* unref any perl callback */
421         if (self->callback_sv) {
422             SvREFCNT_dec(self->callback_sv);
423             self->callback_sv = NULL;
424         }
425
426         /* undo the ref made in set_callback() */
427         amglue_source_unref(self);
428
429         g_source_destroy(self->src);
430     }
431
432     self->state = AMGLUE_SOURCE_DESTROYED;
433
434     /* reverse the "protection" increment used above */
435     amglue_source_unref(self);
436 }
437 %}
438
439 /* "Generic" callback function for a GSource that actually uses the GSourceFunc
440  * prototype.  The source of this function also serves as a prototype for other,
441  * more advanced callbacks.  Due to perl's heavy use of precompiler macros, it's
442  * not possible to break this down any further. */
443 %{
444 static gboolean
445 amglue_source_callback_simple(
446     gpointer *data)
447 {
448     dSP;
449     amglue_Source *src = (amglue_Source *)data;
450     SV *src_sv = NULL;
451
452     /* keep the source around long enough for the call to finish */
453     amglue_source_ref(src);
454     g_assert(src->callback_sv != NULL);
455
456     ENTER;
457     SAVETMPS;
458
459     /* create a new SV pointing to 'src', and increase our refcount
460      * accordingly.  The SV is mortal, so FREETMPS will decrease the 
461      * refcount, unless the callee keeps a copy of it somewhere */
462     amglue_source_ref(src);
463     src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
464                                  SWIG_OWNER | SWIG_SHADOW);
465
466     PUSHMARK(SP);
467     XPUSHs(src_sv);
468     PUTBACK;
469
470     call_sv(src->callback_sv, G_EVAL|G_DISCARD);
471
472     FREETMPS;
473     LEAVE;
474
475     /* we no longer need the src */
476     amglue_source_unref(src);
477     src = NULL;
478
479     /* this may have been freed, so don't use them after this point */
480     src_sv = NULL;
481
482     /* check for an uncaught 'die'.  If we don't do this, then Perl will longjmp()
483      * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
484      * state. */
485     if (SvTRUE(ERRSV)) {
486         /* We handle this just the way the default 'die' handler in Amanda::Debug 
487          * does, but since Amanda's debug support may not yet be running, we back
488          * it up with an exit() */
489         g_critical("%s", SvPV_nolen(ERRSV));
490         exit(1);
491     }
492
493     return TRUE;
494 }
495 %}
496
497 /* Constructors for some general-purpose sources */
498
499 /* timeout source */
500 %newobject timeout_source;
501 %inline %{
502 amglue_Source *
503 timeout_source(
504     guint interval)
505 {
506     return amglue_source_new(g_timeout_source_new(interval), 
507         (GSourceFunc)amglue_source_callback_simple);
508 }
509 %}
510
511 /* idle source */
512 %newobject idle_source;
513 %inline %{
514 amglue_Source *
515 idle_source(
516     gint priority)
517 {
518     GSource *idle_source = g_idle_source_new();
519     g_source_set_priority(idle_source, priority);
520     return amglue_source_new(idle_source,
521         (GSourceFunc)amglue_source_callback_simple);
522 }
523 %}
524
525 /* child watch source */
526 %{
527 static gboolean
528 child_watch_source_callback(
529     pid_t pid,
530     gint status,
531     gpointer data)
532 {
533     dSP;
534     amglue_Source *src = (amglue_Source *)data;
535     SV *src_sv;
536
537     /* keep the source around long enough for the call to finish */
538     amglue_source_ref(src);
539     g_assert(src->callback_sv != NULL);
540
541     ENTER;
542     SAVETMPS;
543
544     /* create a new SV pointing to 'src', and increase our refcount
545      * accordingly.  The SV is mortal, so FREETMPS will decrease the 
546      * refcount, unless the callee keeps a copy of it somewhere */
547     amglue_source_ref(src);
548     src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
549                                  SWIG_OWNER | SWIG_SHADOW);
550
551     PUSHMARK(SP);
552     XPUSHs(src_sv);
553     XPUSHs(sv_2mortal(newSViv(pid)));
554     XPUSHs(sv_2mortal(newSViv(status)));
555     PUTBACK;
556
557     call_sv(src->callback_sv, G_EVAL|G_DISCARD);
558
559     /* child watch sources automatically destroy themselves after the
560      * child dies, so we mark the amglue_Source as destroyed, too. */
561     amglue_source_remove(src);
562
563     FREETMPS;
564     LEAVE;
565
566     /* we no longer need the src */
567     amglue_source_unref(src);
568     src = NULL;
569
570     /* this may have been freed, so don't use them after this point */
571     src_sv = NULL;
572
573     /* check for an uncaught 'die'.  If we don't do this, then Perl will longjmp()
574      * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
575      * state. */
576     if (SvTRUE(ERRSV)) {
577         /* We handle this just the way the default 'die' handler in Amanda::Debug 
578          * does, but since Amanda's debug support may not yet be running, we back
579          * it up with an exit() */
580         g_critical("%s", SvPV_nolen(ERRSV));
581         exit(1);
582     }
583
584     return TRUE;
585 }
586 %}
587 %newobject child_watch_source;
588 %inline %{
589 amglue_Source *
590 child_watch_source(
591     gint pid)
592 {
593     GSource *child_watch_source = new_child_watch_source(pid);
594     return amglue_source_new(child_watch_source,
595         (GSourceFunc)child_watch_source_callback);
596 }
597 %}
598
599 /* fd source */
600 %apply gint { GIOCondition };
601 amglue_add_flag_tag_fns(GIOCondition);
602 amglue_add_constant(G_IO_IN, GIOCondition);
603 amglue_add_constant(G_IO_OUT, GIOCondition);
604 amglue_add_constant(G_IO_PRI, GIOCondition);
605 amglue_add_constant(G_IO_ERR, GIOCondition);
606 amglue_add_constant(G_IO_HUP, GIOCondition);
607 amglue_add_constant(G_IO_NVAL, GIOCondition);
608 amglue_copy_to_tag(GIOCondition, constants);
609
610 %newobject fd_source;
611 %inline %{
612 amglue_Source *
613 fd_source(
614     int fd,
615     GIOCondition events)
616 {
617     GSource *fdsource = new_fdsource(fd, events);
618     return amglue_source_new(fdsource,
619         (GSourceFunc)amglue_source_callback_simple);
620 }
621 %}