+
+xfer_status
+wait_until_xfer_cancelled(
+ Xfer *xfer)
+{
+ xfer_status seen_status;
+ g_assert(xfer != NULL);
+
+ g_mutex_lock(xfer->status_mutex);
+ while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
+ g_cond_wait(xfer->status_cond, xfer->status_mutex);
+ seen_status = xfer->status;
+ g_mutex_unlock(xfer->status_mutex);
+
+ return seen_status;
+}
+
+xfer_status
+wait_until_xfer_running(
+ Xfer *xfer)
+{
+ xfer_status seen_status;
+ g_assert(xfer != NULL);
+
+ g_mutex_lock(xfer->status_mutex);
+ while (xfer->status == XFER_START)
+ g_cond_wait(xfer->status_cond, xfer->status_mutex);
+ seen_status = xfer->status;
+ g_mutex_unlock(xfer->status_mutex);
+
+ return seen_status;
+}
+
+void
+xfer_cancel_with_error(
+ XferElement *elt,
+ const char *fmt,
+ ...)
+{
+ va_list argp;
+ XMsg *msg;
+
+ g_assert(elt != NULL);
+ g_assert(elt->xfer != NULL);
+
+ msg = xmsg_new(elt, XMSG_ERROR, 0);
+
+ arglist_start(argp, fmt);
+ msg->message = g_strdup_vprintf(fmt, argp);
+ arglist_end(argp);
+
+ /* send the XMSG_ERROR */
+ xfer_queue_message(elt->xfer, msg);
+
+ /* cancel the transfer */
+ xfer_cancel(elt->xfer);
+}
+
+gint
+xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd)
+{
+ gint rv;
+
+ if (xfer)
+ g_mutex_lock(xfer->fd_mutex);
+ rv = *fdp;
+ *fdp = newfd;
+ if (xfer)
+ g_mutex_unlock(xfer->fd_mutex);
+
+ return rv;
+}