xref: /dragonfly/sys/dev/disk/xdisk/xdisk.c (revision 42e46aee3886bf921057b9d73ba56cb52657e469)
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63 
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66 
67 #include <sys/buf2.h>
68 
69 struct xa_softc;
70 struct xa_softc_tree;
71 RB_HEAD(xa_softc_tree, xa_softc);
72 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
73 
74 static int xa_active;
75 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0,
76              "Number of active xdisk IOs");
77 static uint64_t xa_last;
78 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0,
79              "Offset of last xdisk IO");
80 static int xa_debug = 1;
81 SYSCTL_INT(_debug, OID_AUTO, xa_debug, CTLFLAG_RW, &xa_debug, 0,
82              "xdisk debugging");
83 
84 /*
85  * Track a BIO tag
86  */
87 struct xa_tag {
88           TAILQ_ENTRY(xa_tag) entry;
89           struct xa_softc     *sc;
90           dmsg_blk_error_t status;
91           kdmsg_state_t       *state;
92           struct bio          *bio;
93           int                 waiting;
94           int                 async;
95           int                 done;
96 };
97 
98 typedef struct xa_tag         xa_tag_t;
99 
100 /*
101  * Track devices.
102  */
103 struct xa_softc {
104           struct kdmsg_state_list spanq;
105           RB_ENTRY(xa_softc) rbnode;
106           cdev_t              dev;
107           struct devstat      stats;
108           struct disk_info info;
109           struct disk         disk;
110           uuid_t              peer_id;
111           int                 unit;
112           int                 opencnt;
113           int                 spancnt;
114           uint64_t  keyid;
115           int                 serializing;
116           int                 last_error;
117           int                 terminating;
118           char                peer_label[64];     /* from LNK_SPAN host/dev */
119           char                pfs_label[64];      /* from LNK_SPAN serno */
120           xa_tag_t  *open_tag;
121           TAILQ_HEAD(, bio) bioq;                 /* pending BIOs */
122           TAILQ_HEAD(, xa_tag) tag_freeq;         /* available I/O tags */
123           TAILQ_HEAD(, xa_tag) tag_pendq;         /* running I/O tags */
124           struct lock         lk;
125 };
126 
127 typedef struct xa_softc       xa_softc_t;
128 
129 struct xa_iocom {
130           TAILQ_ENTRY(xa_iocom) entry;
131           kdmsg_iocom_t       iocom;
132           xa_softc_t          dummysc;
133 };
134 
135 typedef struct xa_iocom xa_iocom_t;
136 
137 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
138 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
139 static struct xa_softc_tree xa_device_tree;
140 
141 #define MAXTAGS               64        /* no real limit */
142 
143 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
144 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
145 static void xaio_exit(kdmsg_iocom_t *iocom);
146 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
147 
148 static void xa_terminate_check(struct xa_softc *sc);
149 
150 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
151 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
152 static void xa_done(xa_tag_t *tag, int wasbio);
153 static void xa_release(xa_tag_t *tag, int wasbio);
154 static uint32_t xa_wait(xa_tag_t *tag);
155 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
156 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
157 static void xa_restart_deferred(xa_softc_t *sc);
158 
159 #define xa_printf(level, ctl, ...)      \
160           if (xa_debug >= (level)) kprintf("xdisk: " ctl, __VA_ARGS__)
161 
162 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
163 
164 /*
165  * Control device, issue ioctls to create xa devices.
166  */
167 static d_open_t xdisk_open;
168 static d_close_t xdisk_close;
169 static d_ioctl_t xdisk_ioctl;
170 
171 static struct dev_ops xdisk_ops = {
172           { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
173         .d_open =   xdisk_open,
174         .d_close =  xdisk_close,
175         .d_ioctl =  xdisk_ioctl
176 };
177 
178 /*
179  * XA disk devices
180  */
181 static d_open_t xa_open;
182 static d_close_t xa_close;
183 static d_ioctl_t xa_ioctl;
184 static d_strategy_t xa_strategy;
185 static d_psize_t xa_size;
186 
187 static struct dev_ops xa_ops = {
188           { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
189         .d_open =   xa_open,
190         .d_close =  xa_close,
191         .d_ioctl =  xa_ioctl,
192         .d_read =   physread,
193         .d_write =  physwrite,
194         .d_strategy =         xa_strategy,
195           .d_psize =          xa_size
196 };
197 
198 static int xdisk_opencount;
199 static cdev_t xdisk_dev;
200 struct lock xdisk_lk;
201 static TAILQ_HEAD(, xa_iocom) xaiocomq;
202 
203 /*
204  * Module initialization
205  */
206 static int
xdisk_modevent(module_t mod,int type,void * data)207 xdisk_modevent(module_t mod, int type, void *data)
208 {
209           switch (type) {
210           case MOD_LOAD:
211                     TAILQ_INIT(&xaiocomq);
212                     RB_INIT(&xa_device_tree);
213                     lockinit(&xdisk_lk, "xdisk", 0, 0);
214                     xdisk_dev = make_dev(&xdisk_ops, 0,
215                                              UID_ROOT, GID_WHEEL, 0600, "xdisk");
216                     break;
217           case MOD_UNLOAD:
218           case MOD_SHUTDOWN:
219                     if (!RB_EMPTY(&xa_device_tree))
220                               return (EBUSY);
221                     if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
222                               return (EBUSY);
223                     if (xdisk_dev) {
224                               destroy_dev(xdisk_dev);
225                               xdisk_dev = NULL;
226                     }
227                     dev_ops_remove_all(&xdisk_ops);
228                     dev_ops_remove_all(&xa_ops);
229                     break;
230           default:
231                     break;
232           }
233           return 0;
234 }
235 
236 DEV_MODULE(xdisk, xdisk_modevent, 0);
237 
238 static int
xa_softc_cmp(xa_softc_t * sc1,xa_softc_t * sc2)239 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
240 {
241           return(strcmp(sc1->pfs_label, sc2->pfs_label));
242 }
243 
244 /*
245  * Control device
246  */
247 static int
xdisk_open(struct dev_open_args * ap)248 xdisk_open(struct dev_open_args *ap)
249 {
250           lockmgr(&xdisk_lk, LK_EXCLUSIVE);
251           ++xdisk_opencount;
252           lockmgr(&xdisk_lk, LK_RELEASE);
253           return(0);
254 }
255 
256 static int
xdisk_close(struct dev_close_args * ap)257 xdisk_close(struct dev_close_args *ap)
258 {
259           lockmgr(&xdisk_lk, LK_EXCLUSIVE);
260           --xdisk_opencount;
261           lockmgr(&xdisk_lk, LK_RELEASE);
262           return(0);
263 }
264 
265 static int
xdisk_ioctl(struct dev_ioctl_args * ap)266 xdisk_ioctl(struct dev_ioctl_args *ap)
267 {
268           int error;
269 
270           switch(ap->a_cmd) {
271           case XDISKIOCATTACH:
272                     error = xdisk_attach((void *)ap->a_data);
273                     break;
274           case XDISKIOCDETACH:
275                     error = xdisk_detach((void *)ap->a_data);
276                     break;
277           default:
278                     error = ENOTTY;
279                     break;
280           }
281           return error;
282 }
283 
284 /************************************************************************
285  *                                      DMSG INTERFACE                                    *
286  ************************************************************************/
287 
288 static int
xdisk_attach(struct xdisk_attach_ioctl * xaioc)289 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
290 {
291           xa_iocom_t *xaio;
292           struct file *fp;
293 
294           /*
295            * Normalize ioctl params
296            */
297           fp = holdfp(curthread, xaioc->fd, -1);
298           if (fp == NULL)
299                     return EINVAL;
300           xa_printf(1, "xdisk_attach fp=%p\n", fp);
301 
302           /*
303            * See if the serial number is already present.  If we are
304            * racing a termination the disk subsystem may still have
305            * duplicate entries not yet removed so we wait a bit and
306            * retry.
307            */
308           lockmgr(&xdisk_lk, LK_EXCLUSIVE);
309 
310           xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
311           kdmsg_iocom_init(&xaio->iocom, xaio,
312                                KDMSG_IOCOMF_AUTOCONN,
313                                M_XDISK, xaio_rcvdmsg);
314           xaio->iocom.exit_func = xaio_exit;
315 
316           kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
317 
318           /*
319            * Setup our LNK_CONN advertisement for autoinitiate.
320            *
321            * Our filter is setup to only accept PEER_BLOCK advertisements.
322            * XXX no peer_id filter.
323            *
324            * We need a unique pfs_fsid to avoid confusion.
325            */
326           xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_CLIENT;
327           xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
328           xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
329           ksnprintf(xaio->iocom.auto_lnk_conn.peer_label,
330                       sizeof(xaio->iocom.auto_lnk_conn.peer_label),
331                       "%s/xdisk",
332                       hostname);
333           /* kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1); */
334 
335           /*
336            * Setup our LNK_SPAN advertisement for autoinitiate
337            */
338           TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
339           kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
340 
341           lockmgr(&xdisk_lk, LK_RELEASE);
342 
343           return 0;
344 }
345 
346 static int
xdisk_detach(struct xdisk_attach_ioctl * xaioc)347 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
348 {
349           return EINVAL;
350 }
351 
352 /*
353  * Called from iocom core transmit thread upon disconnect.
354  */
355 static
356 void
xaio_exit(kdmsg_iocom_t * iocom)357 xaio_exit(kdmsg_iocom_t *iocom)
358 {
359           xa_iocom_t *xaio = iocom->handle;
360 
361           lockmgr(&xdisk_lk, LK_EXCLUSIVE);
362           xa_printf(1, "%s", "xdisk_detach [xaio_exit()]\n");
363           TAILQ_REMOVE(&xaiocomq, xaio, entry);
364           lockmgr(&xdisk_lk, LK_RELEASE);
365 
366           kdmsg_iocom_uninit(&xaio->iocom);
367 
368           kfree(xaio, M_XDISK);
369 }
370 
371 /*
372  * Called from iocom core to handle messages that the iocom core does not
373  * handle itself and for which a state function callback has not yet been
374  * established.
375  *
376  * We primarily care about LNK_SPAN transactions here.
377  */
378 static int
xaio_rcvdmsg(kdmsg_msg_t * msg)379 xaio_rcvdmsg(kdmsg_msg_t *msg)
380 {
381           kdmsg_state_t       *state = msg->state;
382           xa_iocom_t          *xaio = state->iocom->handle;
383           xa_softc_t          *sc;
384 
385           if (state) {
386                     xa_printf(4,
387                               "xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n",
388                               state, state->rxcmd, state->txcmd,
389                               msg->any.head.cmd);
390           }
391           lockmgr(&xdisk_lk, LK_EXCLUSIVE);
392 
393           switch(msg->tcmd) {
394           case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
395                     /*
396                      * A LNK_SPAN transaction which is opened and closed
397                      * degenerately is not useful to us, just ignore it.
398                      */
399                     kdmsg_msg_reply(msg, 0);
400                     break;
401           case DMSG_LNK_SPAN | DMSGF_CREATE:
402                     /*
403                      * Manage the tracking node for the remote LNK_SPAN.
404                      *
405                      * Return a streaming result, leaving the transaction open
406                      * in both directions to allow sub-transactions.
407                      */
408                     bcopy(msg->any.lnk_span.peer_label, xaio->dummysc.peer_label,
409                           sizeof(xaio->dummysc.peer_label));
410                     xaio->dummysc.peer_label[
411                               sizeof(xaio->dummysc.peer_label) - 1] = 0;
412 
413                     bcopy(msg->any.lnk_span.pfs_label, xaio->dummysc.pfs_label,
414                           sizeof(xaio->dummysc.pfs_label));
415                     xaio->dummysc.pfs_label[
416                               sizeof(xaio->dummysc.pfs_label) - 1] = 0;
417 
418                     xa_printf(3, "LINK_SPAN state %p create for %s\n",
419                                 msg->state, msg->any.lnk_span.pfs_label);
420 
421                     sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
422                     if (sc == NULL) {
423                               xa_softc_t *sctmp;
424                               xa_tag_t *tag;
425                               cdev_t dev;
426                               int unit;
427                               int n;
428 
429                               sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
430                               bcopy(msg->any.lnk_span.peer_label, sc->peer_label,
431                                     sizeof(sc->peer_label));
432                               sc->peer_label[sizeof(sc->peer_label) - 1] = 0;
433                               bcopy(msg->any.lnk_span.pfs_label, sc->pfs_label,
434                                     sizeof(sc->pfs_label));
435                               sc->pfs_label[sizeof(sc->pfs_label) - 1] = 0;
436 
437                               /* XXX FIXME O(N^2) */
438                               unit = -1;
439                               do {
440                                         ++unit;
441                                         RB_FOREACH(sctmp, xa_softc_tree,
442                                                      &xa_device_tree) {
443                                                   if (sctmp->unit == unit)
444                                                             break;
445                                         }
446                               } while (sctmp);
447 
448                               sc->unit = unit;
449                               sc->serializing = 1;
450                               sc->spancnt = 1;
451                               lockinit(&sc->lk, "xalk", 0, 0);
452                               TAILQ_INIT(&sc->spanq);
453                               TAILQ_INIT(&sc->bioq);
454                               TAILQ_INIT(&sc->tag_freeq);
455                               TAILQ_INIT(&sc->tag_pendq);
456 
457                               lockmgr(&sc->lk, LK_EXCLUSIVE);
458                               RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
459                               TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
460                               msg->state->any.xa_sc = sc;
461 
462                               /*
463                                * Setup block device
464                                */
465                               for (n = 0; n < MAXTAGS; ++n) {
466                                         tag = kmalloc(sizeof(*tag),
467                                                         M_XDISK, M_WAITOK|M_ZERO);
468                                         tag->sc = sc;
469                                         TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
470                               }
471 
472                               if (sc->dev == NULL) {
473                                         dev = disk_create(unit, &sc->disk, &xa_ops);
474                                         dev->si_drv1 = sc;
475                                         sc->dev = dev;
476                                         devstat_add_entry(&sc->stats, "xa", unit,
477                                                               DEV_BSIZE,
478                                                               DEVSTAT_NO_ORDERED_TAGS,
479                                                               DEVSTAT_TYPE_DIRECT |
480                                                               DEVSTAT_TYPE_IF_OTHER,
481                                                               DEVSTAT_PRIORITY_OTHER);
482                               }
483 
484                               sc->info.d_media_blksize =
485                                         msg->any.lnk_span.media.block.blksize;
486                               if (sc->info.d_media_blksize <= 0)
487                                         sc->info.d_media_blksize = 1;
488                               sc->info.d_media_blocks =
489                                         msg->any.lnk_span.media.block.bytes /
490                                         sc->info.d_media_blksize;
491                               sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
492                               sc->info.d_secpertrack = 32;
493                               sc->info.d_nheads = 64;
494                               sc->info.d_secpercyl = sc->info.d_secpertrack *
495                                                          sc->info.d_nheads;
496                               sc->info.d_ncylinders = 0;
497                               if (sc->pfs_label[0])
498                                         sc->info.d_serialno = sc->pfs_label;
499                               /*
500                                * WARNING! disk_setdiskinfo() must be asynchronous
501                                *            because we are in the rxmsg thread.  If
502                                *            it is synchronous and issues more disk
503                                *            I/Os, we will deadlock.
504                                */
505                               disk_setdiskinfo(&sc->disk, &sc->info);
506                               xa_restart_deferred(sc);      /* eats serializing */
507                               lockmgr(&sc->lk, LK_RELEASE);
508                     } else {
509                               lockmgr(&sc->lk, LK_EXCLUSIVE);
510                               ++sc->spancnt;
511                               TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
512                               msg->state->any.xa_sc = sc;
513                               if (sc->serializing == 0 && sc->open_tag == NULL) {
514                                         sc->serializing = 1;
515                                         xa_restart_deferred(sc); /* eats serializing */
516                               }
517                               lockmgr(&sc->lk, LK_RELEASE);
518                               if (sc->dev && sc->dev->si_disk) {
519                                         xa_printf(1, "reprobe disk: %s\n",
520                                                     sc->pfs_label);
521                                         disk_msg_send(DISK_DISK_REPROBE,
522                                                         sc->dev->si_disk,
523                                                         NULL);
524                               }
525                     }
526                     xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
527                     kdmsg_msg_result(msg, 0);
528                     break;
529           case DMSG_LNK_SPAN | DMSGF_DELETE:
530                     /*
531                      * Manage the tracking node for the remote LNK_SPAN.
532                      *
533                      * Return a final result, closing our end of the transaction.
534                      */
535                     sc = msg->state->any.xa_sc;
536                     xa_printf(3, "LINK_SPAN state %p delete for %s (sc=%p)\n",
537                                 msg->state, (sc ? sc->pfs_label : "(null)"), sc);
538                     lockmgr(&sc->lk, LK_EXCLUSIVE);
539                     msg->state->any.xa_sc = NULL;
540                     TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
541                     --sc->spancnt;
542 
543                     xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
544 
545                     /*
546                      * Spans can come and go as the graph stabilizes, so if
547                      * we lose a span along with sc->open_tag we may be able
548                      * to restart the I/Os on a different span.
549                      */
550                     if (sc->spancnt &&
551                         sc->serializing == 0 && sc->open_tag == NULL) {
552                               sc->serializing = 1;
553                               xa_restart_deferred(sc);
554                     }
555                     lockmgr(&sc->lk, LK_RELEASE);
556                     kdmsg_msg_reply(msg, 0);
557 
558 #if 0
559                     /*
560                      * Termination
561                      */
562                     if (sc->spancnt == 0)
563                               xa_terminate_check(sc);
564 #endif
565                     break;
566           case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
567                     /*
568                      * Ignore unimplemented streaming replies on our LNK_SPAN
569                      * transaction.
570                      */
571                     xa_printf(3, "LINK_SPAN state %p delete+reply\n",
572                                 msg->state);
573                     break;
574           case DMSG_LNK_SPAN | DMSGF_REPLY:
575                     /*
576                      * Ignore unimplemented streaming replies on our LNK_SPAN
577                      * transaction.
578                      */
579                     xa_printf(3, "LINK_SPAN state %p reply\n",
580                                 msg->state);
581                     break;
582           case DMSG_DBG_SHELL:
583                     /*
584                      * Execute shell command (not supported atm).
585                      *
586                      * This is a one-way packet but if not (e.g. if part of
587                      * a streaming transaction), we will have already closed
588                      * our end.
589                      */
590                     kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
591                     break;
592           case DMSG_DBG_SHELL | DMSGF_REPLY:
593                     /*
594                      * Receive one or more replies to a shell command
595                      * that we sent.  Just dump it to the console.
596                      *
597                      * This is a one-way packet but if not (e.g. if
598                      * part of a streaming transaction), we will have
599                      * already closed our end.
600                      */
601                     if (msg->aux_data) {
602                               msg->aux_data[msg->aux_size - 1] = 0;
603                               xa_printf(0, "DEBUGMSG: %s\n", msg->aux_data);
604                     }
605                     break;
606           default:
607                     /*
608                      * Unsupported one-way message, streaming message, or
609                      * transaction.
610                      *
611                      * Terminate any unsupported transactions with an error
612                      * and ignore any unsupported streaming messages.
613                      *
614                      * NOTE: This case also includes DMSG_LNK_ERROR messages
615                      *         which might be one-way, replying to those would
616                      *         cause an infinite ping-pong.
617                      */
618                     if (msg->any.head.cmd & DMSGF_CREATE)
619                               kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
620                     break;
621           }
622           lockmgr(&xdisk_lk, LK_RELEASE);
623 
624           return 0;
625 }
626 
627 /*
628  * Determine if we can destroy the xa_softc.
629  *
630  * Called with xdisk_lk held.
631  */
632 static
633 void
xa_terminate_check(struct xa_softc * sc)634 xa_terminate_check(struct xa_softc *sc)
635 {
636           xa_tag_t *tag;
637 
638           /*
639            * Determine if we can destroy the softc.
640            */
641           xa_printf(1, "Terminate check xa%d (%d,%d,%d) sc=%p ",
642                     sc->unit,
643                     sc->opencnt, sc->serializing, sc->spancnt,
644                     sc);
645 
646           if (sc->opencnt || sc->serializing || sc->spancnt ||
647               TAILQ_FIRST(&sc->bioq) || TAILQ_FIRST(&sc->tag_pendq)) {
648                     xa_printf(1, "%s", "(leave intact)\n");
649                     return;
650           }
651 
652           /*
653            * Remove from device tree, a race with a new incoming span
654            * will create a new softc and disk.
655            */
656           RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
657           sc->terminating = 1;
658 
659           /*
660            * Device has to go first to prevent device ops races.
661            */
662           if (sc->dev) {
663                     disk_destroy(&sc->disk);
664                     devstat_remove_entry(&sc->stats);
665                     sc->dev->si_drv1 = NULL;
666                     sc->dev = NULL;
667           }
668 
669           xa_printf(1, "%s", "(remove from tree)\n");
670           sc->serializing = 1;
671           KKASSERT(sc->opencnt == 0);
672           KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
673 
674           while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
675                     TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
676                     tag->sc = NULL;
677                     kfree(tag, M_XDISK);
678           }
679 
680           kfree(sc, M_XDISK);
681 }
682 
683 /************************************************************************
684  *                               XA DEVICE INTERFACE                                      *
685  ************************************************************************/
686 
687 static int
xa_open(struct dev_open_args * ap)688 xa_open(struct dev_open_args *ap)
689 {
690           cdev_t dev = ap->a_head.a_dev;
691           xa_softc_t *sc;
692           int error;
693 
694           dev->si_bsize_phys = 512;
695           dev->si_bsize_best = 32768;
696 
697           /*
698            * Interlock open with opencnt, wait for attachment operations
699            * to finish.
700            */
701           lockmgr(&xdisk_lk, LK_EXCLUSIVE);
702 again:
703           sc = dev->si_drv1;
704           if (sc == NULL) {
705                     lockmgr(&xdisk_lk, LK_RELEASE);
706                     return ENXIO;       /* raced destruction */
707           }
708           if (sc->serializing) {
709                     tsleep(sc, 0, "xarace", hz / 10);
710                     goto again;
711           }
712           if (sc->terminating) {
713                     lockmgr(&xdisk_lk, LK_RELEASE);
714                     return ENXIO;       /* raced destruction */
715           }
716           sc->serializing = 1;
717 
718           /*
719            * Serialize initial open
720            */
721           if (sc->opencnt++ > 0) {
722                     sc->serializing = 0;
723                     wakeup(sc);
724                     lockmgr(&xdisk_lk, LK_RELEASE);
725                     return(0);
726           }
727 
728           /*
729            * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
730            */
731           if (sc->open_tag == NULL) {
732                     lockmgr(&sc->lk, LK_EXCLUSIVE);
733                     xa_restart_deferred(sc); /* eats serializing */
734                     lockmgr(&sc->lk, LK_RELEASE);
735           } else {
736                     sc->serializing = 0;
737                     wakeup(sc);
738           }
739           lockmgr(&xdisk_lk, LK_RELEASE);
740 
741           /*
742            * Wait for completion of the BLK_OPEN
743            */
744           lockmgr(&xdisk_lk, LK_EXCLUSIVE);
745           while (sc->serializing)
746                     lksleep(sc, &xdisk_lk, 0, "xaopen", hz);
747 
748           error = sc->last_error;
749           if (error) {
750                     KKASSERT(sc->opencnt > 0);
751                     --sc->opencnt;
752                     xa_terminate_check(sc);
753                     sc = NULL;          /* sc may be invalid now */
754           }
755           lockmgr(&xdisk_lk, LK_RELEASE);
756 
757           return (error);
758 }
759 
760 static int
xa_close(struct dev_close_args * ap)761 xa_close(struct dev_close_args *ap)
762 {
763           cdev_t dev = ap->a_head.a_dev;
764           xa_softc_t *sc;
765           xa_tag_t *tag;
766 
767           lockmgr(&xdisk_lk, LK_EXCLUSIVE);
768           sc = dev->si_drv1;
769           if (sc == NULL)
770                     return ENXIO;       /* raced destruction */
771           if (sc->terminating) {
772                     lockmgr(&sc->lk, LK_RELEASE);
773                     return ENXIO;       /* raced destruction */
774           }
775           lockmgr(&sc->lk, LK_EXCLUSIVE);
776 
777           /*
778            * NOTE: Clearing open_tag allows a concurrent open to re-open
779            *         the device and prevents autonomous completion of the tag.
780            */
781           if (sc->opencnt == 1 && sc->open_tag) {
782                     tag = sc->open_tag;
783                     sc->open_tag = NULL;
784                     lockmgr(&sc->lk, LK_RELEASE);
785                     kdmsg_state_reply(tag->state, 0);       /* close our side */
786                     xa_wait(tag);                                     /* wait on remote */
787           } else {
788                     lockmgr(&sc->lk, LK_RELEASE);
789           }
790           KKASSERT(sc->opencnt > 0);
791           --sc->opencnt;
792           xa_terminate_check(sc);
793           lockmgr(&xdisk_lk, LK_RELEASE);
794 
795           return(0);
796 }
797 
798 static int
xa_strategy(struct dev_strategy_args * ap)799 xa_strategy(struct dev_strategy_args *ap)
800 {
801           xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
802           xa_tag_t *tag;
803           struct bio *bio = ap->a_bio;
804 
805           devstat_start_transaction(&sc->stats);
806           atomic_add_int(&xa_active, 1);
807           xa_last = bio->bio_offset;
808 
809           /*
810            * If no tags are available NULL is returned and the bio is
811            * placed on sc->bioq.
812            */
813           lockmgr(&sc->lk, LK_EXCLUSIVE);
814           tag = xa_setup_cmd(sc, bio);
815           if (tag)
816                     xa_start(tag, NULL, 1);
817           lockmgr(&sc->lk, LK_RELEASE);
818 
819           return(0);
820 }
821 
822 static int
xa_ioctl(struct dev_ioctl_args * ap)823 xa_ioctl(struct dev_ioctl_args *ap)
824 {
825           return(ENOTTY);
826 }
827 
828 static int
xa_size(struct dev_psize_args * ap)829 xa_size(struct dev_psize_args *ap)
830 {
831           struct xa_softc *sc;
832 
833           if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
834                     return (ENXIO);
835           ap->a_result = sc->info.d_media_blocks;
836           return (0);
837 }
838 
839 /************************************************************************
840  *                      XA BLOCK PROTOCOL STATE MACHINE                         *
841  ************************************************************************
842  *
843  * Implement tag/msg setup and related functions.
844  * Called with sc->lk held.
845  */
846 static xa_tag_t *
xa_setup_cmd(xa_softc_t * sc,struct bio * bio)847 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
848 {
849           xa_tag_t *tag;
850 
851           /*
852            * Only get a tag if we have a valid virtual circuit to the server.
853            */
854           if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
855                     TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
856                     tag->bio = bio;
857                     TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
858           }
859 
860           /*
861            * If we can't dispatch now and this is a bio, queue it for later.
862            */
863           if (tag == NULL && bio) {
864                     TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
865           }
866 
867           return (tag);
868 }
869 
870 /*
871  * Called with sc->lk held
872  */
873 static void
xa_start(xa_tag_t * tag,kdmsg_msg_t * msg,int async)874 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
875 {
876           xa_softc_t *sc = tag->sc;
877 
878           tag->done = 0;
879           tag->async = async;
880           tag->status.head.error = DMSG_ERR_IO;   /* fallback error */
881 
882           if (msg == NULL) {
883                     struct bio *bio;
884                     struct buf *bp;
885                     kdmsg_state_t *trans;
886 
887                     if (sc->opencnt == 0 || sc->open_tag == NULL) {
888                               TAILQ_FOREACH(trans, &sc->spanq, user_entry) {
889                                         if ((trans->rxcmd & DMSGF_DELETE) == 0)
890                                                   break;
891                               }
892                     } else {
893                               trans = sc->open_tag->state;
894                     }
895                     if (trans == NULL)
896                               goto skip;
897 
898                     KKASSERT(tag->bio);
899                     bio = tag->bio;
900                     bp = bio->bio_buf;
901 
902                     switch(bp->b_cmd) {
903                     case BUF_CMD_READ:
904                               msg = kdmsg_msg_alloc(trans,
905                                                         DMSG_BLK_READ |
906                                                         DMSGF_CREATE |
907                                                         DMSGF_DELETE,
908                                                         xa_bio_completion, tag);
909                               msg->any.blk_read.keyid = sc->keyid;
910                               msg->any.blk_read.offset = bio->bio_offset;
911                               msg->any.blk_read.bytes = bp->b_bcount;
912                               break;
913                     case BUF_CMD_WRITE:
914                               msg = kdmsg_msg_alloc(trans,
915                                                         DMSG_BLK_WRITE |
916                                                         DMSGF_CREATE | DMSGF_DELETE,
917                                                         xa_bio_completion, tag);
918                               msg->any.blk_write.keyid = sc->keyid;
919                               msg->any.blk_write.offset = bio->bio_offset;
920                               msg->any.blk_write.bytes = bp->b_bcount;
921                               msg->aux_data = bp->b_data;
922                               msg->aux_size = bp->b_bcount;
923                               break;
924                     case BUF_CMD_FLUSH:
925                               msg = kdmsg_msg_alloc(trans,
926                                                         DMSG_BLK_FLUSH |
927                                                         DMSGF_CREATE | DMSGF_DELETE,
928                                                         xa_bio_completion, tag);
929                               msg->any.blk_flush.keyid = sc->keyid;
930                               msg->any.blk_flush.offset = bio->bio_offset;
931                               msg->any.blk_flush.bytes = bp->b_bcount;
932                               break;
933                     case BUF_CMD_FREEBLKS:
934                               msg = kdmsg_msg_alloc(trans,
935                                                         DMSG_BLK_FREEBLKS |
936                                                         DMSGF_CREATE | DMSGF_DELETE,
937                                                         xa_bio_completion, tag);
938                               msg->any.blk_freeblks.keyid = sc->keyid;
939                               msg->any.blk_freeblks.offset = bio->bio_offset;
940                               msg->any.blk_freeblks.bytes = bp->b_bcount;
941                               break;
942                     default:
943                               bp->b_flags |= B_ERROR;
944                               bp->b_error = EIO;
945                               devstat_end_transaction_buf(&sc->stats, bp);
946                               atomic_add_int(&xa_active, -1);
947                               biodone(bio);
948                               tag->bio = NULL;
949                               break;
950                     }
951           }
952 
953           /*
954            * If no msg was allocated we likely could not find a good span.
955            */
956 skip:
957           if (msg) {
958                     /*
959                      * Message was passed in or constructed.
960                      */
961                     tag->state = msg->state;
962                     lockmgr(&sc->lk, LK_RELEASE);
963                     kdmsg_msg_write(msg);
964                     lockmgr(&sc->lk, LK_EXCLUSIVE);
965           } else if (tag->bio &&
966                        (tag->bio->bio_buf->b_flags & B_FAILONDIS) == 0) {
967                     /*
968                      * No spans available but BIO is not allowed to fail
969                      * on connectivity problems.  Requeue the BIO.
970                      */
971                     TAILQ_INSERT_TAIL(&sc->bioq, tag->bio, bio_act);
972                     tag->bio = NULL;
973                     lockmgr(&sc->lk, LK_RELEASE);
974                     xa_done(tag, 1);
975                     lockmgr(&sc->lk, LK_EXCLUSIVE);
976           } else {
977                     /*
978                      * No spans available, bio is allowed to fail.
979                      */
980                     lockmgr(&sc->lk, LK_RELEASE);
981                     tag->status.head.error = DMSG_ERR_IO;
982                     xa_done(tag, 1);
983                     lockmgr(&sc->lk, LK_EXCLUSIVE);
984           }
985 }
986 
987 static uint32_t
xa_wait(xa_tag_t * tag)988 xa_wait(xa_tag_t *tag)
989 {
990           xa_softc_t *sc = tag->sc;
991           uint32_t error;
992 
993           lockmgr(&sc->lk, LK_EXCLUSIVE);
994           tag->waiting = 1;
995           while (tag->done == 0)
996                     lksleep(tag, &sc->lk, 0, "xawait", 0);
997           lockmgr(&sc->lk, LK_RELEASE);
998 
999           error = tag->status.head.error;
1000           tag->waiting = 0;
1001           xa_release(tag, 0);
1002 
1003           return error;
1004 }
1005 
1006 static void
xa_done(xa_tag_t * tag,int wasbio)1007 xa_done(xa_tag_t *tag, int wasbio)
1008 {
1009           KKASSERT(tag->bio == NULL);
1010 
1011           tag->state = NULL;
1012           tag->done = 1;
1013           if (tag->waiting)
1014                     wakeup(tag);
1015           if (tag->async)
1016                     xa_release(tag, wasbio);
1017 }
1018 
1019 /*
1020  * Release a tag.  If everything looks ok and there are pending BIOs
1021  * (due to all tags in-use), we can use the tag to start the next BIO.
1022  * Do not try to restart if the connection is currently failed.
1023  */
1024 static
1025 void
xa_release(xa_tag_t * tag,int wasbio)1026 xa_release(xa_tag_t *tag, int wasbio)
1027 {
1028           xa_softc_t *sc = tag->sc;
1029           struct bio *bio;
1030 
1031           if ((bio = tag->bio) != NULL) {
1032                     struct buf *bp = bio->bio_buf;
1033 
1034                     bp->b_error = EIO;
1035                     bp->b_flags |= B_ERROR;
1036                     devstat_end_transaction_buf(&sc->stats, bp);
1037                     atomic_add_int(&xa_active, -1);
1038                     biodone(bio);
1039                     tag->bio = NULL;
1040           }
1041 
1042           lockmgr(&sc->lk, LK_EXCLUSIVE);
1043 
1044           if (wasbio && sc->open_tag &&
1045               (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1046                     TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1047                     tag->bio = bio;
1048                     xa_start(tag, NULL, 1);
1049           } else {
1050                     TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
1051                     TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
1052           }
1053           lockmgr(&sc->lk, LK_RELEASE);
1054 }
1055 
1056 /*
1057  * Handle messages under the BLKOPEN transaction.
1058  */
1059 static int
xa_sync_completion(kdmsg_state_t * state,kdmsg_msg_t * msg)1060 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1061 {
1062           xa_tag_t *tag = state->any.any;
1063           xa_softc_t *sc;
1064           struct bio *bio;
1065 
1066           /*
1067            * If the tag has been cleaned out we already closed our side
1068            * of the transaction and we are waiting for the other side to
1069            * close.
1070            */
1071           xa_printf(1, "xa_sync_completion: tag %p msg %08x state %p\n",
1072                       tag, msg->any.head.cmd, msg->state);
1073 
1074           if (tag == NULL) {
1075                     if (msg->any.head.cmd & DMSGF_CREATE)
1076                               kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
1077                     return 0;
1078           }
1079           sc = tag->sc;
1080 
1081           /*
1082            * Validate the tag
1083            */
1084           lockmgr(&sc->lk, LK_EXCLUSIVE);
1085 
1086           /*
1087            * Handle initial response to our open and restart any deferred
1088            * BIOs on success.
1089            *
1090            * NOTE: DELETE may also be set.
1091            */
1092           if (msg->any.head.cmd & DMSGF_CREATE) {
1093                     switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1094                     case DMSG_LNK_ERROR | DMSGF_REPLY:
1095                               bzero(&tag->status, sizeof(tag->status));
1096                               tag->status.head = msg->any.head;
1097                               break;
1098                     case DMSG_BLK_ERROR | DMSGF_REPLY:
1099                               tag->status = msg->any.blk_error;
1100                               break;
1101                     }
1102                     sc->last_error = tag->status.head.error;
1103                     xa_printf(1, "blk_open completion status %d\n",
1104                                 sc->last_error);
1105                     if (sc->last_error == 0) {
1106                               while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1107                                         tag = xa_setup_cmd(sc, NULL);
1108                                         if (tag == NULL)
1109                                                   break;
1110                                         TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1111                                         tag->bio = bio;
1112                                         xa_start(tag, NULL, 1);
1113                               }
1114                     }
1115                     sc->serializing = 0;
1116                     wakeup(sc);
1117           }
1118 
1119           /*
1120            * Handle unexpected termination (or lost comm channel) from other
1121            * side.  Autonomous completion only if open_tag matches,
1122            * otherwise another thread is probably waiting on the tag.
1123            *
1124            * (see xa_close() for other interactions)
1125            */
1126           if (msg->any.head.cmd & DMSGF_DELETE) {
1127                     kdmsg_state_reply(tag->state, 0);
1128                     if (sc->open_tag == tag) {
1129                               sc->open_tag = NULL;
1130                               xa_done(tag, 0);
1131                     } else {
1132                               tag->async = 0;
1133                               xa_done(tag, 0);
1134                     }
1135           }
1136           lockmgr(&sc->lk, LK_RELEASE);
1137 
1138           return (0);
1139 }
1140 
1141 static int
xa_bio_completion(kdmsg_state_t * state,kdmsg_msg_t * msg)1142 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1143 {
1144           xa_tag_t *tag = state->any.any;
1145           xa_softc_t *sc = tag->sc;
1146           struct bio *bio;
1147           struct buf *bp;
1148 
1149           /*
1150            * Get the bio from the tag.  If no bio is present we just do
1151            * 'done' handling.
1152            */
1153           if ((bio = tag->bio) == NULL)
1154                     goto handle_done;
1155           bp = bio->bio_buf;
1156 
1157           /*
1158            * Process return status
1159            */
1160           switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1161           case DMSG_LNK_ERROR | DMSGF_REPLY:
1162                     bzero(&tag->status, sizeof(tag->status));
1163                     tag->status.head = msg->any.head;
1164                     if (tag->status.head.error)
1165                               tag->status.resid = bp->b_bcount;
1166                     else
1167                               tag->status.resid = 0;
1168                     break;
1169           case DMSG_BLK_ERROR | DMSGF_REPLY:
1170                     tag->status = msg->any.blk_error;
1171                     break;
1172           }
1173 
1174           /*
1175            * If the device is open stall the bio on DMSG errors.  If an
1176            * actual I/O error occured on the remote device, DMSG_ERR_IO
1177            * will be returned.
1178            */
1179           if (tag->status.head.error &&
1180               (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1181                     if (tag->status.head.error != DMSG_ERR_IO)
1182                               goto handle_repend;
1183           }
1184 
1185           /*
1186            * Process bio completion
1187            *
1188            * For reads any returned data is zero-extended if necessary, so
1189            * the server can short-cut any all-zeros reads if it desires.
1190            */
1191           switch(bp->b_cmd) {
1192           case BUF_CMD_READ:
1193                     if (msg->aux_data && msg->aux_size) {
1194                               if (msg->aux_size < bp->b_bcount) {
1195                                         bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1196                                         bzero(bp->b_data + msg->aux_size,
1197                                               bp->b_bcount - msg->aux_size);
1198                               } else {
1199                                         bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1200                               }
1201                     } else {
1202                               bzero(bp->b_data, bp->b_bcount);
1203                     }
1204                     /* fall through */
1205           case BUF_CMD_WRITE:
1206           case BUF_CMD_FLUSH:
1207           case BUF_CMD_FREEBLKS:
1208           default:
1209                     if (tag->status.resid > bp->b_bcount)
1210                               tag->status.resid = bp->b_bcount;
1211                     bp->b_resid = tag->status.resid;
1212                     if (tag->status.head.error != 0) {
1213                               bp->b_error = EIO;
1214                               bp->b_flags |= B_ERROR;
1215                     } else {
1216                               bp->b_resid = 0;
1217                     }
1218                     devstat_end_transaction_buf(&sc->stats, bp);
1219                     atomic_add_int(&xa_active, -1);
1220                     biodone(bio);
1221                     tag->bio = NULL;
1222                     break;
1223           }
1224 
1225           /*
1226            * Handle completion of the transaction.  If the bioq is not empty
1227            * we can initiate another bio on the same tag.
1228            *
1229            * NOTE: Most of our transactions will be single-message
1230            *         CREATE+DELETEs, so we won't have to terminate the
1231            *         transaction separately, here.  But just in case they
1232            *         aren't be sure to terminate the transaction.
1233            */
1234 handle_done:
1235           if (msg->any.head.cmd & DMSGF_DELETE) {
1236                     xa_done(tag, 1);
1237                     if ((state->txcmd & DMSGF_DELETE) == 0)
1238                               kdmsg_msg_reply(msg, 0);
1239           }
1240           return (0);
1241 
1242           /*
1243            * Handle the case where the transaction failed due to a
1244            * connectivity issue.  The tag is put away with wasbio=0
1245            * and we put the BIO back onto the bioq for a later restart.
1246            *
1247            * probe I/Os (where the device is not open) will be failed
1248            * instead of requeued.
1249            */
1250 handle_repend:
1251           tag->bio = NULL;
1252           if (bio->bio_buf->b_flags & B_FAILONDIS) {
1253                     xa_printf(1, "xa_strategy: lost link, fail probe bp %p\n",
1254                                 bio->bio_buf);
1255                     bio->bio_buf->b_error = ENXIO;
1256                     bio->bio_buf->b_flags |= B_ERROR;
1257                     biodone(bio);
1258                     bio = NULL;
1259           } else {
1260                     xa_printf(1, "xa_strategy: lost link, requeue bp %p\n",
1261                                 bio->bio_buf);
1262           }
1263           xa_done(tag, 0);
1264           if ((state->txcmd & DMSGF_DELETE) == 0)
1265                     kdmsg_msg_reply(msg, 0);
1266 
1267           /*
1268            * Requeue the bio
1269            */
1270           if (bio) {
1271                     lockmgr(&sc->lk, LK_EXCLUSIVE);
1272                     TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1273                     lockmgr(&sc->lk, LK_RELEASE);
1274           }
1275           return (0);
1276 }
1277 
1278 /*
1279  * Restart as much deferred I/O as we can.  The serializer is set and we
1280  * eat it (clear it) when done.
1281  *
1282  * Called with sc->lk held
1283  */
1284 static
1285 void
xa_restart_deferred(xa_softc_t * sc)1286 xa_restart_deferred(xa_softc_t *sc)
1287 {
1288           kdmsg_state_t *span;
1289           kdmsg_msg_t *msg;
1290           xa_tag_t *tag;
1291           int error;
1292 
1293           KKASSERT(sc->serializing);
1294 
1295           /*
1296            * Determine if a restart is needed.
1297            */
1298           if (sc->opencnt == 0) {
1299                     /*
1300                      * Device is not open, nothing to do, eat serializing.
1301                      */
1302                     sc->serializing = 0;
1303                     wakeup(sc);
1304           } else if (sc->open_tag == NULL) {
1305                     /*
1306                      * BLK_OPEN required before we can restart any BIOs.
1307                      * Select the best LNK_SPAN to issue the BLK_OPEN under.
1308                      *
1309                      * serializing interlocks waiting open()s.
1310                      */
1311                     error = 0;
1312                     TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1313                               if ((span->rxcmd & DMSGF_DELETE) == 0)
1314                                         break;
1315                     }
1316                     if (span == NULL)
1317                               error = ENXIO;
1318 
1319                     if (error == 0) {
1320                               tag = xa_setup_cmd(sc, NULL);
1321                               if (tag == NULL)
1322                                         error = ENXIO;
1323                     }
1324                     if (error == 0) {
1325                               sc->open_tag = tag;
1326                               msg = kdmsg_msg_alloc(span,
1327                                                         DMSG_BLK_OPEN |
1328                                                         DMSGF_CREATE,
1329                                                         xa_sync_completion, tag);
1330                               msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1331                               xa_printf(1,
1332                                           "BLK_OPEN tag %p state %p "
1333                                           "span-state %p\n",
1334                                           tag, msg->state, span);
1335                               xa_start(tag, msg, 0);
1336                     }
1337                     if (error) {
1338                               sc->serializing = 0;
1339                               wakeup(sc);
1340                     }
1341                     /* else leave serializing set until BLK_OPEN response */
1342           } else {
1343                     /* nothing to do */
1344                     sc->serializing = 0;
1345                     wakeup(sc);
1346           }
1347 }
1348