385fb76ee1eaecb03916d27894254fa642f67b45
[debian/amanda] / xfer-src / dest-device.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008 Zmanda Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #include "amxfer.h"
21 #include "amanda.h"
22 #include "queueing.h"
23 #include "device-queueing.h"
24
25 /*
26  * Class declaration
27  *
28  * This declaration is entirely private; nothing but xfer_dest_device() references
29  * it directly.
30  */
31
32 GType xfer_dest_device_get_type(void);
33 #define XFER_DEST_DEVICE_TYPE (xfer_dest_device_get_type())
34 #define XFER_DEST_DEVICE(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_device_get_type(), XferDestDevice)
35 #define XFER_DEST_DEVICE_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_device_get_type(), XferDestDevice const)
36 #define XFER_DEST_DEVICE_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_device_get_type(), XferDestDeviceClass)
37 #define IS_XFER_DEST_DEVICE(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_device_get_type ())
38 #define XFER_DEST_DEVICE_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_device_get_type(), XferDestDeviceClass)
39
40 static GObjectClass *parent_class = NULL;
41
42 /*
43  * Main object structure
44  */
45
46 typedef struct XferDestDevice {
47     XferElement __parent__;
48
49     Device *device;
50     size_t max_memory;
51
52     GThread *thread;
53 } XferDestDevice;
54
55 /*
56  * Class definition
57  */
58
59 typedef struct {
60     XferElementClass __parent__;
61 } XferDestDeviceClass;
62
63 /*
64  * Implementation
65  */
66
67 static producer_result_t
68 pull_buffer_producer(gpointer data,
69     queue_buffer_t *buffer,
70     size_t hint_size G_GNUC_UNUSED)
71 {
72     XferDestDevice *self = (XferDestDevice *)data;
73     XferElement *elt = XFER_ELEMENT(self);
74     gpointer buf;
75     size_t size;
76
77     if (elt->cancelled) {
78         /* drain our upstream only if we're expecting an EOF */
79         if (elt->expect_eof) {
80             xfer_element_drain_by_pulling(XFER_ELEMENT(self)->upstream);
81         }
82
83         return PRODUCER_FINISHED;
84     }
85
86     buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size);
87     if (!buf) {
88         return PRODUCER_FINISHED;
89     }
90
91     /* queueing recycles allocated buffers back to us, but we don't need them.. */
92     amfree(buffer->data);
93     buffer->data = buf;
94     buffer->alloc_size = buffer->data_size = size;
95     buffer->offset = 0;
96
97     return PRODUCER_MORE;
98 }
99
100 static gpointer
101 queueing_thread(
102     gpointer data)
103 {
104     XferDestDevice *self = (XferDestDevice *)data;
105     XferElement *elt = (XferElement *)self;
106     queue_result_flags result;
107     GValue val;
108     StreamingRequirement streaming_mode;
109     size_t block_size;
110     size_t max_memory;
111
112     /* Get the device's parameters */
113     bzero(&val, sizeof(val));
114     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
115         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
116         g_warning("XferDestDevice Couldn't get streaming type for %s", self->device->device_name);
117         streaming_mode = STREAMING_REQUIREMENT_REQUIRED;
118     } else {
119         streaming_mode = g_value_get_enum(&val);
120     }
121
122     block_size = self->device->block_size;
123
124     max_memory = self->max_memory || DEFAULT_MAX_BUFFER_MEMORY;
125
126     /* this thread creates two other threads (consumer and producer) and
127      * blocks waiting for them to finish.  TODO: when taper no longer uses
128      * queueing, merge the queueing functionality here */
129     result =
130         do_consumer_producer_queue_full(pull_buffer_producer, data,
131                                         device_write_consumer, self->device,
132                                         block_size, max_memory,
133                                         streaming_mode);
134
135     /* finish the file explicitly */
136     if (!(self->device->status & DEVICE_STATUS_DEVICE_ERROR))
137         device_finish_file(self->device);
138
139     if (result != QUEUE_SUCCESS) {
140         /* note that our producer never returns an error */
141
142         if ((result & QUEUE_CONSUMER_ERROR)
143                 && (self->device->status != DEVICE_STATUS_SUCCESS)) {
144             xfer_element_handle_error(elt, "%s: %s",
145                     self->device->device_name, device_error_or_status(self->device));
146         } else {
147             xfer_element_handle_error(elt, _("%s: internal error"),
148                     xfer_element_repr(elt));
149         }
150
151         /* and drain our upstream, since the queueing loop is done */
152         if (elt->expect_eof)
153             xfer_element_drain_by_pulling(elt->upstream);
154     }
155
156     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
157
158     return NULL;
159 }
160
161 static gboolean
162 start_impl(
163     XferElement *elt)
164 {
165     XferDestDevice *self = (XferDestDevice *)elt;
166     self->thread = g_thread_create(queueing_thread, (gpointer)self, FALSE, NULL);
167     return TRUE;
168 }
169
170 static void
171 instance_init(
172     XferElement *elt)
173 {
174     elt->can_generate_eof = TRUE;
175 }
176
177 static void
178 class_init(
179     XferDestDeviceClass * selfc)
180 {
181     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
182     static xfer_element_mech_pair_t mech_pairs[] = {
183         { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 0, 0},
184         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
185     };
186
187     klass->start = start_impl;
188
189     klass->perl_class = "Amanda::Xfer::Dest::Device";
190     klass->mech_pairs = mech_pairs;
191
192     parent_class = g_type_class_peek_parent(selfc);
193 }
194
195 GType
196 xfer_dest_device_get_type (void)
197 {
198     static GType type = 0;
199
200     if G_UNLIKELY(type == 0) {
201         static const GTypeInfo info = {
202             sizeof (XferDestDeviceClass),
203             (GBaseInitFunc) NULL,
204             (GBaseFinalizeFunc) NULL,
205             (GClassInitFunc) class_init,
206             (GClassFinalizeFunc) NULL,
207             NULL /* class_data */,
208             sizeof (XferDestDevice),
209             0 /* n_preallocs */,
210             (GInstanceInitFunc) instance_init,
211             NULL
212         };
213
214         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestDevice", &info, 0);
215     }
216
217     return type;
218 }
219
220 /* create an element of this class; prototype is in xfer-element.h */
221 XferElement *
222 xfer_dest_device(
223     Device *device,
224     size_t max_memory)
225 {
226     XferDestDevice *self = (XferDestDevice *)g_object_new(XFER_DEST_DEVICE_TYPE, NULL);
227     XferElement *elt = XFER_ELEMENT(self);
228
229     g_assert(device != NULL);
230
231     self->device = device;
232     self->max_memory = max_memory;
233
234     return elt;
235 }