Imported Upstream version 3.1.0
[debian/amanda] / common-src / queueing-test.c
1 /*
2  * Copyright (c) 2008,2009 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  *
20  * Author: Dustin J. Mitchell <dustin@zmanda.com>
21  */
22
23 #include "amanda.h"
24 #include "testutils.h"
25 #include "glib-util.h"
26 #include "queueing.h"
27
28 /****
29  * Test a simple queue
30  */
31
32 struct test_queue_simple_data {
33     size_t bytes_to_produce;
34
35     /* The counters here are used to produce a slowly changing 
36      * bytesequence, which should not align with block or buffer
37      * boundaries. */
38     size_t bytes_produced;
39     guint32 producer_counter;
40
41     size_t bytes_consumed;
42     guint32 consumer_counter;
43 };
44
45 static producer_result_t
46 test_queue_simple_producer(
47     gpointer data,
48     queue_buffer_t *buffer,
49     size_t hint_size)
50 {
51     struct test_queue_simple_data *d = (struct test_queue_simple_data *)data;
52     size_t to_write = hint_size;
53     size_t i;
54
55     /* just for fun, write a little bit more sometimes */
56     to_write += d->producer_counter % 50;
57
58     /* but not too much */
59     if (to_write > d->bytes_to_produce - d->bytes_produced)
60         to_write = d->bytes_to_produce - d->bytes_produced;
61
62     /* make sure the buffer is big enough */
63     if (buffer->data == NULL) {
64         buffer->data = g_malloc(to_write);
65         buffer->alloc_size = to_write;
66     } else if (buffer->alloc_size < to_write) {
67         buffer->data = g_realloc(buffer->data, to_write);
68         buffer->alloc_size = to_write;
69     }
70     /* g_debug("Producing %zd bytes in %p (@%p)", to_write, buffer, buffer->data); */
71
72     /* fill in the data with some random junk */
73     for (i = 0; i < to_write; i++) {
74         buffer->data[i] = (char)(d->producer_counter / 7 + (d->producer_counter >> 10));
75         d->producer_counter++;
76     }
77
78     /* and call it a day */
79     buffer->offset = 0;
80     buffer->data_size = to_write;
81     d->bytes_produced += to_write;
82     return d->bytes_produced >= d->bytes_to_produce?
83          PRODUCER_FINISHED
84        : PRODUCER_MORE;
85 }
86
87 static ssize_t
88 test_queue_simple_consumer(
89     gpointer data,
90     queue_buffer_t *buffer)
91 {
92     struct test_queue_simple_data *d = (struct test_queue_simple_data *)data;
93     size_t to_read = buffer->data_size;
94     size_t i;
95
96     g_assert(buffer->data != NULL);
97     g_assert(buffer->data_size != 0);
98
99     /* let's not read it all, to make sure that we get called back with the
100      * remainder */
101     to_read = buffer->data_size;
102     if (to_read > 1000) to_read = 1000;
103
104     /* verify the contents of the buffer */
105     /* g_debug("Consuming %zd bytes starting at %d in %p (@%p)", to_read, buffer->offset, buffer, buffer->data); */
106     for (i = 0; i < to_read; i++) {
107         char expected = d->consumer_counter / 7 + (d->consumer_counter >> 10);
108         if (buffer->data[buffer->offset + i] != expected) {
109             tu_dbg("expected %d, but got %d at byte position %zd\n",
110                 (int)expected, buffer->data[i], i);
111             return -1;
112         }
113         d->consumer_counter++;
114     }
115     d->bytes_consumed += to_read;
116
117     return to_read;
118 }
119
120 static int
121 test_queue_simple(StreamingRequirement sr)
122 {
123     queue_result_flags qr;
124     gboolean success = TRUE;
125
126     struct test_queue_simple_data d = {
127         10*1024*1024, /* bytes_to_produce */
128         0, /* bytes_produced */
129         0, /* producer_counter */
130         0, /* bytes_consumed */
131         0 /* consumer_counter */
132     };
133
134     qr = do_consumer_producer_queue_full(
135         test_queue_simple_producer, (gpointer)&d,
136         test_queue_simple_consumer, (gpointer)&d,
137         10230, /* almost 10k */
138         3*1024*1024, /* 3M */
139         sr);
140
141     if (qr != QUEUE_SUCCESS) {
142         tu_dbg("Expected result QUEUE_SUCCESS (%d); got %d\n",
143             QUEUE_SUCCESS, qr);
144         success = FALSE;
145     }
146
147     if (d.bytes_produced != d.bytes_to_produce) {
148         tu_dbg("Expected to produce %zd bytes; produced %zd\n",
149             d.bytes_to_produce, d.bytes_produced);
150         success = FALSE;
151     }
152
153     if (d.bytes_consumed != d.bytes_to_produce) {
154         tu_dbg("Expected to consume %zd bytes; consumed %zd\n",
155             d.bytes_to_produce, d.bytes_consumed);
156         success = FALSE;
157     }
158
159     return success;
160 }
161
162 static int
163 test_queue_simple_STREAMING_REQUIREMENT_NONE(void)
164 {
165     return test_queue_simple(STREAMING_REQUIREMENT_NONE);
166 }
167
168 static int
169 test_queue_simple_STREAMING_REQUIREMENT_DESIRED(void)
170 {
171     return test_queue_simple(STREAMING_REQUIREMENT_DESIRED);
172 }
173
174 static int
175 test_queue_simple_STREAMING_REQUIREMENT_REQUIRED(void)
176 {
177     return test_queue_simple(STREAMING_REQUIREMENT_REQUIRED);
178 }
179
180 /****
181  * Test fd_reader and fd_writer
182  */
183
184 #define TEST_FD_CONSUMER_PRODUCER_BLOCKS (1024)
185
186 static gpointer
187 data_producer_thread(gpointer d)
188 {
189     int fd = GPOINTER_TO_INT(d);
190     char buf[1024];
191     size_t i;
192     int block;
193
194     /* fill in the buffer with some stuff */
195     for (i = 0; i < (int)sizeof(buf); i++) {
196         buf[i] = (char)i;
197     }
198
199     /* and write it out in blocks */
200     for (block = 0; block < TEST_FD_CONSUMER_PRODUCER_BLOCKS; block++) {
201         size_t written = 0;
202         while (written < sizeof(buf)) {
203             int len = write(fd, buf + written, sizeof(buf) - written);
204             if (len < 0) {
205                 perror("writing pipe to fd_read_producer");
206                 close(fd);
207                 return GINT_TO_POINTER(0);
208             }
209             written += len;
210         }
211     }
212
213     close(fd);
214     return GINT_TO_POINTER(1);
215 }
216
217 static gpointer
218 data_consumer_thread(gpointer d)
219 {
220     int fd = GPOINTER_TO_INT(d);
221     char buf[1024];
222     size_t i;
223     int block;
224
225     /* and read it in in blocks */
226     for (block = 0; block < TEST_FD_CONSUMER_PRODUCER_BLOCKS; block++) {
227         size_t bytes_read = 0;
228         while (bytes_read < sizeof(buf)) {
229             int len = read(fd, buf + bytes_read, sizeof(buf) - bytes_read);
230             if (len < 0) {
231                 perror("reading pipe from fd_write_consumer");
232                 return NULL;
233             }
234             bytes_read += len;
235         }
236
237         /* verify the block */
238         for (i = 0; i < (int)sizeof(buf); i++) {
239             if (buf[i] != (char)i) {
240                 tu_dbg("result data does not match input; block %d byte %zd", block, i);
241                 close(fd);
242                 return GINT_TO_POINTER(0);
243             }
244         }
245     }
246
247     close(fd);
248     return GINT_TO_POINTER(1);
249 }
250
251 static int
252 test_fd_consumer_producer(void)
253 {
254     gboolean success;
255     GThread *rth, *wth;
256     int input_pipe[2];
257     int output_pipe[2];
258     queue_fd_t queue_read = {0, NULL};
259     queue_fd_t queue_write = {0, NULL};
260
261     /* create pipes and hook up threads to them */
262     if (pipe(input_pipe) < 0) {
263         perror("pipe(input_pipe)");
264         return FALSE;
265     }
266     if (pipe(output_pipe) < 0) {
267         perror("pipe(output_pipe)");
268         return FALSE;
269     }
270
271     wth = g_thread_create(data_producer_thread, GINT_TO_POINTER(input_pipe[1]), TRUE, NULL);
272     rth = g_thread_create(data_consumer_thread, GINT_TO_POINTER(output_pipe[0]), TRUE, NULL);
273
274     /* run the queue */
275     queue_read.fd = input_pipe[0];
276     queue_write.fd = output_pipe[1];
277     success = do_consumer_producer_queue(
278         fd_read_producer, &queue_read,
279         fd_write_consumer, &queue_write);
280     if (!success)
281         tu_dbg("do_consumer_producer_queue returned FALSE");
282
283     /* and examine the results */
284     success = GPOINTER_TO_INT(g_thread_join(wth)) && success;
285     success = GPOINTER_TO_INT(g_thread_join(rth)) && success;
286
287     /* close stuff up */
288     close(input_pipe[0]);
289     close(input_pipe[1]);
290     close(output_pipe[0]);
291     close(output_pipe[1]);
292
293     return success;
294 }
295
296 /*
297  * Main driver
298  */
299
300 int
301 main(int argc, char **argv)
302 {
303     static TestUtilsTest tests[] = {
304         TU_TEST(test_queue_simple_STREAMING_REQUIREMENT_NONE, 90),
305         TU_TEST(test_queue_simple_STREAMING_REQUIREMENT_DESIRED, 90),
306         TU_TEST(test_queue_simple_STREAMING_REQUIREMENT_REQUIRED, 90),
307         TU_TEST(test_fd_consumer_producer, 120), /* runs slowly on old kernels */
308         TU_END()
309     };
310
311     glib_init();
312
313     return testutils_run_tests(argc, argv, tests);
314 }