6b66c457b09f18d771e6b02a8978cc7116207a04
[debian/amanda] / perl / Amanda / MainLoop.swg
1 /*
2  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 %module "Amanda::MainLoop"
22 %include "amglue/amglue.swg"
23 %include "exception.i"
24
25 %include "Amanda/MainLoop.pod"
26
27 %{
28 #include "amanda.h"
29 #include "event.h"
30 %}
31
32 %perlcode %{
33
34 use POSIX;
35 use Carp;
36
37 ## basic functions
38
39 BEGIN {
40     my $have_sub_name = eval "use Sub::Name; 1";
41     if (!$have_sub_name) {
42         eval <<'EOF'
43             sub subname {
44                 my ($name, $sub) = @_;
45                 $sub;
46             }
47 EOF
48     }
49 }
50
51 # glib's g_is_main_loop_running() seems inaccurate, so we just
52 # track that information locally..
53 my $mainloop_running = 0;
54 sub run {
55     $mainloop_running = 1;
56     run_c();
57     $mainloop_running = 0;
58 }
59 push @EXPORT_OK, "run";
60
61 sub is_running {
62     return $mainloop_running;
63 }
64 push @EXPORT_OK, "is_running";
65
66 # quit is a direct call to C
67 push @EXPORT_OK, "quit";
68
69 ## utility functions
70
71 my @waiting_to_call_later;
72 sub call_later {
73     my ($sub, @args) = @_;
74
75     confess "undefined sub" unless ($sub);
76
77     # add the callback if nothing is waiting right now
78     if (!@waiting_to_call_later) {
79         timeout_source(0)->set_callback(sub {
80             my ($src) = @_;
81             $src->remove();
82
83             while (@waiting_to_call_later) {
84                 my ($sub, @args) = @{shift @waiting_to_call_later};
85                 $sub->(@args) if $sub;
86             }
87         });
88     }
89
90     push @waiting_to_call_later, [ $sub, @args ];
91 }
92 push @EXPORT_OK, "call_later";
93
94 sub make_cb {
95     my ($name, $sub) = @_;
96
97     if ($sub) {
98         my ($pkg, $filename, $line) = caller;
99         my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
100         $sub = subname($newname => $sub);
101     } else {
102         $sub = $name; # no name => sub is actually in first parameter
103     }
104
105     sub {
106         Amanda::MainLoop::call_later($sub, @_);
107     };
108 }
109 push @EXPORT, 'make_cb';
110
111 sub call_after {
112     my ($delay_ms, $sub, @args) = @_;
113
114     confess "undefined sub" unless ($sub);
115
116     my $src = timeout_source($delay_ms);
117     $src->set_callback(sub {
118         $src->remove();
119         $sub->(@args);
120     });
121
122     return $src;
123 }
124 push @EXPORT_OK, "call_after";
125
126 sub call_on_child_termination {
127     my ($pid, $cb, @args) = @_;
128
129     confess "undefined sub" unless ($cb);
130
131     my $src = child_watch_source($pid);
132     $src->set_callback(sub {
133         my ($src, $pid, $exitstatus) = @_;
134         $src->remove();
135         return $cb->($exitstatus);
136     });
137 }
138 push @EXPORT_OK, "call_on_child_termination";
139
140 sub async_read {
141     my %params = @_;
142     my $fd = $params{'fd'};
143     my $size = $params{'size'} || 0;
144     my $cb = $params{'async_read_cb'};
145     my @args;
146     @args = @{$params{'args'}} if exists $params{'args'};
147
148     my $fd_cb = sub {
149         my ($src) = @_;
150         $src->remove();
151
152         my $buf;
153         my $res = POSIX::read($fd, $buf, $size || 32768);
154         if (!defined $res) {
155             return $cb->($!, undef, @args);
156         } else {
157             return $cb->(undef, $buf, @args);
158         }
159     };
160     my $src = fd_source($fd, $G_IO_IN|$G_IO_HUP|$G_IO_ERR);
161     $src->set_callback($fd_cb);
162     return $src;
163 }
164 push @EXPORT_OK, "async_read";
165
166 my %outstanding_writes;
167 sub async_write {
168     my %params = @_;
169     my $fd = $params{'fd'};
170     my $data = $params{'data'};
171     my $cb = $params{'async_write_cb'};
172     my @args;
173     @args = @{$params{'args'}} if exists $params{'args'};
174
175     # more often than not, writes will not block, so just try it.
176     if (!exists $outstanding_writes{$fd}) {
177         my $res = POSIX::write($fd, $data, length($data));
178         if (!defined $res) {
179             if ($! != POSIX::EAGAIN) {
180                 return $cb->($!, 0, @args);
181             }
182         } elsif ($res eq length($data)) {
183             return $cb->(undef, $res, @args);
184         } else {
185             # chop off whatever data was written
186             $data = substr($data, $res);
187         }
188     }
189
190     if (!exists $outstanding_writes{$fd}) {
191         my $fd_writes = $outstanding_writes{$fd} = [];
192         my $src = fd_source($fd, $G_IO_OUT|$G_IO_HUP|$G_IO_ERR);
193
194         # (note that this does not coalesce consecutive outstanding writes
195         # into a single POSIX::write call)
196         my $fd_cb = sub {
197             my $ow = $fd_writes->[0];
198             my ($buf, $nwritten, $len, $cb, $args) = @$ow;
199
200             my $res = POSIX::write($fd, $buf, $len-$nwritten);
201             if (!defined $res) {
202                 shift @$fd_writes;
203                 $cb->($!, $nwritten, @$args);
204             } else {
205                 $ow->[1] = $nwritten = $nwritten + $res;
206                 if ($nwritten == $len) {
207                     shift @$fd_writes;
208                     $cb->(undef, $nwritten, @$args);
209                 } else {
210                     $ow->[0] = substr($buf, $res);
211                 }
212             }
213
214             # (the following is *intentionally* done after calling $cb, allowing
215             # $cb to add a new message to $fd_writes if desired, and thus avoid
216             # removing and re-adding the source)
217             if (@$fd_writes == 0) {
218                 $src->remove();
219                 delete $outstanding_writes{$fd};
220             }
221         };
222
223         $src->set_callback($fd_cb);
224     }
225     
226     push @{$outstanding_writes{$fd}}, [ $data, 0, length($data), $cb, \@args ];
227 }
228 push @EXPORT_OK, "async_write";
229
230 sub synchronized {
231     my ($lock, $orig_cb, $sub) = @_;
232     my $continuation_cb;
233
234     $continuation_cb = sub {
235         my @args = @_;
236
237         # shift this invocation off the queue
238         my ($last_sub, $last_orig_cb) = @{ shift @$lock };
239
240         # start the next invocation, if the queue isn't empty
241         if (@$lock) {
242             Amanda::MainLoop::call_later($lock->[0][0], $continuation_cb);
243         }
244
245         # call through to the original callback for the last invocation
246         return $last_orig_cb->(@args);
247     };
248
249     # push this sub onto the lock queue
250     if ((push @$lock, [ $sub, $orig_cb ]) == 1) {
251         # if this is the first addition to the queue, start it
252         $sub->($continuation_cb);
253     }
254 }
255 push @EXPORT_OK, "synchronized";
256
257 {   # privat variables to track the "current" step definition
258     my $current_steps;
259     my $immediate;
260     my $first_step;
261
262     sub define_steps (@) {
263         my (%params) = @_;
264         my $cb_ref = $params{'cb_ref'};
265         my $finalize = $params{'finalize'};
266         my %steps;
267
268         croak "cb_ref is undefined" unless defined $cb_ref;
269         croak "cb_ref is not a reference" unless ref($cb_ref) eq 'REF';
270         croak "cb_ref is not a code double-reference" unless ref($$cb_ref) eq 'CODE';
271
272         # arrange to clear out $steps when $exit_cb is called; this eliminates
273         # reference loops (values in %steps are closures which point to %steps).
274         # This also clears $current_steps, which is likely holding a reference to
275         # the steps hash.
276         my $orig_cb = $$cb_ref;
277         $$cb_ref = sub {
278             %steps = ();
279             $current_steps = undef;
280             $finalize->() if defined($finalize);
281             goto $orig_cb;
282         };
283
284         # set up state
285         $current_steps = \%steps;
286         $immediate = $params{'immediate'};
287         $first_step = 1;
288
289         return $current_steps;
290     }
291     push @EXPORT, "define_steps";
292
293     sub step (@) {
294         my (%params) = @_;
295         my $step_immediate = $immediate || $params{'immediate'};
296         delete $params{'immediate'} if $step_immediate;
297
298         my ($name) = keys %params;
299         my $cb = $params{$name};
300
301         croak "expected a sub at key $name" unless ref($cb) eq 'CODE';
302
303         # make the sub delayed
304         unless ($step_immediate) {
305             my $orig_cb = $cb;
306             $cb = sub { Amanda::MainLoop::call_later($orig_cb, @_); }
307         }
308
309         # patch up the callback
310         my ($pkg, $filename, $line) = caller;
311         my $newname = sprintf('$%s::%s@l%s', $pkg, $name, $line);
312         $cb = subname($newname => $cb);
313
314         # store the step for later
315         $current_steps->{$name} = $cb;
316
317         # and invoke it, if it's the first step given
318         if ($first_step) {
319             if ($step_immediate) {
320                 call_later($cb);
321             } else {
322                 $cb->();
323             }
324         }
325         $first_step = 0;
326     }
327     push @EXPORT, "step";
328 }
329 %}
330
331 %inline %{
332 void run_c(void) {
333     g_main_loop_run(default_main_loop());
334 }
335
336 void quit(void) {
337     g_main_loop_quit(default_main_loop());
338 }
339 %}
340
341 /*
342  * Event Sources
343  */
344
345 /* First we wrap the amglue_Source struct, defined in
346  * perl/amglue/mainloop.h, into a Perl object (named
347  * Amanda::MainLoop::Source).  After that appear several 
348  * constructors for various event sources.
349  */
350
351 %{ static void amglue_source_remove(amglue_Source *self); %}
352
353 %rename(Source) amglue_Source;
354 typedef struct amglue_Source {
355     %extend {
356         /* Constructor: use one of the package-level functions, below */
357         amglue_Source() {
358             die("Amanda::MainLoop::Source is an abstract base class");
359         }
360
361         /* Destructor: just unref the object */
362         ~amglue_Source() {
363             amglue_source_unref(self);
364         }
365
366         /* a "cheater's typemap" to just pass the SV along */
367         %typemap(in) SV *callback_sub "$1 = $input;"
368         void set_callback(SV *callback_sub) {
369             /* Attach the source to the default mainloop context, so
370              * that it will start generating events.  If it's already
371              * been destroyed, then bail with a fatal error.
372              */
373             if (self->state == AMGLUE_SOURCE_DESTROYED) {
374                 die("This source has already been removed");
375             } else if (self->state == AMGLUE_SOURCE_NEW) {
376                 self->state = AMGLUE_SOURCE_ATTACHED;
377
378                 g_source_attach(self->src, NULL);
379
380                 /* the link from the GSource to the amglue_Source is
381                  * now in use, so we increment the amglue_Source's 
382                  * refcount. */
383                 amglue_source_ref(self);
384             }
385
386             /* whoever created this Source object conveniently left
387              * the proper C-side callback for us.  This function has
388              * the appropriate calling signature for this GSource, and
389              * knows how to reflect that into Perl.  It expects the SV to
390              * be provided as its 'data' argument.  'perlcall' suggests
391              * that we make a copy of this SV, in case the user later
392              * modifies it. */
393             if (self->callback_sv) {
394                 SvREFCNT_dec(self->callback_sv);
395                 self->callback_sv = NULL;
396             }
397             self->callback_sv = newSVsv(callback_sub);
398             SvREFCNT_inc(self->callback_sv);
399             g_source_set_callback(self->src, self->callback,
400                 (gpointer)self, NULL);
401         }
402         /* delete the cheater's typemap */
403         %typemap(in) SV *sv;
404
405         void remove(void) {
406             amglue_source_remove(self);
407         }
408     }
409 } amglue_Source;
410
411 %{
412 /* Detach a source from the mainloop and remove it from play.  This is broken
413  * out as a separate function because it's also used from some callbacks */
414 static void
415 amglue_source_remove(
416     amglue_Source *self)
417 {
418     /* protect against self being freed out from under us */
419     amglue_source_ref(self);
420
421     if (self->state == AMGLUE_SOURCE_ATTACHED) {
422         /* unref any perl callback */
423         if (self->callback_sv) {
424             SvREFCNT_dec(self->callback_sv);
425             self->callback_sv = NULL;
426         }
427
428         /* undo the ref made in set_callback() */
429         amglue_source_unref(self);
430
431         g_source_destroy(self->src);
432     }
433
434     self->state = AMGLUE_SOURCE_DESTROYED;
435
436     /* reverse the "protection" increment used above */
437     amglue_source_unref(self);
438 }
439 %}
440
441 /* "Generic" callback function for a GSource that actually uses the GSourceFunc
442  * prototype.  The source of this function also serves as a prototype for other,
443  * more advanced callbacks.  Due to perl's heavy use of precompiler macros, it's
444  * not possible to break this down any further. */
445 %{
446 static gboolean
447 amglue_source_callback_simple(
448     gpointer *data)
449 {
450     dSP;
451     amglue_Source *src = (amglue_Source *)data;
452     SV *src_sv = NULL;
453
454     /* keep the source around long enough for the call to finish */
455     amglue_source_ref(src);
456     g_assert(src->callback_sv != NULL);
457
458     ENTER;
459     SAVETMPS;
460
461     /* create a new SV pointing to 'src', and increase our refcount
462      * accordingly.  The SV is mortal, so FREETMPS will decrease the 
463      * refcount, unless the callee keeps a copy of it somewhere */
464     amglue_source_ref(src);
465     src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
466                                  SWIG_OWNER | SWIG_SHADOW);
467
468     PUSHMARK(SP);
469     XPUSHs(src_sv);
470     PUTBACK;
471
472     call_sv(src->callback_sv, G_EVAL|G_DISCARD);
473
474     FREETMPS;
475     LEAVE;
476
477     /* we no longer need the src */
478     amglue_source_unref(src);
479     src = NULL;
480
481     /* this may have been freed, so don't use them after this point */
482     src_sv = NULL;
483
484     /* check for an uncaught 'die'.  If we don't do this, then Perl will longjmp()
485      * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
486      * state. */
487     if (SvTRUE(ERRSV)) {
488         /* We handle this just the way the default 'die' handler in Amanda::Debug 
489          * does, but since Amanda's debug support may not yet be running, we back
490          * it up with an exit() */
491         g_critical("%s", SvPV_nolen(ERRSV));
492         exit(1);
493     }
494
495     return TRUE;
496 }
497 %}
498
499 /* Constructors for some general-purpose sources */
500
501 /* timeout source */
502 %newobject timeout_source;
503 %inline %{
504 amglue_Source *
505 timeout_source(
506     guint interval)
507 {
508     return amglue_source_new(g_timeout_source_new(interval), 
509         (GSourceFunc)amglue_source_callback_simple);
510 }
511 %}
512
513 /* idle source */
514 %newobject idle_source;
515 %inline %{
516 amglue_Source *
517 idle_source(
518     gint priority)
519 {
520     GSource *idle_source = g_idle_source_new();
521     g_source_set_priority(idle_source, priority);
522     return amglue_source_new(idle_source,
523         (GSourceFunc)amglue_source_callback_simple);
524 }
525 %}
526
527 /* child watch source */
528 %{
529 static gboolean
530 child_watch_source_callback(
531     pid_t pid,
532     gint status,
533     gpointer data)
534 {
535     dSP;
536     amglue_Source *src = (amglue_Source *)data;
537     SV *src_sv;
538
539     /* keep the source around long enough for the call to finish */
540     amglue_source_ref(src);
541     g_assert(src->callback_sv != NULL);
542
543     ENTER;
544     SAVETMPS;
545
546     /* create a new SV pointing to 'src', and increase our refcount
547      * accordingly.  The SV is mortal, so FREETMPS will decrease the 
548      * refcount, unless the callee keeps a copy of it somewhere */
549     amglue_source_ref(src);
550     src_sv = SWIG_NewPointerObj(src, SWIGTYPE_p_amglue_Source,
551                                  SWIG_OWNER | SWIG_SHADOW);
552
553     PUSHMARK(SP);
554     XPUSHs(src_sv);
555     XPUSHs(sv_2mortal(newSViv(pid)));
556     XPUSHs(sv_2mortal(newSViv(status)));
557     PUTBACK;
558
559     call_sv(src->callback_sv, G_EVAL|G_DISCARD);
560
561     /* child watch sources automatically destroy themselves after the
562      * child dies, so we mark the amglue_Source as destroyed, too. */
563     amglue_source_remove(src);
564
565     FREETMPS;
566     LEAVE;
567
568     /* we no longer need the src */
569     amglue_source_unref(src);
570     src = NULL;
571
572     /* this may have been freed, so don't use them after this point */
573     src_sv = NULL;
574
575     /* check for an uncaught 'die'.  If we don't do this, then Perl will longjmp()
576      * over the GMainLoop mechanics, leaving GMainLoop in an inconsistent (locked)
577      * state. */
578     if (SvTRUE(ERRSV)) {
579         /* We handle this just the way the default 'die' handler in Amanda::Debug 
580          * does, but since Amanda's debug support may not yet be running, we back
581          * it up with an exit() */
582         g_critical("%s", SvPV_nolen(ERRSV));
583         exit(1);
584     }
585
586     return TRUE;
587 }
588 %}
589 %newobject child_watch_source;
590 %inline %{
591 amglue_Source *
592 child_watch_source(
593     gint pid)
594 {
595     GSource *child_watch_source = new_child_watch_source(pid);
596     return amglue_source_new(child_watch_source,
597         (GSourceFunc)child_watch_source_callback);
598 }
599 %}
600
601 /* fd source */
602 %apply gint { GIOCondition };
603 amglue_add_flag_tag_fns(GIOCondition);
604 amglue_add_constant(G_IO_IN, GIOCondition);
605 amglue_add_constant(G_IO_OUT, GIOCondition);
606 amglue_add_constant(G_IO_PRI, GIOCondition);
607 amglue_add_constant(G_IO_ERR, GIOCondition);
608 amglue_add_constant(G_IO_HUP, GIOCondition);
609 amglue_add_constant(G_IO_NVAL, GIOCondition);
610 amglue_copy_to_tag(GIOCondition, constants);
611
612 %newobject fd_source;
613 %inline %{
614 amglue_Source *
615 fd_source(
616     int fd,
617     GIOCondition events)
618 {
619     GSource *fdsource = new_fdsource(fd, events);
620     return amglue_source_new(fdsource,
621         (GSourceFunc)amglue_source_callback_simple);
622 }
623 %}