Imported Upstream version 3.3.3
[debian/amanda] / perl / Amanda / MainLoop.swg
1 /*
2  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 %module "Amanda::MainLoop"
23 %include "amglue/amglue.swg"
24 %include "exception.i"
25
26 %include "Amanda/MainLoop.pod"
27
28 %{
29 #include "amanda.h"
30 #include "event.h"
31 %}
32
33 %perlcode %{
34
35 use POSIX;
36 use Carp;
37
38 ## basic functions
39
40 BEGIN {
41     my $have_sub_name = eval "use Sub::Name; 1";
42     if (!$have_sub_name) {
43         eval <<'EOF'
44             sub subname {
45                 my ($name, $sub) = @_;
46                 $sub;
47             }
48 EOF
49     }
50 }
51
52 # glib's g_is_main_loop_running() seems inaccurate, so we just
53 # track that information locally..
54 my $mainloop_running = 0;
55 sub run {
56     $mainloop_running = 1;
57     run_c();
58     $mainloop_running = 0;
59 }
60 push @EXPORT_OK, "run";
61
62 sub is_running {
63     return $mainloop_running;
64 }
65 push @EXPORT_OK, "is_running";
66
67 # quit is a direct call to C
68 push @EXPORT_OK, "quit";
69
70 ## utility functions
71
72 my @waiting_to_call_later;
73 sub call_later {
74     my ($sub, @args) = @_;
75
76     confess "undefined sub" unless ($sub);
77
78     # add the callback if nothing is waiting right now
79     if (!@waiting_to_call_later) {
80         timeout_source(0)->set_callback(sub {
81             my ($src) = @_;
82             $src->remove();
83
84             while (@waiting_to_call_later) {
85                 my ($sub, @args) = @{shift @waiting_to_call_later};
86                 $sub->(@args) if $sub;
87             }
88         });
89     }
90
91     push @waiting_to_call_later, [ $sub, @args ];
92 }
93 push @EXPORT_OK, "call_later";
94
95 sub make_cb {
96     my ($name, $sub) = @_;
97
98     if ($sub) {
99         my ($pkg, $filename, $line) = caller;
100         my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
101         $sub = subname($newname => $sub);
102     } else {
103         $sub = $name; # no name => sub is actually in first parameter
104     }
105
106     sub {
107         Amanda::MainLoop::call_later($sub, @_);
108     };
109 }
110 push @EXPORT, 'make_cb';
111
112 sub call_after {
113     my ($delay_ms, $sub, @args) = @_;
114
115     confess "undefined sub" unless ($sub);
116
117     my $src = timeout_source($delay_ms);
118     $src->set_callback(sub {
119         $src->remove();
120         $sub->(@args);
121     });
122
123     return $src;
124 }
125 push @EXPORT_OK, "call_after";
126
127 sub call_on_child_termination {
128     my ($pid, $cb, @args) = @_;
129
130     confess "undefined sub" unless ($cb);
131
132     my $src = child_watch_source($pid);
133     $src->set_callback(sub {
134         my ($src, $pid, $exitstatus) = @_;
135         $src->remove();
136         return $cb->($exitstatus);
137     });
138 }
139 push @EXPORT_OK, "call_on_child_termination";
140
141 sub async_read {
142     my %params = @_;
143     my $fd = $params{'fd'};
144     my $size = $params{'size'} || 0;
145     my $cb = $params{'async_read_cb'};
146     my @args;
147     @args = @{$params{'args'}} if exists $params{'args'};
148
149     my $fd_cb = sub {
150         my ($src) = @_;
151         $src->remove();
152
153         my $buf;
154         my $res = POSIX::read($fd, $buf, $size || 32768);
155         if (!defined $res) {
156             return $cb->($!, undef, @args);
157         } else {
158             return $cb->(undef, $buf, @args);
159         }
160     };
161     my $src = fd_source($fd, $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
162     $src->set_callback($fd_cb);
163     return $src;
164 }
165 push @EXPORT_OK, "async_read";
166
167 my %outstanding_writes;
168 sub async_write {
169     my %params = @_;
170     my $fd = $params{'fd'};
171     my $data = $params{'data'};
172     my $cb = $params{'async_write_cb'};
173     my @args;
174     @args = @{$params{'args'}} if exists $params{'args'};
175
176     # more often than not, writes will not block, so just try it.
177     if (!exists $outstanding_writes{$fd}) {
178         my $res = POSIX::write($fd, $data, length($data));
179         if (!defined $res) {
180             if ($! != POSIX::EAGAIN) {
181                 return $cb->($!, 0, @args);
182             }
183         } elsif ($res eq length($data)) {
184             return $cb->(undef, $res, @args);
185         } else {
186             # chop off whatever data was written
187             $data = substr($data, $res);
188         }
189     }
190
191     if (!exists $outstanding_writes{$fd}) {
192         my $fd_writes = $outstanding_writes{$fd} = [];
193         my $src = fd_source($fd, $G_IO_OUT|$G_IO_HUP|$G_IO_ERR);
194
195         # (note that this does not coalesce consecutive outstanding writes
196         # into a single POSIX::write call)
197         my $fd_cb = sub {
198             my $ow = $fd_writes->[0];
199             my ($buf, $nwritten, $len, $cb, $args) = @$ow;
200
201             my $res = POSIX::write($fd, $buf, $len-$nwritten);
202             if (!defined $res) {
203                 shift @$fd_writes;
204                 $cb->($!, $nwritten, @$args);
205             } else {
206                 $ow->[1] = $nwritten = $nwritten + $res;
207                 if ($nwritten == $len) {
208                     shift @$fd_writes;
209                     $cb->(undef, $nwritten, @$args);
210                 } else {
211                     $ow->[0] = substr($buf, $res);
212                 }
213             }
214
215             # (the following is *intentionally* done after calling $cb, allowing
216             # $cb to add a new message to $fd_writes if desired, and thus avoid
217             # removing and re-adding the source)
218             if (@$fd_writes == 0) {
219                 $src->remove();
220                 delete $outstanding_writes{$fd};
221             }
222         };
223
224         $src->set_callback($fd_cb);
225     }
226     
227     push @{$outstanding_writes{$fd}}, [ $data, 0, length($data), $cb, \@args ];
228 }
229 push @EXPORT_OK, "async_write";
230
231 sub synchronized {
232     my ($lock, $orig_cb, $sub) = @_;
233     my $continuation_cb;
234
235     $continuation_cb = sub {
236         my @args = @_;
237
238         # shift this invocation off the queue
239         my ($last_sub, $last_orig_cb) = @{ shift @$lock };
240
241         # start the next invocation, if the queue isn't empty
242         if (@$lock) {
243             Amanda::MainLoop::call_later($lock->[0][0], $continuation_cb);
244         }
245
246         # call through to the original callback for the last invocation
247         return $last_orig_cb->(@args);
248     };
249
250     # push this sub onto the lock queue
251     if ((push @$lock, [ $sub, $orig_cb ]) == 1) {
252         # if this is the first addition to the queue, start it
253         $sub->($continuation_cb);
254     }
255 }
256 push @EXPORT_OK, "synchronized";
257
258 {   # privat variables to track the "current" step definition
259     my $current_steps;
260     my $immediate;
261     my $first_step;
262
263     sub define_steps (@) {
264         my (%params) = @_;
265         my $cb_ref = $params{'cb_ref'};
266         my $finalize = $params{'finalize'};
267         my %steps;
268
269         croak "cb_ref is undefined" unless defined $cb_ref;
270         croak "cb_ref is not a reference" unless ref($cb_ref) eq 'REF';
271         croak "cb_ref is not a code double-reference" unless ref($$cb_ref) eq 'CODE';
272
273         # arrange to clear out $steps when $exit_cb is called; this eliminates
274         # reference loops (values in %steps are closures which point to %steps).
275         # This also clears $current_steps, which is likely holding a reference to
276         # the steps hash.
277         my $orig_cb = $$cb_ref;
278         $$cb_ref = sub {
279             %steps = ();
280             $current_steps = undef;
281             $finalize->() if defined($finalize);
282             goto $orig_cb;
283         };
284
285         # set up state
286         $current_steps = \%steps;
287         $immediate = $params{'immediate'};
288         $first_step = 1;
289
290         return $current_steps;
291     }
292     push @EXPORT, "define_steps";
293
294     sub step (@) {
295         my (%params) = @_;
296         my $step_immediate = $immediate || $params{'immediate'};
297         delete $params{'immediate'} if $step_immediate;
298
299         my ($name) = keys %params;
300         my $cb = $params{$name};
301
302         croak "expected a sub at key $name" unless ref($cb) eq 'CODE';
303
304         # make the sub delayed
305         unless ($step_immediate) {
306             my $orig_cb = $cb;
307             $cb = sub { Amanda::MainLoop::call_later($orig_cb, @_); }
308         }
309
310         # patch up the callback
311         my ($pkg, $filename, $line) = caller;
312         my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
313         $cb = subname($newname => $cb);
314
315         # store the step for later
316         $current_steps->{$name} = $cb;
317
318         # and invoke it, if it's the first step given
319         if ($first_step) {
320             if ($step_immediate) {
321                 call_later($cb);
322             } else {
323                 $cb->();
324             }
325         }
326         $first_step = 0;
327     }
328     push @EXPORT, "step";
329 }
330 %}
331
332 %inline %{
333 void run_c(void) {
334     g_main_loop_run(default_main_loop());
335 }
336
337 void quit(void) {
338     g_main_loop_quit(default_main_loop());
339 }
340 %}
341
342 /*
343  * Event Sources
344  */
345
346 /* First we wrap the amglue_Source struct, defined in
347  * perl/amglue/mainloop.h, into a Perl object (named
348  * Amanda::MainLoop::Source).  After that appear several 
349  * constructors for various event sources.
350  */
351
352 %{ static void amglue_source_remove(amglue_Source *self); %}
353
354 %rename(Source) amglue_Source;
355 typedef struct amglue_Source {
356     %extend {
357         /* Constructor: use one of the package-level functions, below */
358         amglue_Source() {
359             die("Amanda::MainLoop::Source is an abstract base class");
360         }
361
362         /* Destructor: just unref the object */
363         ~amglue_Source() {
364             amglue_source_unref(self);
365         }
366
367         /* a "cheater's typemap" to just pass the SV along */
368         %typemap(in) SV *callback_sub "$1 = $input;"
369         void set_callback(SV *callback_sub) {
370             /* Attach the source to the default mainloop context, so
371              * that it will start generating events.  If it's already
372              * been destroyed, then bail with a fatal error.
373              */
374             if (self->state == AMGLUE_SOURCE_DESTROYED) {
375                 die("This source has already been removed");
376             } else if (self->state == AMGLUE_SOURCE_NEW) {
377                 self->state = AMGLUE_SOURCE_ATTACHED;
378
379                 g_source_attach(self->src, NULL);
380
381                 /* the link from the GSource to the amglue_Source is
382                  * now in use, so we increment the amglue_Source's 
383                  * refcount. */
384                 amglue_source_ref(self);
385             }
386
387             /* whoever created this Source object conveniently left
388              * the proper C-side callback for us.  This function has
389              * the appropriate calling signature for this GSource, and
390              * knows how to reflect that into Perl.  It expects the SV to
391              * be provided as its 'data' argument.  'perlcall' suggests
392              * that we make a copy of this SV, in case the user later
393              * modifies it. */
394             if (self->callback_sv) {
395                 SvREFCNT_dec(self->callback_sv);
396                 self->callback_sv = NULL;
397             }
398             self->callback_sv = newSVsv(callback_sub);
399             SvREFCNT_inc(self->callback_sv);
400             g_source_set_callback(self->src, self->callback,
401                 (gpointer)self, NULL);
402         }
403         /* delete the cheater's typemap */
404         %typemap(in) SV *sv;
405
406         void remove(void) {
407             amglue_source_remove(self);
408         }
409     }
410 } amglue_Source;
411
412 %{
413 /* Detach a source from the mainloop and remove it from play.  This is broken
414  * out as a separate function because it's also used from some callbacks */
415 static void
416 amglue_source_remove(
417     amglue_Source *self)
418 {
419     /* protect against self being freed out from under us */
420     amglue_source_ref(self);
421
422     if (self->state == AMGLUE_SOURCE_ATTACHED) {
423         /* unref any perl callback */
424         if (self->callback_sv) {
425             SvREFCNT_dec(self->callback_sv);
426             self->callback_sv = NULL;
427         }
428
429         /* undo the ref made in set_callback() */
430         amglue_source_unref(self);
431
432         g_source_destroy(self->src);
433     }
434
435     self->state = AMGLUE_SOURCE_DESTROYED;
436
437     /* reverse the "protection" increment used above */
438     amglue_source_unref(self);
439 }
440 %}
441
442 /* "Generic" callback function for a GSource that actually uses the GSourceFunc
443  * prototype.  The source of this function also serves as a prototype for other,
444  * more advanced callbacks.  Due to perl's heavy use of precompiler macros, it's
445  * not possible to break this down any further. */
446 %{
447 static gboolean
448 amglue_source_callback_simple(
449     gpointer *data)
450 {
451     dSP;
452     amglue_Source *src = (amglue_Source *)data;
453     SV *src_sv = NULL;
454
455     /* keep the source around long enough for the call to finish */
456     amglue_source_ref(src);
457     g_assert(src->callback_sv != NULL);
458
459     ENTER;
460     SAVETMPS;
461
462     /* create a new SV pointing to 'src', and increase our refcount
463      * accordingly.  The SV is mortal, so FREETMPS will decrease the 
464      * refcount, unless the callee keeps a copy of it somewhere */
465     amglue_source_ref(src);
466     src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
467                                  SWIG_OWNER | SWIG_SHADOW);
468
469     PUSHMARK(SP);
470     XPUSHs(src_sv);
471     PUTBACK;
472
473     call_sv(src->callback_sv, G_EVAL|G_DISCARD);
474
475     FREETMPS;
476     LEAVE;
477
478     /* we no longer need the src */
479     amglue_source_unref(src);
480     src = NULL;
481
482     /* this may have been freed, so don't use them after this point */
483     src_sv = NULL;
484
485     /* check for an uncaught 'die'.  If we don't do this, then Perl will longjmp()
486      * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
487      * state. */
488     if (SvTRUE(ERRSV)) {
489         /* We handle this just the way the default 'die' handler in Amanda::Debug 
490          * does, but since Amanda's debug support may not yet be running, we back
491          * it up with an exit() */
492         g_critical("%s", SvPV_nolen(ERRSV));
493         exit(1);
494     }
495
496     return TRUE;
497 }
498 %}
499
500 /* Constructors for some general-purpose sources */
501
502 /* timeout source */
503 %newobject timeout_source;
504 %inline %{
505 amglue_Source *
506 timeout_source(
507     guint interval)
508 {
509     return amglue_source_new(g_timeout_source_new(interval), 
510         (GSourceFunc)amglue_source_callback_simple);
511 }
512 %}
513
514 /* idle source */
515 %newobject idle_source;
516 %inline %{
517 amglue_Source *
518 idle_source(
519     gint priority)
520 {
521     GSource *idle_source = g_idle_source_new();
522     g_source_set_priority(idle_source, priority);
523     return amglue_source_new(idle_source,
524         (GSourceFunc)amglue_source_callback_simple);
525 }
526 %}
527
528 /* child watch source */
529 %{
530 static gboolean
531 child_watch_source_callback(
532     pid_t pid,
533     gint status,
534     gpointer data)
535 {
536     dSP;
537     amglue_Source *src = (amglue_Source *)data;
538     SV *src_sv;
539
540     /* keep the source around long enough for the call to finish */
541     amglue_source_ref(src);
542     g_assert(src->callback_sv != NULL);
543
544     ENTER;
545     SAVETMPS;
546
547     /* create a new SV pointing to 'src', and increase our refcount
548      * accordingly.  The SV is mortal, so FREETMPS will decrease the 
549      * refcount, unless the callee keeps a copy of it somewhere */
550     amglue_source_ref(src);
551     src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
552                                  SWIG_OWNER | SWIG_SHADOW);
553
554     PUSHMARK(SP);
555     XPUSHs(src_sv);
556     XPUSHs(sv_2mortal(newSViv(pid)));
557     XPUSHs(sv_2mortal(newSViv(status)));
558     PUTBACK;
559
560     call_sv(src->callback_sv, G_EVAL|G_DISCARD);
561
562     /* child watch sources automatically destroy themselves after the
563      * child dies, so we mark the amglue_Source as destroyed, too. */
564     amglue_source_remove(src);
565
566     FREETMPS;
567     LEAVE;
568
569     /* we no longer need the src */
570     amglue_source_unref(src);
571     src = NULL;
572
573     /* this may have been freed, so don't use them after this point */
574     src_sv = NULL;
575
576     /* check for an uncaught 'die'.  If we don't do this, then Perl will longjmp()
577      * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
578      * state. */
579     if (SvTRUE(ERRSV)) {
580         /* We handle this just the way the default 'die' handler in Amanda::Debug 
581          * does, but since Amanda's debug support may not yet be running, we back
582          * it up with an exit() */
583         g_critical("%s", SvPV_nolen(ERRSV));
584         exit(1);
585     }
586
587     return TRUE;
588 }
589 %}
590 %newobject child_watch_source;
591 %inline %{
592 amglue_Source *
593 child_watch_source(
594     gint pid)
595 {
596     GSource *child_watch_source = new_child_watch_source(pid);
597     return amglue_source_new(child_watch_source,
598         (GSourceFunc)child_watch_source_callback);
599 }
600 %}
601
602 /* fd source */
603 %apply gint { GIOCondition };
604 amglue_add_flag_tag_fns(GIOCondition);
605 amglue_add_constant(G_IO_IN, GIOCondition);
606 amglue_add_constant(G_IO_OUT, GIOCondition);
607 amglue_add_constant(G_IO_PRI, GIOCondition);
608 amglue_add_constant(G_IO_ERR, GIOCondition);
609 amglue_add_constant(G_IO_HUP, GIOCondition);
610 amglue_add_constant(G_IO_NVAL, GIOCondition);
611 amglue_copy_to_tag(GIOCondition, constants);
612
613 %newobject fd_source;
614 %inline %{
615 amglue_Source *
616 fd_source(
617     int fd,
618     GIOCondition events)
619 {
620     GSource *fdsource = new_fdsource(fd, events);
621     return amglue_source_new(fdsource,
622         (GSourceFunc)amglue_source_callback_simple);
623 }
624 %}