1 /*        $NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $   */
2 
3 /*-
4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 /*
30  * a file system server which stores the data in a PostgreSQL database.
31  */
32 
33 /*
34  * we use large objects to store file contents.  there are a few XXXs wrt it.
35  *
36  * - large objects don't obey the normal transaction semantics.
37  *
38  * - we use large object server-side functions directly (instead of via the
39  *   libpq large object api) because:
40  *        - we want to use asynchronous (in the sense of PQsendFoo) operations
41  *          which is not available with the libpq large object api.
42  *        - with the libpq large object api, there's no way to know details of
43  *          an error because PGresult is freed in the library without saving
44  *          PG_DIAG_SQLSTATE etc.
45  */
46 
47 #include <sys/cdefs.h>
48 #ifndef lint
49 __RCSID("$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $");
50 #endif /* not lint */
51 
52 #include <assert.h>
53 #include <err.h>
54 #include <errno.h>
55 #include <puffs.h>
56 #include <inttypes.h>
57 #include <stdarg.h>
58 #include <stdbool.h>
59 #include <stdio.h>
60 #include <stdlib.h>
61 #include <time.h>
62 #include <util.h>
63 
64 #include <libpq-fe.h>
65 #include <libpq/libpq-fs.h>   /* INV_* */
66 
67 #include "pgfs.h"
68 #include "pgfs_db.h"
69 #include "pgfs_debug.h"
70 #include "pgfs_waitq.h"
71 #include "pgfs_subs.h"
72 
73 const char * const vtype_table[] = {
74           [VREG] = "regular",
75           [VDIR] = "directory",
76           [VLNK] = "link",
77 };
78 
79 static unsigned int
tovtype(const char * type)80 tovtype(const char *type)
81 {
82           unsigned int i;
83 
84           for (i = 0; i < __arraycount(vtype_table); i++) {
85                     if (vtype_table[i] == NULL) {
86                               continue;
87                     }
88                     if (!strcmp(type, vtype_table[i])) {
89                               return i;
90                     }
91           }
92           assert(0);
93           return 0;
94 }
95 
96 static const char *
fromvtype(enum vtype vtype)97 fromvtype(enum vtype vtype)
98 {
99 
100           if (vtype < __arraycount(vtype_table)) {
101                     assert(vtype_table[vtype] != NULL);
102                     return vtype_table[vtype];
103           }
104           return NULL;
105 }
106 
107 /*
108  * fileid_lock stuff below is to keep ordering of operations for a file.
109  * it is a workaround for the lack of operation barriers in the puffs
110  * protocol.
111  *
112  * currently we do this locking only for SETATTR, GETATTR, and WRITE as
113  * they are known to be reorder-unsafe.  they are sensitive to the file
114  * attributes, mainly the file size.  note that as the kernel issues async
115  * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
116  * the stale attributes.
117  *
118  * we are relying on waiton/wakeup being a FIFO.
119  */
120 
121 struct fileid_lock_handle {
122           TAILQ_ENTRY(fileid_lock_handle) list;
123           fileid_t fileid;
124           struct puffs_cc *owner;       /* diagnostic only */
125           struct waitq waitq;
126 };
127 
128 TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
129     TAILQ_HEAD_INITIALIZER(fileid_lock_list);
130 struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);
131 
132 /*
133  * fileid_lock: serialize requests for the fileid.
134  *
135  * this function should be the first yieldable point in a puffs callback.
136  */
137 
138 struct fileid_lock_handle *
fileid_lock(fileid_t fileid,struct puffs_cc * cc)139 fileid_lock(fileid_t fileid, struct puffs_cc *cc)
140 {
141           struct fileid_lock_handle *lock;
142 
143           TAILQ_FOREACH(lock, &fileid_lock_list, list) {
144                     if (lock->fileid == fileid) {
145                               DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
146                               assert(lock->owner != cc);
147                               waiton(&lock->waitq, cc);     /* enter FIFO */
148                               assert(lock->owner == cc);
149                               return lock;
150                     }
151           }
152           lock = emalloc(sizeof(*lock));
153           lock->fileid = fileid;
154           lock->owner = cc;
155           DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
156           waitq_init(&lock->waitq);
157           TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
158           return lock;
159 }
160 
161 void
fileid_unlock(struct fileid_lock_handle * lock)162 fileid_unlock(struct fileid_lock_handle *lock)
163 {
164 
165           DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
166           assert(lock != NULL);
167           assert(lock->owner != NULL);
168           /*
169            * perform direct-handoff to the first waiter.
170            *
171            * a handoff is essential to keep the order of requests.
172            */
173           lock->owner = wakeup_one(&lock->waitq);
174           if (lock->owner != NULL) {
175                     return;
176           }
177           /*
178            * no one is waiting this fileid.
179            */
180           TAILQ_REMOVE(&fileid_lock_list, lock, list);
181           free(lock);
182 }
183 
184 /*
185  * timespec_to_pgtimestamp: create a text representation of timestamp which
186  * can be recognized by the database server.
187  *
188  * it's caller's responsibility to free(3) the result.
189  */
190 
191 int
timespec_to_pgtimestamp(const struct timespec * tv,char ** resultp)192 timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
193 {
194           /*
195            * XXX is there any smarter way?
196            */
197           char buf1[1024];
198           char buf2[1024];
199           struct tm tm_store;
200           struct tm *tm;
201 
202           tm = gmtime_r(&tv->tv_sec, &tm_store);
203           if (tm == NULL) {
204                     assert(errno != 0);
205                     return errno;
206           }
207           strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
208           snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
209               (uintmax_t)tv->tv_nsec / 1000);
210           *resultp = estrdup(buf2);
211           return 0;
212 }
213 
214 int
my_lo_truncate(struct Xconn * xc,int32_t fd,int32_t size)215 my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
216 {
217           static struct cmd *c;
218           int32_t ret;
219           int error;
220 
221           CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
222           error = sendcmd(xc, c, fd, size);
223           if (error != 0) {
224                     return error;
225           }
226           error = simplefetch(xc, INT4OID, &ret);
227           if (error != 0) {
228                     if (error == EEXIST) {
229                               /*
230                                * probably the insertion of the new-sized page
231                                * caused a duplicated key error.  retry.
232                                */
233                               DPRINTF("map EEXIST to EAGAIN\n");
234                               error = EAGAIN;
235                     }
236                     return error;
237           }
238           assert(ret == 0);
239           return 0;
240 }
241 
242 int
my_lo_lseek(struct Xconn * xc,int32_t fd,int32_t offset,int32_t whence,int32_t * retp)243 my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
244     int32_t *retp)
245 {
246           static struct cmd *c;
247           int32_t ret;
248           int error;
249 
250           CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
251           error = sendcmd(xc, c, fd, offset, whence);
252           if (error != 0) {
253                     return error;
254           }
255           error = simplefetch(xc, INT4OID, &ret);
256           if (error != 0) {
257                     return error;
258           }
259           if (retp != NULL) {
260                     *retp = ret;
261           }
262           return 0;
263 }
264 
265 int
my_lo_read(struct Xconn * xc,int32_t fd,void * buf,size_t size,size_t * resultsizep)266 my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
267     size_t *resultsizep)
268 {
269           static struct cmd *c;
270           size_t resultsize;
271           int error;
272 
273           CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
274           error = sendcmdx(xc, 1, c, fd, (int32_t)size);
275           if (error != 0) {
276                     return error;
277           }
278           error = simplefetch(xc, BYTEA, buf, &resultsize);
279           if (error != 0) {
280                     return error;
281           }
282           *resultsizep = resultsize;
283           if (size != resultsize) {
284                     DPRINTF("shortread? %zu != %zu\n", size, resultsize);
285           }
286           return 0;
287 }
288 
289 int
my_lo_write(struct Xconn * xc,int32_t fd,const void * buf,size_t size,size_t * resultsizep)290 my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
291     size_t *resultsizep)
292 {
293           static struct cmd *c;
294           int32_t resultsize;
295           int error;
296 
297           CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
298           error = sendcmd(xc, c, fd, buf, (int32_t)size);
299           if (error != 0) {
300                     return error;
301           }
302           error = simplefetch(xc, INT4OID, &resultsize);
303           if (error != 0) {
304                     if (error == EEXIST) {
305                               /*
306                                * probably the insertion of the new data page
307                                * caused a duplicated key error.  retry.
308                                */
309                               DPRINTF("map EEXIST to EAGAIN\n");
310                               error = EAGAIN;
311                     }
312                     return error;
313           }
314           *resultsizep = resultsize;
315           if (size != (size_t)resultsize) {
316                     DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
317           }
318           return 0;
319 }
320 
321 int
my_lo_open(struct Xconn * xc,Oid loid,int32_t mode,int32_t * fdp)322 my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
323 {
324           static struct cmd *c;
325           int error;
326 
327           CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
328           error = sendcmd(xc, c, loid, mode);
329           if (error != 0) {
330                     return error;
331           }
332           return simplefetch(xc, INT4OID, fdp);
333 }
334 
335 int
my_lo_close(struct Xconn * xc,int32_t fd)336 my_lo_close(struct Xconn *xc, int32_t fd)
337 {
338 #if 1
339           /*
340            * do nothing.
341            *
342            * LO handles are automatically closed at the end of transactions.
343            * our transactions are small enough.
344            */
345 #else
346           static struct cmd *c;
347           int32_t ret;
348           int error;
349 
350           CREATECMD(c, "SELECT lo_close($1)", INT4OID);
351           error = sendcmd(xc, c, fd);
352           if (error != 0) {
353                     return error;
354           }
355           error = simplefetch(xc, INT4OID, &ret);
356           if (error != 0) {
357                     return error;
358           }
359           assert(ret == 0);
360 #endif
361           return 0;
362 }
363 
364 static int
lo_lookup_by_fileid(struct Xconn * xc,fileid_t fileid,Oid * idp)365 lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
366 {
367           static struct cmd *c;
368           static const Oid types[] = { OIDOID, };
369           struct fetchstatus s;
370           int error;
371 
372           CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
373           error = sendcmd(xc, c, fileid);
374           if (error != 0) {
375                     return error;
376           }
377           fetchinit(&s, xc);
378           error = FETCHNEXT(&s, types, idp);
379           fetchdone(&s);
380           DPRINTF("error %d\n", error);
381           return error;
382 }
383 
384 int
lo_open_by_fileid(struct Xconn * xc,fileid_t fileid,int mode,int * fdp)385 lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
386 {
387           Oid loid;
388           int fd;
389           int error;
390 
391           error = lo_lookup_by_fileid(xc, fileid, &loid);
392           if (error != 0) {
393                     return error;
394           }
395           error = my_lo_open(xc, loid, mode, &fd);
396           if (error != 0) {
397                     return error;
398           }
399           *fdp = fd;
400           return 0;
401 }
402 
403 static int
getsize(struct Xconn * xc,fileid_t fileid,int * resultp)404 getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
405 {
406           int32_t size;
407           int fd;
408           int error;
409 
410           error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
411           if (error != 0) {
412                     return error;
413           }
414           error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
415           if (error != 0) {
416                     return error;
417           }
418           error = my_lo_close(xc, fd);
419           if (error != 0) {
420                     return error;
421           }
422           *resultp = size;
423           return 0;
424 }
425 
426 #define   GETATTR_TYPE        0x00000001
427 #define   GETATTR_NLINK       0x00000002
428 #define   GETATTR_SIZE        0x00000004
429 #define   GETATTR_MODE        0x00000008
430 #define   GETATTR_UID         0x00000010
431 #define   GETATTR_GID         0x00000020
432 #define   GETATTR_TIME        0x00000040
433 #define   GETATTR_ALL         \
434           (GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
435           GETATTR_UID|GETATTR_GID|GETATTR_TIME)
436 
437 int
getattr(struct Xconn * xc,fileid_t fileid,struct vattr * va,unsigned int mask)438 getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
439 {
440           char *type;
441           long long atime_s;
442           long long atime_us;
443           long long ctime_s;
444           long long ctime_us;
445           long long mtime_s;
446           long long mtime_us;
447           long long btime_s;
448           long long btime_us;
449           uint64_t mode;
450           long long uid;
451           long long gid;
452           long long nlink;
453           long long rev;
454           struct fetchstatus s;
455           int error;
456 
457           if (mask == 0) {
458                     return 0;
459           }
460           /*
461            * unless explicitly requested, avoid fetching timestamps as they
462            * are a little more expensive than other simple attributes.
463            */
464           if ((mask & GETATTR_TIME) != 0) {
465                     static struct cmd *c;
466                     static const Oid types[] = {
467                               TEXTOID,
468                               INT8OID,
469                               INT8OID,
470                               INT8OID,
471                               INT8OID,
472                               INT8OID,
473                               INT8OID,
474                               INT8OID,
475                               INT8OID,
476                               INT8OID,
477                               INT8OID,
478                               INT8OID,
479                               INT8OID,
480                               INT8OID,
481                     };
482 
483                     CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
484                         "extract(epoch from date_trunc('second', atime))::int8, "
485                         "extract(microseconds from atime)::int8, "
486                         "extract(epoch from date_trunc('second', ctime))::int8, "
487                         "extract(microseconds from ctime)::int8, "
488                         "extract(epoch from date_trunc('second', mtime))::int8, "
489                         "extract(microseconds from mtime)::int8, "
490                         "extract(epoch from date_trunc('second', btime))::int8, "
491                         "extract(microseconds from btime)::int8 "
492                         "FROM file "
493                         "WHERE fileid = $1", INT8OID);
494                     error = sendcmd(xc, c, fileid);
495                     if (error != 0) {
496                               return error;
497                     }
498                     fetchinit(&s, xc);
499                     error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
500                         &rev,
501                         &atime_s, &atime_us,
502                         &ctime_s, &ctime_us,
503                         &mtime_s, &mtime_us,
504                         &btime_s, &btime_us);
505           } else {
506                     static struct cmd *c;
507                     static const Oid types[] = {
508                               TEXTOID,
509                               INT8OID,
510                               INT8OID,
511                               INT8OID,
512                               INT8OID,
513                               INT8OID,
514                     };
515 
516                     CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
517                         "FROM file "
518                         "WHERE fileid = $1", INT8OID);
519                     error = sendcmd(xc, c, fileid);
520                     if (error != 0) {
521                               return error;
522                     }
523                     fetchinit(&s, xc);
524                     error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
525                         &rev);
526           }
527           fetchdone(&s);
528           if (error != 0) {
529                     return error;
530           }
531           memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
532           va->va_type = tovtype(type);
533           free(type);
534           va->va_mode = mode;
535           va->va_uid = uid;
536           va->va_gid = gid;
537           if (nlink > 0 && va->va_type == VDIR) {
538                     nlink++; /* "." */
539           }
540           va->va_nlink = nlink;
541           va->va_fileid = fileid;
542           va->va_atime.tv_sec = atime_s;
543           va->va_atime.tv_nsec = atime_us * 1000;
544           va->va_ctime.tv_sec = ctime_s;
545           va->va_ctime.tv_nsec = ctime_us * 1000;
546           va->va_mtime.tv_sec = mtime_s;
547           va->va_mtime.tv_nsec = mtime_us * 1000;
548           va->va_birthtime.tv_sec = btime_s;
549           va->va_birthtime.tv_nsec = btime_us * 1000;
550           va->va_blocksize = LOBLKSIZE;
551           va->va_gen = 1;
552           va->va_filerev = rev;
553           if ((mask & GETATTR_SIZE) != 0) {
554                     int size;
555 
556                     size = 0;
557                     if (va->va_type == VREG || va->va_type == VLNK) {
558                               error = getsize(xc, fileid, &size);
559                               if (error != 0) {
560                                         return error;
561                               }
562                     } else if (va->va_type == VDIR) {
563                               size = 100; /* XXX */
564                     }
565                     va->va_size = size;
566           }
567           /*
568            * XXX va_bytes: likely wrong due to toast compression.
569            * there's no cheap way to get the compressed size of LO.
570            */
571           va->va_bytes = va->va_size;
572           va->va_flags = 0;
573           return 0;
574 }
575 
576 int
update_mctime(struct Xconn * xc,fileid_t fileid)577 update_mctime(struct Xconn *xc, fileid_t fileid)
578 {
579           static struct cmd *c;
580 
581           CREATECMD(c,
582               "UPDATE file "
583               "SET mtime = current_timestamp, ctime = current_timestamp, "
584                     "rev = rev + 1 "
585               "WHERE fileid = $1", INT8OID);
586           return simplecmd(xc, c, fileid);
587 }
588 
589 int
update_atime(struct Xconn * xc,fileid_t fileid)590 update_atime(struct Xconn *xc, fileid_t fileid)
591 {
592           static struct cmd *c;
593 
594           CREATECMD(c,
595               "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
596               INT8OID);
597           return simplecmd(xc, c, fileid);
598 }
599 
600 int
update_mtime(struct Xconn * xc,fileid_t fileid)601 update_mtime(struct Xconn *xc, fileid_t fileid)
602 {
603           static struct cmd *c;
604 
605           CREATECMD(c,
606               "UPDATE file "
607               "SET mtime = current_timestamp, rev = rev + 1 "
608               "WHERE fileid = $1", INT8OID);
609           return simplecmd(xc, c, fileid);
610 }
611 
612 int
update_ctime(struct Xconn * xc,fileid_t fileid)613 update_ctime(struct Xconn *xc, fileid_t fileid)
614 {
615           static struct cmd *c;
616 
617           CREATECMD(c,
618               "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
619               INT8OID);
620           return simplecmd(xc, c, fileid);
621 }
622 
623 int
update_nlink(struct Xconn * xc,fileid_t fileid,int delta)624 update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
625 {
626           static struct cmd *c;
627 
628           CREATECMD(c,
629               "UPDATE file "
630               "SET nlink = nlink + $1 "
631               "WHERE fileid = $2",
632               INT8OID, INT8OID);
633           return simplecmd(xc, c, (int64_t)delta, fileid);
634 }
635 
636 int
lookupp(struct Xconn * xc,fileid_t fileid,fileid_t * parent)637 lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
638 {
639           static struct cmd *c;
640           static const Oid types[] = { INT8OID, };
641           struct fetchstatus s;
642           int error;
643 
644           CREATECMD(c, "SELECT parent_fileid FROM dirent "
645                     "WHERE child_fileid = $1 LIMIT 1", INT8OID);
646           error = sendcmd(xc, c, fileid);
647           if (error != 0) {
648                     return error;
649           }
650           fetchinit(&s, xc);
651           error = FETCHNEXT(&s, types, parent);
652           fetchdone(&s);
653           if (error != 0) {
654                     return error;
655           }
656           return 0;
657 }
658 
659 int
mkfile(struct Xconn * xc,enum vtype vtype,mode_t mode,uid_t uid,gid_t gid,fileid_t * idp)660 mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
661     fileid_t *idp)
662 {
663           static struct cmd *c;
664           const char *type;
665           int error;
666 
667           type = fromvtype(vtype);
668           if (type == NULL) {
669                     return EOPNOTSUPP;
670           }
671           CREATECMD(c,
672                     "INSERT INTO file "
673                     "(fileid, type, mode, uid, gid, nlink, rev, "
674                     "atime, ctime, mtime, btime) "
675                     "VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
676                     "current_timestamp, "
677                     "current_timestamp, "
678                     "current_timestamp, "
679                     "current_timestamp) "
680                     "RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
681           error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
682               (uint64_t)gid);
683           if (error != 0) {
684                     return error;
685           }
686           return simplefetch(xc, INT8OID, idp);
687 }
688 
689 int
linkfile(struct Xconn * xc,fileid_t parent,const char * name,fileid_t child)690 linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
691 {
692           static struct cmd *c;
693           int error;
694 
695           CREATECMD(c,
696                     "INSERT INTO dirent "
697                     "(parent_fileid, name, child_fileid) "
698                     "VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
699           error = simplecmd(xc, c, parent, name, child);
700           if (error != 0) {
701                     return error;
702           }
703           error = update_nlink(xc, child, 1);
704           if (error != 0) {
705                     return error;
706           }
707           return update_mtime(xc, parent);
708 }
709 
710 int
unlinkfile(struct Xconn * xc,fileid_t parent,const char * name,fileid_t child)711 unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
712 {
713           static struct cmd *c;
714           int error;
715 
716           /*
717            * in addition to the primary key, we check child_fileid as well here
718            * to avoid removing an entry which was appeared after our VOP_LOOKUP.
719            */
720           CREATECMD(c,
721                     "DELETE FROM dirent "
722                     "WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
723                     INT8OID, TEXTOID, INT8OID);
724           error = simplecmd(xc, c, parent, name, child);
725           if (error != 0) {
726                     return error;
727           }
728           error = update_nlink(xc, child, -1);
729           if (error != 0) {
730                     return error;
731           }
732           error = update_mtime(xc, parent);
733           if (error != 0) {
734                     return error;
735           }
736           return update_ctime(xc, child);
737 }
738 
739 int
mklinkfile(struct Xconn * xc,fileid_t parent,const char * name,enum vtype vtype,mode_t mode,uid_t uid,gid_t gid,fileid_t * idp)740 mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
741     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
742 {
743           fileid_t fileid;
744           int error;
745 
746           error = mkfile(xc, vtype, mode, uid, gid, &fileid);
747           if (error != 0) {
748                     return error;
749           }
750           error = linkfile(xc, parent, name, fileid);
751           if (error != 0) {
752                     return error;
753           }
754           if (idp != NULL) {
755                     *idp = fileid;
756           }
757           return 0;
758 }
759 
760 int
mklinkfile_lo(struct Xconn * xc,fileid_t parent_fileid,const char * name,enum vtype vtype,mode_t mode,uid_t uid,gid_t gid,fileid_t * fileidp,int * loidp)761 mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
762     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
763     int *loidp)
764 {
765           static struct cmd *c;
766           fileid_t new_fileid;
767           int loid;
768           int error;
769 
770           error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
771               &new_fileid);
772           if (error != 0) {
773                     return error;
774           }
775           CREATECMD(c,
776                     "INSERT INTO datafork (fileid, loid) "
777                     "VALUES($1, lo_creat(-1)) "
778                     "RETURNING loid", INT8OID);
779           error = sendcmd(xc, c, new_fileid);
780           if (error != 0) {
781                     return error;
782           }
783           error = simplefetch(xc, OIDOID, &loid);
784           if (error != 0) {
785                     return error;
786           }
787           if (fileidp != NULL) {
788                     *fileidp = new_fileid;
789           }
790           if (loidp != NULL) {
791                     *loidp = loid;
792           }
793           return 0;
794 }
795 
796 int
cleanupfile(struct Xconn * xc,fileid_t fileid)797 cleanupfile(struct Xconn *xc, fileid_t fileid)
798 {
799           static struct cmd *c;
800           char *type;
801           unsigned int vtype;
802           int error;
803 
804           CREATECMD(c, "DELETE FROM file WHERE fileid = $1 AND nlink = 0 "
805                     "RETURNING type::text", INT8OID);
806           error = sendcmd(xc, c, fileid);
807           if (error != 0) {
808                     return error;
809           }
810           error = simplefetch(xc, TEXTOID, &type);
811           if (error == ENOENT) {
812                     return 0; /* probably nlink > 0 */
813           }
814           if (error != 0) {
815                     return error;
816           }
817           vtype = tovtype(type);
818           free(type);
819           if (vtype == VREG || vtype == VLNK) {
820                     static struct cmd *c_datafork;
821                     int32_t ret;
822 
823                     CREATECMD(c_datafork,
824                               "WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
825                               "RETURNING loid) SELECT lo_unlink(loid) FROM loids",
826                               INT8OID);
827                     error = sendcmd(xc, c_datafork, fileid);
828                     if (error != 0) {
829                               return error;
830                     }
831                     error = simplefetch(xc, INT4OID, &ret);
832                     if (error != 0) {
833                               return error;
834                     }
835                     if (ret != 1) {
836                               return EIO; /* lo_unlink failed */
837                     }
838           }
839           return 0;
840 }
841 
842 /*
843  * check_path: do locking and check to prevent a rename from creating loop.
844  *
845  * lock the dirents between child_fileid and the root directory.
846  * if gate_fileid is appeared in the path, return EINVAL.
847  * caller should ensure that child_fileid is of VDIR beforehand.
848  *
849  * we uses FOR SHARE row level locks as poor man's predicate locks.
850  *
851  * the following is an example to show why we need to lock the path.
852  *
853  * consider:
854  * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
855  * and then
856  * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
857  * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
858  *
859  * a possible consequence:
860  *        thread 1: check_path -> success
861  *        thread 2: check_path -> success
862  *        thread 1: modify directories -> block on row-level lock
863  *        thread 2: modify directories -> block on row-level lock
864  *                            -> deadlock detected
865  *                            -> rollback and retry
866  *
867  * another possible consequence:
868  *        thread 1: check_path -> success
869  *        thread 1: modify directory entries -> success
870  *        thread 2: check_path -> block on row-level lock
871  *        thread 1: commit
872  *        thread 2: acquire the lock and notices the row is updated
873  *                            -> serialization error
874  *                            -> rollback and retry
875  *
876  * XXX it might be better to use real serializable transactions,
877  * which will be available for PostgreSQL 9.1
878  */
879 
880 int
check_path(struct Xconn * xc,fileid_t gate_fileid,fileid_t child_fileid)881 check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
882 {
883           static struct cmd *c;
884           fileid_t parent_fileid;
885           struct fetchstatus s;
886           int error;
887 
888           CREATECMD(c,
889                     "WITH RECURSIVE r AS "
890                     "( "
891                                         "SELECT parent_fileid, cookie, child_fileid "
892                                         "FROM dirent "
893                                         "WHERE child_fileid = $1 "
894                               "UNION ALL "
895                                         "SELECT d.parent_fileid, d.cookie, "
896                                         "d.child_fileid "
897                                         "FROM dirent AS d INNER JOIN r "
898                                         "ON d.child_fileid = r.parent_fileid "
899                     ") "
900                     "SELECT d.parent_fileid "
901                     "FROM dirent d "
902                     "JOIN r "
903                     "ON d.cookie = r.cookie "
904                     "FOR SHARE", INT8OID);
905           error = sendcmd(xc, c, child_fileid);
906           if (error != 0) {
907                     return error;
908           }
909           fetchinit(&s, xc);
910           do {
911                     static const Oid types[] = { INT8OID, };
912 
913                     error = FETCHNEXT(&s, types, &parent_fileid);
914                     if (error == ENOENT) {
915                               fetchdone(&s);
916                               return 0;
917                     }
918                     if (error != 0) {
919                               fetchdone(&s);
920                               return error;
921                     }
922           } while (gate_fileid != parent_fileid);
923           fetchdone(&s);
924           return EINVAL;
925 }
926 
927 int
isempty(struct Xconn * xc,fileid_t fileid,bool * emptyp)928 isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
929 {
930           int32_t dummy;
931           static struct cmd *c;
932           int error;
933 
934           CREATECMD(c,
935                     "SELECT 1 FROM dirent "
936                     "WHERE parent_fileid = $1 LIMIT 1", INT8OID);
937           error = sendcmd(xc, c, fileid);
938           if (error != 0) {
939                     return error;
940           }
941           error = simplefetch(xc, INT4OID, &dummy);
942           assert(error != 0 || dummy == 1);
943           if (error == ENOENT) {
944                     *emptyp = true;
945                     error = 0;
946           } else {
947                     *emptyp = false;
948           }
949           return error;
950 }
951