/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2008 Zmanda Inc.
+ * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
*
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
*
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
+ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
-#include "amxfer.h"
#include "amanda.h"
-#include "arglist.h"
+#include "amxfer.h"
/* parent class for XferElement */
static GObjectClass *parent_class = NULL;
xe->output_mech = XFER_MECH_NONE;
xe->input_mech = XFER_MECH_NONE;
xe->upstream = xe->downstream = NULL;
- xe->input_fd = xe->output_fd = -1;
+ xe->_input_fd = xe->_output_fd = -1;
xe->repr = NULL;
}
-static void
+static gboolean
xfer_element_setup_impl(
XferElement *elt G_GNUC_UNUSED)
{
+ return TRUE; /* success */
+}
+
+static gboolean
+xfer_element_set_size_impl(
+ XferElement *elt G_GNUC_UNUSED,
+ gint64 size G_GNUC_UNUSED)
+{
+ elt->size = size;
+
+ return TRUE; /* success */
}
static gboolean
xfer_element_start_impl(
XferElement *elt G_GNUC_UNUSED)
{
- return FALSE;
+ return FALSE; /* will not send XMSG_DONE */
}
static gboolean
{
}
+static xfer_element_mech_pair_t *
+xfer_element_get_mech_pairs_impl(
+ XferElement *elt)
+{
+ return XFER_ELEMENT_GET_CLASS(elt)->mech_pairs;
+}
+
static char *
xfer_element_repr_impl(
XferElement *elt)
{
if (!elt->repr) {
- elt->repr = newvstrallocf(elt->repr, "<%s@%p>",
+ elt->repr = newvstrallocf(elt->repr, "<%s@%p>",
G_OBJECT_TYPE_NAME(G_OBJECT(elt)),
elt);
}
GObject * obj_self)
{
XferElement *elt = XFER_ELEMENT(obj_self);
+ gint fd;
/* free the repr cache */
if (elt->repr) g_free(elt->repr);
+ /* close up the input/output file descriptors, being careful to do so
+ * atomically, and making any errors doing so into mere warnings */
+ fd = xfer_element_swap_input_fd(elt, -1);
+ if (fd != -1 && close(fd) != 0)
+ g_warning("error closing fd %d: %s", fd, strerror(errno));
+ fd = xfer_element_swap_output_fd(elt, -1);
+ if (fd != -1 && close(fd) != 0)
+ g_warning("error closing fd %d: %s", fd, strerror(errno));
+
/* chain up */
G_OBJECT_CLASS(parent_class)->finalize(obj_self);
}
klass->repr = xfer_element_repr_impl;
klass->setup = xfer_element_setup_impl;
+ klass->set_size = xfer_element_set_size_impl;
klass->start = xfer_element_start_impl;
klass->cancel = xfer_element_cancel_impl;
klass->pull_buffer = xfer_element_pull_buffer_impl;
klass->push_buffer = xfer_element_push_buffer_impl;
+ klass->get_mech_pairs = xfer_element_get_mech_pairs_impl;
goc->finalize = xfer_element_finalize;
return XFER_ELEMENT_GET_CLASS(elt)->repr(elt);
}
-void
+gboolean
xfer_element_setup(
XferElement *elt)
{
- XFER_ELEMENT_GET_CLASS(elt)->setup(elt);
+ return XFER_ELEMENT_GET_CLASS(elt)->setup(elt);
+}
+
+gboolean
+xfer_element_set_size(
+ XferElement *elt,
+ gint64 size)
+{
+ return XFER_ELEMENT_GET_CLASS(elt)->set_size(elt, size);
}
gboolean
XferElement *elt,
size_t *size)
{
+ /* Make sure that the xfer is running before calling upstream's
+ * pull_buffer method; this avoids a race condition where upstream
+ * hasn't finished its xfer_element_start yet, and isn't ready for
+ * a pull */
+ if (elt->xfer->status == XFER_START)
+ wait_until_xfer_running(elt->xfer);
+
return XFER_ELEMENT_GET_CLASS(elt)->pull_buffer(elt, size);
}
gpointer buf,
size_t size)
{
+ /* There is no race condition with push_buffer, because downstream
+ * elements are started first. */
XFER_ELEMENT_GET_CLASS(elt)->push_buffer(elt, buf, size);
}
+xfer_element_mech_pair_t *
+xfer_element_get_mech_pairs(
+ XferElement *elt)
+{
+ return XFER_ELEMENT_GET_CLASS(elt)->get_mech_pairs(elt);
+}
+
/****
* Utilities
*/
void
-xfer_element_drain_by_pulling(
+xfer_element_drain_buffers(
XferElement *upstream)
{
gpointer buf;
}
void
-xfer_element_drain_by_reading(
+xfer_element_drain_fd(
int fd)
{
size_t len;
}
}
-xfer_status
-wait_until_xfer_cancelled(
- Xfer *xfer)
-{
- xfer_status seen_status;
- g_assert(xfer != NULL);
-
- g_mutex_lock(xfer->status_mutex);
- while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
- g_cond_wait(xfer->status_cond, xfer->status_mutex);
- seen_status = xfer->status;
- g_mutex_unlock(xfer->status_mutex);
-
- return seen_status;
-}
-
-void
-xfer_element_handle_error(
- XferElement *elt,
- const char *fmt,
- ...)
-{
- va_list argp;
- XMsg *msg;
-
- g_assert(elt != NULL);
- g_assert(elt->xfer != NULL);
-
- msg = xmsg_new(elt, XMSG_ERROR, 0);
-
- arglist_start(argp, fmt);
- msg->message = g_strdup_vprintf(fmt, argp);
- arglist_end(argp);
-
- /* send the XMSG_ERROR */
- xfer_queue_message(elt->xfer, msg);
-
- /* cancel the transfer */
- xfer_cancel(elt->xfer);
-
- /* and wait for the cancellation to take effect */
- wait_until_xfer_cancelled(elt->xfer);
-}