Imported Upstream version 3.1.0
[debian/amanda] / device-src / xfer-dest-device.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amxfer.h"
23 #include "amanda.h"
24 #include "device.h"
25 #include "property.h"
26 #include "xfer-device.h"
27 #include "queueing.h"
28 #include "device-queueing.h"
29
30 /*
31  * Class declaration
32  *
33  * This declaration is entirely private; nothing but xfer_dest_device() references
34  * it directly.
35  */
36
37 GType xfer_dest_device_get_type(void);
38 #define XFER_DEST_DEVICE_TYPE (xfer_dest_device_get_type())
39 #define XFER_DEST_DEVICE(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_device_get_type(), XferDestDevice)
40 #define XFER_DEST_DEVICE_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_device_get_type(), XferDestDevice const)
41 #define XFER_DEST_DEVICE_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_device_get_type(), XferDestDeviceClass)
42 #define IS_XFER_DEST_DEVICE(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_device_get_type ())
43 #define XFER_DEST_DEVICE_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_device_get_type(), XferDestDeviceClass)
44
45 static GObjectClass *parent_class = NULL;
46
47 /*
48  * Main object structure
49  */
50
51 typedef struct XferDestDevice {
52     XferElement __parent__;
53
54     Device *device;
55     size_t max_memory;
56
57     GThread *thread;
58 } XferDestDevice;
59
60 /*
61  * Class definition
62  */
63
64 typedef struct {
65     XferElementClass __parent__;
66 } XferDestDeviceClass;
67
68 /*
69  * Implementation
70  */
71
72 static producer_result_t
73 pull_buffer_producer(gpointer data,
74     queue_buffer_t *buffer,
75     size_t hint_size G_GNUC_UNUSED)
76 {
77     XferDestDevice *self = (XferDestDevice *)data;
78     XferElement *elt = XFER_ELEMENT(self);
79     gpointer buf;
80     size_t size;
81
82     if (elt->cancelled) {
83         /* drain our upstream only if we're expecting an EOF */
84         if (elt->expect_eof) {
85             xfer_element_drain_by_pulling(XFER_ELEMENT(self)->upstream);
86         }
87
88         return PRODUCER_FINISHED;
89     }
90
91     buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size);
92     if (!buf) {
93         return PRODUCER_FINISHED;
94     }
95
96     /* queueing recycles allocated buffers back to us, but we don't need them.. */
97     amfree(buffer->data);
98     buffer->data = buf;
99     buffer->alloc_size = buffer->data_size = size;
100     buffer->offset = 0;
101
102     return PRODUCER_MORE;
103 }
104
105 static gpointer
106 queueing_thread(
107     gpointer data)
108 {
109     XferDestDevice *self = (XferDestDevice *)data;
110     XferElement *elt = (XferElement *)self;
111     queue_result_flags result;
112     GValue val;
113     StreamingRequirement streaming_mode;
114     size_t block_size;
115     size_t max_memory;
116
117     /* Get the device's parameters */
118     bzero(&val, sizeof(val));
119     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
120         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
121         g_warning("XferDestDevice Couldn't get streaming type for %s", self->device->device_name);
122         streaming_mode = STREAMING_REQUIREMENT_REQUIRED;
123     } else {
124         streaming_mode = g_value_get_enum(&val);
125     }
126
127     block_size = self->device->block_size;
128
129     max_memory = self->max_memory || DEFAULT_MAX_BUFFER_MEMORY;
130
131     /* this thread creates two other threads (consumer and producer) and
132      * blocks waiting for them to finish.  TODO: when taper no longer uses
133      * queueing, merge the queueing functionality here */
134     result =
135         do_consumer_producer_queue_full(pull_buffer_producer, data,
136                                         device_write_consumer, self->device,
137                                         block_size, max_memory,
138                                         streaming_mode);
139
140     /* finish the file explicitly */
141     if (!(self->device->status & DEVICE_STATUS_DEVICE_ERROR))
142         device_finish_file(self->device);
143
144     if (result != QUEUE_SUCCESS) {
145         /* note that our producer never returns an error */
146
147         if ((result & QUEUE_CONSUMER_ERROR)
148                 && (self->device->status != DEVICE_STATUS_SUCCESS)) {
149             xfer_cancel_with_error(elt, "%s: %s",
150                     self->device->device_name, device_error_or_status(self->device));
151             wait_until_xfer_cancelled(elt->xfer);
152         } else {
153             xfer_cancel_with_error(elt, _("%s: internal error"),
154                     xfer_element_repr(elt));
155             wait_until_xfer_cancelled(elt->xfer);
156         }
157
158         /* and drain our upstream, since the queueing loop is done */
159         if (elt->expect_eof)
160             xfer_element_drain_by_pulling(elt->upstream);
161     }
162
163     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
164
165     return NULL;
166 }
167
168 static gboolean
169 start_impl(
170     XferElement *elt)
171 {
172     XferDestDevice *self = (XferDestDevice *)elt;
173     self->thread = g_thread_create(queueing_thread, (gpointer)self, FALSE, NULL);
174     return TRUE;
175 }
176
177 static void
178 instance_init(
179     XferElement *elt)
180 {
181     elt->can_generate_eof = TRUE;
182 }
183
184 static void
185 class_init(
186     XferDestDeviceClass * selfc)
187 {
188     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
189     static xfer_element_mech_pair_t mech_pairs[] = {
190         { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 0, 0},
191         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
192     };
193
194     klass->start = start_impl;
195
196     klass->perl_class = "Amanda::Xfer::Dest::Device";
197     klass->mech_pairs = mech_pairs;
198
199     parent_class = g_type_class_peek_parent(selfc);
200 }
201
202 GType
203 xfer_dest_device_get_type (void)
204 {
205     static GType type = 0;
206
207     if G_UNLIKELY(type == 0) {
208         static const GTypeInfo info = {
209             sizeof (XferDestDeviceClass),
210             (GBaseInitFunc) NULL,
211             (GBaseFinalizeFunc) NULL,
212             (GClassInitFunc) class_init,
213             (GClassFinalizeFunc) NULL,
214             NULL /* class_data */,
215             sizeof (XferDestDevice),
216             0 /* n_preallocs */,
217             (GInstanceInitFunc) instance_init,
218             NULL
219         };
220
221         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestDevice", &info, 0);
222     }
223
224     return type;
225 }
226
227 /* create an element of this class; prototype is in xfer-device.h */
228 XferElement *
229 xfer_dest_device(
230     Device *device,
231     size_t max_memory)
232 {
233     XferDestDevice *self = (XferDestDevice *)g_object_new(XFER_DEST_DEVICE_TYPE, NULL);
234     XferElement *elt = XFER_ELEMENT(self);
235
236     g_assert(device != NULL);
237
238     self->device = device;
239     self->max_memory = max_memory;
240
241     return elt;
242 }