1 /* $OpenBSD: evbuffer.c,v 1.12 2010/04/21 20:02:40 nicm Exp $ */
2
3 /*
4 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <sys/types.h>
31
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35
36 #ifdef HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #ifdef HAVE_STDARG_H
45 #include <stdarg.h>
46 #endif
47
48 #ifdef WIN32
49 #include <winsock2.h>
50 #endif
51
52 #include "evutil.h"
53 #include "event.h"
54
55 /* prototypes */
56
57 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
58
59 static int
bufferevent_add(struct event * ev,int timeout)60 bufferevent_add(struct event *ev, int timeout)
61 {
62 struct timeval tv, *ptv = NULL;
63
64 if (timeout) {
65 evutil_timerclear(&tv);
66 tv.tv_sec = timeout;
67 ptv = &tv;
68 }
69
70 return (event_add(ev, ptv));
71 }
72
73 /*
74 * This callback is executed when the size of the input buffer changes.
75 * We use it to apply back pressure on the reading side.
76 */
77
78 void
bufferevent_read_pressure_cb(struct evbuffer * buf,size_t old __unused,size_t now,void * arg)79 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old __unused,
80 size_t now, void *arg) {
81 struct bufferevent *bufev = arg;
82 /*
83 * If we are below the watermark then reschedule reading if it's
84 * still enabled.
85 */
86 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
87 evbuffer_setcb(buf, NULL, NULL);
88
89 if (bufev->enabled & EV_READ)
90 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
91 }
92 }
93
94 static void
bufferevent_readcb(int fd,short event,void * arg)95 bufferevent_readcb(int fd, short event, void *arg)
96 {
97 struct bufferevent *bufev = arg;
98 int res = 0;
99 short what = EVBUFFER_READ;
100 size_t len;
101 int howmuch = -1;
102
103 if (event == EV_TIMEOUT) {
104 what |= EVBUFFER_TIMEOUT;
105 goto error;
106 }
107
108 /*
109 * If we have a high watermark configured then we don't want to
110 * read more data than would make us reach the watermark.
111 */
112 if (bufev->wm_read.high != 0) {
113 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
114 /* we might have lowered the watermark, stop reading */
115 if (howmuch <= 0) {
116 struct evbuffer *buf = bufev->input;
117 event_del(&bufev->ev_read);
118 evbuffer_setcb(buf,
119 bufferevent_read_pressure_cb, bufev);
120 return;
121 }
122 }
123
124 res = evbuffer_read(bufev->input, fd, howmuch);
125 if (res == -1) {
126 if (errno == EAGAIN || errno == EINTR)
127 goto reschedule;
128 /* error case */
129 what |= EVBUFFER_ERROR;
130 } else if (res == 0) {
131 /* eof case */
132 what |= EVBUFFER_EOF;
133 }
134
135 if (res <= 0)
136 goto error;
137
138 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
139
140 /* See if this callbacks meets the water marks */
141 len = EVBUFFER_LENGTH(bufev->input);
142 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
143 return;
144 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
145 struct evbuffer *buf = bufev->input;
146 event_del(&bufev->ev_read);
147
148 /* Now schedule a callback for us when the buffer changes */
149 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
150 }
151
152 /* Invoke the user callback - must always be called last */
153 if (bufev->readcb != NULL)
154 (*bufev->readcb)(bufev, bufev->cbarg);
155 return;
156
157 reschedule:
158 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
159 return;
160
161 error:
162 (*bufev->errorcb)(bufev, what, bufev->cbarg);
163 }
164
165 static void
bufferevent_writecb(int fd,short event,void * arg)166 bufferevent_writecb(int fd, short event, void *arg)
167 {
168 struct bufferevent *bufev = arg;
169 int res = 0;
170 short what = EVBUFFER_WRITE;
171
172 if (event == EV_TIMEOUT) {
173 what |= EVBUFFER_TIMEOUT;
174 goto error;
175 }
176
177 if (EVBUFFER_LENGTH(bufev->output)) {
178 res = evbuffer_write(bufev->output, fd);
179 if (res == -1) {
180 #ifndef WIN32
181 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
182 *set errno. thus this error checking is not portable*/
183 if (errno == EAGAIN ||
184 errno == EINTR ||
185 errno == EINPROGRESS)
186 goto reschedule;
187 /* error case */
188 what |= EVBUFFER_ERROR;
189
190 #else
191 goto reschedule;
192 #endif
193
194 } else if (res == 0) {
195 /* eof case */
196 what |= EVBUFFER_EOF;
197 }
198 if (res <= 0)
199 goto error;
200 }
201
202 if (EVBUFFER_LENGTH(bufev->output) != 0)
203 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
204
205 /*
206 * Invoke the user callback if our buffer is drained or below the
207 * low watermark.
208 */
209 if (bufev->writecb != NULL &&
210 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
211 (*bufev->writecb)(bufev, bufev->cbarg);
212
213 return;
214
215 reschedule:
216 if (EVBUFFER_LENGTH(bufev->output) != 0)
217 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
218 return;
219
220 error:
221 (*bufev->errorcb)(bufev, what, bufev->cbarg);
222 }
223
224 /*
225 * Create a new buffered event object.
226 *
227 * The read callback is invoked whenever we read new data.
228 * The write callback is invoked whenever the output buffer is drained.
229 * The error callback is invoked on a write/read error or on EOF.
230 *
231 * Both read and write callbacks maybe NULL. The error callback is not
232 * allowed to be NULL and have to be provided always.
233 */
234
235 struct bufferevent *
bufferevent_new(int fd,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)236 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
237 everrorcb errorcb, void *cbarg)
238 {
239 struct bufferevent *bufev;
240
241 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
242 return (NULL);
243
244 if ((bufev->input = evbuffer_new()) == NULL) {
245 free(bufev);
246 return (NULL);
247 }
248
249 if ((bufev->output = evbuffer_new()) == NULL) {
250 evbuffer_free(bufev->input);
251 free(bufev);
252 return (NULL);
253 }
254
255 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
256 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
257
258 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
259
260 /*
261 * Set to EV_WRITE so that using bufferevent_write is going to
262 * trigger a callback. Reading needs to be explicitly enabled
263 * because otherwise no data will be available.
264 */
265 bufev->enabled = EV_WRITE;
266
267 return (bufev);
268 }
269
270 void
bufferevent_setcb(struct bufferevent * bufev,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)271 bufferevent_setcb(struct bufferevent *bufev,
272 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
273 {
274 bufev->readcb = readcb;
275 bufev->writecb = writecb;
276 bufev->errorcb = errorcb;
277
278 bufev->cbarg = cbarg;
279 }
280
281 void
bufferevent_setfd(struct bufferevent * bufev,int fd)282 bufferevent_setfd(struct bufferevent *bufev, int fd)
283 {
284 event_del(&bufev->ev_read);
285 event_del(&bufev->ev_write);
286
287 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
288 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
289 if (bufev->ev_base != NULL) {
290 event_base_set(bufev->ev_base, &bufev->ev_read);
291 event_base_set(bufev->ev_base, &bufev->ev_write);
292 }
293
294 /* might have to manually trigger event registration */
295 }
296
297 int
bufferevent_priority_set(struct bufferevent * bufev,int priority)298 bufferevent_priority_set(struct bufferevent *bufev, int priority)
299 {
300 if (event_priority_set(&bufev->ev_read, priority) == -1)
301 return (-1);
302 if (event_priority_set(&bufev->ev_write, priority) == -1)
303 return (-1);
304
305 return (0);
306 }
307
308 /* Closing the file descriptor is the responsibility of the caller */
309
310 void
bufferevent_free(struct bufferevent * bufev)311 bufferevent_free(struct bufferevent *bufev)
312 {
313 event_del(&bufev->ev_read);
314 event_del(&bufev->ev_write);
315
316 evbuffer_free(bufev->input);
317 evbuffer_free(bufev->output);
318
319 free(bufev);
320 }
321
322 /*
323 * Returns 0 on success;
324 * -1 on failure.
325 */
326
327 int
bufferevent_write(struct bufferevent * bufev,const void * data,size_t size)328 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
329 {
330 int res;
331
332 res = evbuffer_add(bufev->output, data, size);
333
334 if (res == -1)
335 return (res);
336
337 /* If everything is okay, we need to schedule a write */
338 if (size > 0 && (bufev->enabled & EV_WRITE))
339 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
340
341 return (res);
342 }
343
344 int
bufferevent_write_buffer(struct bufferevent * bufev,struct evbuffer * buf)345 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
346 {
347 int res;
348
349 res = bufferevent_write(bufev, buf->buffer, buf->off);
350 if (res != -1)
351 evbuffer_drain(buf, buf->off);
352
353 return (res);
354 }
355
356 size_t
bufferevent_read(struct bufferevent * bufev,void * data,size_t size)357 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
358 {
359 struct evbuffer *buf = bufev->input;
360
361 if (buf->off < size)
362 size = buf->off;
363
364 /* Copy the available data to the user buffer */
365 memcpy(data, buf->buffer, size);
366
367 if (size)
368 evbuffer_drain(buf, size);
369
370 return (size);
371 }
372
373 int
bufferevent_enable(struct bufferevent * bufev,short event)374 bufferevent_enable(struct bufferevent *bufev, short event)
375 {
376 if (event & EV_READ) {
377 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
378 return (-1);
379 }
380 if (event & EV_WRITE) {
381 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
382 return (-1);
383 }
384
385 bufev->enabled |= event;
386 return (0);
387 }
388
389 int
bufferevent_disable(struct bufferevent * bufev,short event)390 bufferevent_disable(struct bufferevent *bufev, short event)
391 {
392 if (event & EV_READ) {
393 if (event_del(&bufev->ev_read) == -1)
394 return (-1);
395 }
396 if (event & EV_WRITE) {
397 if (event_del(&bufev->ev_write) == -1)
398 return (-1);
399 }
400
401 bufev->enabled &= ~event;
402 return (0);
403 }
404
405 /*
406 * Sets the read and write timeout for a buffered event.
407 */
408
409 void
bufferevent_settimeout(struct bufferevent * bufev,int timeout_read,int timeout_write)410 bufferevent_settimeout(struct bufferevent *bufev,
411 int timeout_read, int timeout_write) {
412 bufev->timeout_read = timeout_read;
413 bufev->timeout_write = timeout_write;
414
415 if (event_pending(&bufev->ev_read, EV_READ, NULL))
416 bufferevent_add(&bufev->ev_read, timeout_read);
417 if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
418 bufferevent_add(&bufev->ev_write, timeout_write);
419 }
420
421 /*
422 * Sets the water marks
423 */
424
425 void
bufferevent_setwatermark(struct bufferevent * bufev,short events,size_t lowmark,size_t highmark)426 bufferevent_setwatermark(struct bufferevent *bufev, short events,
427 size_t lowmark, size_t highmark)
428 {
429 if (events & EV_READ) {
430 bufev->wm_read.low = lowmark;
431 bufev->wm_read.high = highmark;
432 }
433
434 if (events & EV_WRITE) {
435 bufev->wm_write.low = lowmark;
436 bufev->wm_write.high = highmark;
437 }
438
439 /* If the watermarks changed then see if we should call read again */
440 bufferevent_read_pressure_cb(bufev->input,
441 0, EVBUFFER_LENGTH(bufev->input), bufev);
442 }
443
444 int
bufferevent_base_set(struct event_base * base,struct bufferevent * bufev)445 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
446 {
447 int res;
448
449 bufev->ev_base = base;
450
451 res = event_base_set(base, &bufev->ev_read);
452 if (res == -1)
453 return (res);
454
455 res = event_base_set(base, &bufev->ev_write);
456 return (res);
457 }
458