X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=xfer-src%2Fxfer-element.c;h=dd94e3ca126faafa4ca3ffd055c5fdae49e08ec5;hb=cd0b924f27312d57bd42f6c4fae2b795139e2d0b;hp=678be8462669350bd611010781ca4c7968a55867;hpb=2627875b7d18858bc1f9f7652811e4d8c15a23eb;p=debian%2Famanda diff --git a/xfer-src/xfer-element.c b/xfer-src/xfer-element.c index 678be84..dd94e3c 100644 --- a/xfer-src/xfer-element.c +++ b/xfer-src/xfer-element.c @@ -1,25 +1,26 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 2008 Zmanda Inc. + * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved. * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation. * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300 + * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ -#include "amxfer.h" #include "amanda.h" -#include "arglist.h" +#include "amxfer.h" /* parent class for XferElement */ static GObjectClass *parent_class = NULL; @@ -38,21 +39,32 @@ xfer_element_init( xe->output_mech = XFER_MECH_NONE; xe->input_mech = XFER_MECH_NONE; xe->upstream = xe->downstream = NULL; - xe->input_fd = xe->output_fd = -1; + xe->_input_fd = xe->_output_fd = -1; xe->repr = NULL; } -static void +static gboolean xfer_element_setup_impl( XferElement *elt G_GNUC_UNUSED) { + return TRUE; /* success */ +} + +static gboolean +xfer_element_set_size_impl( + XferElement *elt G_GNUC_UNUSED, + gint64 size G_GNUC_UNUSED) +{ + elt->size = size; + + return TRUE; /* success */ } static gboolean xfer_element_start_impl( XferElement *elt G_GNUC_UNUSED) { - return FALSE; + return FALSE; /* will not send XMSG_DONE */ } static gboolean @@ -81,12 +93,19 @@ xfer_element_push_buffer_impl( { } +static xfer_element_mech_pair_t * +xfer_element_get_mech_pairs_impl( + XferElement *elt) +{ + return XFER_ELEMENT_GET_CLASS(elt)->mech_pairs; +} + static char * xfer_element_repr_impl( XferElement *elt) { if (!elt->repr) { - elt->repr = newvstrallocf(elt->repr, "<%s@%p>", + elt->repr = newvstrallocf(elt->repr, "<%s@%p>", G_OBJECT_TYPE_NAME(G_OBJECT(elt)), elt); } @@ -99,10 +118,20 @@ xfer_element_finalize( GObject * obj_self) { XferElement *elt = XFER_ELEMENT(obj_self); + gint fd; /* free the repr cache */ if (elt->repr) g_free(elt->repr); + /* close up the input/output file descriptors, being careful to do so + * atomically, and making any errors doing so into mere warnings */ + fd = xfer_element_swap_input_fd(elt, -1); + if (fd != -1 && close(fd) != 0) + g_warning("error closing fd %d: %s", fd, strerror(errno)); + fd = xfer_element_swap_output_fd(elt, -1); + if (fd != -1 && close(fd) != 0) + g_warning("error closing fd %d: %s", fd, strerror(errno)); + /* chain up */ G_OBJECT_CLASS(parent_class)->finalize(obj_self); } @@ -115,10 +144,12 @@ xfer_element_class_init( klass->repr = xfer_element_repr_impl; klass->setup = xfer_element_setup_impl; + klass->set_size = xfer_element_set_size_impl; klass->start = xfer_element_start_impl; klass->cancel = xfer_element_cancel_impl; klass->pull_buffer = xfer_element_pull_buffer_impl; klass->push_buffer = xfer_element_push_buffer_impl; + klass->get_mech_pairs = xfer_element_get_mech_pairs_impl; goc->finalize = xfer_element_finalize; @@ -172,11 +203,19 @@ xfer_element_repr( return XFER_ELEMENT_GET_CLASS(elt)->repr(elt); } -void +gboolean xfer_element_setup( XferElement *elt) { - XFER_ELEMENT_GET_CLASS(elt)->setup(elt); + return XFER_ELEMENT_GET_CLASS(elt)->setup(elt); +} + +gboolean +xfer_element_set_size( + XferElement *elt, + gint64 size) +{ + return XFER_ELEMENT_GET_CLASS(elt)->set_size(elt, size); } gboolean @@ -199,6 +238,13 @@ xfer_element_pull_buffer( XferElement *elt, size_t *size) { + /* Make sure that the xfer is running before calling upstream's + * pull_buffer method; this avoids a race condition where upstream + * hasn't finished its xfer_element_start yet, and isn't ready for + * a pull */ + if (elt->xfer->status == XFER_START) + wait_until_xfer_running(elt->xfer); + return XFER_ELEMENT_GET_CLASS(elt)->pull_buffer(elt, size); } @@ -208,15 +254,24 @@ xfer_element_push_buffer( gpointer buf, size_t size) { + /* There is no race condition with push_buffer, because downstream + * elements are started first. */ XFER_ELEMENT_GET_CLASS(elt)->push_buffer(elt, buf, size); } +xfer_element_mech_pair_t * +xfer_element_get_mech_pairs( + XferElement *elt) +{ + return XFER_ELEMENT_GET_CLASS(elt)->get_mech_pairs(elt); +} + /**** * Utilities */ void -xfer_element_drain_by_pulling( +xfer_element_drain_buffers( XferElement *upstream) { gpointer buf; @@ -228,7 +283,7 @@ xfer_element_drain_by_pulling( } void -xfer_element_drain_by_reading( +xfer_element_drain_fd( int fd) { size_t len; @@ -241,46 +296,3 @@ xfer_element_drain_by_reading( } } -xfer_status -wait_until_xfer_cancelled( - Xfer *xfer) -{ - xfer_status seen_status; - g_assert(xfer != NULL); - - g_mutex_lock(xfer->status_mutex); - while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE) - g_cond_wait(xfer->status_cond, xfer->status_mutex); - seen_status = xfer->status; - g_mutex_unlock(xfer->status_mutex); - - return seen_status; -} - -void -xfer_element_handle_error( - XferElement *elt, - const char *fmt, - ...) -{ - va_list argp; - XMsg *msg; - - g_assert(elt != NULL); - g_assert(elt->xfer != NULL); - - msg = xmsg_new(elt, XMSG_ERROR, 0); - - arglist_start(argp, fmt); - msg->message = g_strdup_vprintf(fmt, argp); - arglist_end(argp); - - /* send the XMSG_ERROR */ - xfer_queue_message(elt->xfer, msg); - - /* cancel the transfer */ - xfer_cancel(elt->xfer); - - /* and wait for the cancellation to take effect */ - wait_until_xfer_cancelled(elt->xfer); -}