1 /*
2 * stream_bucket.c : a serf bucket that wraps a readable svn_stream_t
3 *
4 * ====================================================================
5 * Licensed to the Apache Software Foundation (ASF) under one
6 * or more contributor license agreements. See the NOTICE file
7 * distributed with this work for additional information
8 * regarding copyright ownership. The ASF licenses this file
9 * to you under the Apache License, Version 2.0 (the
10 * "License"); you may not use this file except in compliance
11 * with the License. You may obtain a copy of the License at
12 *
13 * http://www.apache.org/licenses/LICENSE-2.0
14 *
15 * Unless required by applicable law or agreed to in writing,
16 * software distributed under the License is distributed on an
17 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18 * KIND, either express or implied. See the License for the
19 * specific language governing permissions and limitations
20 * under the License.
21 * ====================================================================
22 */
23
24 #include <serf.h>
25 #include <serf_bucket_util.h>
26
27 #include "ra_serf.h"
28
29 typedef struct stream_bucket_ctx_t
30 {
31 svn_stream_t *stream;
32 svn_ra_serf__stream_bucket_errfunc_t errfunc;
33 void *errfunc_baton;
34 serf_databuf_t databuf;
35 } stream_bucket_ctx_t;
36
37 static apr_status_t
stream_reader(void * baton,apr_size_t bufsize,char * buf,apr_size_t * len)38 stream_reader(void *baton, apr_size_t bufsize, char *buf, apr_size_t *len)
39 {
40 stream_bucket_ctx_t *ctx = baton;
41 svn_error_t *err;
42
43 *len = bufsize;
44
45 err = svn_stream_read_full(ctx->stream, buf, len);
46 if (err)
47 {
48 if (ctx->errfunc)
49 ctx->errfunc(ctx->errfunc_baton, err);
50 svn_error_clear(err);
51
52 return SVN_ERR_RA_SERF_STREAM_BUCKET_READ_ERROR;
53 }
54
55 if (*len == bufsize)
56 {
57 return APR_SUCCESS;
58 }
59 else
60 {
61 svn_error_clear(svn_stream_close(ctx->stream));
62 return APR_EOF;
63 }
64 }
65
66 static apr_status_t
stream_bucket_read(serf_bucket_t * bucket,apr_size_t requested,const char ** data,apr_size_t * len)67 stream_bucket_read(serf_bucket_t *bucket, apr_size_t requested,
68 const char **data, apr_size_t *len)
69 {
70 stream_bucket_ctx_t *ctx = bucket->data;
71
72 return serf_databuf_read(&ctx->databuf, requested, data, len);
73 }
74
75 static apr_status_t
stream_bucket_readline(serf_bucket_t * bucket,int acceptable,int * found,const char ** data,apr_size_t * len)76 stream_bucket_readline(serf_bucket_t *bucket, int acceptable,
77 int *found, const char **data, apr_size_t *len)
78 {
79 stream_bucket_ctx_t *ctx = bucket->data;
80
81 return serf_databuf_readline(&ctx->databuf, acceptable, found, data, len);
82 }
83
84 static apr_status_t
stream_bucket_peek(serf_bucket_t * bucket,const char ** data,apr_size_t * len)85 stream_bucket_peek(serf_bucket_t *bucket, const char **data, apr_size_t *len)
86 {
87 stream_bucket_ctx_t *ctx = bucket->data;
88
89 return serf_databuf_peek(&ctx->databuf, data, len);
90 }
91
92 static const serf_bucket_type_t stream_bucket_vtable = {
93 "SVNSTREAM",
94 stream_bucket_read,
95 stream_bucket_readline,
96 serf_default_read_iovec,
97 serf_default_read_for_sendfile,
98 serf_default_read_bucket,
99 stream_bucket_peek,
100 serf_default_destroy_and_data
101 };
102
103 serf_bucket_t *
svn_ra_serf__create_stream_bucket(svn_stream_t * stream,serf_bucket_alloc_t * allocator,svn_ra_serf__stream_bucket_errfunc_t errfunc,void * errfunc_baton)104 svn_ra_serf__create_stream_bucket(svn_stream_t *stream,
105 serf_bucket_alloc_t *allocator,
106 svn_ra_serf__stream_bucket_errfunc_t errfunc,
107 void *errfunc_baton)
108 {
109 stream_bucket_ctx_t *ctx;
110
111 ctx = serf_bucket_mem_calloc(allocator, sizeof(*ctx));
112 ctx->stream = stream;
113 ctx->errfunc = errfunc;
114 ctx->errfunc_baton = errfunc_baton;
115 serf_databuf_init(&ctx->databuf);
116 ctx->databuf.read = stream_reader;
117 ctx->databuf.read_baton = ctx;
118
119 return serf_bucket_create(&stream_bucket_vtable, allocator, ctx);
120 }
121