Merge branch 'upstream'
[debian/amanda] / server-src / taper.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1998, 2000 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26 /* $Id: taper.c 6512 2007-05-24 17:00:24Z ian $
27  *
28  * moves files from holding disk to tape, or from a socket to tape
29  */
30
31 /* FIXME: This file needs to use gettext. */
32
33 #include <glib.h>
34 #include "physmem.h"
35
36 #include "changer.h"
37 #include "clock.h"
38 #include "conffile.h"
39 #include "device.h"
40 #include "logfile.h"
41 #include "server_util.h"
42 #include "stream.h"
43 #include "tapefile.h"
44 #include "taperscan.h"
45 #include "taper-source.h"
46 #include "timestamp.h"
47 #include "token.h"
48 #include "version.h"
49
50 /* FIXME: This should not be here. */
51 #define CONNECT_TIMEOUT (2*60)
52
53 /* Use this instead of global variables, so that we are reentrant. */
54 typedef struct {
55     Device * device;
56     char * driver_start_time;
57     int    cur_tape;
58     char * next_tape_label;
59     char * next_tape_device;
60     taper_scan_tracker_t * taper_scan_tracker;
61 } taper_state_t;
62
63 typedef struct {
64     char * handle;
65     char * hostname;
66     char * diskname;
67     int level;
68     char * timestamp;
69     char * id_string;
70     TaperSource * source;
71     int current_part;
72     GTimeVal total_time;
73     guint64 total_bytes;
74 } dump_info_t;
75
76 static gboolean label_new_tape(taper_state_t * state, dump_info_t * dump_info);
77
78 static void init_taper_state(taper_state_t* state) {
79     state->device = NULL;
80     state->driver_start_time = NULL;
81     state->taper_scan_tracker = taper_scan_tracker_new();
82 }
83
84 static void cleanup(taper_state_t * state) {
85     amfree(state->driver_start_time);
86     amfree(state->next_tape_label);
87     amfree(state->next_tape_device);
88     taper_scan_tracker_free(state->taper_scan_tracker);
89     if (state->device != NULL) {
90         g_object_unref(state->device);
91         state->device = NULL;
92     }
93 }
94
95 static void free_dump_info(dump_info_t * info) {
96     amfree(info->handle);
97     amfree(info->hostname);
98     amfree(info->diskname);
99     amfree(info->timestamp);
100     amfree(info->id_string);
101     if (info->source != NULL) {
102         g_object_unref(info->source);
103         info->source = NULL;
104     }
105 }
106
107 /* Validate that a command has the proper number of arguments, and
108    print a meaningful error message if not. It returns only if the
109    check is successful. */
110 static void validate_args(cmd_t cmd, struct cmdargs * args,
111                           char ** argnames) {
112     int i;
113     
114     for (i = 0; argnames[i] != NULL; i ++) {
115         if (i > args->argc) {
116             error("error [taper %s: not enough args: %s]",
117                   cmdstr[cmd], argnames[i]);
118         }
119     }
120     if (i < args->argc) {
121         error("error [taper %s: Too many args: Got %d, expected %d.]",
122               cmdstr[cmd], args->argc, i);
123     }
124 }
125
126 /* Open a socket to the dumper. Returns TRUE if everything is happy, FALSE
127    otherwise. */
128 static gboolean open_read_socket(dump_info_t * info, char * split_diskbuffer,
129                              guint64 splitsize, guint64 fallback_splitsize) {
130     in_port_t port = 0;
131     int socket;
132     int fd;
133     int result;
134     struct addrinfo *res;
135
136     if ((result = resolve_hostname("localhost", 0, &res, NULL) != 0)) {
137         char *m;
138         char *q;
139         int save_errno = errno;
140         char *qdiskname = quote_string(info->diskname);
141
142         m = vstralloc("[localhost resolve failure: ",
143                       strerror(save_errno),
144                       "]",
145                       NULL);
146         q = squote(m);
147         putresult(TAPE_ERROR, "%s %s\n", info->handle, q);
148         log_add(L_FAIL, "%s %s %s %d %s",
149                 info->hostname, qdiskname, info->timestamp,
150                 info->level, q);
151         amfree(qdiskname);
152         amfree(m);
153         amfree(q);
154         return FALSE;
155     }
156
157     socket = stream_server(res->ai_family, &port, 0, STREAM_BUFSIZE, 0);
158     freeaddrinfo(res);
159
160     if (socket < 0) {
161         char *m;
162         char *q;
163         int save_errno = errno;
164         char *qdiskname = quote_string(info->diskname);
165
166         m = vstralloc("[port create failure: ",
167                       strerror(save_errno),
168                       "]",
169                       NULL);
170         q = squote(m);
171         putresult(TAPE_ERROR, "%s %s\n", info->handle, q);
172         log_add(L_FAIL, "%s %s %s %d %s",
173                 info->hostname, qdiskname, info->timestamp,
174                 info->level, q);
175         amfree(qdiskname);
176         amfree(m);
177         amfree(q);
178         return FALSE;
179     }
180
181     putresult(PORT, "%d\n", port);
182
183     fd = stream_accept(socket, CONNECT_TIMEOUT, 0, STREAM_BUFSIZE);
184
185     if (fd < 0) {
186         char *m, *q;
187         int save_errno = errno;
188         char *qdiskname = quote_string(info->diskname);
189         m = vstralloc("[port connect failure: ",
190                       strerror(save_errno),
191                       "]",
192                       NULL);
193         q = squote(m);
194         putresult(TAPE_ERROR, "%s %s\n", info->handle, q);
195         log_add(L_FAIL, "%s %s %s %d %s",
196                 info->hostname, qdiskname, info->timestamp,
197                 info->level, q);
198         amfree(qdiskname);
199         aclose(socket);
200         amfree(m);
201         amfree(q);
202         return FALSE;
203     } else {
204         aclose(socket);
205     }
206
207     info->source = taper_source_new(info->handle, PORT_WRITE, NULL, fd,
208                                     split_diskbuffer, splitsize,
209                                     fallback_splitsize);
210     /* FIXME: This should be handled properly. */
211     g_assert(info->source != NULL);
212     return TRUE;
213 }
214
215 typedef struct {
216     ConsumerFunctor next_consumer;
217     gpointer next_consumer_data;
218     guint64 bytes_written;
219 } CountingConsumerData;
220
221 /* A ConsumerFunctor. This consumer just passes its arguments on to a
222    second consumer, but counts the number of bytes successfully
223    written. */
224 static int counting_consumer(gpointer user_data, queue_buffer_t * buffer) {
225     int result;
226     CountingConsumerData * data = user_data;
227
228     result = data->next_consumer(data->next_consumer_data, buffer);
229     
230     if (result > 0) {
231         data->bytes_written += result;
232     }
233
234     return result;
235 }
236
237 static gboolean boolean_prolong(void * data) {
238     if (data == NULL) {
239         return TRUE; /* Do not interrupt. */
240     } else {
241         return *(gboolean*)data;
242     }
243 }
244
245 /* A (simpler) wrapper around taper_scan(). */
246 static gboolean simple_taper_scan(taper_state_t * state,
247                                   gboolean* prolong, char ** error_message) {
248     char ** label = &(state->next_tape_label);
249     char ** device = &(state->next_tape_device);
250     char *timestamp = NULL;
251     int result;
252     result = taper_scan(NULL, label, &timestamp, device,
253                         state->taper_scan_tracker,
254                         CHAR_taperscan_output_callback,
255                         error_message, boolean_prolong, prolong);
256     if (prolong != NULL && !*prolong) {
257         g_fprintf(stderr, _("Cancelled taper scan.\n"));
258         return FALSE;
259     } else if (result < 0) {
260         g_fprintf(stderr, _("Failed taper scan: %s\n"), (*error_message)?(*error_message):_("(no error message)"));
261         amfree(timestamp);
262         return FALSE;
263     } else {
264         g_fprintf(stderr, _("taper: using label `%s' date `%s'\n"), *label,
265                 state->driver_start_time);
266         if (result == 3) {
267             log_add(L_INFO,
268             _("Will write new label `%s' to new (previously non-amanda) tape"),
269                     *label);
270         }
271
272     }
273     amfree(timestamp);
274     return TRUE;
275 }
276
277 typedef struct {
278     taper_state_t * state;
279     gboolean prolong; /* scan stops when this is FALSE. */
280     char *errmsg;
281 } tape_search_request_t;
282
283 /* A GThread that runs taper_scan. */
284 static gpointer tape_search_thread(gpointer data) {
285     tape_search_request_t * request = data;
286
287     if (request->state->next_tape_label != NULL &&
288         request->state->next_tape_device != NULL) {
289         return GINT_TO_POINTER(TRUE);
290     } else {
291         amfree(request->state->next_tape_label);
292         amfree(request->state->next_tape_device);
293     }
294
295     return GINT_TO_POINTER
296         (simple_taper_scan(request->state,
297                            &(request->prolong),
298                            &(request->errmsg)));
299 }
300
301 static void log_taper_scan_errmsg(char * errmsg) {
302     char *c, *c1;
303     if (errmsg == NULL)
304         return;
305
306     c = c1 = errmsg;
307     while (*c != '\0') {
308         if (*c == '\n') {
309             *c = '\0';
310             log_add(L_WARNING,"%s", c1);
311             c1 = c+1;
312         }
313         c++;
314     }
315     if (strlen(c1) > 1 )
316         log_add(L_WARNING,"%s", c1);
317     amfree(errmsg);
318 }
319
320 /* If handle is NULL, then this function assumes that we are in startup mode.
321  * In that case it will wait for a command from driver. If handle is not NULL,
322  * this this function will ask for permission with REQUEST-NEW-TAPE. */
323 static gboolean find_new_tape(taper_state_t * state, dump_info_t * dump) {
324     GThread * tape_search = NULL;
325     tape_search_request_t search_request;
326     gboolean use_threads;
327     cmd_t cmd;
328     struct cmdargs args;
329
330     if (state->device != NULL) {
331         return TRUE;
332     }
333
334     /* We save the value here in case it changes while we're running. */
335     use_threads = g_thread_supported();
336
337     search_request.state = state;
338     search_request.prolong = TRUE;
339     search_request.errmsg = NULL;
340     if (use_threads) {
341         tape_search = g_thread_create(tape_search_thread,
342                                       &search_request, TRUE, NULL);
343     }
344     
345     putresult(REQUEST_NEW_TAPE, "%s\n", dump->handle);
346     cmd = getcmd(&args);
347     switch (cmd) {
348     default:
349         g_fprintf(stderr, "taper: Got odd message from driver, expected NEW-TAPE or NO-NEW-TAPE.\n");
350         /* FALLTHROUGH. */
351     case NEW_TAPE: {
352         gboolean search_result;
353         if (use_threads) {
354             search_result = GPOINTER_TO_INT(g_thread_join(tape_search));
355         } else {
356             search_result =
357                 GPOINTER_TO_INT(tape_search_thread(&search_request));
358         }
359         if (search_result) {
360             /* We don't say NEW_TAPE until we actually write the label. */
361             amfree(search_request.errmsg);
362             return TRUE;
363         } else {
364             putresult(NO_NEW_TAPE, "%s\n", dump->handle);
365             log_taper_scan_errmsg(search_request.errmsg);
366             return FALSE;
367         }
368     }
369     case NO_NEW_TAPE:
370         search_request.prolong = FALSE;
371         if (use_threads) {
372             g_thread_join(tape_search);
373         }
374         return FALSE;
375     }
376 }
377
378 /* Returns TRUE if the old volume details are not the same as the new ones. */
379 static gboolean check_volume_changed(Device * device,
380                                      char * old_label, char * old_timestamp) {
381     /* If one is NULL and the other is not, something changed. */
382     if ((old_label == NULL) != (device->volume_label == NULL))
383         return TRUE;
384     if ((old_timestamp == NULL) != (device->volume_time == NULL))
385         return TRUE;
386     /* If details were not NULL and is now different, we have a difference. */
387     if (old_label != NULL && strcmp(old_label, device->volume_label) != 0)
388         return TRUE;
389     if (old_timestamp != NULL &&
390         strcmp(old_timestamp, device->volume_time) != 0)
391         return TRUE;
392
393     /* If we got here, everything is cool. */
394     return FALSE;
395 }
396
397 static void
398 update_tapelist(
399     taper_state_t *state)
400 {
401     char *tapelist_name = NULL;
402     char *tapelist_name_old = NULL;
403
404     tapelist_name = config_dir_relative(getconf_str(CNF_TAPELIST));
405     if (state->cur_tape == 0) {
406         tapelist_name_old = stralloc2(tapelist_name, ".yesterday");
407     } else {
408         char cur_str[NUM_STR_SIZE];
409         g_snprintf(cur_str, SIZEOF(cur_str), "%d", state->cur_tape - 1);
410         tapelist_name_old = vstralloc(tapelist_name,
411                                       ".today.", cur_str, NULL);
412     }
413
414     if (write_tapelist(tapelist_name_old)) {
415         error("could not write tapelist: %s", strerror(errno));
416         /*NOTREACHED*/
417     }
418     amfree(tapelist_name_old);
419
420     remove_tapelabel(state->device->volume_label);
421     add_tapelabel(state->driver_start_time,
422                   state->device->volume_label);
423     if (write_tapelist(tapelist_name)) {
424         error("could not write tapelist: %s", strerror(errno));
425         /*NOTREACHED*/
426     }
427     amfree(tapelist_name);
428 }
429
430 /* Find and label a new tape, if one is not already open. Returns TRUE
431  * if a tape could be written. */
432 static gboolean find_and_label_new_tape(taper_state_t * state,
433                                         dump_info_t * dump_info) {
434     if (state->device != NULL) {
435         return TRUE;
436     }
437     
438     if (!find_new_tape(state, dump_info)) {
439         return FALSE;
440     }
441
442     return label_new_tape(state, dump_info);
443 }
444
445 static gboolean label_new_tape(taper_state_t * state, dump_info_t * dump_info) {
446     char *old_volume_name = NULL;
447     char *old_volume_time = NULL;
448     tape_search_request_t request;
449     gboolean search_result;
450     ReadLabelStatusFlags status;
451
452     /* If we got here, it means that we have found a tape to label and
453      * have gotten permission from the driver to write it. But we
454      * still can say NO-NEW-TAPE if a problem shows up, and must still
455      * say NEW-TAPE if one doesn't. */
456
457     state->device = device_open(state->next_tape_device);
458     amfree(state->next_tape_device);
459     if (state->device == NULL)
460         goto skip_volume;
461
462     device_set_startup_properties_from_config(state->device);
463
464     /* if we have an error, and are sure it isn't just an unlabeled volume,
465      * then skip this volume */
466     status = device_read_label(state->device);
467     if ((status & ~READ_LABEL_STATUS_VOLUME_UNLABELED) &&
468         !(status & READ_LABEL_STATUS_VOLUME_UNLABELED))
469         goto skip_volume;
470
471     old_volume_name = g_strdup(state->device->volume_label);
472     old_volume_time = g_strdup(state->device->volume_time);
473
474     if (!device_start(state->device, ACCESS_WRITE, state->next_tape_label,
475                       state->driver_start_time)) {
476         gboolean tape_used;
477
478         /* Something broke, see if we can tell if the volume was erased or
479          * not. */
480         g_fprintf(stderr, "taper: Error writing label %s to device %s.\n",
481                 state->next_tape_label, state->device->device_name);
482
483         if (!device_finish(state->device))
484             goto request_new_volume;
485
486         /* This time, if we can't read the label, assume we've overwritten
487          * the volume or otherwise corrupted it */
488         status = device_read_label(state->device);
489         if ((status & ~READ_LABEL_STATUS_VOLUME_UNLABELED) &&
490             !(status & READ_LABEL_STATUS_VOLUME_UNLABELED))
491             goto request_new_volume;
492
493         tape_used = check_volume_changed(state->device, old_volume_name, 
494                                          old_volume_time);
495         if (tape_used) {
496             goto request_new_volume;
497         } else {
498             goto skip_volume;
499         }
500     }
501
502     amfree(old_volume_name);
503     amfree(old_volume_time);
504     amfree(state->next_tape_label);
505
506     update_tapelist(state);
507     state->cur_tape++;
508
509     log_add(L_START, "datestamp %s label %s tape %d",
510             state->driver_start_time, state->device->volume_label,
511             state->cur_tape);
512     putresult(NEW_TAPE, "%s %s\n", dump_info->handle,
513               state->device->volume_label);
514
515     return TRUE;
516
517 request_new_volume:
518     /* Tell the driver we overwrote this volume, even if it was empty, and request
519      * a new volume. */
520     if (state->device) {
521         g_object_unref(state->device);
522         state->device = NULL;
523     }
524
525     putresult(NEW_TAPE, "%s %s\n", dump_info->handle,
526               state->next_tape_label);
527     if (old_volume_name) {
528         log_add(L_WARNING, "Problem writing label '%s' to volume %s, "
529                 "volume may be erased.\n",
530                 state->next_tape_label, old_volume_name);
531     } else {
532         log_add(L_WARNING, "Problem writing label '%s' to new volume, "
533                 "volume may be erased.\n", state->next_tape_label);
534     }
535
536     amfree(state->next_tape_label);
537     amfree(old_volume_name);
538     amfree(old_volume_time);
539
540     return find_and_label_new_tape(state, dump_info);
541
542 skip_volume:
543     /* grab a new volume without talking to the driver again -- we do this if we're
544      * confident we didn't overwrite the last tape we got. */
545     if (state->device) {
546         g_object_unref(state->device);
547         state->device = NULL;
548     }
549
550     if (old_volume_name) {
551         log_add(L_WARNING, "Problem writing label '%s' to volume '%s', "
552                 "old volume data intact\n",
553                 state->next_tape_label, old_volume_name);
554     } else {
555         log_add(L_WARNING, "Problem writing label '%s' to new volume, "
556                 "old volume data intact\n", state->next_tape_label);
557     }
558
559     amfree(state->next_tape_label);
560     amfree(old_volume_name);
561     amfree(old_volume_time);
562
563     request.state = state;
564     request.prolong = TRUE;
565     request.errmsg = NULL;
566     search_result = GPOINTER_TO_INT(tape_search_thread(&request));
567     if (search_result) {
568         amfree(request.errmsg);
569         return label_new_tape(state, dump_info);
570     } else {
571         /* Problem finding a new tape! */
572         log_taper_scan_errmsg(request.errmsg);
573         putresult(NO_NEW_TAPE, "%s\n", dump_info->handle);
574         return FALSE;
575     }
576
577 /* Find out if the dump is PARTIAL or not, and set the proper driver
578    and logfile tags for the dump. */
579 static void find_completion_tags(dump_info_t * dump_info, /* IN */
580                                  cmd_t * result_cmd,      /* OUT */
581                                  logtype_t * result_log   /* OUT */) {
582     if (taper_source_is_partial(dump_info->source)) {
583         *result_cmd = PARTIAL;
584         *result_log = L_PARTIAL;
585     } else {
586         *result_cmd = DONE;
587         *result_log = L_DONE;
588     }
589
590     memcpy(intp, &buf[1], SIZEOF(int));
591     return (int)buf[0];
592 }
593
594 /* Put an L_PARTIAL message to the logfile. */
595 static void put_partial_log(dump_info_t * dump_info, double dump_time,
596                             guint64 dump_kbytes) {
597     char * qdiskname = quote_string(dump_info->diskname);
598
599     log_add(L_PARTIAL, "%s %s %s %d %d [sec %f kb %ju kps %f] \"\"",
600             dump_info->hostname, qdiskname, dump_info->timestamp,
601             dump_info->current_part, dump_info->level, dump_time,
602             (uintmax_t)dump_kbytes, dump_kbytes / dump_time);
603     amfree(qdiskname);
604 }
605
606 /* Figure out what to do after a part attempt. Returns TRUE if another
607    attempt should proceed for this dump; FALSE if we are done. */
608 static gboolean finish_part_attempt(taper_state_t * taper_state,
609                                     dump_info_t * dump_info,
610                                     queue_result_flags queue_result,
611                                     GTimeVal run_time, guint64 run_bytes) {
612     double part_time = g_timeval_to_double(run_time);
613     guint64 part_kbytes = run_bytes / 1024;
614     double part_kbps = run_bytes / (1024 * part_time);
615         
616     char * qdiskname = quote_string(dump_info->diskname);
617
618     if (queue_result == QUEUE_SUCCESS) {
619         dump_info->total_time = timesadd(run_time, dump_info->total_time);
620         dump_info->total_bytes += run_bytes;
621
622         log_add(L_PART, "%s %d %s %s %s %d/%d %d [sec %f kb %ju kps %f]",
623                 taper_state->device->volume_label,
624                 taper_state->device->file, dump_info->hostname, qdiskname,
625                 dump_info->timestamp, dump_info->current_part,
626                 taper_source_predict_parts(dump_info->source),
627                 dump_info->level, part_time, (uintmax_t)part_kbytes, part_kbps);
628         putresult(PARTDONE, "%s %s %d %ju \"[sec %f kb %ju kps %f]\"\n",
629                   dump_info->handle, taper_state->device->volume_label,
630                   taper_state->device->file, (uintmax_t)part_kbytes, part_time,
631                   (uintmax_t)part_kbytes, part_kbps);
632         
633         if (taper_source_get_end_of_data(dump_info->source)) {
634             cmd_t result_cmd;
635             logtype_t result_log;
636             double dump_time = g_timeval_to_double(dump_info->total_time);
637             guint64 dump_kbytes = dump_info->total_bytes / 1024;
638             double dump_kbps = dump_info->total_bytes / (1024 * dump_time);
639
640             find_completion_tags(dump_info, &result_cmd, &result_log);
641
642             g_object_unref(dump_info->source);
643             dump_info->source = NULL;
644         
645             log_add(result_log, "%s %s %s %d %d [sec %f kb %ju kps %f]",
646                     dump_info->hostname, qdiskname, dump_info->timestamp,
647                     dump_info->current_part, dump_info->level, dump_time,
648                     (uintmax_t)dump_kbytes, dump_kbps);
649             putresult(result_cmd, "%s INPUT-GOOD TAPE-GOOD "
650                       "\"[sec %f kb %ju kps %f]\" \"\" \"\"\n",
651                       dump_info->handle, dump_time, (uintmax_t)dump_kbytes,
652                       dump_kbps);
653             
654             amfree(qdiskname);
655             return FALSE;
656         } else if (taper_source_get_end_of_part(dump_info->source)) {
657             taper_source_start_new_part(dump_info->source);
658             dump_info->current_part ++;
659             amfree(qdiskname);
660             return TRUE;
661         }
662         /* If we didn't read EOF or EOP, then an error
663            occured. But we read QUEUE_SUCCESS, so something is
664            b0rked. */
665         g_assert_not_reached();
666     } else {
667         char * volume_label = strdup(taper_state->device->volume_label);
668         int file_number = taper_state->device->file;
669         double dump_time, dump_kbps;
670         guint64 dump_kbytes;
671
672         /* A problem occured. */
673         if (queue_result & QUEUE_CONSUMER_ERROR) {
674             /* Close the device. */
675             device_finish(taper_state->device);
676             g_object_unref(taper_state->device);
677             taper_state->device = NULL;
678         }
679         
680         log_add(L_PARTPARTIAL,
681                 "%s %d %s %s %s %d/%d %d [sec %f kb %ju kps %f] \"\"",
682                 volume_label, file_number, dump_info->hostname, qdiskname,
683                 dump_info->timestamp, dump_info->current_part,
684                 taper_source_predict_parts(dump_info->source),
685                 dump_info->level, part_time, (uintmax_t)part_kbytes, part_kbps);
686         amfree(volume_label);
687         
688         if ((queue_result & QUEUE_CONSUMER_ERROR) &&
689             (!(queue_result & QUEUE_PRODUCER_ERROR)) &&
690             taper_source_seek_to_part_start(dump_info->source)) {
691             /* It is recoverable. */
692             log_add(L_INFO, "Will request retry of failed split part.");
693             if (find_and_label_new_tape(taper_state, dump_info)) {
694                 /* dump_info->current_part is unchanged. */
695                 amfree(qdiskname);
696                 return TRUE;
697             }
698         }
699
700         dump_info->total_time = timesadd(run_time, dump_info->total_time);
701         dump_info->total_bytes += run_bytes;
702         dump_time = g_timeval_to_double(dump_info->total_time);
703         dump_kbytes = dump_info->total_bytes / 1024;
704         dump_kbps = dump_info->total_bytes / (1024 * dump_time);
705         
706         putresult(PARTIAL,
707                   "%s INPUT-%s TAPE-%s "
708                   "\"[sec %f kb %ju kps %f]\" \"\" \"\"\n",
709                   dump_info->handle,
710                   (queue_result & QUEUE_PRODUCER_ERROR) ? "ERROR" : "GOOD",
711                   (queue_result & QUEUE_CONSUMER_ERROR) ? "ERROR" : "GOOD",
712                   dump_time, (uintmax_t)dump_kbytes, dump_kbps);
713         put_partial_log(dump_info, dump_time, dump_kbytes);
714     }
715
716     amfree(qdiskname);
717     return FALSE;
718 }
719
720 /* Generate the actual header structure to write to tape. This means dropping
721  * bits related to the holding disk, and adding bits for split dumps. */
722 static dumpfile_t * munge_headers(dump_info_t * dump_info) {
723     dumpfile_t * rval;
724     int expected_splits;
725     
726     rval = taper_source_get_first_header(dump_info->source);
727
728     if (rval == NULL) {
729         return NULL;
730     }
731
732     rval->cont_filename[0] = '\0';
733
734     expected_splits = taper_source_predict_parts(dump_info->source);
735
736     if (expected_splits != 1) {
737         rval->type = F_SPLIT_DUMPFILE;
738         rval->partnum = dump_info->current_part;
739         rval->totalparts = expected_splits;
740     }
741
742     return rval;
743 }
744
745 /* We call this when we can't find a tape to write data to. This could
746    happen with the first (or only) part of a file, but it could also
747    happen with an intermediate part of a split dump. dump_bytes
748    is 0 if this is the first part of a dump. */
749 static void bail_no_volume(dump_info_t * dump_info) {
750     if (dump_info->total_bytes > 0) {
751         /* Second or later part of a split dump, so PARTIAL message. */
752         double dump_time = g_timeval_to_double(dump_info->total_time);
753         guint64 dump_kbytes = dump_info->total_bytes / 1024;
754         double dump_kbps = dump_kbytes / dump_time;
755         putresult(PARTIAL,
756                   "%s INPUT-GOOD TAPE-ERROR "
757                   "\"[sec %f kb %ju kps %f]\" \"\" \"no new tape\"\n",
758                   dump_info->handle, 
759                   dump_time, (uintmax_t)dump_kbytes, dump_kbps);
760         put_partial_log(dump_info, dump_time, dump_kbytes);
761     } else {
762         char * qdiskname = quote_string(dump_info->diskname);
763         putresult(FAILED,
764                   "%s INPUT-GOOD TAPE-ERROR \"\" \"No new tape.\"\n",
765                   dump_info->handle);
766         log_add(L_FAIL, "%s %s %s %d \"No new tape.\"",
767                 dump_info->hostname, qdiskname, dump_info->timestamp,
768                 dump_info->level);
769         amfree(qdiskname);
770     }
771 }
772
773 /* Link up the TaperSource with the Device, including retries etc. */
774 static void run_device_output(taper_state_t * taper_state,
775                               dump_info_t * dump_info) {
776     GValue val;
777     guint file_number;
778     dump_info->current_part = 1;
779     dump_info->total_time.tv_sec = 0;
780     dump_info->total_time.tv_usec = 0;
781     dump_info->total_bytes = 0;
782
783     for (;;) {
784         GTimeVal start_time, end_time, run_time;
785         StreamingRequirement streaming_mode;
786         queue_result_flags queue_result;
787         CountingConsumerData consumer_data;
788         dumpfile_t *this_header;
789         size_t max_memory;
790         
791         this_header = munge_headers(dump_info);
792         if (this_header == NULL) {
793             char * qdiskname = quote_string(dump_info->diskname);
794             putresult(FAILED,
795              "%s INPUT-ERROR TAPE-GOOD \"Failed reading dump header.\" \"\"\n",
796                       dump_info->handle);
797             log_add(L_FAIL, "%s %s %s %d \"Failed reading dump header.\"",
798                     dump_info->hostname, qdiskname, dump_info->timestamp,
799                     dump_info->level);
800             amfree(qdiskname);
801             return;
802         }            
803
804         if (!find_and_label_new_tape(taper_state, dump_info)) {
805             bail_no_volume(dump_info);
806             amfree(this_header);
807             return;
808         }
809
810         if (!device_start_file(taper_state->device, this_header)) {
811             bail_no_volume(dump_info);
812             amfree(this_header);
813             return;
814         }
815         amfree(this_header);
816
817         bzero(&val, sizeof(val));
818         if (!device_property_get(taper_state->device, PROPERTY_STREAMING, &val)
819             || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
820             g_fprintf(stderr, "taper: Couldn't get streaming type!\n");
821             streaming_mode = STREAMING_REQUIREMENT_REQUIRED;
822         } else {
823             streaming_mode = g_value_get_enum(&val);
824         }
825     
826         file_number = taper_state->device->file;
827
828         consumer_data.next_consumer = device_write_consumer;
829         consumer_data.next_consumer_data = taper_state->device;
830         consumer_data.bytes_written = 0;
831
832         g_get_current_time(&start_time);
833
834         if (getconf_seen(CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
835             max_memory = getconf_size(CNF_DEVICE_OUTPUT_BUFFER_SIZE);
836             if (getconf_seen(CNF_TAPEBUFS)) {
837                 g_fprintf(stderr,
838                         "Configuration directives 'device_output_buffer_size' "
839                         "and \n"
840                         "'tapebufs' are incompatible; using former.\n");
841             }
842         } else if (getconf_seen(CNF_TAPEBUFS)) {
843             max_memory = getconf_int(CNF_TAPEBUFS) *
844                 device_write_max_size(taper_state->device);
845         } else {
846             /* Use default. */
847             max_memory = getconf_size(CNF_DEVICE_OUTPUT_BUFFER_SIZE);
848         }
849
850         queue_result = do_consumer_producer_queue_full
851             (taper_source_producer,
852              dump_info->source,
853              counting_consumer,
854              &consumer_data,
855              device_write_max_size(taper_state->device), max_memory,
856              streaming_mode);
857
858         g_get_current_time(&end_time);
859         run_time = timesub(end_time, start_time);
860
861         /* The device_write_consumer may have closed the file with a short
862          * write, so we only finish here if it needs it. */
863         if (taper_state->device->in_file &&
864             !device_finish_file(taper_state->device)) {
865             queue_result = queue_result | QUEUE_CONSUMER_ERROR;
866         }
867
868         if (!finish_part_attempt(taper_state, dump_info, queue_result,
869                                  run_time, consumer_data.bytes_written)) {
870             break;
871         }
872     }
873 }
874
875 /* Handle a PORT_WRITE command. */
876 static void process_port_write(taper_state_t * state,
877                                struct cmdargs * cmdargs) {
878     dump_info_t dump_state;
879     guint64 splitsize;
880     guint64 fallback_splitsize;
881     char * split_diskbuffer;
882     char * argnames[] = {"command",               /* 1 */
883                          "handle",                /* 2 */
884                          "hostname",              /* 3 */
885                          "diskname",              /* 4 */
886                          "level",                 /* 5 */
887                          "datestamp",             /* 6 */
888                          "splitsize",             /* 7 */
889                          "split_diskbuffer",      /* 8 */
890                          "fallback_splitsize",    /* 9 */
891                           NULL };
892
893     validate_args(PORT_WRITE, cmdargs, argnames);
894
895     dump_state.handle = g_strdup(cmdargs->argv[2]);
896     dump_state.hostname = g_strdup(cmdargs->argv[3]);
897     dump_state.diskname = unquote_string(cmdargs->argv[4]);
898     
899     errno = 0;
900     dump_state.level = strtol(cmdargs->argv[5], NULL, 10);
901     if (errno != 0) {
902         error("error [taper PORT-WRITE: Invalid dump level %s]",
903               cmdargs->argv[5]);
904         g_assert_not_reached();
905     }
906     
907     dump_state.timestamp = strdup(cmdargs->argv[6]);
908
909     errno = 0;
910     splitsize = g_ascii_strtoull(cmdargs->argv[7], NULL, 10);
911     if (errno != 0) {
912         error("error [taper PORT-WRITE: Invalid splitsize %s]",
913               cmdargs->argv[7]);
914         g_assert_not_reached();
915     }
916     
917     if (strcmp(cmdargs->argv[8], "NULL") == 0) {
918         split_diskbuffer = NULL;
919     } else {
920         split_diskbuffer = g_strdup(cmdargs->argv[8]);
921     }
922     
923     errno = 0;
924     fallback_splitsize = g_ascii_strtoull(cmdargs->argv[9], NULL, 10);
925     if (errno != 0) {
926         error("error [taper PORT-WRITE: Invalid fallback_splitsize %s]",
927               cmdargs->argv[9]);
928         g_assert_not_reached();
929     }
930
931     dump_state.id_string = g_strdup_printf("%s:%s.%d", dump_state.hostname,
932                                            dump_state.diskname,
933                                            dump_state.level);
934     
935     if (!open_read_socket(&dump_state, split_diskbuffer, splitsize,
936                           fallback_splitsize)) {
937         free(split_diskbuffer);
938         return;
939     }
940     free(split_diskbuffer);
941
942     run_device_output(state, &dump_state);
943
944     free_dump_info(&dump_state);
945 }
946
947 /* Handle a FILE_WRITE command. */
948 static void process_file_write(taper_state_t * state,
949                                struct cmdargs * cmdargs) {
950     dump_info_t dump_state;
951     char * holding_disk_file;
952     guint64 splitsize;
953     char * argnames[] = {"command",               /* 1 */
954                          "handle",                /* 2 */
955                          "filename",              /* 3 */
956                          "hostname",              /* 4 */
957                          "diskname",              /* 5 */
958                          "level",                 /* 6 */
959                          "datestamp",             /* 7 */
960                          "splitsize",             /* 8 */
961                           NULL };
962
963     validate_args(FILE_WRITE, cmdargs, argnames);
964
965     dump_state.handle = g_strdup(cmdargs->argv[2]);
966     holding_disk_file = unquote_string(cmdargs->argv[3]);
967     dump_state.hostname = g_strdup(cmdargs->argv[4]);
968     dump_state.diskname = unquote_string(cmdargs->argv[5]);
969     
970     errno = 0;
971     dump_state.level = strtol(cmdargs->argv[6], NULL, 10);
972     if (errno != 0) {
973         error("error [taper FILE-WRITE: Invalid dump level %s]",
974               cmdargs->argv[5]);
975         g_assert_not_reached();
976     }
977     
978     dump_state.timestamp = strdup(cmdargs->argv[7]);
979
980     errno = 0;
981     splitsize = g_ascii_strtoull(cmdargs->argv[8], NULL, 10);
982     if (errno != 0) {
983         error("error [taper FILE-WRITE: Invalid splitsize %s]",
984               cmdargs->argv[8]);
985         g_assert_not_reached();
986     }
987
988     dump_state.id_string = g_strdup_printf("%s:%s.%d", dump_state.hostname,
989                                            dump_state.diskname,
990                                            dump_state.level);
991     
992     dump_state.source = taper_source_new(dump_state.handle, FILE_WRITE,
993                                          holding_disk_file, -1,
994                                          NULL, splitsize, -1);
995     /* FIXME: This should be handled properly. */
996     g_assert(dump_state.source != NULL);
997
998     run_device_output(state, &dump_state);
999
1000     free_dump_info(&dump_state);
1001     amfree(holding_disk_file);
1002 }
1003
1004 /* Send QUITTING message to driver and associated logging. Always
1005    returns false. */
1006 static gboolean send_quitting(taper_state_t * state) {
1007     putresult(QUITTING, "\n");
1008     g_fprintf(stderr,"taper: DONE\n");
1009     cleanup(state);
1010     return FALSE;
1011 }
1012
1013 /* This function recieves the START_TAPER command from driver, and
1014    returns the attached timestamp. */
1015 static gboolean find_first_tape(taper_state_t * state) {
1016     cmd_t cmd;
1017     /* Note: cmdargs.argv is never freed. In the entire Amanda codebase. */
1018     struct cmdargs cmdargs;
1019     tape_search_request_t search_request;
1020     GThread * tape_search = NULL;
1021     gboolean use_threads;
1022
1023     /* We save the value here in case it changes while we're running. */
1024     use_threads = g_thread_supported();
1025
1026     search_request.state = state;
1027     search_request.prolong = TRUE;
1028     search_request.errmsg = NULL;
1029     
1030     if (use_threads) {
1031         tape_search = g_thread_create(tape_search_thread,
1032                                       &search_request, TRUE, NULL);
1033     }
1034
1035     cmd = getcmd(&cmdargs);
1036
1037     switch (cmd) {
1038     case START_TAPER: {
1039         gboolean search_result;
1040         state->driver_start_time = strdup(cmdargs.argv[2]);
1041         if (use_threads) {
1042             search_result = GPOINTER_TO_INT(g_thread_join(tape_search));
1043         } else {
1044             search_result =
1045                 GPOINTER_TO_INT(tape_search_thread(&search_request));
1046         }
1047         if (search_result) {
1048             putresult(TAPER_OK, "\n");
1049         } else {
1050             putresult(TAPE_ERROR, "Could not find a tape to use.\n");
1051             log_add(L_ERROR, "no-tape [%s]", "Could not find a tape to use");
1052             if (search_request.errmsg != NULL) {
1053                 char *c, *c1;
1054                 c = c1 = search_request.errmsg;
1055                 while (*c != '\0') {
1056                     if (*c == '\n') {
1057                         *c = '\0';
1058                         log_add(L_WARNING,"%s", c1);
1059                         c1 = c+1;
1060                     }
1061                     c++;
1062                 }
1063                 if (strlen(c1) > 1 )
1064                     log_add(L_WARNING,"%s", c1);
1065             }
1066         }
1067         amfree(search_request.errmsg);
1068         return TRUE;
1069     }
1070     case QUIT:
1071         search_request.prolong = FALSE;
1072         if (use_threads) {
1073             g_thread_join(tape_search);
1074         }
1075         return send_quitting(state);
1076     default:
1077         error("error [file_reader_side cmd %d argc %d]", cmd, cmdargs.argc);
1078     }
1079
1080     g_assert_not_reached();
1081 }
1082
1083 /* In running mode (not startup mode), get a command from driver and
1084    deal with it. */
1085 static gboolean process_driver_command(taper_state_t * state) {
1086     cmd_t cmd;
1087     struct cmdargs cmdargs;
1088     char * q;
1089
1090     /* This will return QUIT if driver has died. */
1091     cmd = getcmd(&cmdargs);
1092     switch (cmd) {
1093     case PORT_WRITE:
1094         /*
1095          * PORT-WRITE
1096          *   handle
1097          *   hostname
1098          *   features
1099          *   diskname
1100          *   level
1101          *   datestamp
1102          *   splitsize
1103          *   split_diskbuffer
1104          */
1105         process_port_write(state, &cmdargs);
1106         break;
1107         
1108     case FILE_WRITE:
1109         /*
1110          * FILE-WRITE
1111          *   handle
1112          *   filename
1113          *   hostname
1114          *   features
1115          *   diskname
1116          *   level
1117          *   datestamp
1118          *   splitsize
1119          */
1120         process_file_write(state, &cmdargs);
1121         break;
1122         
1123     case QUIT:
1124         return send_quitting(state);
1125     default:
1126         if (cmdargs.argc >= 1) {
1127             q = squote(cmdargs.argv[1]);
1128         } else if (cmdargs.argc >= 0) {
1129             q = squote(cmdargs.argv[0]);
1130         } else {
1131             q = stralloc("(no input?)");
1132         }
1133         putresult(BAD_COMMAND, "%s\n", q);
1134         amfree(q);
1135         break;
1136     }
1137
1138     return TRUE;
1139 }
1140
1141 int main(int argc, char ** argv) {
1142     char * tapelist_name;
1143     int have_changer;
1144     taper_state_t state;
1145     config_overwrites_t *cfg_ovr = NULL;
1146     char *cfg_opt = NULL;
1147
1148     /*
1149      * Configure program for internationalization:
1150      *   1) Only set the message locale for now.
1151      *   2) Set textdomain for all amanda related programs to "amanda"
1152      *      We don't want to be forced to support dozens of message catalogs.
1153      */
1154     setlocale(LC_MESSAGES, "C");
1155     textdomain("amanda");
1156     
1157     safe_fd(-1, 0);
1158     set_pname("taper");
1159
1160     dbopen("server");
1161
1162     device_api_init();
1163     init_taper_state(&state);
1164
1165     /* Don't die when child closes pipe */
1166     signal(SIGPIPE, SIG_IGN);
1167
1168     g_fprintf(stderr, _("%s: pid %ld executable %s version %s\n"),
1169             get_pname(), (long) getpid(), argv[0], version());
1170     dbprintf(_("%s: pid %ld executable %s version %s\n"),
1171               get_pname(), (long) getpid(), argv[0], version());
1172
1173     /* Process options */
1174
1175     cfg_ovr = extract_commandline_config_overwrites(&argc, &argv);
1176
1177     if(argc > 2) {
1178         error("Too many arguments!\n");
1179         g_assert_not_reached();
1180     }
1181     if (argc > 1)
1182         cfg_opt = argv[1];
1183     config_init(CONFIG_INIT_EXPLICIT_NAME | CONFIG_INIT_USE_CWD | CONFIG_INIT_FATAL,
1184                 cfg_opt);
1185     apply_config_overwrites(cfg_ovr);
1186
1187     safe_cd();
1188
1189     set_logerror(logerror);
1190
1191     check_running_as(RUNNING_AS_DUMPUSER);
1192
1193     dbrename(config_name, DBG_SUBDIR_SERVER);
1194
1195     tapelist_name = config_dir_relative(getconf_str(CNF_TAPELIST));
1196
1197     if (read_tapelist(tapelist_name) != 0) {
1198         error("could not load tapelist \"%s\"", tapelist_name);
1199         g_assert_not_reached();
1200     }
1201     amfree(tapelist_name);
1202
1203     have_changer = changer_init();
1204     if (have_changer < 0) {
1205         error("changer initialization failed: %s", strerror(errno));
1206         g_assert_not_reached();
1207     }
1208
1209     state.next_tape_label = NULL;
1210     state.next_tape_device = NULL;
1211     state.cur_tape = 0;
1212     
1213     if (!find_first_tape(&state)) {
1214         return EXIT_SUCCESS;
1215     }
1216
1217     while (process_driver_command(&state));
1218     return EXIT_SUCCESS;
1219 }