2 * Copyright (c) 2008,2009 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20 * Author: Dustin J. Mitchell <dustin@zmanda.com>
24 #include "testutils.h"
25 #include "glib-util.h"
32 struct test_queue_simple_data {
33 size_t bytes_to_produce;
35 /* The counters here are used to produce a slowly changing
36 * bytesequence, which should not align with block or buffer
38 size_t bytes_produced;
39 guint32 producer_counter;
41 size_t bytes_consumed;
42 guint32 consumer_counter;
45 static producer_result_t
46 test_queue_simple_producer(
48 queue_buffer_t *buffer,
51 struct test_queue_simple_data *d = (struct test_queue_simple_data *)data;
52 size_t to_write = hint_size;
55 /* just for fun, write a little bit more sometimes */
56 to_write += d->producer_counter % 50;
58 /* but not too much */
59 if (to_write > d->bytes_to_produce - d->bytes_produced)
60 to_write = d->bytes_to_produce - d->bytes_produced;
62 /* make sure the buffer is big enough */
63 if (buffer->data == NULL) {
64 buffer->data = g_malloc(to_write);
65 buffer->alloc_size = to_write;
66 } else if (buffer->alloc_size < to_write) {
67 buffer->data = g_realloc(buffer->data, to_write);
68 buffer->alloc_size = to_write;
70 /* g_debug("Producing %zd bytes in %p (@%p)", to_write, buffer, buffer->data); */
72 /* fill in the data with some random junk */
73 for (i = 0; i < to_write; i++) {
74 buffer->data[i] = (char)(d->producer_counter / 7 + (d->producer_counter >> 10));
75 d->producer_counter++;
78 /* and call it a day */
80 buffer->data_size = to_write;
81 d->bytes_produced += to_write;
82 return d->bytes_produced >= d->bytes_to_produce?
88 test_queue_simple_consumer(
90 queue_buffer_t *buffer)
92 struct test_queue_simple_data *d = (struct test_queue_simple_data *)data;
93 size_t to_read = buffer->data_size;
96 g_assert(buffer->data != NULL);
97 g_assert(buffer->data_size != 0);
99 /* let's not read it all, to make sure that we get called back with the
101 to_read = buffer->data_size;
102 if (to_read > 1000) to_read = 1000;
104 /* verify the contents of the buffer */
105 /* g_debug("Consuming %zd bytes starting at %d in %p (@%p)", to_read, buffer->offset, buffer, buffer->data); */
106 for (i = 0; i < to_read; i++) {
107 char expected = d->consumer_counter / 7 + (d->consumer_counter >> 10);
108 if (buffer->data[buffer->offset + i] != expected) {
109 tu_dbg("expected %d, but got %d at byte position %zd\n",
110 (int)expected, buffer->data[i], i);
113 d->consumer_counter++;
115 d->bytes_consumed += to_read;
121 test_queue_simple(StreamingRequirement sr)
123 queue_result_flags qr;
124 gboolean success = TRUE;
126 struct test_queue_simple_data d = {
127 10*1024*1024, /* bytes_to_produce */
128 0, /* bytes_produced */
129 0, /* producer_counter */
130 0, /* bytes_consumed */
131 0 /* consumer_counter */
134 qr = do_consumer_producer_queue_full(
135 test_queue_simple_producer, (gpointer)&d,
136 test_queue_simple_consumer, (gpointer)&d,
137 10230, /* almost 10k */
138 3*1024*1024, /* 3M */
141 if (qr != QUEUE_SUCCESS) {
142 tu_dbg("Expected result QUEUE_SUCCESS (%d); got %d\n",
147 if (d.bytes_produced != d.bytes_to_produce) {
148 tu_dbg("Expected to produce %zd bytes; produced %zd\n",
149 d.bytes_to_produce, d.bytes_produced);
153 if (d.bytes_consumed != d.bytes_to_produce) {
154 tu_dbg("Expected to consume %zd bytes; consumed %zd\n",
155 d.bytes_to_produce, d.bytes_consumed);
163 test_queue_simple_STREAMING_REQUIREMENT_NONE(void)
165 return test_queue_simple(STREAMING_REQUIREMENT_NONE);
169 test_queue_simple_STREAMING_REQUIREMENT_DESIRED(void)
171 return test_queue_simple(STREAMING_REQUIREMENT_DESIRED);
175 test_queue_simple_STREAMING_REQUIREMENT_REQUIRED(void)
177 return test_queue_simple(STREAMING_REQUIREMENT_REQUIRED);
181 * Test fd_reader and fd_writer
184 #define TEST_FD_CONSUMER_PRODUCER_BLOCKS (1024)
187 data_producer_thread(gpointer d)
189 int fd = GPOINTER_TO_INT(d);
194 /* fill in the buffer with some stuff */
195 for (i = 0; i < (int)sizeof(buf); i++) {
199 /* and write it out in blocks */
200 for (block = 0; block < TEST_FD_CONSUMER_PRODUCER_BLOCKS; block++) {
202 while (written < sizeof(buf)) {
203 int len = write(fd, buf + written, sizeof(buf) - written);
205 perror("writing pipe to fd_read_producer");
207 return GINT_TO_POINTER(0);
214 return GINT_TO_POINTER(1);
218 data_consumer_thread(gpointer d)
220 int fd = GPOINTER_TO_INT(d);
225 /* and read it in in blocks */
226 for (block = 0; block < TEST_FD_CONSUMER_PRODUCER_BLOCKS; block++) {
227 size_t bytes_read = 0;
228 while (bytes_read < sizeof(buf)) {
229 int len = read(fd, buf + bytes_read, sizeof(buf) - bytes_read);
231 perror("reading pipe from fd_write_consumer");
237 /* verify the block */
238 for (i = 0; i < (int)sizeof(buf); i++) {
239 if (buf[i] != (char)i) {
240 tu_dbg("result data does not match input; block %d byte %zd", block, i);
242 return GINT_TO_POINTER(0);
248 return GINT_TO_POINTER(1);
252 test_fd_consumer_producer(void)
258 queue_fd_t queue_read = {0, NULL};
259 queue_fd_t queue_write = {0, NULL};
261 /* create pipes and hook up threads to them */
262 if (pipe(input_pipe) < 0) {
263 perror("pipe(input_pipe)");
266 if (pipe(output_pipe) < 0) {
267 perror("pipe(output_pipe)");
271 wth = g_thread_create(data_producer_thread, GINT_TO_POINTER(input_pipe[1]), TRUE, NULL);
272 rth = g_thread_create(data_consumer_thread, GINT_TO_POINTER(output_pipe[0]), TRUE, NULL);
275 queue_read.fd = input_pipe[0];
276 queue_write.fd = output_pipe[1];
277 success = do_consumer_producer_queue(
278 fd_read_producer, &queue_read,
279 fd_write_consumer, &queue_write);
281 tu_dbg("do_consumer_producer_queue returned FALSE");
283 /* and examine the results */
284 success = GPOINTER_TO_INT(g_thread_join(wth)) && success;
285 success = GPOINTER_TO_INT(g_thread_join(rth)) && success;
288 close(input_pipe[0]);
289 close(input_pipe[1]);
290 close(output_pipe[0]);
291 close(output_pipe[1]);
301 main(int argc, char **argv)
303 static TestUtilsTest tests[] = {
304 TU_TEST(test_queue_simple_STREAMING_REQUIREMENT_NONE, 90),
305 TU_TEST(test_queue_simple_STREAMING_REQUIREMENT_DESIRED, 90),
306 TU_TEST(test_queue_simple_STREAMING_REQUIREMENT_REQUIRED, 90),
307 TU_TEST(test_fd_consumer_producer, 120), /* runs slowly on old kernels */
313 return testutils_run_tests(argc, argv, tests);