1 /* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <apr_pools.h>
17 #include <apr_poll.h>
18 #include <apr_version.h>
19 #include <apr_portable.h>
20
21 #include "serf.h"
22 #include "serf_bucket_util.h"
23
24 #include "serf_private.h"
25
26 /* cleanup for sockets */
clean_skt(void * data)27 static apr_status_t clean_skt(void *data)
28 {
29 serf_connection_t *conn = data;
30 apr_status_t status = APR_SUCCESS;
31
32 if (conn->skt) {
33 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, "cleanup - ");
34 status = apr_socket_close(conn->skt);
35 conn->skt = NULL;
36 serf__log_nopref(SOCK_VERBOSE, "closed socket, status %d\n", status);
37 }
38
39 return status;
40 }
41
clean_resp(void * data)42 static apr_status_t clean_resp(void *data)
43 {
44 serf_request_t *request = data;
45
46 /* The request's RESPOOL is being cleared. */
47
48 /* If the response has allocated some buckets, then destroy them (since
49 the bucket may hold resources other than memory in RESPOOL). Also
50 make sure to set their fields to NULL so connection closure does
51 not attempt to free them again. */
52 if (request->resp_bkt) {
53 serf_bucket_destroy(request->resp_bkt);
54 request->resp_bkt = NULL;
55 }
56 if (request->req_bkt) {
57 serf_bucket_destroy(request->req_bkt);
58 request->req_bkt = NULL;
59 }
60
61 /* ### should we worry about debug stuff, like that performed in
62 ### destroy_request()? should we worry about calling req->handler
63 ### to notify this "cancellation" due to pool clearing? */
64
65 /* This pool just got cleared/destroyed. Don't try to destroy the pool
66 (again) when the request is canceled. */
67 request->respool = NULL;
68
69 return APR_SUCCESS;
70 }
71
72 /* cleanup for conns */
clean_conn(void * data)73 static apr_status_t clean_conn(void *data)
74 {
75 serf_connection_t *conn = data;
76
77 serf__log(CONN_VERBOSE, __FILE__, "cleaning up connection 0x%x\n",
78 conn);
79 serf_connection_close(conn);
80
81 return APR_SUCCESS;
82 }
83
84 /* Check if there is data waiting to be sent over the socket. This can happen
85 in two situations:
86 - The connection queue has atleast one request with unwritten data.
87 - All requests are written and the ssl layer wrote some data while reading
88 the response. This can happen when the server triggers a renegotiation,
89 e.g. after the first and only request on that connection was received.
90 Returns 1 if data is pending on CONN, NULL if not.
91 If NEXT_REQ is not NULL, it will be filled in with the next available request
92 with unwritten data. */
93 static int
request_or_data_pending(serf_request_t ** next_req,serf_connection_t * conn)94 request_or_data_pending(serf_request_t **next_req, serf_connection_t *conn)
95 {
96 serf_request_t *request = conn->requests;
97
98 while (request != NULL && request->req_bkt == NULL &&
99 request->writing_started)
100 request = request->next;
101
102 if (next_req)
103 *next_req = request;
104
105 if (request != NULL) {
106 return 1;
107 } else if (conn->ostream_head) {
108 const char *dummy;
109 apr_size_t len;
110 apr_status_t status;
111
112 status = serf_bucket_peek(conn->ostream_head, &dummy,
113 &len);
114 if (!SERF_BUCKET_READ_ERROR(status) && len) {
115 serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
116 "All requests written but still data pending.\n");
117 return 1;
118 }
119 }
120
121 return 0;
122 }
123
124 /* Update the pollset for this connection. We tweak the pollset based on
125 * whether we want to read and/or write, given conditions within the
126 * connection. If the connection is not (yet) in the pollset, then it
127 * will be added.
128 */
serf__conn_update_pollset(serf_connection_t * conn)129 apr_status_t serf__conn_update_pollset(serf_connection_t *conn)
130 {
131 serf_context_t *ctx = conn->ctx;
132 apr_status_t status;
133 apr_pollfd_t desc = { 0 };
134
135 if (!conn->skt) {
136 return APR_SUCCESS;
137 }
138
139 /* Remove the socket from the poll set. */
140 desc.desc_type = APR_POLL_SOCKET;
141 desc.desc.s = conn->skt;
142 desc.reqevents = conn->reqevents;
143
144 status = ctx->pollset_rm(ctx->pollset_baton,
145 &desc, conn);
146 if (status && !APR_STATUS_IS_NOTFOUND(status))
147 return status;
148
149 /* Now put it back in with the correct read/write values. */
150 desc.reqevents = APR_POLLHUP | APR_POLLERR;
151 if (conn->requests &&
152 conn->state != SERF_CONN_INIT) {
153 /* If there are any outstanding events, then we want to read. */
154 /* ### not true. we only want to read IF we have sent some data */
155 desc.reqevents |= APR_POLLIN;
156
157 /* Don't write if OpenSSL told us that it needs to read data first. */
158 if (conn->stop_writing != 1) {
159
160 /* If the connection is not closing down and
161 * has unwritten data or
162 * there are any requests that still have buckets to write out,
163 * then we want to write.
164 */
165 if (conn->vec_len &&
166 conn->state != SERF_CONN_CLOSING)
167 desc.reqevents |= APR_POLLOUT;
168 else {
169
170 if ((conn->probable_keepalive_limit &&
171 conn->completed_requests > conn->probable_keepalive_limit) ||
172 (conn->max_outstanding_requests &&
173 conn->completed_requests - conn->completed_responses >=
174 conn->max_outstanding_requests)) {
175 /* we wouldn't try to write any way right now. */
176 }
177 else if (request_or_data_pending(NULL, conn)) {
178 desc.reqevents |= APR_POLLOUT;
179 }
180 }
181 }
182 }
183
184 /* If we can have async responses, always look for something to read. */
185 if (conn->async_responses) {
186 desc.reqevents |= APR_POLLIN;
187 }
188
189 /* save our reqevents, so we can pass it in to remove later. */
190 conn->reqevents = desc.reqevents;
191
192 /* Note: even if we don't want to read/write this socket, we still
193 * want to poll it for hangups and errors.
194 */
195 return ctx->pollset_add(ctx->pollset_baton,
196 &desc, &conn->baton);
197 }
198
199 #ifdef SERF_DEBUG_BUCKET_USE
200
201 /* Make sure all response buckets were drained. */
check_buckets_drained(serf_connection_t * conn)202 static void check_buckets_drained(serf_connection_t *conn)
203 {
204 serf_request_t *request = conn->requests;
205
206 for ( ; request ; request = request->next ) {
207 if (request->resp_bkt != NULL) {
208 /* ### crap. can't do this. this allocator may have un-drained
209 * ### REQUEST buckets.
210 */
211 /* serf_debug__entered_loop(request->resp_bkt->allocator); */
212 /* ### for now, pretend we closed the conn (resets the tracking) */
213 serf_debug__closed_conn(request->resp_bkt->allocator);
214 }
215 }
216 }
217
218 #endif
219
destroy_ostream(serf_connection_t * conn)220 static void destroy_ostream(serf_connection_t *conn)
221 {
222 if (conn->ostream_head != NULL) {
223 serf_bucket_destroy(conn->ostream_head);
224 conn->ostream_head = NULL;
225 conn->ostream_tail = NULL;
226 }
227 }
228
detect_eof(void * baton,serf_bucket_t * aggregate_bucket)229 static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
230 {
231 serf_connection_t *conn = baton;
232 conn->hit_eof = 1;
233 return APR_EAGAIN;
234 }
235
do_conn_setup(serf_connection_t * conn)236 static apr_status_t do_conn_setup(serf_connection_t *conn)
237 {
238 apr_status_t status;
239 serf_bucket_t *ostream;
240
241 if (conn->ostream_head == NULL) {
242 conn->ostream_head = serf_bucket_aggregate_create(conn->allocator);
243 }
244
245 if (conn->ostream_tail == NULL) {
246 conn->ostream_tail = serf__bucket_stream_create(conn->allocator,
247 detect_eof,
248 conn);
249 }
250
251 ostream = conn->ostream_tail;
252
253 status = (*conn->setup)(conn->skt,
254 &conn->stream,
255 &ostream,
256 conn->setup_baton,
257 conn->pool);
258 if (status) {
259 /* extra destroy here since it wasn't added to the head bucket yet. */
260 serf_bucket_destroy(conn->ostream_tail);
261 destroy_ostream(conn);
262 return status;
263 }
264
265 serf_bucket_aggregate_append(conn->ostream_head,
266 ostream);
267
268 return status;
269 }
270
271 /* Set up the input and output stream buckets.
272 When a tunnel over an http proxy is needed, create a socket bucket and
273 empty aggregate bucket for sending and receiving unencrypted requests
274 over the socket.
275
276 After the tunnel is there, or no tunnel was needed, ask the application
277 to create the input and output buckets, which should take care of the
278 [en/de]cryption.
279 */
280
prepare_conn_streams(serf_connection_t * conn,serf_bucket_t ** istream,serf_bucket_t ** ostreamt,serf_bucket_t ** ostreamh)281 static apr_status_t prepare_conn_streams(serf_connection_t *conn,
282 serf_bucket_t **istream,
283 serf_bucket_t **ostreamt,
284 serf_bucket_t **ostreamh)
285 {
286 apr_status_t status;
287
288 if (conn->stream == NULL) {
289 conn->latency = apr_time_now() - conn->connect_time;
290 }
291
292 /* Do we need a SSL tunnel first? */
293 if (conn->state == SERF_CONN_CONNECTED) {
294 /* If the connection does not have an associated bucket, then
295 * call the setup callback to get one.
296 */
297 if (conn->stream == NULL) {
298 status = do_conn_setup(conn);
299 if (status) {
300 return status;
301 }
302 }
303 *ostreamt = conn->ostream_tail;
304 *ostreamh = conn->ostream_head;
305 *istream = conn->stream;
306 } else {
307 /* SSL tunnel needed and not set up yet, get a direct unencrypted
308 stream for this socket */
309 if (conn->stream == NULL) {
310 *istream = serf_bucket_socket_create(conn->skt,
311 conn->allocator);
312 }
313 /* Don't create the ostream bucket chain including the ssl_encrypt
314 bucket yet. This ensure the CONNECT request is sent unencrypted
315 to the proxy. */
316 *ostreamt = *ostreamh = conn->ssltunnel_ostream;
317 }
318
319 return APR_SUCCESS;
320 }
321
322 /* Create and connect sockets for any connections which don't have them
323 * yet. This is the core of our lazy-connect behavior.
324 */
serf__open_connections(serf_context_t * ctx)325 apr_status_t serf__open_connections(serf_context_t *ctx)
326 {
327 int i;
328
329 for (i = ctx->conns->nelts; i--; ) {
330 serf_connection_t *conn = GET_CONN(ctx, i);
331 serf__authn_info_t *authn_info;
332 apr_status_t status;
333 apr_socket_t *skt;
334
335 conn->seen_in_pollset = 0;
336
337 if (conn->skt != NULL) {
338 #ifdef SERF_DEBUG_BUCKET_USE
339 check_buckets_drained(conn);
340 #endif
341 continue;
342 }
343
344 /* Delay opening until we have something to deliver! */
345 if (conn->requests == NULL) {
346 continue;
347 }
348
349 apr_pool_clear(conn->skt_pool);
350 apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, clean_skt);
351
352 status = apr_socket_create(&skt, conn->address->family,
353 SOCK_STREAM,
354 #if APR_MAJOR_VERSION > 0
355 APR_PROTO_TCP,
356 #endif
357 conn->skt_pool);
358 serf__log(SOCK_VERBOSE, __FILE__,
359 "created socket for conn 0x%x, status %d\n", conn, status);
360 if (status != APR_SUCCESS)
361 return status;
362
363 /* Set the socket to be non-blocking */
364 if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS)
365 return status;
366
367 /* Disable Nagle's algorithm */
368 if ((status = apr_socket_opt_set(skt,
369 APR_TCP_NODELAY, 1)) != APR_SUCCESS)
370 return status;
371
372 /* Configured. Store it into the connection now. */
373 conn->skt = skt;
374
375 /* Remember time when we started connecting to server to calculate
376 network latency. */
377 conn->connect_time = apr_time_now();
378
379 /* Now that the socket is set up, let's connect it. This should
380 * return immediately.
381 */
382 status = apr_socket_connect(skt, conn->address);
383 serf__log_skt(SOCK_VERBOSE, __FILE__, skt,
384 "connected socket for conn 0x%x, status %d\n",
385 conn, status);
386 if (status != APR_SUCCESS) {
387 if (!APR_STATUS_IS_EINPROGRESS(status))
388 return status;
389 }
390
391 /* Flag our pollset as dirty now that we have a new socket. */
392 conn->dirty_conn = 1;
393 ctx->dirty_pollset = 1;
394
395 /* If the authentication was already started on another connection,
396 prepare this connection (it might be possible to skip some
397 part of the handshaking). */
398 if (ctx->proxy_address) {
399 authn_info = &ctx->proxy_authn_info;
400 if (authn_info->scheme) {
401 authn_info->scheme->init_conn_func(authn_info->scheme, 407,
402 conn, conn->pool);
403 }
404 }
405
406 authn_info = serf__get_authn_info_for_server(conn);
407 if (authn_info->scheme) {
408 authn_info->scheme->init_conn_func(authn_info->scheme, 401,
409 conn, conn->pool);
410 }
411
412 /* Does this connection require a SSL tunnel over the proxy? */
413 if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0)
414 serf__ssltunnel_connect(conn);
415 else {
416 serf_bucket_t *dummy1, *dummy2;
417
418 conn->state = SERF_CONN_CONNECTED;
419
420 status = prepare_conn_streams(conn, &conn->stream,
421 &dummy1, &dummy2);
422 if (status) {
423 return status;
424 }
425 }
426 }
427
428 return APR_SUCCESS;
429 }
430
no_more_writes(serf_connection_t * conn)431 static apr_status_t no_more_writes(serf_connection_t *conn)
432 {
433 /* Note that we should hold new requests until we open our new socket. */
434 conn->state = SERF_CONN_CLOSING;
435 serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
436 "stop writing on conn 0x%x\n", conn);
437
438 /* Clear our iovec. */
439 conn->vec_len = 0;
440
441 /* Update the pollset to know we don't want to write on this socket any
442 * more.
443 */
444 conn->dirty_conn = 1;
445 conn->ctx->dirty_pollset = 1;
446 return APR_SUCCESS;
447 }
448
449 /* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if
450 * the header contains value 'close' indicating the server is closing the
451 * connection right after this response.
452 * Otherwise returns APR_SUCCESS.
453 */
is_conn_closing(serf_bucket_t * response)454 static apr_status_t is_conn_closing(serf_bucket_t *response)
455 {
456 serf_bucket_t *hdrs;
457 const char *val;
458
459 hdrs = serf_bucket_response_get_headers(response);
460 val = serf_bucket_headers_get(hdrs, "Connection");
461 if (val && strcasecmp("close", val) == 0)
462 {
463 return SERF_ERROR_CLOSING;
464 }
465
466 return APR_SUCCESS;
467 }
468
link_requests(serf_request_t ** list,serf_request_t ** tail,serf_request_t * request)469 static void link_requests(serf_request_t **list, serf_request_t **tail,
470 serf_request_t *request)
471 {
472 if (*list == NULL) {
473 *list = request;
474 *tail = request;
475 }
476 else {
477 (*tail)->next = request;
478 *tail = request;
479 }
480 }
481
destroy_request(serf_request_t * request)482 static apr_status_t destroy_request(serf_request_t *request)
483 {
484 serf_connection_t *conn = request->conn;
485
486 /* The request and response buckets are no longer needed,
487 nor is the request's pool. */
488 if (request->resp_bkt) {
489 serf_debug__closed_conn(request->resp_bkt->allocator);
490 serf_bucket_destroy(request->resp_bkt);
491 request->resp_bkt = NULL;
492 }
493 if (request->req_bkt) {
494 serf_debug__closed_conn(request->req_bkt->allocator);
495 serf_bucket_destroy(request->req_bkt);
496 request->req_bkt = NULL;
497 }
498
499 serf_debug__bucket_alloc_check(request->allocator);
500 if (request->respool) {
501 /* ### unregister the pool cleanup for self? */
502 apr_pool_destroy(request->respool);
503 }
504
505 serf_bucket_mem_free(conn->allocator, request);
506
507 return APR_SUCCESS;
508 }
509
cancel_request(serf_request_t * request,serf_request_t ** list,int notify_request)510 static apr_status_t cancel_request(serf_request_t *request,
511 serf_request_t **list,
512 int notify_request)
513 {
514 /* If we haven't run setup, then we won't have a handler to call. */
515 if (request->handler && notify_request) {
516 /* We actually don't care what the handler returns.
517 * We have bigger matters at hand.
518 */
519 (*request->handler)(request, NULL, request->handler_baton,
520 request->respool);
521 }
522
523 if (*list == request) {
524 *list = request->next;
525 }
526 else {
527 serf_request_t *scan = *list;
528
529 while (scan->next && scan->next != request)
530 scan = scan->next;
531
532 if (scan->next) {
533 scan->next = scan->next->next;
534 }
535 }
536
537 return destroy_request(request);
538 }
539
remove_connection(serf_context_t * ctx,serf_connection_t * conn)540 static apr_status_t remove_connection(serf_context_t *ctx,
541 serf_connection_t *conn)
542 {
543 apr_pollfd_t desc = { 0 };
544
545 desc.desc_type = APR_POLL_SOCKET;
546 desc.desc.s = conn->skt;
547 desc.reqevents = conn->reqevents;
548
549 return ctx->pollset_rm(ctx->pollset_baton,
550 &desc, conn);
551 }
552
553 /* A socket was closed, inform the application. */
handle_conn_closed(serf_connection_t * conn,apr_status_t status)554 static void handle_conn_closed(serf_connection_t *conn, apr_status_t status)
555 {
556 (*conn->closed)(conn, conn->closed_baton, status,
557 conn->pool);
558 }
559
reset_connection(serf_connection_t * conn,int requeue_requests)560 static apr_status_t reset_connection(serf_connection_t *conn,
561 int requeue_requests)
562 {
563 serf_context_t *ctx = conn->ctx;
564 apr_status_t status;
565 serf_request_t *old_reqs;
566
567 conn->probable_keepalive_limit = conn->completed_responses;
568 conn->completed_requests = 0;
569 conn->completed_responses = 0;
570
571 old_reqs = conn->requests;
572
573 conn->requests = NULL;
574 conn->requests_tail = NULL;
575
576 /* Handle all outstanding requests. These have either not been written yet,
577 or have been written but the expected reply wasn't received yet. */
578 while (old_reqs) {
579 /* If we haven't started to write the connection, bring it over
580 * unchanged to our new socket.
581 * Do not copy a CONNECT request to the new connection, the ssl tunnel
582 * setup code will create a new CONNECT request already.
583 */
584 if (requeue_requests && !old_reqs->writing_started &&
585 !old_reqs->ssltunnel) {
586
587 serf_request_t *req = old_reqs;
588 old_reqs = old_reqs->next;
589 req->next = NULL;
590 link_requests(&conn->requests, &conn->requests_tail, req);
591 }
592 else {
593 /* Request has been consumed, or we don't want to requeue the
594 request. Either way, inform the application that the request
595 is cancelled. */
596 cancel_request(old_reqs, &old_reqs, requeue_requests);
597 }
598 }
599
600 /* Requests queue has been prepared for a new socket, close the old one. */
601 if (conn->skt != NULL) {
602 remove_connection(ctx, conn);
603 status = apr_socket_close(conn->skt);
604 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
605 "closed socket, status %d\n", status);
606 if (conn->closed != NULL) {
607 handle_conn_closed(conn, status);
608 }
609 conn->skt = NULL;
610 }
611
612 if (conn->stream != NULL) {
613 serf_bucket_destroy(conn->stream);
614 conn->stream = NULL;
615 }
616
617 destroy_ostream(conn);
618
619 /* Don't try to resume any writes */
620 conn->vec_len = 0;
621
622 conn->dirty_conn = 1;
623 conn->ctx->dirty_pollset = 1;
624 conn->state = SERF_CONN_INIT;
625
626 serf__log(CONN_VERBOSE, __FILE__, "reset connection 0x%x\n", conn);
627
628 conn->status = APR_SUCCESS;
629
630 /* Let our context know that we've 'reset' the socket already. */
631 conn->seen_in_pollset |= APR_POLLHUP;
632
633 /* Found the connection. Closed it. All done. */
634 return APR_SUCCESS;
635 }
636
socket_writev(serf_connection_t * conn)637 static apr_status_t socket_writev(serf_connection_t *conn)
638 {
639 apr_size_t written;
640 apr_status_t status;
641
642 status = apr_socket_sendv(conn->skt, conn->vec,
643 conn->vec_len, &written);
644 if (status && !APR_STATUS_IS_EAGAIN(status))
645 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
646 "socket_sendv error %d\n", status);
647
648 /* did we write everything? */
649 if (written) {
650 apr_size_t len = 0;
651 int i;
652
653 serf__log_skt(SOCK_MSG_VERBOSE, __FILE__, conn->skt,
654 "--- socket_sendv:\n");
655
656 for (i = 0; i < conn->vec_len; i++) {
657 len += conn->vec[i].iov_len;
658 if (written < len) {
659 serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
660 conn->vec[i].iov_len - (len - written),
661 conn->vec[i].iov_base);
662 if (i) {
663 memmove(conn->vec, &conn->vec[i],
664 sizeof(struct iovec) * (conn->vec_len - i));
665 conn->vec_len -= i;
666 }
667 conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written));
668 conn->vec[0].iov_len = len - written;
669 break;
670 } else {
671 serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
672 conn->vec[i].iov_len, conn->vec[i].iov_base);
673 }
674 }
675 if (len == written) {
676 conn->vec_len = 0;
677 }
678 serf__log_nopref(SOCK_MSG_VERBOSE, "-(%d)-\n", written);
679
680 /* Log progress information */
681 serf__context_progress_delta(conn->ctx, 0, written);
682 }
683
684 return status;
685 }
686
setup_request(serf_request_t * request)687 static apr_status_t setup_request(serf_request_t *request)
688 {
689 serf_connection_t *conn = request->conn;
690 apr_status_t status;
691
692 /* Now that we are about to serve the request, allocate a pool. */
693 apr_pool_create(&request->respool, conn->pool);
694 request->allocator = serf_bucket_allocator_create(request->respool,
695 NULL, NULL);
696 apr_pool_cleanup_register(request->respool, request,
697 clean_resp, clean_resp);
698
699 /* Fill in the rest of the values for the request. */
700 status = request->setup(request, request->setup_baton,
701 &request->req_bkt,
702 &request->acceptor,
703 &request->acceptor_baton,
704 &request->handler,
705 &request->handler_baton,
706 request->respool);
707 return status;
708 }
709
710 /* write data out to the connection */
write_to_connection(serf_connection_t * conn)711 static apr_status_t write_to_connection(serf_connection_t *conn)
712 {
713 if (conn->probable_keepalive_limit &&
714 conn->completed_requests > conn->probable_keepalive_limit) {
715
716 conn->dirty_conn = 1;
717 conn->ctx->dirty_pollset = 1;
718
719 /* backoff for now. */
720 return APR_SUCCESS;
721 }
722
723 /* Keep reading and sending until we run out of stuff to read, or
724 * writing would block.
725 */
726 while (1) {
727 serf_request_t *request;
728 int stop_reading = 0;
729 apr_status_t status;
730 apr_status_t read_status;
731 serf_bucket_t *ostreamt;
732 serf_bucket_t *ostreamh;
733 int max_outstanding_requests = conn->max_outstanding_requests;
734
735 /* If we're setting up an ssl tunnel, we can't send real requests
736 at yet, as they need to be encrypted and our encrypt buckets
737 aren't created yet as we still need to read the unencrypted
738 response of the CONNECT request. */
739 if (conn->state != SERF_CONN_CONNECTED)
740 max_outstanding_requests = 1;
741
742 if (max_outstanding_requests &&
743 conn->completed_requests -
744 conn->completed_responses >= max_outstanding_requests) {
745 /* backoff for now. */
746 return APR_SUCCESS;
747 }
748
749 /* If we have unwritten data, then write what we can. */
750 while (conn->vec_len) {
751 status = socket_writev(conn);
752
753 /* If the write would have blocked, then we're done. Don't try
754 * to write anything else to the socket.
755 */
756 if (APR_STATUS_IS_EAGAIN(status))
757 return APR_SUCCESS;
758 if (APR_STATUS_IS_EPIPE(status) ||
759 APR_STATUS_IS_ECONNRESET(status) ||
760 APR_STATUS_IS_ECONNABORTED(status))
761 return no_more_writes(conn);
762 if (status)
763 return status;
764 }
765 /* ### can we have a short write, yet no EAGAIN? a short write
766 ### would imply unwritten_len > 0 ... */
767 /* assert: unwritten_len == 0. */
768
769 /* We may need to move forward to a request which has something
770 * to write.
771 */
772 if (!request_or_data_pending(&request, conn)) {
773 /* No more requests (with data) are registered with the
774 * connection, and no data is pending on the outgoing stream.
775 * Let's update the pollset so that we don't try to write to this
776 * socket again.
777 */
778 conn->dirty_conn = 1;
779 conn->ctx->dirty_pollset = 1;
780 return APR_SUCCESS;
781 }
782
783 status = prepare_conn_streams(conn, &conn->stream, &ostreamt, &ostreamh);
784 if (status) {
785 return status;
786 }
787
788 if (request) {
789 if (request->req_bkt == NULL) {
790 read_status = setup_request(request);
791 if (read_status) {
792 /* Something bad happened. Propagate any errors. */
793 return read_status;
794 }
795 }
796
797 if (!request->writing_started) {
798 request->writing_started = 1;
799 serf_bucket_aggregate_append(ostreamt, request->req_bkt);
800 }
801 }
802
803 /* ### optimize at some point by using read_for_sendfile */
804 /* TODO: now that read_iovec will effectively try to return as much
805 data as available, we probably don't want to read ALL_AVAIL, but
806 a lower number, like the size of one or a few TCP packets, the
807 available TCP buffer size ... */
808 read_status = serf_bucket_read_iovec(ostreamh,
809 SERF_READ_ALL_AVAIL,
810 IOV_MAX,
811 conn->vec,
812 &conn->vec_len);
813
814 if (!conn->hit_eof) {
815 if (APR_STATUS_IS_EAGAIN(read_status)) {
816 /* We read some stuff, but should not try to read again. */
817 stop_reading = 1;
818 }
819 else if (read_status == SERF_ERROR_WAIT_CONN) {
820 /* The bucket told us that it can't provide more data until
821 more data is read from the socket. This normally happens
822 during a SSL handshake.
823
824 We should avoid looking for writability for a while so
825 that (hopefully) something will appear in the bucket so
826 we can actually write something. otherwise, we could
827 end up in a CPU spin: socket wants something, but we
828 don't have anything (and keep returning EAGAIN)
829 */
830 conn->stop_writing = 1;
831 conn->dirty_conn = 1;
832 conn->ctx->dirty_pollset = 1;
833 }
834 else if (read_status && !APR_STATUS_IS_EOF(read_status)) {
835 /* Something bad happened. Propagate any errors. */
836 return read_status;
837 }
838 }
839
840 /* If we got some data, then deliver it. */
841 /* ### what to do if we got no data?? is that a problem? */
842 if (conn->vec_len > 0) {
843 status = socket_writev(conn);
844
845 /* If we can't write any more, or an error occurred, then
846 * we're done here.
847 */
848 if (APR_STATUS_IS_EAGAIN(status))
849 return APR_SUCCESS;
850 if (APR_STATUS_IS_EPIPE(status))
851 return no_more_writes(conn);
852 if (APR_STATUS_IS_ECONNRESET(status) ||
853 APR_STATUS_IS_ECONNABORTED(status)) {
854 return no_more_writes(conn);
855 }
856 if (status)
857 return status;
858 }
859
860 if (read_status == SERF_ERROR_WAIT_CONN) {
861 stop_reading = 1;
862 conn->stop_writing = 1;
863 conn->dirty_conn = 1;
864 conn->ctx->dirty_pollset = 1;
865 }
866 else if (request && read_status && conn->hit_eof &&
867 conn->vec_len == 0) {
868 /* If we hit the end of the request bucket and all of its data has
869 * been written, then clear it out to signify that we're done
870 * sending the request. On the next iteration through this loop:
871 * - if there are remaining bytes they will be written, and as the
872 * request bucket will be completely read it will be destroyed then.
873 * - we'll see if there are other requests that need to be sent
874 * ("pipelining").
875 */
876 conn->hit_eof = 0;
877 serf_bucket_destroy(request->req_bkt);
878 request->req_bkt = NULL;
879
880 /* If our connection has async responses enabled, we're not
881 * going to get a reply back, so kill the request.
882 */
883 if (conn->async_responses) {
884 conn->requests = request->next;
885 destroy_request(request);
886 }
887
888 conn->completed_requests++;
889
890 if (conn->probable_keepalive_limit &&
891 conn->completed_requests > conn->probable_keepalive_limit) {
892 /* backoff for now. */
893 stop_reading = 1;
894 }
895 }
896
897 if (stop_reading) {
898 return APR_SUCCESS;
899 }
900 }
901 /* NOTREACHED */
902 }
903
904 /* A response message was received from the server, so call
905 the handler as specified on the original request. */
handle_response(serf_request_t * request,apr_pool_t * pool)906 static apr_status_t handle_response(serf_request_t *request,
907 apr_pool_t *pool)
908 {
909 apr_status_t status = APR_SUCCESS;
910 int consumed_response = 0;
911
912 /* Only enable the new authentication framework if the program has
913 * registered an authentication credential callback.
914 *
915 * This permits older Serf apps to still handle authentication
916 * themselves by not registering credential callbacks.
917 */
918 if (request->conn->ctx->cred_cb) {
919 status = serf__handle_auth_response(&consumed_response,
920 request,
921 request->resp_bkt,
922 request->handler_baton,
923 pool);
924
925 /* If there was an error reading the response (maybe there wasn't
926 enough data available), don't bother passing the response to the
927 application.
928
929 If the authentication was tried, but failed, pass the response
930 to the application, maybe it can do better. */
931 if (status) {
932 return status;
933 }
934 }
935
936 if (!consumed_response) {
937 return (*request->handler)(request,
938 request->resp_bkt,
939 request->handler_baton,
940 pool);
941 }
942
943 return status;
944 }
945
946 /* An async response message was received from the server. */
handle_async_response(serf_connection_t * conn,apr_pool_t * pool)947 static apr_status_t handle_async_response(serf_connection_t *conn,
948 apr_pool_t *pool)
949 {
950 apr_status_t status;
951
952 if (conn->current_async_response == NULL) {
953 conn->current_async_response =
954 (*conn->async_acceptor)(NULL, conn->stream,
955 conn->async_acceptor_baton, pool);
956 }
957
958 status = (*conn->async_handler)(NULL, conn->current_async_response,
959 conn->async_handler_baton, pool);
960
961 if (APR_STATUS_IS_EOF(status)) {
962 serf_bucket_destroy(conn->current_async_response);
963 conn->current_async_response = NULL;
964 status = APR_SUCCESS;
965 }
966
967 return status;
968 }
969
970
971 apr_status_t
serf__provide_credentials(serf_context_t * ctx,char ** username,char ** password,serf_request_t * request,void * baton,int code,const char * authn_type,const char * realm,apr_pool_t * pool)972 serf__provide_credentials(serf_context_t *ctx,
973 char **username,
974 char **password,
975 serf_request_t *request, void *baton,
976 int code, const char *authn_type,
977 const char *realm,
978 apr_pool_t *pool)
979 {
980 serf_connection_t *conn = request->conn;
981 serf_request_t *authn_req = request;
982 apr_status_t status;
983
984 if (request->ssltunnel == 1 &&
985 conn->state == SERF_CONN_SETUP_SSLTUNNEL) {
986 /* This is a CONNECT request to set up an SSL tunnel over a proxy.
987 This request is created by serf, so if the proxy requires
988 authentication, we can't ask the application for credentials with
989 this request.
990
991 Solution: setup the first request created by the application on
992 this connection, and use that request and its handler_baton to
993 call back to the application. */
994
995 authn_req = request->next;
996 /* assert: app_request != NULL */
997 if (!authn_req)
998 return APR_EGENERAL;
999
1000 if (!authn_req->req_bkt) {
1001 apr_status_t status;
1002
1003 status = setup_request(authn_req);
1004 /* If we can't setup a request, don't bother setting up the
1005 ssl tunnel. */
1006 if (status)
1007 return status;
1008 }
1009 }
1010
1011 /* Ask the application. */
1012 status = (*ctx->cred_cb)(username, password,
1013 authn_req, authn_req->handler_baton,
1014 code, authn_type, realm, pool);
1015 if (status)
1016 return status;
1017
1018 return APR_SUCCESS;
1019 }
1020
1021 /* read data from the connection */
read_from_connection(serf_connection_t * conn)1022 static apr_status_t read_from_connection(serf_connection_t *conn)
1023 {
1024 apr_status_t status;
1025 apr_pool_t *tmppool;
1026 int close_connection = FALSE;
1027
1028 /* Whatever is coming in on the socket corresponds to the first request
1029 * on our chain.
1030 */
1031 serf_request_t *request = conn->requests;
1032
1033 /* If the stop_writing flag was set on the connection, reset it now because
1034 there is some data to read. */
1035 if (conn->stop_writing) {
1036 conn->stop_writing = 0;
1037 conn->dirty_conn = 1;
1038 conn->ctx->dirty_pollset = 1;
1039 }
1040
1041 /* assert: request != NULL */
1042
1043 if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS)
1044 goto error;
1045
1046 /* Invoke response handlers until we have no more work. */
1047 while (1) {
1048 serf_bucket_t *dummy1, *dummy2;
1049
1050 apr_pool_clear(tmppool);
1051
1052 /* Only interested in the input stream here. */
1053 status = prepare_conn_streams(conn, &conn->stream, &dummy1, &dummy2);
1054 if (status) {
1055 goto error;
1056 }
1057
1058 /* We have a different codepath when we can have async responses. */
1059 if (conn->async_responses) {
1060 /* TODO What about socket errors? */
1061 status = handle_async_response(conn, tmppool);
1062 if (APR_STATUS_IS_EAGAIN(status)) {
1063 status = APR_SUCCESS;
1064 goto error;
1065 }
1066 if (status) {
1067 goto error;
1068 }
1069 continue;
1070 }
1071
1072 /* We are reading a response for a request we haven't
1073 * written yet!
1074 *
1075 * This shouldn't normally happen EXCEPT:
1076 *
1077 * 1) when the other end has closed the socket and we're
1078 * pending an EOF return.
1079 * 2) Doing the initial SSL handshake - we'll get EAGAIN
1080 * as the SSL buckets will hide the handshake from us
1081 * but not return any data.
1082 * 3) When the server sends us an SSL alert.
1083 *
1084 * In these cases, we should not receive any actual user data.
1085 *
1086 * 4) When the server sends a error response, like 408 Request timeout.
1087 * This response should be passed to the application.
1088 *
1089 * If we see an EOF (due to either an expired timeout or the server
1090 * sending the SSL 'close notify' shutdown alert), we'll reset the
1091 * connection and open a new one.
1092 */
1093 if (request->req_bkt || !request->writing_started) {
1094 const char *data;
1095 apr_size_t len;
1096
1097 status = serf_bucket_peek(conn->stream, &data, &len);
1098
1099 if (APR_STATUS_IS_EOF(status)) {
1100 reset_connection(conn, 1);
1101 status = APR_SUCCESS;
1102 goto error;
1103 }
1104 else if (APR_STATUS_IS_EAGAIN(status) && !len) {
1105 status = APR_SUCCESS;
1106 goto error;
1107 } else if (status && !APR_STATUS_IS_EAGAIN(status)) {
1108 /* Read error */
1109 goto error;
1110 }
1111
1112 /* Unexpected response from the server */
1113
1114 }
1115
1116 /* If the request doesn't have a response bucket, then call the
1117 * acceptor to get one created.
1118 */
1119 if (request->resp_bkt == NULL) {
1120 request->resp_bkt = (*request->acceptor)(request, conn->stream,
1121 request->acceptor_baton,
1122 tmppool);
1123 apr_pool_clear(tmppool);
1124 }
1125
1126 status = handle_response(request, tmppool);
1127
1128 /* Some systems will not generate a HUP poll event so we have to
1129 * handle the ECONNRESET issue and ECONNABORT here.
1130 */
1131 if (APR_STATUS_IS_ECONNRESET(status) ||
1132 APR_STATUS_IS_ECONNABORTED(status) ||
1133 status == SERF_ERROR_REQUEST_LOST) {
1134 /* If the connection had ever been good, be optimistic & try again.
1135 * If it has never tried again (incl. a retry), fail.
1136 */
1137 if (conn->completed_responses) {
1138 reset_connection(conn, 1);
1139 status = APR_SUCCESS;
1140 }
1141 else if (status == SERF_ERROR_REQUEST_LOST) {
1142 status = SERF_ERROR_ABORTED_CONNECTION;
1143 }
1144 goto error;
1145 }
1146
1147 /* If our response handler says it can't do anything more, we now
1148 * treat that as a success.
1149 */
1150 if (APR_STATUS_IS_EAGAIN(status)) {
1151 /* It is possible that while reading the response, the ssl layer
1152 has prepared some data to send. If this was the last request,
1153 serf will not check for socket writability, so force this here.
1154 */
1155 if (request_or_data_pending(&request, conn) && !request) {
1156 conn->dirty_conn = 1;
1157 conn->ctx->dirty_pollset = 1;
1158 }
1159 status = APR_SUCCESS;
1160 goto error;
1161 }
1162
1163 /* If we received APR_SUCCESS, run this loop again. */
1164 if (!status) {
1165 continue;
1166 }
1167
1168 close_connection = is_conn_closing(request->resp_bkt);
1169
1170 if (!APR_STATUS_IS_EOF(status) &&
1171 close_connection != SERF_ERROR_CLOSING) {
1172 /* Whether success, or an error, there is no more to do unless
1173 * this request has been completed.
1174 */
1175 goto error;
1176 }
1177
1178 /* The response has been fully-read, so that means the request has
1179 * either been fully-delivered (most likely), or that we don't need to
1180 * write the rest of it anymore, e.g. when a 408 Request timeout was
1181 $ received.
1182 * Remove it from our queue and loop to read another response.
1183 */
1184 conn->requests = request->next;
1185
1186 destroy_request(request);
1187
1188 request = conn->requests;
1189
1190 /* If we're truly empty, update our tail. */
1191 if (request == NULL) {
1192 conn->requests_tail = NULL;
1193 }
1194
1195 conn->completed_responses++;
1196
1197 /* We've to rebuild pollset since completed_responses is changed. */
1198 conn->dirty_conn = 1;
1199 conn->ctx->dirty_pollset = 1;
1200
1201 /* This means that we're being advised that the connection is done. */
1202 if (close_connection == SERF_ERROR_CLOSING) {
1203 reset_connection(conn, 1);
1204 if (APR_STATUS_IS_EOF(status))
1205 status = APR_SUCCESS;
1206 goto error;
1207 }
1208
1209 /* The server is suddenly deciding to serve more responses than we've
1210 * seen before.
1211 *
1212 * Let our requests go.
1213 */
1214 if (conn->probable_keepalive_limit &&
1215 conn->completed_responses > conn->probable_keepalive_limit) {
1216 conn->probable_keepalive_limit = 0;
1217 }
1218
1219 /* If we just ran out of requests or have unwritten requests, then
1220 * update the pollset. We don't want to read from this socket any
1221 * more. We are definitely done with this loop, too.
1222 */
1223 if (request == NULL || !request->writing_started) {
1224 conn->dirty_conn = 1;
1225 conn->ctx->dirty_pollset = 1;
1226 status = APR_SUCCESS;
1227 goto error;
1228 }
1229 }
1230
1231 error:
1232 apr_pool_destroy(tmppool);
1233 return status;
1234 }
1235
1236 /* process all events on the connection */
serf__process_connection(serf_connection_t * conn,apr_int16_t events)1237 apr_status_t serf__process_connection(serf_connection_t *conn,
1238 apr_int16_t events)
1239 {
1240 apr_status_t status;
1241
1242 /* POLLHUP/ERR should come after POLLIN so if there's an error message or
1243 * the like sitting on the connection, we give the app a chance to read
1244 * it before we trigger a reset condition.
1245 */
1246 if ((events & APR_POLLIN) != 0) {
1247 if ((status = read_from_connection(conn)) != APR_SUCCESS)
1248 return status;
1249
1250 /* If we decided to reset our connection, return now as we don't
1251 * want to write.
1252 */
1253 if ((conn->seen_in_pollset & APR_POLLHUP) != 0) {
1254 return APR_SUCCESS;
1255 }
1256 }
1257 if ((events & APR_POLLHUP) != 0) {
1258 /* The connection got reset by the server. On Windows this can happen
1259 when all data is read, so just cleanup the connection and open
1260 a new one.
1261 If we haven't had any successful responses on this connection,
1262 then error out as it is likely a server issue. */
1263 if (conn->completed_responses) {
1264 return reset_connection(conn, 1);
1265 }
1266 return SERF_ERROR_ABORTED_CONNECTION;
1267 }
1268 if ((events & APR_POLLERR) != 0) {
1269 /* We might be talking to a buggy HTTP server that doesn't
1270 * do lingering-close. (httpd < 2.1.8 does this.)
1271 *
1272 * See:
1273 *
1274 * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292
1275 */
1276 if (conn->completed_requests && !conn->probable_keepalive_limit) {
1277 return reset_connection(conn, 1);
1278 }
1279 #ifdef SO_ERROR
1280 /* If possible, get the error from the platform's socket layer and
1281 convert it to an APR status code. */
1282 {
1283 apr_os_sock_t osskt;
1284 if (!apr_os_sock_get(&osskt, conn->skt)) {
1285 int error;
1286 apr_socklen_t l = sizeof(error);
1287
1288 if (!getsockopt(osskt, SOL_SOCKET, SO_ERROR, (char*)&error,
1289 &l)) {
1290 status = APR_FROM_OS_ERROR(error);
1291
1292 /* Handle fallback for multi-homed servers.
1293
1294 ### Improve algorithm to find better than just 'next'?
1295
1296 Current Windows versions already handle re-ordering for
1297 api users by using statistics on the recently failed
1298 connections to order the list of addresses. */
1299 if (conn->completed_requests == 0
1300 && conn->address->next != NULL
1301 && (APR_STATUS_IS_ECONNREFUSED(status)
1302 || APR_STATUS_IS_TIMEUP(status)
1303 || APR_STATUS_IS_ENETUNREACH(status))) {
1304
1305 conn->address = conn->address->next;
1306 return reset_connection(conn, 1);
1307 }
1308
1309 return status;
1310 }
1311 }
1312 }
1313 #endif
1314 return APR_EGENERAL;
1315 }
1316 if ((events & APR_POLLOUT) != 0) {
1317 if ((status = write_to_connection(conn)) != APR_SUCCESS)
1318 return status;
1319 }
1320 return APR_SUCCESS;
1321 }
1322
serf_connection_create(serf_context_t * ctx,apr_sockaddr_t * address,serf_connection_setup_t setup,void * setup_baton,serf_connection_closed_t closed,void * closed_baton,apr_pool_t * pool)1323 serf_connection_t *serf_connection_create(
1324 serf_context_t *ctx,
1325 apr_sockaddr_t *address,
1326 serf_connection_setup_t setup,
1327 void *setup_baton,
1328 serf_connection_closed_t closed,
1329 void *closed_baton,
1330 apr_pool_t *pool)
1331 {
1332 serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn));
1333
1334 conn->ctx = ctx;
1335 conn->status = APR_SUCCESS;
1336 /* Ignore server address if proxy was specified. */
1337 conn->address = ctx->proxy_address ? ctx->proxy_address : address;
1338 conn->setup = setup;
1339 conn->setup_baton = setup_baton;
1340 conn->closed = closed;
1341 conn->closed_baton = closed_baton;
1342 conn->pool = pool;
1343 conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
1344 conn->stream = NULL;
1345 conn->ostream_head = NULL;
1346 conn->ostream_tail = NULL;
1347 conn->baton.type = SERF_IO_CONN;
1348 conn->baton.u.conn = conn;
1349 conn->hit_eof = 0;
1350 conn->state = SERF_CONN_INIT;
1351 conn->latency = -1; /* unknown */
1352
1353 /* Create a subpool for our connection. */
1354 apr_pool_create(&conn->skt_pool, conn->pool);
1355
1356 /* register a cleanup */
1357 apr_pool_cleanup_register(conn->pool, conn, clean_conn,
1358 apr_pool_cleanup_null);
1359
1360 /* Add the connection to the context. */
1361 *(serf_connection_t **)apr_array_push(ctx->conns) = conn;
1362
1363 serf__log(CONN_VERBOSE, __FILE__, "created connection 0x%x\n",
1364 conn);
1365
1366 return conn;
1367 }
1368
serf_connection_create2(serf_connection_t ** conn,serf_context_t * ctx,apr_uri_t host_info,serf_connection_setup_t setup,void * setup_baton,serf_connection_closed_t closed,void * closed_baton,apr_pool_t * pool)1369 apr_status_t serf_connection_create2(
1370 serf_connection_t **conn,
1371 serf_context_t *ctx,
1372 apr_uri_t host_info,
1373 serf_connection_setup_t setup,
1374 void *setup_baton,
1375 serf_connection_closed_t closed,
1376 void *closed_baton,
1377 apr_pool_t *pool)
1378 {
1379 apr_status_t status = APR_SUCCESS;
1380 serf_connection_t *c;
1381 apr_sockaddr_t *host_address = NULL;
1382
1383 /* Set the port number explicitly, needed to create the socket later. */
1384 if (!host_info.port) {
1385 host_info.port = apr_uri_port_of_scheme(host_info.scheme);
1386 }
1387
1388 /* Only lookup the address of the server if no proxy server was
1389 configured. */
1390 if (!ctx->proxy_address) {
1391 status = apr_sockaddr_info_get(&host_address,
1392 host_info.hostname,
1393 APR_UNSPEC, host_info.port, 0, pool);
1394 if (status)
1395 return status;
1396 }
1397
1398 c = serf_connection_create(ctx, host_address, setup, setup_baton,
1399 closed, closed_baton, pool);
1400
1401 /* We're not interested in the path following the hostname. */
1402 c->host_url = apr_uri_unparse(c->pool,
1403 &host_info,
1404 APR_URI_UNP_OMITPATHINFO |
1405 APR_URI_UNP_OMITUSERINFO);
1406
1407 /* Store the host info without the path on the connection. */
1408 (void)apr_uri_parse(c->pool, c->host_url, &(c->host_info));
1409 if (!c->host_info.port) {
1410 c->host_info.port = apr_uri_port_of_scheme(c->host_info.scheme);
1411 }
1412
1413 *conn = c;
1414
1415 return status;
1416 }
1417
serf_connection_reset(serf_connection_t * conn)1418 apr_status_t serf_connection_reset(
1419 serf_connection_t *conn)
1420 {
1421 return reset_connection(conn, 0);
1422 }
1423
1424
serf_connection_close(serf_connection_t * conn)1425 apr_status_t serf_connection_close(
1426 serf_connection_t *conn)
1427 {
1428 int i;
1429 serf_context_t *ctx = conn->ctx;
1430 apr_status_t status;
1431
1432 for (i = ctx->conns->nelts; i--; ) {
1433 serf_connection_t *conn_seq = GET_CONN(ctx, i);
1434
1435 if (conn_seq == conn) {
1436 while (conn->requests) {
1437 serf_request_cancel(conn->requests);
1438 }
1439 if (conn->skt != NULL) {
1440 remove_connection(ctx, conn);
1441 status = apr_socket_close(conn->skt);
1442 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
1443 "closed socket, status %d\n",
1444 status);
1445 if (conn->closed != NULL) {
1446 handle_conn_closed(conn, status);
1447 }
1448 conn->skt = NULL;
1449 }
1450 if (conn->stream != NULL) {
1451 serf_bucket_destroy(conn->stream);
1452 conn->stream = NULL;
1453 }
1454
1455 destroy_ostream(conn);
1456
1457 /* Remove the connection from the context. We don't want to
1458 * deal with it any more.
1459 */
1460 if (i < ctx->conns->nelts - 1) {
1461 /* move later connections over this one. */
1462 memmove(
1463 &GET_CONN(ctx, i),
1464 &GET_CONN(ctx, i + 1),
1465 (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *));
1466 }
1467 --ctx->conns->nelts;
1468
1469 serf__log(CONN_VERBOSE, __FILE__, "closed connection 0x%x\n",
1470 conn);
1471
1472 /* Found the connection. Closed it. All done. */
1473 return APR_SUCCESS;
1474 }
1475 }
1476
1477 /* We didn't find the specified connection. */
1478 /* ### doc talks about this w.r.t poll structures. use something else? */
1479 return APR_NOTFOUND;
1480 }
1481
1482
serf_connection_set_max_outstanding_requests(serf_connection_t * conn,unsigned int max_requests)1483 void serf_connection_set_max_outstanding_requests(
1484 serf_connection_t *conn,
1485 unsigned int max_requests)
1486 {
1487 if (max_requests == 0)
1488 serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1489 "Set max. nr. of outstanding requests for this "
1490 "connection to unlimited.\n");
1491 else
1492 serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1493 "Limit max. nr. of outstanding requests for this "
1494 "connection to %u.\n", max_requests);
1495
1496 conn->max_outstanding_requests = max_requests;
1497 }
1498
1499
serf_connection_set_async_responses(serf_connection_t * conn,serf_response_acceptor_t acceptor,void * acceptor_baton,serf_response_handler_t handler,void * handler_baton)1500 void serf_connection_set_async_responses(
1501 serf_connection_t *conn,
1502 serf_response_acceptor_t acceptor,
1503 void *acceptor_baton,
1504 serf_response_handler_t handler,
1505 void *handler_baton)
1506 {
1507 conn->async_responses = 1;
1508 conn->async_acceptor = acceptor;
1509 conn->async_acceptor_baton = acceptor_baton;
1510 conn->async_handler = handler;
1511 conn->async_handler_baton = handler_baton;
1512 }
1513
1514 static serf_request_t *
create_request(serf_connection_t * conn,serf_request_setup_t setup,void * setup_baton,int priority,int ssltunnel)1515 create_request(serf_connection_t *conn,
1516 serf_request_setup_t setup,
1517 void *setup_baton,
1518 int priority,
1519 int ssltunnel)
1520 {
1521 serf_request_t *request;
1522
1523 request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request));
1524 request->conn = conn;
1525 request->setup = setup;
1526 request->setup_baton = setup_baton;
1527 request->handler = NULL;
1528 request->respool = NULL;
1529 request->req_bkt = NULL;
1530 request->resp_bkt = NULL;
1531 request->priority = priority;
1532 request->writing_started = 0;
1533 request->ssltunnel = ssltunnel;
1534 request->next = NULL;
1535 request->auth_baton = NULL;
1536
1537 return request;
1538 }
1539
serf_connection_request_create(serf_connection_t * conn,serf_request_setup_t setup,void * setup_baton)1540 serf_request_t *serf_connection_request_create(
1541 serf_connection_t *conn,
1542 serf_request_setup_t setup,
1543 void *setup_baton)
1544 {
1545 serf_request_t *request;
1546
1547 request = create_request(conn, setup, setup_baton,
1548 0, /* priority */
1549 0 /* ssl tunnel */);
1550
1551 /* Link the request to the end of the request chain. */
1552 link_requests(&conn->requests, &conn->requests_tail, request);
1553
1554 /* Ensure our pollset becomes writable in context run */
1555 conn->ctx->dirty_pollset = 1;
1556 conn->dirty_conn = 1;
1557
1558 return request;
1559 }
1560
1561 static serf_request_t *
priority_request_create(serf_connection_t * conn,int ssltunnelreq,serf_request_setup_t setup,void * setup_baton)1562 priority_request_create(serf_connection_t *conn,
1563 int ssltunnelreq,
1564 serf_request_setup_t setup,
1565 void *setup_baton)
1566 {
1567 serf_request_t *request;
1568 serf_request_t *iter, *prev;
1569
1570 request = create_request(conn, setup, setup_baton,
1571 1, /* priority */
1572 ssltunnelreq);
1573
1574 /* Link the new request after the last written request. */
1575 iter = conn->requests;
1576 prev = NULL;
1577
1578 /* Find a request that has data which needs to be delivered. */
1579 while (iter != NULL && iter->req_bkt == NULL && iter->writing_started) {
1580 prev = iter;
1581 iter = iter->next;
1582 }
1583
1584 /* A CONNECT request to setup an ssltunnel has absolute priority over all
1585 other requests on the connection, so:
1586 a. add it first to the queue
1587 b. ensure that other priority requests are added after the CONNECT
1588 request */
1589 if (!request->ssltunnel) {
1590 /* Advance to next non priority request */
1591 while (iter != NULL && iter->priority) {
1592 prev = iter;
1593 iter = iter->next;
1594 }
1595 }
1596
1597 if (prev) {
1598 request->next = iter;
1599 prev->next = request;
1600 } else {
1601 request->next = iter;
1602 conn->requests = request;
1603 }
1604
1605 /* Ensure our pollset becomes writable in context run */
1606 conn->ctx->dirty_pollset = 1;
1607 conn->dirty_conn = 1;
1608
1609 return request;
1610 }
1611
serf_connection_priority_request_create(serf_connection_t * conn,serf_request_setup_t setup,void * setup_baton)1612 serf_request_t *serf_connection_priority_request_create(
1613 serf_connection_t *conn,
1614 serf_request_setup_t setup,
1615 void *setup_baton)
1616 {
1617 return priority_request_create(conn,
1618 0, /* not a ssltunnel CONNECT request */
1619 setup, setup_baton);
1620 }
1621
serf__ssltunnel_request_create(serf_connection_t * conn,serf_request_setup_t setup,void * setup_baton)1622 serf_request_t *serf__ssltunnel_request_create(serf_connection_t *conn,
1623 serf_request_setup_t setup,
1624 void *setup_baton)
1625 {
1626 return priority_request_create(conn,
1627 1, /* This is a ssltunnel CONNECT request */
1628 setup, setup_baton);
1629 }
1630
serf_request_cancel(serf_request_t * request)1631 apr_status_t serf_request_cancel(serf_request_t *request)
1632 {
1633 return cancel_request(request, &request->conn->requests, 0);
1634 }
1635
serf_request_is_written(serf_request_t * request)1636 apr_status_t serf_request_is_written(serf_request_t *request)
1637 {
1638 if (request->writing_started && !request->req_bkt)
1639 return APR_SUCCESS;
1640
1641 return APR_EBUSY;
1642 }
1643
serf_request_get_pool(const serf_request_t * request)1644 apr_pool_t *serf_request_get_pool(const serf_request_t *request)
1645 {
1646 return request->respool;
1647 }
1648
1649
serf_request_get_alloc(const serf_request_t * request)1650 serf_bucket_alloc_t *serf_request_get_alloc(
1651 const serf_request_t *request)
1652 {
1653 return request->allocator;
1654 }
1655
1656
serf_request_get_conn(const serf_request_t * request)1657 serf_connection_t *serf_request_get_conn(
1658 const serf_request_t *request)
1659 {
1660 return request->conn;
1661 }
1662
1663
serf_request_set_handler(serf_request_t * request,const serf_response_handler_t handler,const void ** handler_baton)1664 void serf_request_set_handler(
1665 serf_request_t *request,
1666 const serf_response_handler_t handler,
1667 const void **handler_baton)
1668 {
1669 request->handler = handler;
1670 request->handler_baton = handler_baton;
1671 }
1672
1673
serf_request_bucket_request_create(serf_request_t * request,const char * method,const char * uri,serf_bucket_t * body,serf_bucket_alloc_t * allocator)1674 serf_bucket_t *serf_request_bucket_request_create(
1675 serf_request_t *request,
1676 const char *method,
1677 const char *uri,
1678 serf_bucket_t *body,
1679 serf_bucket_alloc_t *allocator)
1680 {
1681 serf_bucket_t *req_bkt, *hdrs_bkt;
1682 serf_connection_t *conn = request->conn;
1683 serf_context_t *ctx = conn->ctx;
1684 int ssltunnel;
1685
1686 ssltunnel = ctx->proxy_address &&
1687 (strcmp(conn->host_info.scheme, "https") == 0);
1688
1689 req_bkt = serf_bucket_request_create(method, uri, body, allocator);
1690 hdrs_bkt = serf_bucket_request_get_headers(req_bkt);
1691
1692 /* Use absolute uri's in requests to a proxy. USe relative uri's in
1693 requests directly to a server or sent through an SSL tunnel. */
1694 if (ctx->proxy_address && conn->host_url &&
1695 !(ssltunnel && !request->ssltunnel)) {
1696
1697 serf_bucket_request_set_root(req_bkt, conn->host_url);
1698 }
1699
1700 if (conn->host_info.hostinfo)
1701 serf_bucket_headers_setn(hdrs_bkt, "Host",
1702 conn->host_info.hostinfo);
1703
1704 /* Setup server authorization headers, unless this is a CONNECT request. */
1705 if (!request->ssltunnel) {
1706 serf__authn_info_t *authn_info;
1707 authn_info = serf__get_authn_info_for_server(conn);
1708 if (authn_info->scheme)
1709 authn_info->scheme->setup_request_func(HOST, 0, conn, request,
1710 method, uri,
1711 hdrs_bkt);
1712 }
1713
1714 /* Setup proxy authorization headers.
1715 Don't set these headers on the requests to the server if we're using
1716 an SSL tunnel, only on the CONNECT request to setup the tunnel. */
1717 if (ctx->proxy_authn_info.scheme) {
1718 if (strcmp(conn->host_info.scheme, "https") == 0) {
1719 if (request->ssltunnel)
1720 ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1721 request,
1722 method, uri,
1723 hdrs_bkt);
1724 } else {
1725 ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1726 request,
1727 method, uri,
1728 hdrs_bkt);
1729 }
1730 }
1731
1732 return req_bkt;
1733 }
1734
serf_connection_get_latency(serf_connection_t * conn)1735 apr_interval_time_t serf_connection_get_latency(serf_connection_t *conn)
1736 {
1737 if (conn->ctx->proxy_address) {
1738 /* Detecting network latency for proxied connection is not implemented
1739 yet. */
1740 return -1;
1741 }
1742
1743 return conn->latency;
1744 }
1745